diff --git a/common/buf/copy.go b/common/buf/copy.go index 4cc3be88..95f09e8f 100644 --- a/common/buf/copy.go +++ b/common/buf/copy.go @@ -11,8 +11,8 @@ import ( type dataHandler func(MultiBuffer) -type copyHandler struct { - onData []dataHandler +type CopyHandler struct { + OnData []dataHandler } // SizeCounter is for counting bytes copied by Copy(). @@ -21,12 +21,12 @@ type SizeCounter struct { } // CopyOption is an option for copying data. -type CopyOption func(*copyHandler) +type CopyOption func(*CopyHandler) // UpdateActivity is a CopyOption to update activity on each data copy operation. func UpdateActivity(timer signal.ActivityUpdater) CopyOption { - return func(handler *copyHandler) { - handler.onData = append(handler.onData, func(MultiBuffer) { + return func(handler *CopyHandler) { + handler.OnData = append(handler.OnData, func(MultiBuffer) { timer.Update() }) } @@ -34,8 +34,8 @@ func UpdateActivity(timer signal.ActivityUpdater) CopyOption { // CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter. func CountSize(sc *SizeCounter) CopyOption { - return func(handler *copyHandler) { - handler.onData = append(handler.onData, func(b MultiBuffer) { + return func(handler *CopyHandler) { + handler.OnData = append(handler.OnData, func(b MultiBuffer) { sc.Size += int64(b.Len()) }) } @@ -43,8 +43,8 @@ func CountSize(sc *SizeCounter) CopyOption { // AddToStatCounter a CopyOption add to stat counter func AddToStatCounter(sc stats.Counter) CopyOption { - return func(handler *copyHandler) { - handler.onData = append(handler.onData, func(b MultiBuffer) { + return func(handler *CopyHandler) { + handler.OnData = append(handler.OnData, func(b MultiBuffer) { if sc != nil { sc.Add(int64(b.Len())) } @@ -88,18 +88,17 @@ func IsWriteError(err error) bool { return ok } -func copyInternal(reader Reader, writer Writer, handler *copyHandler) error { +func copyInternal(reader Reader, writer Writer, handler *CopyHandler) error { for { buffer, err := reader.ReadMultiBuffer() if !buffer.IsEmpty() { - for _, handler := range handler.onData { - handler(buffer) - } - if werr := writer.WriteMultiBuffer(buffer); werr != nil { return writeError{werr} } } + for _, handler := range handler.OnData { + handler(buffer) + } if err != nil { return readError{err} @@ -109,7 +108,7 @@ func copyInternal(reader Reader, writer Writer, handler *copyHandler) error { // Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF. func Copy(reader Reader, writer Writer, options ...CopyOption) error { - var handler copyHandler + var handler CopyHandler for _, option := range options { option(&handler) } diff --git a/proxy/addons.pb.go b/proxy/addons.pb.go index e55bba6a..02f804ff 100644 --- a/proxy/addons.pb.go +++ b/proxy/addons.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.23.1 +// protoc-gen-go v1.34.2 +// protoc v5.27.0 // source: proxy/addons.proto package proxy @@ -307,6 +307,7 @@ type SchedulerConfig struct { unknownFields protoimpl.UnknownFields TimeoutMillis uint32 `protobuf:"varint,1,opt,name=TimeoutMillis,proto3" json:"TimeoutMillis,omitempty"` // original traffic will not be sent right away but when scheduler want to send or pending buffer times out + PingPong bool `protobuf:"varint,2,opt,name=PingPong,proto3" json:"PingPong,omitempty"` // Other TBD } func (x *SchedulerConfig) Reset() { @@ -348,6 +349,13 @@ func (x *SchedulerConfig) GetTimeoutMillis() uint32 { return 0 } +func (x *SchedulerConfig) GetPingPong() bool { + if x != nil { + return x.PingPong + } + return false +} + var File_proxy_addons_proto protoreflect.FileDescriptor var file_proxy_addons_proto_rawDesc = []byte{ @@ -385,21 +393,22 @@ var file_proxy_addons_proto_rawDesc = []byte{ 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x4d, 0x69, 0x6e, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x4d, 0x61, 0x78, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x4d, 0x61, 0x78, 0x4d, - 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x22, 0x37, 0x0a, 0x0f, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, + 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x22, 0x53, 0x0a, 0x0f, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x24, 0x0a, 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x2a, 0x58, - 0x0a, 0x08, 0x53, 0x65, 0x65, 0x64, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, - 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x61, 0x64, 0x64, 0x69, - 0x6e, 0x67, 0x4f, 0x6e, 0x6c, 0x79, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x50, 0x61, 0x64, 0x64, - 0x69, 0x6e, 0x67, 0x50, 0x6c, 0x75, 0x73, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x10, 0x02, 0x12, 0x18, - 0x0a, 0x14, 0x49, 0x6e, 0x64, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x68, - 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x10, 0x03, 0x42, 0x40, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, - 0x78, 0x72, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x50, 0x01, 0x5a, 0x1f, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, - 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0xaa, 0x02, 0x0a, - 0x58, 0x72, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x0d, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x12, 0x1a, + 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x50, 0x6f, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x50, 0x6f, 0x6e, 0x67, 0x2a, 0x58, 0x0a, 0x08, 0x53, 0x65, + 0x65, 0x64, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, + 0x6e, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x4f, 0x6e, + 0x6c, 0x79, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x50, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x50, + 0x6c, 0x75, 0x73, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x10, 0x02, 0x12, 0x18, 0x0a, 0x14, 0x49, 0x6e, + 0x64, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, + 0x65, 0x72, 0x10, 0x03, 0x42, 0x40, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, + 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x50, 0x01, 0x5a, 0x1f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, + 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0xaa, 0x02, 0x0a, 0x58, 0x72, 0x61, 0x79, + 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -416,7 +425,7 @@ func file_proxy_addons_proto_rawDescGZIP() []byte { var file_proxy_addons_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_proxy_addons_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_proxy_addons_proto_goTypes = []interface{}{ +var file_proxy_addons_proto_goTypes = []any{ (SeedMode)(0), // 0: xray.proxy.SeedMode (*Addons)(nil), // 1: xray.proxy.Addons (*PaddingConfig)(nil), // 2: xray.proxy.PaddingConfig @@ -441,7 +450,7 @@ func file_proxy_addons_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_proxy_addons_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_proxy_addons_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*Addons); i { case 0: return &v.state @@ -453,7 +462,7 @@ func file_proxy_addons_proto_init() { return nil } } - file_proxy_addons_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_proxy_addons_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*PaddingConfig); i { case 0: return &v.state @@ -465,7 +474,7 @@ func file_proxy_addons_proto_init() { return nil } } - file_proxy_addons_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_proxy_addons_proto_msgTypes[2].Exporter = func(v any, i int) any { switch v := v.(*DelayConfig); i { case 0: return &v.state @@ -477,7 +486,7 @@ func file_proxy_addons_proto_init() { return nil } } - file_proxy_addons_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_proxy_addons_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*SchedulerConfig); i { case 0: return &v.state diff --git a/proxy/addons.proto b/proxy/addons.proto index 1542299e..6f7d8b57 100644 --- a/proxy/addons.proto +++ b/proxy/addons.proto @@ -38,5 +38,6 @@ message DelayConfig { message SchedulerConfig { uint32 TimeoutMillis = 1; // original traffic will not be sent right away but when scheduler want to send or pending buffer times out + bool PingPong = 2; // Other TBD } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index f2d68eea..0bc4d377 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -231,7 +231,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte inTimer = inbound.Timer } if !isTLSConn(conn) { // it would be tls conn in special use case of MITM, we need to let link handle traffic - return proxy.CopyRawConnIfExist(ctx, conn, writeConn, link.Writer, timer, inTimer) + return proxy.CopyRawConnIfExist(ctx, conn, writeConn, link.Writer, timer, inTimer, nil) } } var reader buf.Reader diff --git a/proxy/proxy.go b/proxy/proxy.go index df10d1f9..b785613d 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -216,7 +216,7 @@ type VisionWriter struct { trafficState *TrafficState ctx context.Context writeOnceUserUUID *[]byte - scheduler *Scheduler + Scheduler *Scheduler } func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter { @@ -228,7 +228,7 @@ func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, cont trafficState: state, ctx: context, writeOnceUserUUID: &w, - scheduler: NewScheduler(writer, addon, state, &w, context), + Scheduler: NewScheduler(writer, addon, state, &w, context), } } @@ -281,12 +281,24 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { if w.trafficState.StartTime.IsZero() { w.trafficState.StartTime = time.Now() } - w.scheduler.Buffer <- mb - if w.addons.Scheduler == nil { - w.scheduler.Trigger <- -1 // send all buffers + w.Scheduler.Buffer <- mb + w.Scheduler.Trigger <- -1 // send all buffers if no independent scheduler + if w.addons.Scheduler != nil { + w.Scheduler.TimeoutLock.Lock() + w.Scheduler.TimeoutCounter++ + w.Scheduler.TimeoutLock.Unlock() + go func() { + time.Sleep(time.Duration(w.addons.Scheduler.TimeoutMillis) * time.Millisecond) + w.Scheduler.TimeoutLock.Lock() + w.Scheduler.TimeoutCounter-- + if w.Scheduler.TimeoutCounter == 0 { + w.Scheduler.Trigger <- 0 // send when the latest buffer timeout + } + w.Scheduler.TimeoutLock.Unlock() + }() } - if len(w.scheduler.Error) > 0 { - return <-w.scheduler.Error + if len(w.Scheduler.Error) > 0 { + return <-w.Scheduler.Error } return nil } @@ -518,7 +530,7 @@ func UnwrapRawConn(conn net.Conn) (net.Conn, stats.Counter, stats.Counter) { // CopyRawConnIfExist use the most efficient copy method. // - If caller don't want to turn on splice, do not pass in both reader conn and writer conn // - writer are from *transport.Link -func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net.Conn, writer buf.Writer, timer *signal.ActivityTimer, inTimer *signal.ActivityTimer) error { +func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net.Conn, writer buf.Writer, timer *signal.ActivityTimer, inTimer *signal.ActivityTimer, scheduler *Scheduler) error { readerConn, readCounter, _ := UnwrapRawConn(readerConn) writerConn, _, writeCounter := UnwrapRawConn(writerConn) reader := buf.NewReader(readerConn) @@ -581,10 +593,13 @@ func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net if readCounter != nil { readCounter.Add(int64(buffer.Len())) } - timer.Update() if werr := writer.WriteMultiBuffer(buffer); werr != nil { return werr } + timer.Update() + } + if scheduler != nil { + scheduler.Trigger <- 2 } if err != nil { return err diff --git a/proxy/scheduler.go b/proxy/scheduler.go index 24c59888..6bc75772 100644 --- a/proxy/scheduler.go +++ b/proxy/scheduler.go @@ -15,6 +15,8 @@ type Scheduler struct { Buffer chan buf.MultiBuffer Trigger chan int Error chan error + TimeoutCounter int + TimeoutLock *sync.Mutex closed chan int bufferReadLock *sync.Mutex writer buf.Writer @@ -24,11 +26,21 @@ type Scheduler struct { ctx context.Context } +func TriggerScheduler(scheduler *Scheduler) buf.CopyOption { + return func(handler *buf.CopyHandler) { + handler.OnData = append(handler.OnData, func(buf.MultiBuffer) { + scheduler.Trigger <- 2 // send fake buffer if no pending + }) + } +} + func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]byte, context context.Context) *Scheduler { var s = Scheduler{ Buffer: make(chan buf.MultiBuffer, 100), Trigger: make(chan int), Error: make(chan error, 100), + TimeoutCounter: 0, + TimeoutLock: new(sync.Mutex), closed: make(chan int), bufferReadLock: new(sync.Mutex), writer: w, @@ -37,11 +49,14 @@ func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[] writeOnceUserUUID: userUUID, ctx: context, } + return &s +} + +func(s *Scheduler) Start() { go s.mainLoop() - if s.addons.Scheduler != nil { + if s.addons.Scheduler != nil && !s.addons.Scheduler.PingPong { go s.exampleIndependentScheduler() } - return &s } func(s *Scheduler) mainLoop() { @@ -49,6 +64,12 @@ func(s *Scheduler) mainLoop() { if len(s.closed) > 0 { return } + if trigger == -1 && s.addons.Scheduler != nil { + continue + } + if trigger == 2 && (s.addons.Scheduler == nil || !s.addons.Scheduler.PingPong) { + continue + } go func() { // each trigger has independent delay, trigger does not block var d = 0 * time.Millisecond if s.addons.Delay != nil { diff --git a/proxy/vless/encoding/addons.go b/proxy/vless/encoding/addons.go index feee6e87..fd9cb240 100644 --- a/proxy/vless/encoding/addons.go +++ b/proxy/vless/encoding/addons.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "strings" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" @@ -54,15 +55,6 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*proxy.Addons, er return addons, nil } -// EncodeBodyAddons returns a Writer that auto-encrypt content written by caller. -func EncodeBodyAddons(writer buf.Writer, request *protocol.RequestHeader, addons *proxy.Addons, state *proxy.TrafficState, context context.Context) buf.Writer { - w := proxy.NewVisionWriter(writer, addons, state, context) - if request.Command == protocol.RequestCommandUDP { - return NewMultiLengthPacketWriter(w) - } - return w -} - // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body. func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *proxy.Addons, state *proxy.TrafficState, context context.Context) buf.Reader { r := proxy.NewVisionReader(buf.NewReader(reader), addons, state, context) @@ -181,7 +173,7 @@ func (r *LengthPacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) { func PopulateSeed(seed string, addons *proxy.Addons) { if len(seed) > 0 { addons.Seed = []byte {1} // only turn on, more TBD - addons.Mode = proxy.SeedMode_PaddingPlusDelay + addons.Mode = proxy.SeedMode_IndependentScheduler addons.Duration = "0-8" addons.Padding = &proxy.PaddingConfig{ RegularMin: 0, @@ -196,6 +188,7 @@ func PopulateSeed(seed string, addons *proxy.Addons) { // } addons.Scheduler = &proxy.SchedulerConfig{ TimeoutMillis: 600, + PingPong: strings.Contains(seed, "pingpong"), } } else if addons.Flow == vless.XRV { addons.Seed = []byte {1} // only turn on, more TBD @@ -244,7 +237,8 @@ func CheckSeed(requestAddons *proxy.Addons, responseAddons *proxy.Addons) error return errors.New("Delay of one is nil but the other is not nil") } if requestAddons.Scheduler != nil && responseAddons.Scheduler != nil { - if requestAddons.Scheduler.TimeoutMillis != responseAddons.Scheduler.TimeoutMillis { + if requestAddons.Scheduler.TimeoutMillis != responseAddons.Scheduler.TimeoutMillis || + requestAddons.Scheduler.PingPong != responseAddons.Scheduler.PingPong { return errors.New("Scheduler not match") } } else if requestAddons.Scheduler != nil || responseAddons.Scheduler != nil { diff --git a/proxy/vless/encoding/encoding.go b/proxy/vless/encoding/encoding.go index 3e4d8009..f95ba560 100644 --- a/proxy/vless/encoding/encoding.go +++ b/proxy/vless/encoding/encoding.go @@ -172,7 +172,7 @@ func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader) (*p } // XtlsRead filter and read xtls protocol -func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, trafficState *proxy.TrafficState, ob *session.Outbound, ctx context.Context) error { +func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, scheduler *proxy.Scheduler, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, trafficState *proxy.TrafficState, ob *session.Outbound, ctx context.Context) error { err := func() error { for { if trafficState.ReaderSwitchToDirectCopy { @@ -188,11 +188,10 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, ob.CanSpliceCopy = 1 } } - return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer, inTimer) + return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer, inTimer, scheduler) } buffer, err := reader.ReadMultiBuffer() if !buffer.IsEmpty() { - timer.Update() if trafficState.ReaderSwitchToDirectCopy { // XTLS Vision processes struct TLS Conn's input and rawInput if inputBuffer, err := buf.ReadFrom(input); err == nil { @@ -209,7 +208,9 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, if werr := writer.WriteMultiBuffer(buffer); werr != nil { return werr } + timer.Update() } + scheduler.Trigger <- 2 if err != nil { return err } diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index ff397a6d..1f428802 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -532,6 +532,12 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s serverReader := link.Reader // .(*pipe.Reader) serverWriter := link.Writer // .(*pipe.Writer) trafficState := proxy.NewTrafficState(account.ID.Bytes(), account.Flow) + bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection)) + var clientWriter buf.Writer + v := proxy.NewVisionWriter(bufferWriter, requestAddons, trafficState, ctx) + scheduler := v.Scheduler + clientWriter = v + postRequest := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) @@ -542,10 +548,10 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s if requestAddons.Flow == vless.XRV { ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice - err = encoding.XtlsRead(clientReader, serverWriter, timer, connection, input, rawInput, trafficState, nil, ctx1) + err = encoding.XtlsRead(clientReader, serverWriter, timer, scheduler, connection, input, rawInput, trafficState, nil, ctx1) } else { // from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBuffer - err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer)) + err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer), proxy.TriggerScheduler(scheduler)) } if err != nil { @@ -558,13 +564,14 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s getResponse := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) - bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection)) if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil { return errors.New("failed to encode response header").Base(err).AtWarning() } - // default: clientWriter := bufferWriter - clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, responseAddons, trafficState, ctx) + scheduler.Start() + if request.Command == protocol.RequestCommandUDP { + clientWriter = encoding.NewMultiLengthPacketWriter(clientWriter) + } multiBuffer, err1 := serverReader.ReadMultiBuffer() if err1 != nil { return err1 // ... diff --git a/proxy/vless/outbound/outbound.go b/proxy/vless/outbound/outbound.go index ff188836..11706eb4 100644 --- a/proxy/vless/outbound/outbound.go +++ b/proxy/vless/outbound/outbound.go @@ -185,18 +185,23 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte request.Address = net.DomainAddress("v1.mux.cool") request.Port = net.Port(666) } + bufferWriter := buf.NewBufferedWriter(buf.NewWriter(conn)) + var serverWriter buf.Writer + v := proxy.NewVisionWriter(bufferWriter, requestAddons, trafficState, ctx) + scheduler := v.Scheduler + serverWriter = v postRequest := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) - - bufferWriter := buf.NewBufferedWriter(buf.NewWriter(conn)) + if err := encoding.EncodeRequestHeader(bufferWriter, request, requestAddons); err != nil { return errors.New("failed to encode request header").Base(err).AtWarning() } - // default: serverWriter := bufferWriter - serverWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, ctx) - if request.Command == protocol.RequestCommandMux && request.Port == 666 { + scheduler.Start() + if request.Command == protocol.RequestCommandUDP { + serverWriter = encoding.NewMultiLengthPacketWriter(serverWriter) + } else if request.Command == protocol.RequestCommandMux && request.Port == 666 { serverWriter = xudp.NewPacketWriter(serverWriter, target, xudp.GetGlobalID(ctx)) } timeoutReader, ok := clientReader.(buf.TimeoutReader) @@ -222,6 +227,11 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte if err := bufferWriter.SetBuffered(false); err != nil { return errors.New("failed to write A request payload").Base(err).AtWarning() } + if requestAddons.Scheduler != nil && requestAddons.Scheduler.PingPong { + go func() { + scheduler.Trigger <- 2 // client kickstart the pingpong! + }() + } var err error if requestAddons.Flow == vless.XRV { @@ -266,10 +276,10 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte } if requestAddons.Flow == vless.XRV { - err = encoding.XtlsRead(serverReader, clientWriter, timer, conn, input, rawInput, trafficState, ob, ctx) + err = encoding.XtlsRead(serverReader, clientWriter, timer, scheduler, conn, input, rawInput, trafficState, ob, ctx) } else { // from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer - err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer)) + err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer), proxy.TriggerScheduler(scheduler)) } if err != nil { diff --git a/testing/scenarios/vless_test.go b/testing/scenarios/vless_test.go index 2e647424..65dbe2e7 100644 --- a/testing/scenarios/vless_test.go +++ b/testing/scenarios/vless_test.go @@ -30,6 +30,18 @@ import ( ) func TestVless(t *testing.T) { + testVlessSeed(t, "") +} + +func TestVlessSeedWithFixedTrigger(t *testing.T) { + testVlessSeed(t, "1") +} + +func TestVlessSeedWithPingPong(t *testing.T) { + testVlessSeed(t, "pingpong") +} + +func testVlessSeed(t *testing.T, seed string) { tcpServer := tcp.Server{ MsgProcessor: xor, } @@ -57,6 +69,7 @@ func TestVless(t *testing.T) { { Account: serial.ToTypedMessage(&vless.Account{ Id: userID.String(), + Seed: seed, }), }, }, @@ -102,106 +115,7 @@ func TestVless(t *testing.T) { { Account: serial.ToTypedMessage(&vless.Account{ Id: userID.String(), - }), - }, - }, - }, - }, - }), - }, - }, - } - - servers, err := InitializeServerConfigs(serverConfig, clientConfig) - common.Must(err) - defer CloseAllServers(servers) - - var errg errgroup.Group - for i := 0; i < 10; i++ { - errg.Go(testTCPConn(clientPort, 1024*1024, time.Second*30)) - } - if err := errg.Wait(); err != nil { - t.Error(err) - } -} - -func TestVlessSeedWithIndependentScheduler(t *testing.T) { - tcpServer := tcp.Server{ - MsgProcessor: xor, - } - dest, err := tcpServer.Start() - common.Must(err) - defer tcpServer.Close() - - userID := protocol.NewID(uuid.New()) - serverPort := tcp.PickPort() - serverConfig := &core.Config{ - App: []*serial.TypedMessage{ - serial.ToTypedMessage(&log.Config{ - ErrorLogLevel: clog.Severity_Debug, - ErrorLogType: log.LogType_Console, - }), - }, - Inbound: []*core.InboundHandlerConfig{ - { - ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ - PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(serverPort)}}, - Listen: net.NewIPOrDomain(net.LocalHostIP), - }), - ProxySettings: serial.ToTypedMessage(&inbound.Config{ - Clients: []*protocol.User{ - { - Account: serial.ToTypedMessage(&vless.Account{ - Id: userID.String(), - Seed: "1", - }), - }, - }, - }), - }, - }, - Outbound: []*core.OutboundHandlerConfig{ - { - ProxySettings: serial.ToTypedMessage(&freedom.Config{}), - }, - }, - } - - clientPort := tcp.PickPort() - clientConfig := &core.Config{ - App: []*serial.TypedMessage{ - serial.ToTypedMessage(&log.Config{ - ErrorLogLevel: clog.Severity_Debug, - ErrorLogType: log.LogType_Console, - }), - }, - Inbound: []*core.InboundHandlerConfig{ - { - ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ - PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(clientPort)}}, - Listen: net.NewIPOrDomain(net.LocalHostIP), - }), - ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ - Address: net.NewIPOrDomain(dest.Address), - Port: uint32(dest.Port), - NetworkList: &net.NetworkList{ - Network: []net.Network{net.Network_TCP}, - }, - }), - }, - }, - Outbound: []*core.OutboundHandlerConfig{ - { - ProxySettings: serial.ToTypedMessage(&outbound.Config{ - Vnext: []*protocol.ServerEndpoint{ - { - Address: net.NewIPOrDomain(net.LocalHostIP), - Port: uint32(serverPort), - User: []*protocol.User{ - { - Account: serial.ToTypedMessage(&vless.Account{ - Id: userID.String(), - Seed: "1", + Seed: seed, }), }, }, @@ -348,6 +262,18 @@ func TestVlessTls(t *testing.T) { } func TestVlessXtlsVision(t *testing.T) { + testVlessXtlsVisionWithSeed(t, "") +} + +func TestVlessXtlsVisionWithFixedTrigger(t *testing.T) { + testVlessXtlsVisionWithSeed(t, "1") +} + +func TestVlessXtlsVisionWithPingPong(t *testing.T) { + testVlessXtlsVisionWithSeed(t, "pingpong") +} + +func testVlessXtlsVisionWithSeed(t *testing.T, seed string) { tcpServer := tcp.Server{ MsgProcessor: xor, } @@ -385,6 +311,7 @@ func TestVlessXtlsVision(t *testing.T) { Account: serial.ToTypedMessage(&vless.Account{ Id: userID.String(), Flow: vless.XRV, + Seed: seed, }), }, }, @@ -431,6 +358,7 @@ func TestVlessXtlsVision(t *testing.T) { Account: serial.ToTypedMessage(&vless.Account{ Id: userID.String(), Flow: vless.XRV, + Seed: seed, }), }, }, @@ -443,134 +371,6 @@ func TestVlessXtlsVision(t *testing.T) { TransportSettings: []*internet.TransportConfig{ { ProtocolName: "tcp", - Settings: serial.ToTypedMessage(&transtcp.Config{}), - }, - }, - SecurityType: serial.GetMessageType(&tls.Config{}), - SecuritySettings: []*serial.TypedMessage{ - serial.ToTypedMessage(&tls.Config{ - AllowInsecure: true, - }), - }, - }, - }), - }, - }, - } - - servers, err := InitializeServerConfigs(serverConfig, clientConfig) - common.Must(err) - defer CloseAllServers(servers) - - var errg errgroup.Group - for i := 0; i < 10; i++ { - errg.Go(testTCPConn(clientPort, 1024*1024, time.Second*30)) - } - if err := errg.Wait(); err != nil { - t.Error(err) - } -} - -func TestVlessXtlsVisionWithSeed(t *testing.T) { - tcpServer := tcp.Server{ - MsgProcessor: xor, - } - dest, err := tcpServer.Start() - common.Must(err) - defer tcpServer.Close() - - userID := protocol.NewID(uuid.New()) - serverPort := tcp.PickPort() - serverConfig := &core.Config{ - App: []*serial.TypedMessage{ - serial.ToTypedMessage(&log.Config{ - ErrorLogLevel: clog.Severity_Debug, - ErrorLogType: log.LogType_Console, - }), - }, - Inbound: []*core.InboundHandlerConfig{ - { - ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ - PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(serverPort)}}, - Listen: net.NewIPOrDomain(net.LocalHostIP), - StreamSettings: &internet.StreamConfig{ - Protocol: internet.TransportProtocol_TCP, - SecurityType: serial.GetMessageType(&tls.Config{}), - SecuritySettings: []*serial.TypedMessage{ - serial.ToTypedMessage(&tls.Config{ - Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil))}, - }), - }, - }, - }), - ProxySettings: serial.ToTypedMessage(&inbound.Config{ - Clients: []*protocol.User{ - { - Account: serial.ToTypedMessage(&vless.Account{ - Id: userID.String(), - Flow: vless.XRV, - Seed: "1", - }), - }, - }, - }), - }, - }, - Outbound: []*core.OutboundHandlerConfig{ - { - ProxySettings: serial.ToTypedMessage(&freedom.Config{}), - }, - }, - } - - clientPort := tcp.PickPort() - clientConfig := &core.Config{ - App: []*serial.TypedMessage{ - serial.ToTypedMessage(&log.Config{ - ErrorLogLevel: clog.Severity_Debug, - ErrorLogType: log.LogType_Console, - }), - }, - Inbound: []*core.InboundHandlerConfig{ - { - ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ - PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(clientPort)}}, - Listen: net.NewIPOrDomain(net.LocalHostIP), - }), - ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ - Address: net.NewIPOrDomain(dest.Address), - Port: uint32(dest.Port), - NetworkList: &net.NetworkList{ - Network: []net.Network{net.Network_TCP}, - }, - }), - }, - }, - Outbound: []*core.OutboundHandlerConfig{ - { - ProxySettings: serial.ToTypedMessage(&outbound.Config{ - Vnext: []*protocol.ServerEndpoint{ - { - Address: net.NewIPOrDomain(net.LocalHostIP), - Port: uint32(serverPort), - User: []*protocol.User{ - { - Account: serial.ToTypedMessage(&vless.Account{ - Id: userID.String(), - Flow: vless.XRV, - Seed: "1", - }), - }, - }, - }, - }, - }), - SenderSettings: serial.ToTypedMessage(&proxyman.SenderConfig{ - StreamSettings: &internet.StreamConfig{ - Protocol: internet.TransportProtocol_TCP, - TransportSettings: []*internet.TransportConfig{ - { - Protocol: internet.TransportProtocol_TCP, Settings: serial.ToTypedMessage(&transtcp.Config{}), }, },