diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index ef5eed0e..361c88ad 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -108,6 +108,8 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou } h.proxyConfig = proxyConfig + ctx = session.ContextWithHandler(ctx, h) + rawProxyHandler, err := common.CreateObject(ctx, proxyConfig) if err != nil { return nil, err diff --git a/app/reverse/bridge.go b/app/reverse/bridge.go index 20719ef1..25f039be 100644 --- a/app/reverse/bridge.go +++ b/app/reverse/bridge.go @@ -94,9 +94,9 @@ func (b *Bridge) Close() error { } type BridgeWorker struct { - tag string - worker *mux.ServerWorker - dispatcher routing.Dispatcher + Tag string + Worker *mux.ServerWorker + Dispatcher routing.Dispatcher state Control_State } @@ -115,15 +115,15 @@ func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWo } w := &BridgeWorker{ - dispatcher: d, - tag: tag, + Dispatcher: d, + Tag: tag, } worker, err := mux.NewServerWorker(context.Background(), w, link) if err != nil { return nil, err } - w.worker = worker + w.Worker = worker return w, nil } @@ -141,11 +141,11 @@ func (w *BridgeWorker) Close() error { } func (w *BridgeWorker) IsActive() bool { - return w.state == Control_ACTIVE && !w.worker.Closed() + return w.state == Control_ACTIVE && !w.Worker.Closed() } func (w *BridgeWorker) Connections() uint32 { - return w.worker.ActiveConnections() + return w.Worker.ActiveConnections() } func (w *BridgeWorker) handleInternalConn(link *transport.Link) { @@ -171,9 +171,9 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) { func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) { if !isInternalDomain(dest) { ctx = session.ContextWithInbound(ctx, &session.Inbound{ - Tag: w.tag, + Tag: w.Tag, }) - return w.dispatcher.Dispatch(ctx, dest) + return w.Dispatcher.Dispatch(ctx, dest) } opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)} @@ -194,12 +194,12 @@ func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*tra func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, link *transport.Link) error { if !isInternalDomain(dest) { ctx = session.ContextWithInbound(ctx, &session.Inbound{ - Tag: w.tag, + Tag: w.Tag, }) - return w.dispatcher.DispatchLink(ctx, dest, link) + return w.Dispatcher.DispatchLink(ctx, dest, link) } - link = w.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link) + link = w.Dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link) w.handleInternalConn(link) return nil diff --git a/app/reverse/portal.go b/app/reverse/portal.go index 5104238a..11bfc514 100644 --- a/app/reverse/portal.go +++ b/app/reverse/portal.go @@ -97,7 +97,6 @@ func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) err link.Writer = &buf.EndpointOverrideWriter{Writer: link.Writer, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address} } - return p.client.Dispatch(ctx, link) } diff --git a/common/protocol/headers.go b/common/protocol/headers.go index fb785d73..f6614ec7 100644 --- a/common/protocol/headers.go +++ b/common/protocol/headers.go @@ -16,11 +16,12 @@ const ( RequestCommandTCP = RequestCommand(0x01) RequestCommandUDP = RequestCommand(0x02) RequestCommandMux = RequestCommand(0x03) + RequestCommandRvs = RequestCommand(0x04) ) func (c RequestCommand) TransferType() TransferType { switch c { - case RequestCommandTCP, RequestCommandMux: + case RequestCommandTCP, RequestCommandMux, RequestCommandRvs: return TransferTypeStream case RequestCommandUDP: return TransferTypePacket diff --git a/common/session/context.go b/common/session/context.go index ba3530b5..6a812f99 100644 --- a/common/session/context.go +++ b/common/session/context.go @@ -6,6 +6,7 @@ import ( "github.com/xtls/xray-core/common/ctx" "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/features/outbound" "github.com/xtls/xray-core/features/routing" ) @@ -16,13 +17,13 @@ const ( inboundSessionKey ctx.SessionKey = 1 outboundSessionKey ctx.SessionKey = 2 contentSessionKey ctx.SessionKey = 3 - muxPreferredSessionKey ctx.SessionKey = 4 // unused - sockoptSessionKey ctx.SessionKey = 5 // used by dokodemo to only receive sockopt.Mark - trackedConnectionErrorKey ctx.SessionKey = 6 // used by observer to get outbound error - dispatcherKey ctx.SessionKey = 7 // used by ss2022 inbounds to get dispatcher - timeoutOnlyKey ctx.SessionKey = 8 // mux context's child contexts to only cancel when its own traffic times out - allowedNetworkKey ctx.SessionKey = 9 // muxcool server control incoming request tcp/udp - handlerSessionKey ctx.SessionKey = 10 // unused + muxPreferredSessionKey ctx.SessionKey = 4 // unused + sockoptSessionKey ctx.SessionKey = 5 // used by dokodemo to only receive sockopt.Mark + trackedConnectionErrorKey ctx.SessionKey = 6 // used by observer to get outbound error + dispatcherKey ctx.SessionKey = 7 // used by ss2022 inbounds to get dispatcher + timeoutOnlyKey ctx.SessionKey = 8 // mux context's child contexts to only cancel when its own traffic times out + allowedNetworkKey ctx.SessionKey = 9 // muxcool server control incoming request tcp/udp + handlerSessionKey ctx.SessionKey = 10 // outbound gets full handler mitmAlpn11Key ctx.SessionKey = 11 // used by TLS dialer mitmServerNameKey ctx.SessionKey = 12 // used by TLS dialer ) @@ -163,6 +164,17 @@ func AllowedNetworkFromContext(ctx context.Context) net.Network { return net.Network_Unknown } +func ContextWithHandler(ctx context.Context, handler outbound.Handler) context.Context { + return context.WithValue(ctx, handlerSessionKey, handler) +} + +func HandlerFromContext(ctx context.Context) outbound.Handler { + if val, ok := ctx.Value(handlerSessionKey).(outbound.Handler); ok { + return val + } + return nil +} + func ContextWithMitmAlpn11(ctx context.Context, alpn11 bool) context.Context { return context.WithValue(ctx, mitmAlpn11Key, alpn11) } diff --git a/infra/conf/vless.go b/infra/conf/vless.go index 999e3b9b..fb4ee464 100644 --- a/infra/conf/vless.go +++ b/infra/conf/vless.go @@ -77,6 +77,10 @@ func (c *VLessInboundConfig) Build() (proto.Message, error) { return nil, errors.New(`VLESS clients: "encryption" should not be in inbound settings`) } + if account.Reverse != nil && account.Reverse.Tag == "" { + return nil, errors.New(`VLESS clients: "tag" can't be empty for "reverse"`) + } + user.Account = serial.ToTypedMessage(account) config.Clients[idx] = user } @@ -288,6 +292,10 @@ func (c *VLessOutboundConfig) Build() (proto.Message, error) { return nil, errors.New(`VLESS users: unsupported "encryption": ` + account.Encryption) } + if account.Reverse != nil && account.Reverse.Tag == "" { + return nil, errors.New(`VLESS clients: "tag" can't be empty for "reverse"`) + } + user.Account = serial.ToTypedMessage(account) spec.User[idx] = user } diff --git a/proxy/vless/account.go b/proxy/vless/account.go index ce3eca27..ac00ea53 100644 --- a/proxy/vless/account.go +++ b/proxy/vless/account.go @@ -21,6 +21,7 @@ func (a *Account) AsAccount() (protocol.Account, error) { XorMode: a.XorMode, Seconds: a.Seconds, Padding: a.Padding, + Reverse: a.Reverse, }, nil } @@ -35,6 +36,8 @@ type MemoryAccount struct { XorMode uint32 Seconds uint32 Padding string + + Reverse *Reverse } // Equals implements protocol.Account.Equals(). @@ -54,5 +57,6 @@ func (a *MemoryAccount) ToProto() proto.Message { XorMode: a.XorMode, Seconds: a.Seconds, Padding: a.Padding, + Reverse: a.Reverse, } } diff --git a/proxy/vless/account.pb.go b/proxy/vless/account.pb.go index b3027c47..5822f512 100644 --- a/proxy/vless/account.pb.go +++ b/proxy/vless/account.pb.go @@ -20,6 +20,51 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type Reverse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"` +} + +func (x *Reverse) Reset() { + *x = Reverse{} + mi := &file_proxy_vless_account_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Reverse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Reverse) ProtoMessage() {} + +func (x *Reverse) ProtoReflect() protoreflect.Message { + mi := &file_proxy_vless_account_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Reverse.ProtoReflect.Descriptor instead. +func (*Reverse) Descriptor() ([]byte, []int) { + return file_proxy_vless_account_proto_rawDescGZIP(), []int{0} +} + +func (x *Reverse) GetTag() string { + if x != nil { + return x.Tag + } + return "" +} + type Account struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -28,16 +73,17 @@ type Account struct { // ID of the account, in the form of a UUID, e.g., "66ad4540-b58c-4ad2-9926-ea63445a9b57". Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // Flow settings. May be "xtls-rprx-vision". - Flow string `protobuf:"bytes,2,opt,name=flow,proto3" json:"flow,omitempty"` - Encryption string `protobuf:"bytes,3,opt,name=encryption,proto3" json:"encryption,omitempty"` - XorMode uint32 `protobuf:"varint,4,opt,name=xorMode,proto3" json:"xorMode,omitempty"` - Seconds uint32 `protobuf:"varint,5,opt,name=seconds,proto3" json:"seconds,omitempty"` - Padding string `protobuf:"bytes,6,opt,name=padding,proto3" json:"padding,omitempty"` + Flow string `protobuf:"bytes,2,opt,name=flow,proto3" json:"flow,omitempty"` + Encryption string `protobuf:"bytes,3,opt,name=encryption,proto3" json:"encryption,omitempty"` + XorMode uint32 `protobuf:"varint,4,opt,name=xorMode,proto3" json:"xorMode,omitempty"` + Seconds uint32 `protobuf:"varint,5,opt,name=seconds,proto3" json:"seconds,omitempty"` + Padding string `protobuf:"bytes,6,opt,name=padding,proto3" json:"padding,omitempty"` + Reverse *Reverse `protobuf:"bytes,7,opt,name=reverse,proto3" json:"reverse,omitempty"` } func (x *Account) Reset() { *x = Account{} - mi := &file_proxy_vless_account_proto_msgTypes[0] + mi := &file_proxy_vless_account_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -49,7 +95,7 @@ func (x *Account) String() string { func (*Account) ProtoMessage() {} func (x *Account) ProtoReflect() protoreflect.Message { - mi := &file_proxy_vless_account_proto_msgTypes[0] + mi := &file_proxy_vless_account_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -62,7 +108,7 @@ func (x *Account) ProtoReflect() protoreflect.Message { // Deprecated: Use Account.ProtoReflect.Descriptor instead. func (*Account) Descriptor() ([]byte, []int) { - return file_proxy_vless_account_proto_rawDescGZIP(), []int{0} + return file_proxy_vless_account_proto_rawDescGZIP(), []int{1} } func (x *Account) GetId() string { @@ -107,28 +153,40 @@ func (x *Account) GetPadding() string { return "" } +func (x *Account) GetReverse() *Reverse { + if x != nil { + return x.Reverse + } + return nil +} + var File_proxy_vless_account_proto protoreflect.FileDescriptor var file_proxy_vless_account_proto_rawDesc = []byte{ 0x0a, 0x19, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x76, 0x6c, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x78, 0x72, 0x61, - 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x6c, 0x65, 0x73, 0x73, 0x22, 0x9b, 0x01, - 0x0a, 0x07, 0x41, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x6c, 0x6f, - 0x77, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x1e, 0x0a, - 0x0a, 0x65, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0a, 0x65, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, - 0x07, 0x78, 0x6f, 0x72, 0x4d, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, - 0x78, 0x6f, 0x72, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, - 0x64, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, - 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x42, 0x52, 0x0a, 0x14, 0x63, - 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x6c, - 0x65, 0x73, 0x73, 0x50, 0x01, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, - 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x76, 0x6c, 0x65, 0x73, 0x73, 0xaa, 0x02, 0x10, 0x58, - 0x72, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x56, 0x6c, 0x65, 0x73, 0x73, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x6c, 0x65, 0x73, 0x73, 0x22, 0x1b, 0x0a, + 0x07, 0x52, 0x65, 0x76, 0x65, 0x72, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x61, 0x67, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x22, 0xd0, 0x01, 0x0a, 0x07, 0x41, + 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x1e, 0x0a, 0x0a, 0x65, 0x6e, + 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, + 0x65, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x78, 0x6f, + 0x72, 0x4d, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x78, 0x6f, 0x72, + 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x18, + 0x0a, 0x07, 0x70, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x70, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x12, 0x33, 0x0a, 0x07, 0x72, 0x65, 0x76, 0x65, + 0x72, 0x73, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x78, 0x72, 0x61, 0x79, + 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x6c, 0x65, 0x73, 0x73, 0x2e, 0x52, 0x65, 0x76, + 0x65, 0x72, 0x73, 0x65, 0x52, 0x07, 0x72, 0x65, 0x76, 0x65, 0x72, 0x73, 0x65, 0x42, 0x52, 0x0a, + 0x14, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, + 0x76, 0x6c, 0x65, 0x73, 0x73, 0x50, 0x01, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, + 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x76, 0x6c, 0x65, 0x73, 0x73, 0xaa, 0x02, + 0x10, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x56, 0x6c, 0x65, 0x73, + 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -143,16 +201,18 @@ func file_proxy_vless_account_proto_rawDescGZIP() []byte { return file_proxy_vless_account_proto_rawDescData } -var file_proxy_vless_account_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proxy_vless_account_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_proxy_vless_account_proto_goTypes = []any{ - (*Account)(nil), // 0: xray.proxy.vless.Account + (*Reverse)(nil), // 0: xray.proxy.vless.Reverse + (*Account)(nil), // 1: xray.proxy.vless.Account } var file_proxy_vless_account_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 0, // 0: xray.proxy.vless.Account.reverse:type_name -> xray.proxy.vless.Reverse + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_proxy_vless_account_proto_init() } @@ -166,7 +226,7 @@ func file_proxy_vless_account_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proxy_vless_account_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, }, diff --git a/proxy/vless/account.proto b/proxy/vless/account.proto index dab861bc..047311dd 100644 --- a/proxy/vless/account.proto +++ b/proxy/vless/account.proto @@ -6,6 +6,10 @@ option go_package = "github.com/xtls/xray-core/proxy/vless"; option java_package = "com.xray.proxy.vless"; option java_multiple_files = true; +message Reverse { + string tag = 1; +} + message Account { // ID of the account, in the form of a UUID, e.g., "66ad4540-b58c-4ad2-9926-ea63445a9b57". string id = 1; @@ -16,4 +20,6 @@ message Account { uint32 xorMode = 4; uint32 seconds = 5; string padding = 6; + + Reverse reverse = 7; } diff --git a/proxy/vless/encoding/encoding.go b/proxy/vless/encoding/encoding.go index c830ac62..fe2c6bc8 100644 --- a/proxy/vless/encoding/encoding.go +++ b/proxy/vless/encoding/encoding.go @@ -46,7 +46,7 @@ func EncodeRequestHeader(writer io.Writer, request *protocol.RequestHeader, requ return errors.New("failed to write request command").Base(err) } - if request.Command != protocol.RequestCommandMux { + if request.Command != protocol.RequestCommandMux && request.Command != protocol.RequestCommandRvs { if err := addrParser.WriteAddressPort(&buffer, request.Address, request.Port); err != nil { return errors.New("failed to write request address and port").Base(err) } @@ -112,7 +112,8 @@ func DecodeRequestHeader(isfb bool, first *buf.Buffer, reader io.Reader, validat switch request.Command { case protocol.RequestCommandMux: request.Address = net.DomainAddress("v1.mux.cool") - request.Port = 0 + case protocol.RequestCommandRvs: + request.Address = net.DomainAddress("v1.rvs.cool") case protocol.RequestCommandTCP, protocol.RequestCommandUDP: if addr, port, err := addrParser.ReadAddressPort(&buffer, reader); err == nil { request.Address = addr diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index 98db4f71..0ab9df71 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -12,19 +12,24 @@ import ( "time" "unsafe" + app_dispatcher "github.com/xtls/xray-core/app/dispatcher" + "github.com/xtls/xray-core/app/reverse" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/log" + "github.com/xtls/xray-core/common/mux" "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/retry" + "github.com/xtls/xray-core/common/serial" "github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/signal" "github.com/xtls/xray-core/common/task" "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/features/dns" feature_inbound "github.com/xtls/xray-core/features/inbound" + "github.com/xtls/xray-core/features/outbound" "github.com/xtls/xray-core/features/policy" "github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/proxy" @@ -179,6 +184,7 @@ func (h *Handler) Close() error { if h.decryption != nil { h.decryption.Close() } + // TODO: remove reverse's handlers (needs ctx) return errors.Combine(common.Close(h.validator)) } @@ -189,6 +195,19 @@ func (h *Handler) AddUser(ctx context.Context, u *protocol.MemoryUser) error { // RemoveUser implements proxy.UserManager.RemoveUser(). func (h *Handler) RemoveUser(ctx context.Context, e string) error { + u := h.validator.GetByEmail(e) + if u != nil { + a := u.Account.(*vless.MemoryAccount) + if a.Reverse != nil && a.Reverse.Tag != "" { + core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error { // not sure whether it works or not + go func() { + time.Sleep(time.Minute) // TODO: check firstLen + om.RemoveHandler(ctx, a.Reverse.Tag) + }() + return nil + }) + } + } return h.validator.Del(e) } @@ -565,6 +584,33 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, false, ctx, connection, nil) bufferWriter.SetFlushNext() + if request.Command == protocol.RequestCommandRvs { + if account.Reverse == nil || account.Reverse.Tag == "" { + return errors.New("account " + account.ID.String() + " can not use reverse proxy") + } + var rd routing.Dispatcher + var obm outbound.Manager + if err := core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error { + rd = d + obm = om + return nil + }); err != nil { + return err + } + r := obm.GetHandler(account.Reverse.Tag) + if r == nil { + picker, _ := reverse.NewStaticMuxPicker() + r = &Reverse{tag: account.Reverse.Tag, picker: picker, client: &mux.ClientManager{Picker: picker}} + if err := obm.AddHandler(ctx, r); err != nil { + return err + } + } + if r, ok := r.(*Reverse); ok { + return r.NewMux(ctx, rd.(*app_dispatcher.DefaultDispatcher).WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) + } + return errors.New("mismatched reverse tag") + } + if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{ Reader: clientReader, Writer: clientWriter}, @@ -573,3 +619,58 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s } return nil } + +type Reverse struct { + tag string + picker *reverse.StaticMuxPicker + client *mux.ClientManager +} + +func (r *Reverse) Tag() string { + return r.tag +} + +func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error { + muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{}) + if err != nil { + return errors.New("failed to create mux client worker").Base(err).AtWarning() + } + worker, err := reverse.NewPortalWorker(muxClient) + if err != nil { + return errors.New("failed to create portal worker").Base(err) + } + r.picker.AddWorker(worker) + select { + case <-ctx.Done(): + case <-muxClient.WaitClosed(): + } + return nil +} + +func (r *Reverse) Dispatch(ctx context.Context, link *transport.Link) { + outbounds := session.OutboundsFromContext(ctx) + ob := outbounds[len(outbounds)-1] + if ob != nil { + if ob.Target.Network == net.Network_UDP && ob.OriginalTarget.Address != nil && ob.OriginalTarget.Address != ob.Target.Address { + link.Reader = &buf.EndpointOverrideReader{Reader: link.Reader, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address} + link.Writer = &buf.EndpointOverrideWriter{Writer: link.Writer, Dest: ob.Target.Address, OriginalDest: ob.OriginalTarget.Address} + } + r.client.Dispatch(ctx, link) + } +} + +func (r *Reverse) Start() error { + return nil +} + +func (r *Reverse) Close() error { + return nil +} + +func (r *Reverse) SenderSettings() *serial.TypedMessage { + return nil +} + +func (r *Reverse) ProxySettings() *serial.TypedMessage { + return nil +} diff --git a/proxy/vless/outbound/outbound.go b/proxy/vless/outbound/outbound.go index 58c1646d..08902932 100644 --- a/proxy/vless/outbound/outbound.go +++ b/proxy/vless/outbound/outbound.go @@ -11,9 +11,12 @@ import ( "unsafe" utls "github.com/refraction-networking/utls" + proxyman "github.com/xtls/xray-core/app/proxyman/outbound" + "github.com/xtls/xray-core/app/reverse" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/mux" "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/retry" @@ -23,6 +26,7 @@ import ( "github.com/xtls/xray-core/common/xudp" "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/features/policy" + "github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/proxy" "github.com/xtls/xray-core/proxy/vless" "github.com/xtls/xray-core/proxy/vless/encoding" @@ -32,6 +36,7 @@ import ( "github.com/xtls/xray-core/transport/internet/reality" "github.com/xtls/xray-core/transport/internet/stat" "github.com/xtls/xray-core/transport/internet/tls" + "github.com/xtls/xray-core/transport/pipe" ) func init() { @@ -47,6 +52,7 @@ type Handler struct { policyManager policy.Manager cone bool encryption *encryption.ClientInstance + reverse *Reverse } // New creates a new VLess outbound handler. @@ -82,9 +88,42 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } } + if a.Reverse != nil { + if err := core.RequireFeatures(ctx, func(d routing.Dispatcher) error { + ctx = session.ContextWithInbound(ctx, &session.Inbound{ + Tag: a.Reverse.Tag, + }) + ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{ + Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")}, + }}) + handler.reverse = &Reverse{ + tag: a.Reverse.Tag, + dispatcher: d, + ctx: ctx, + handler: handler, + } + handler.reverse.monitorTask = &task.Periodic{ + Execute: handler.reverse.monitor, + Interval: time.Second * 2, + } + handler.reverse.Start() + return nil + }); err != nil { + return nil, err + } + } + return handler, nil } +// Close implements common.Closable.Close(). +func (h *Handler) Close() error { + if h.reverse != nil { + return h.reverse.Close() + } + return nil +} + // Process implements proxy.Outbound.Process(). func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { outbounds := session.OutboundsFromContext(ctx) @@ -127,8 +166,16 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte if target.Network == net.Network_UDP { command = protocol.RequestCommandUDP } - if target.Address.Family().IsDomain() && target.Address.Domain() == "v1.mux.cool" { - command = protocol.RequestCommandMux + if target.Address.Family().IsDomain() { + switch target.Address.Domain() { + case "v1.mux.cool": + command = protocol.RequestCommandMux + case "v1.rvs.cool": + if target.Network != net.Network_Unknown { + return errors.New("nice try baby").AtError() + } + command = protocol.RequestCommandRvs + } } request := &protocol.RequestHeader{ @@ -321,3 +368,66 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte return nil } + +type Reverse struct { + tag string + dispatcher routing.Dispatcher + ctx context.Context + handler *Handler + workers []*reverse.BridgeWorker + monitorTask *task.Periodic +} + +func (r *Reverse) monitor() error { + var activeWorkers []*reverse.BridgeWorker + for _, w := range r.workers { + if w.IsActive() { + activeWorkers = append(activeWorkers, w) + } + } + if len(activeWorkers) != len(r.workers) { + r.workers = activeWorkers + } + + var numConnections uint32 + var numWorker uint32 + for _, w := range r.workers { + if w.IsActive() { + numConnections += w.Connections() + numWorker++ + } + } + if numWorker == 0 || numConnections/numWorker > 16 { + reader1, writer1 := pipe.New(pipe.WithSizeLimit(2 * buf.Size)) + reader2, writer2 := pipe.New(pipe.WithSizeLimit(2 * buf.Size)) + link1 := &transport.Link{ + Reader: reader1, + Writer: writer2, + } + link2 := &transport.Link{ + Reader: reader2, + Writer: writer1, + } + w := &reverse.BridgeWorker{ + Tag: r.tag, + Dispatcher: r.dispatcher, + } + worker, err := mux.NewServerWorker(r.ctx, w, link1) + if err != nil { + errors.LogWarningInner(context.Background(), err, "failed to create bridge worker") + return nil + } + w.Worker = worker + r.workers = append(r.workers, w) + go r.handler.Process(r.ctx, link2, session.HandlerFromContext(r.ctx).(*proxyman.Handler)) + } + return nil +} + +func (r *Reverse) Start() error { + return r.monitorTask.Start() +} + +func (r *Reverse) Close() error { + return r.monitorTask.Close() +}