From b1c6471eebd2ec4ef5e0ef6af4b6ceb5f4a06734 Mon Sep 17 00:00:00 2001 From: ll11l1lIllIl1lll <88377095+ll11l1lIllIl1lll@users.noreply.github.com> Date: Mon, 16 Sep 2024 12:42:01 +0000 Subject: [PATCH] SplitHTTP client: Add xmux (multiplex controller) for H3 & H2 (#3613) https://github.com/XTLS/Xray-core/pull/3613#issuecomment-2351954957 Closes https://github.com/XTLS/Xray-core/issues/3560#issuecomment-2247495778 --------- Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com> --- infra/conf/transport_internet.go | 22 +++ transport/internet/splithttp/client.go | 7 +- transport/internet/splithttp/config.go | 43 +++++ transport/internet/splithttp/config.pb.go | 187 +++++++++++++++++---- transport/internet/splithttp/config.proto | 8 + transport/internet/splithttp/connection.go | 5 + transport/internet/splithttp/dialer.go | 77 +++++---- transport/internet/splithttp/mux.go | 102 +++++++++++ transport/internet/splithttp/mux_test.go | 88 ++++++++++ 9 files changed, 475 insertions(+), 64 deletions(-) create mode 100644 transport/internet/splithttp/mux.go create mode 100644 transport/internet/splithttp/mux_test.go diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index b61914ae..67e57c0c 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -231,6 +231,14 @@ type SplitHTTPConfig struct { ScMinPostsIntervalMs *Int32Range `json:"scMinPostsIntervalMs"` NoSSEHeader bool `json:"noSSEHeader"` XPaddingBytes *Int32Range `json:"xPaddingBytes"` + Xmux Xmux `json:"xmux"` +} + +type Xmux struct { + maxConnections *Int32Range `json:"maxConnections"` + maxConcurrency *Int32Range `json:"maxConcurrency"` + cMaxReuseTimes *Int32Range `json:"cMaxReuseTimes"` + cMaxLifetimeMs *Int32Range `json:"cMaxLifetimeMs"` } func splithttpNewRandRangeConfig(input *Int32Range) *splithttp.RandRangeConfig { @@ -254,6 +262,19 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { } else if c.Host == "" && c.Headers["Host"] != "" { c.Host = c.Headers["Host"] } + + if c.Xmux.maxConnections != nil && c.Xmux.maxConcurrency != nil { + return nil, errors.New("maxConnections cannot be specified together with maxConcurrency") + } + + // Multiplexing config + muxProtobuf := splithttp.Multiplexing{ + MaxConnections: splithttpNewRandRangeConfig(c.Xmux.maxConnections), + MaxConcurrency: splithttpNewRandRangeConfig(c.Xmux.maxConcurrency), + CMaxReuseTimes: splithttpNewRandRangeConfig(c.Xmux.cMaxReuseTimes), + CMaxLifetimeMs: splithttpNewRandRangeConfig(c.Xmux.cMaxLifetimeMs), + } + config := &splithttp.Config{ Path: c.Path, Host: c.Host, @@ -263,6 +284,7 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { ScMinPostsIntervalMs: splithttpNewRandRangeConfig(c.ScMinPostsIntervalMs), NoSSEHeader: c.NoSSEHeader, XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes), + Xmux: &muxProtobuf, } return config, nil } diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index 7fc26945..3f278aa7 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -30,8 +30,7 @@ type DialerClient interface { // implements splithttp.DialerClient in terms of direct network connections type DefaultDialerClient struct { transportConfig *Config - download *http.Client - upload *http.Client + client *http.Client isH2 bool isH3 bool // pool of net.Conn, created using dialUploadConn @@ -80,7 +79,7 @@ func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) req.Header = c.transportConfig.GetRequestHeader() - response, err := c.download.Do(req) + response, err := c.client.Do(req) gotConn.Close() if err != nil { errors.LogInfoInner(ctx, err, "failed to send download http request") @@ -138,7 +137,7 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, req.Header = c.transportConfig.GetRequestHeader() if c.isH2 || c.isH3 { - resp, err := c.upload.Do(req) + resp, err := c.client.Do(req) if err != nil { return err } diff --git a/transport/internet/splithttp/config.go b/transport/internet/splithttp/config.go index 6b5a2005..5309b180 100644 --- a/transport/internet/splithttp/config.go +++ b/transport/internet/splithttp/config.go @@ -105,6 +105,49 @@ func (c *Config) GetNormalizedXPaddingBytes() RandRangeConfig { return *c.XPaddingBytes } +func (m *Multiplexing) GetNormalizedCMaxReuseTimes() RandRangeConfig { + if m.CMaxReuseTimes == nil { + return RandRangeConfig{ + From: 0, + To: 0, + } + } + + return *m.CMaxReuseTimes +} + +func (m *Multiplexing) GetNormalizedCMaxLifetimeMs() RandRangeConfig { + if m.CMaxLifetimeMs == nil || m.CMaxLifetimeMs.To == 0 { + return RandRangeConfig{ + From: 0, + To: 0, + } + } + return *m.CMaxLifetimeMs +} + +func (m *Multiplexing) GetNormalizedMaxConnections() RandRangeConfig { + if m.MaxConnections == nil { + return RandRangeConfig{ + From: 0, + To: 0, + } + } + + return *m.MaxConnections +} + +func (m *Multiplexing) GetNormalizedMaxConcurrency() RandRangeConfig { + if m.MaxConcurrency == nil { + return RandRangeConfig{ + From: 0, + To: 0, + } + } + + return *m.MaxConcurrency +} + func init() { common.Must(internet.RegisterProtocolConfigCreator(protocolName, func() interface{} { return new(Config) diff --git a/transport/internet/splithttp/config.pb.go b/transport/internet/splithttp/config.pb.go index 24768490..51fc2091 100644 --- a/transport/internet/splithttp/config.pb.go +++ b/transport/internet/splithttp/config.pb.go @@ -33,6 +33,7 @@ type Config struct { ScMinPostsIntervalMs *RandRangeConfig `protobuf:"bytes,6,opt,name=scMinPostsIntervalMs,proto3" json:"scMinPostsIntervalMs,omitempty"` NoSSEHeader bool `protobuf:"varint,7,opt,name=noSSEHeader,proto3" json:"noSSEHeader,omitempty"` XPaddingBytes *RandRangeConfig `protobuf:"bytes,8,opt,name=xPaddingBytes,proto3" json:"xPaddingBytes,omitempty"` + Xmux *Multiplexing `protobuf:"bytes,9,opt,name=xmux,proto3" json:"xmux,omitempty"` } func (x *Config) Reset() { @@ -123,6 +124,13 @@ func (x *Config) GetXPaddingBytes() *RandRangeConfig { return nil } +func (x *Config) GetXmux() *Multiplexing { + if x != nil { + return x.Xmux + } + return nil +} + type RandRangeConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -178,6 +186,77 @@ func (x *RandRangeConfig) GetTo() int32 { return 0 } +type Multiplexing struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MaxConnections *RandRangeConfig `protobuf:"bytes,1,opt,name=maxConnections,proto3" json:"maxConnections,omitempty"` + MaxConcurrency *RandRangeConfig `protobuf:"bytes,2,opt,name=maxConcurrency,proto3" json:"maxConcurrency,omitempty"` + CMaxReuseTimes *RandRangeConfig `protobuf:"bytes,3,opt,name=cMaxReuseTimes,proto3" json:"cMaxReuseTimes,omitempty"` + CMaxLifetimeMs *RandRangeConfig `protobuf:"bytes,4,opt,name=cMaxLifetimeMs,proto3" json:"cMaxLifetimeMs,omitempty"` +} + +func (x *Multiplexing) Reset() { + *x = Multiplexing{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Multiplexing) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Multiplexing) ProtoMessage() {} + +func (x *Multiplexing) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Multiplexing.ProtoReflect.Descriptor instead. +func (*Multiplexing) Descriptor() ([]byte, []int) { + return file_transport_internet_splithttp_config_proto_rawDescGZIP(), []int{2} +} + +func (x *Multiplexing) GetMaxConnections() *RandRangeConfig { + if x != nil { + return x.MaxConnections + } + return nil +} + +func (x *Multiplexing) GetMaxConcurrency() *RandRangeConfig { + if x != nil { + return x.MaxConcurrency + } + return nil +} + +func (x *Multiplexing) GetCMaxReuseTimes() *RandRangeConfig { + if x != nil { + return x.CMaxReuseTimes + } + return nil +} + +func (x *Multiplexing) GetCMaxLifetimeMs() *RandRangeConfig { + if x != nil { + return x.CMaxLifetimeMs + } + return nil +} + var File_transport_internet_splithttp_config_proto protoreflect.FileDescriptor var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ @@ -185,8 +264,8 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x21, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x22, 0xea, - 0x04, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, + 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x22, 0xaf, + 0x05, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x4d, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x03, 0x28, @@ -221,23 +300,51 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0d, 0x78, 0x50, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x42, 0x79, 0x74, 0x65, 0x73, - 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, - 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, - 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x35, 0x0a, 0x0f, 0x52, - 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, - 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x66, 0x72, - 0x6f, 0x6d, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, - 0x74, 0x6f, 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, + 0x12, 0x43, 0x0a, 0x04, 0x78, 0x6d, 0x75, 0x78, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, + 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, + 0x74, 0x70, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x52, + 0x04, 0x78, 0x6d, 0x75, 0x78, 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x22, 0x35, 0x0a, 0x0f, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x02, 0x74, 0x6f, 0x22, 0xfe, 0x02, 0x0a, 0x0c, 0x4d, 0x75, 0x6c, 0x74, + 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x12, 0x5a, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x43, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, + 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, + 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x5a, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, + 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, + 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, + 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x52, 0x0e, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, + 0x12, 0x5a, 0x0a, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x75, 0x73, 0x65, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x50, 0x01, 0x5a, 0x36, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, - 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, - 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, - 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, - 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, + 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x63, 0x4d, + 0x61, 0x78, 0x52, 0x65, 0x75, 0x73, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x12, 0x5a, 0x0a, 0x0e, + 0x63, 0x4d, 0x61, 0x78, 0x4c, 0x69, 0x66, 0x65, 0x74, 0x69, 0x6d, 0x65, 0x4d, 0x73, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, + 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, + 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, + 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0e, 0x63, 0x4d, 0x61, 0x78, 0x4c, 0x69, + 0x66, 0x65, 0x74, 0x69, 0x6d, 0x65, 0x4d, 0x73, 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, + 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, + 0x74, 0x70, 0x50, 0x01, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, + 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, + 0x72, 0x61, 0x79, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -252,23 +359,29 @@ func file_transport_internet_splithttp_config_proto_rawDescGZIP() []byte { return file_transport_internet_splithttp_config_proto_rawDescData } -var file_transport_internet_splithttp_config_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_transport_internet_splithttp_config_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_transport_internet_splithttp_config_proto_goTypes = []any{ (*Config)(nil), // 0: xray.transport.internet.splithttp.Config (*RandRangeConfig)(nil), // 1: xray.transport.internet.splithttp.RandRangeConfig - nil, // 2: xray.transport.internet.splithttp.Config.HeaderEntry + (*Multiplexing)(nil), // 2: xray.transport.internet.splithttp.Multiplexing + nil, // 3: xray.transport.internet.splithttp.Config.HeaderEntry } var file_transport_internet_splithttp_config_proto_depIdxs = []int32{ - 2, // 0: xray.transport.internet.splithttp.Config.header:type_name -> xray.transport.internet.splithttp.Config.HeaderEntry - 1, // 1: xray.transport.internet.splithttp.Config.scMaxConcurrentPosts:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 1, // 2: xray.transport.internet.splithttp.Config.scMaxEachPostBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 1, // 3: xray.transport.internet.splithttp.Config.scMinPostsIntervalMs:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 1, // 4: xray.transport.internet.splithttp.Config.xPaddingBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 3, // 0: xray.transport.internet.splithttp.Config.header:type_name -> xray.transport.internet.splithttp.Config.HeaderEntry + 1, // 1: xray.transport.internet.splithttp.Config.scMaxConcurrentPosts:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 1, // 2: xray.transport.internet.splithttp.Config.scMaxEachPostBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 1, // 3: xray.transport.internet.splithttp.Config.scMinPostsIntervalMs:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 1, // 4: xray.transport.internet.splithttp.Config.xPaddingBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 2, // 5: xray.transport.internet.splithttp.Config.xmux:type_name -> xray.transport.internet.splithttp.Multiplexing + 1, // 6: xray.transport.internet.splithttp.Multiplexing.maxConnections:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 1, // 7: xray.transport.internet.splithttp.Multiplexing.maxConcurrency:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 1, // 8: xray.transport.internet.splithttp.Multiplexing.cMaxReuseTimes:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 1, // 9: xray.transport.internet.splithttp.Multiplexing.cMaxLifetimeMs:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 10, // [10:10] is the sub-list for method output_type + 10, // [10:10] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_transport_internet_splithttp_config_proto_init() } @@ -301,6 +414,18 @@ func file_transport_internet_splithttp_config_proto_init() { return nil } } + file_transport_internet_splithttp_config_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*Multiplexing); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -308,7 +433,7 @@ func file_transport_internet_splithttp_config_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_transport_internet_splithttp_config_proto_rawDesc, NumEnums: 0, - NumMessages: 3, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/transport/internet/splithttp/config.proto b/transport/internet/splithttp/config.proto index 3f24cfd3..019f4d00 100644 --- a/transport/internet/splithttp/config.proto +++ b/transport/internet/splithttp/config.proto @@ -15,9 +15,17 @@ message Config { RandRangeConfig scMinPostsIntervalMs = 6; bool noSSEHeader = 7; RandRangeConfig xPaddingBytes = 8; + Multiplexing xmux = 9; } message RandRangeConfig { int32 from = 1; int32 to = 2; } + +message Multiplexing { + RandRangeConfig maxConnections = 1; + RandRangeConfig maxConcurrency = 2; + RandRangeConfig cMaxReuseTimes = 3; + RandRangeConfig cMaxLifetimeMs = 4; +} diff --git a/transport/internet/splithttp/connection.go b/transport/internet/splithttp/connection.go index 697381d4..613e1f36 100644 --- a/transport/internet/splithttp/connection.go +++ b/transport/internet/splithttp/connection.go @@ -11,6 +11,7 @@ type splitConn struct { reader io.ReadCloser remoteAddr net.Addr localAddr net.Addr + onClose func() } func (c *splitConn) Write(b []byte) (int, error) { @@ -22,6 +23,10 @@ func (c *splitConn) Read(b []byte) (int, error) { } func (c *splitConn) Close() error { + if c.onClose != nil { + c.onClose() + } + err := c.writer.Close() err2 := c.reader.Close() if err != nil { diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index ff1501a8..60f8c628 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -41,32 +41,51 @@ type dialerConf struct { } var ( - globalDialerMap map[dialerConf]DialerClient + globalDialerMap map[dialerConf]*muxManager globalDialerAccess sync.Mutex ) -func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient { +func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (DialerClient, *muxResource) { if browser_dialer.HasBrowserDialer() { - return &BrowserDialerClient{} + return &BrowserDialerClient{}, nil } - tlsConfig := tls.ConfigFromStreamSettings(streamSettings) - isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1") - isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3") - globalDialerAccess.Lock() defer globalDialerAccess.Unlock() if globalDialerMap == nil { - globalDialerMap = make(map[dialerConf]DialerClient) + globalDialerMap = make(map[dialerConf]*muxManager) + } + + key := dialerConf{dest, streamSettings} + + muxManager, found := globalDialerMap[key] + + if !found { + transportConfig := streamSettings.ProtocolSettings.(*Config) + var mux Multiplexing + if transportConfig.Xmux != nil { + mux = *transportConfig.Xmux + } + + muxManager = NewMuxManager(mux, func() interface{} { + return createHTTPClient(dest, streamSettings) + }) + globalDialerMap[key] = muxManager } + res := muxManager.GetResource(ctx) + return res.Resource.(DialerClient), res +} + +func createHTTPClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient { + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1") + isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3") + if isH3 { dest.Network = net.Network_UDP } - if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found { - return client - } var gotlsConfig *gotls.Config @@ -74,6 +93,8 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in gotlsConfig = tlsConfig.GetTLSConfig(tls.WithDestination(dest)) } + transportConfig := streamSettings.ProtocolSettings.(*Config) + dialContext := func(ctxInner context.Context) (net.Conn, error) { conn, err := internet.DialSystem(ctxInner, dest, streamSettings.SocketSettings) if err != nil { @@ -94,8 +115,7 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in return conn, nil } - var downloadTransport http.RoundTripper - var uploadTransport http.RoundTripper + var transport http.RoundTripper if isH3 { quicConfig := &quic.Config{ @@ -107,7 +127,7 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in MaxIncomingStreams: -1, KeepAlivePeriod: h3KeepalivePeriod, } - roundTripper := &http3.RoundTripper{ + transport = &http3.RoundTripper{ QUICConfig: quicConfig, TLSClientConfig: gotlsConfig, Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (quic.EarlyConnection, error) { @@ -147,23 +167,20 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in return quic.DialEarly(ctx, udpConn, udpAddr, tlsCfg, cfg) }, } - downloadTransport = roundTripper - uploadTransport = roundTripper } else if isH2 { - downloadTransport = &http2.Transport{ + transport = &http2.Transport{ DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) { return dialContext(ctxInner) }, IdleConnTimeout: connIdleTimeout, ReadIdleTimeout: h2KeepalivePeriod, } - uploadTransport = downloadTransport } else { httpDialContext := func(ctxInner context.Context, network string, addr string) (net.Conn, error) { return dialContext(ctxInner) } - downloadTransport = &http.Transport{ + transport = &http.Transport{ DialTLSContext: httpDialContext, DialContext: httpDialContext, IdleConnTimeout: connIdleTimeout, @@ -171,17 +188,12 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in // http.Client and our custom dial context. DisableKeepAlives: true, } - // we use uploadRawPool for that - uploadTransport = nil } client := &DefaultDialerClient{ - transportConfig: streamSettings.ProtocolSettings.(*Config), - download: &http.Client{ - Transport: downloadTransport, - }, - upload: &http.Client{ - Transport: uploadTransport, + transportConfig: transportConfig, + client: &http.Client{ + Transport: transport, }, isH2: isH2, isH3: isH3, @@ -189,7 +201,6 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in dialUploadConn: dialContext, } - globalDialerMap[dialerConf{dest, streamSettings}] = client return client } @@ -223,7 +234,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me requestURL.Path = transportConfiguration.GetNormalizedPath() + sessionIdUuid.String() requestURL.RawQuery = transportConfiguration.GetNormalizedQuery() - httpClient := getHTTPClient(ctx, dest, streamSettings) + httpClient, muxResource := getHTTPClient(ctx, dest, streamSettings) maxUploadSize := scMaxEachPostBytes.roll() // WithSizeLimit(0) will still allow single bytes to pass, and a lot of @@ -231,7 +242,15 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me // uploadWriter wrapper, exact size limits can be enforced uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1)) + if muxResource != nil { + muxResource.OpenRequests.Add(1) + } + go func() { + if muxResource != nil { + defer muxResource.OpenRequests.Add(-1) + } + requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll())) var requestCounter int64 diff --git a/transport/internet/splithttp/mux.go b/transport/internet/splithttp/mux.go new file mode 100644 index 00000000..83ab25b0 --- /dev/null +++ b/transport/internet/splithttp/mux.go @@ -0,0 +1,102 @@ +package splithttp + +import ( + "context" + "math/rand" + "sync/atomic" + "time" + + "github.com/xtls/xray-core/common/errors" +) + +type muxResource struct { + Resource interface{} + OpenRequests atomic.Int32 + leftUsage int32 + expirationTime time.Time +} + +type muxManager struct { + newResourceFn func() interface{} + config Multiplexing + concurrency int32 + connections int32 + instances []*muxResource +} + +func NewMuxManager(config Multiplexing, newResource func() interface{}) *muxManager { + return &muxManager{ + config: config, + concurrency: config.GetNormalizedMaxConcurrency().roll(), + connections: config.GetNormalizedMaxConnections().roll(), + newResourceFn: newResource, + instances: make([]*muxResource, 0), + } +} + +func (m *muxManager) GetResource(ctx context.Context) *muxResource { + m.removeExpiredConnections(ctx) + + if m.connections > 0 && len(m.instances) < int(m.connections) { + errors.LogDebug(ctx, "xmux: creating client, connections=", len(m.instances)) + return m.newResource() + } + + if len(m.instances) == 0 { + errors.LogDebug(ctx, "xmux: creating client because instances is empty, connections=", len(m.instances)) + return m.newResource() + } + + clients := make([]*muxResource, 0) + if m.concurrency > 0 { + for _, client := range m.instances { + openRequests := client.OpenRequests.Load() + if openRequests < m.concurrency { + clients = append(clients, client) + } + } + } else { + clients = m.instances + } + + if len(clients) == 0 { + errors.LogDebug(ctx, "xmux: creating client because concurrency was hit, total clients=", len(m.instances)) + return m.newResource() + } + + client := clients[rand.Intn(len(clients))] + if client.leftUsage > 0 { + client.leftUsage -= 1 + } + return client +} + +func (m *muxManager) newResource() *muxResource { + leftUsage := int32(-1) + if x := m.config.GetNormalizedCMaxReuseTimes().roll(); x > 0 { + leftUsage = x - 1 + } + expirationTime := time.UnixMilli(0) + if x := m.config.GetNormalizedCMaxLifetimeMs().roll(); x > 0 { + expirationTime = time.Now().Add(time.Duration(x) * time.Millisecond) + } + + client := &muxResource{ + Resource: m.newResourceFn(), + leftUsage: leftUsage, + expirationTime: expirationTime, + } + m.instances = append(m.instances, client) + return client +} + +func (m *muxManager) removeExpiredConnections(ctx context.Context) { + for i := 0; i < len(m.instances); i++ { + client := m.instances[i] + if client.leftUsage == 0 || (client.expirationTime != time.UnixMilli(0) && time.Now().After(client.expirationTime)) { + errors.LogDebug(ctx, "xmux: removing client, leftUsage = ", client.leftUsage, ", expirationTime = ", client.expirationTime) + m.instances = append(m.instances[:i], m.instances[i+1:]...) + i-- + } + } +} diff --git a/transport/internet/splithttp/mux_test.go b/transport/internet/splithttp/mux_test.go new file mode 100644 index 00000000..e59eff0e --- /dev/null +++ b/transport/internet/splithttp/mux_test.go @@ -0,0 +1,88 @@ +package splithttp_test + +import ( + "context" + "testing" + + . "github.com/xtls/xray-core/transport/internet/splithttp" +) + +type fakeRoundTripper struct{} + +func TestMaxConnections(t *testing.T) { + config := Multiplexing{ + MaxConnections: &RandRangeConfig{From: 4, To: 4}, + } + + mux := NewMuxManager(config, func() interface{} { + return &fakeRoundTripper{} + }) + + clients := make(map[interface{}]struct{}) + for i := 0; i < 8; i++ { + clients[mux.GetResource(context.Background())] = struct{}{} + } + + if len(clients) != 4 { + t.Error("did not get 4 distinct clients, got ", len(clients)) + } +} + +func TestCMaxReuseTimes(t *testing.T) { + config := Multiplexing{ + CMaxReuseTimes: &RandRangeConfig{From: 2, To: 2}, + } + + mux := NewMuxManager(config, func() interface{} { + return &fakeRoundTripper{} + }) + + clients := make(map[interface{}]struct{}) + for i := 0; i < 64; i++ { + clients[mux.GetResource(context.Background())] = struct{}{} + } + + if len(clients) != 32 { + t.Error("did not get 32 distinct clients, got ", len(clients)) + } +} + +func TestMaxConcurrency(t *testing.T) { + config := Multiplexing{ + MaxConcurrency: &RandRangeConfig{From: 2, To: 2}, + } + + mux := NewMuxManager(config, func() interface{} { + return &fakeRoundTripper{} + }) + + clients := make(map[interface{}]struct{}) + for i := 0; i < 64; i++ { + client := mux.GetResource(context.Background()) + client.OpenRequests.Add(1) + clients[client] = struct{}{} + } + + if len(clients) != 32 { + t.Error("did not get 32 distinct clients, got ", len(clients)) + } +} + +func TestDefault(t *testing.T) { + config := Multiplexing{} + + mux := NewMuxManager(config, func() interface{} { + return &fakeRoundTripper{} + }) + + clients := make(map[interface{}]struct{}) + for i := 0; i < 64; i++ { + client := mux.GetResource(context.Background()) + client.OpenRequests.Add(1) + clients[client] = struct{}{} + } + + if len(clients) != 1 { + t.Error("did not get 1 distinct clients, got ", len(clients)) + } +}