diff --git a/common/buf/writer.go b/common/buf/writer.go index 8c348a46..6ff8a6b6 100644 --- a/common/buf/writer.go +++ b/common/buf/writer.go @@ -75,9 +75,10 @@ func (w *BufferToBytesWriter) ReadFrom(reader io.Reader) (int64, error) { // BufferedWriter is a Writer with internal buffer. type BufferedWriter struct { sync.Mutex - writer Writer - buffer *Buffer - buffered bool + writer Writer + buffer *Buffer + buffered bool + flushNext bool } // NewBufferedWriter creates a new BufferedWriter. @@ -161,6 +162,12 @@ func (w *BufferedWriter) WriteMultiBuffer(b MultiBuffer) error { } } + if w.flushNext { + w.buffered = false + w.flushNext = false + return w.flushInternal() + } + return nil } @@ -201,6 +208,13 @@ func (w *BufferedWriter) SetBuffered(f bool) error { return nil } +// SetFlushNext will wait the next WriteMultiBuffer to flush and set buffered = false +func (w *BufferedWriter) SetFlushNext() { + w.Lock() + defer w.Unlock() + w.flushNext = true +} + // ReadFrom implements io.ReaderFrom. func (w *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) { if err := w.SetBuffered(false); err != nil { diff --git a/proxy/proxy.go b/proxy/proxy.go index edfa63d0..157493bf 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -177,63 +177,109 @@ type VisionReader struct { trafficState *TrafficState ctx context.Context isUplink bool + conn net.Conn + input *bytes.Reader + rawInput *bytes.Buffer + ob *session.Outbound + + // internal + directReadCounter stats.Counter } -func NewVisionReader(reader buf.Reader, state *TrafficState, isUplink bool, context context.Context) *VisionReader { +func NewVisionReader(reader buf.Reader, trafficState *TrafficState, isUplink bool, ctx context.Context, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, ob *session.Outbound) *VisionReader { return &VisionReader{ Reader: reader, - trafficState: state, - ctx: context, + trafficState: trafficState, + ctx: ctx, isUplink: isUplink, + conn: conn, + input: input, + rawInput: rawInput, + ob: ob, } } func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) { buffer, err := w.Reader.ReadMultiBuffer() - if !buffer.IsEmpty() { - var withinPaddingBuffers *bool - var remainingContent *int32 - var remainingPadding *int32 - var currentCommand *int - var switchToDirectCopy *bool - if w.isUplink { - withinPaddingBuffers = &w.trafficState.Inbound.WithinPaddingBuffers - remainingContent = &w.trafficState.Inbound.RemainingContent - remainingPadding = &w.trafficState.Inbound.RemainingPadding - currentCommand = &w.trafficState.Inbound.CurrentCommand - switchToDirectCopy = &w.trafficState.Inbound.UplinkReaderDirectCopy - } else { - withinPaddingBuffers = &w.trafficState.Outbound.WithinPaddingBuffers - remainingContent = &w.trafficState.Outbound.RemainingContent - remainingPadding = &w.trafficState.Outbound.RemainingPadding - currentCommand = &w.trafficState.Outbound.CurrentCommand - switchToDirectCopy = &w.trafficState.Outbound.DownlinkReaderDirectCopy - } + if buffer.IsEmpty() { + return buffer, err + } - if *withinPaddingBuffers || w.trafficState.NumberOfPacketToFilter > 0 { - mb2 := make(buf.MultiBuffer, 0, len(buffer)) - for _, b := range buffer { - newbuffer := XtlsUnpadding(b, w.trafficState, w.isUplink, w.ctx) - if newbuffer.Len() > 0 { - mb2 = append(mb2, newbuffer) - } - } - buffer = mb2 - if *remainingContent > 0 || *remainingPadding > 0 || *currentCommand == 0 { - *withinPaddingBuffers = true - } else if *currentCommand == 1 { - *withinPaddingBuffers = false - } else if *currentCommand == 2 { - *withinPaddingBuffers = false - *switchToDirectCopy = true - } else { - errors.LogInfo(w.ctx, "XtlsRead unknown command ", *currentCommand, buffer.Len()) + var withinPaddingBuffers *bool + var remainingContent *int32 + var remainingPadding *int32 + var currentCommand *int + var switchToDirectCopy *bool + if w.isUplink { + withinPaddingBuffers = &w.trafficState.Inbound.WithinPaddingBuffers + remainingContent = &w.trafficState.Inbound.RemainingContent + remainingPadding = &w.trafficState.Inbound.RemainingPadding + currentCommand = &w.trafficState.Inbound.CurrentCommand + switchToDirectCopy = &w.trafficState.Inbound.UplinkReaderDirectCopy + } else { + withinPaddingBuffers = &w.trafficState.Outbound.WithinPaddingBuffers + remainingContent = &w.trafficState.Outbound.RemainingContent + remainingPadding = &w.trafficState.Outbound.RemainingPadding + currentCommand = &w.trafficState.Outbound.CurrentCommand + switchToDirectCopy = &w.trafficState.Outbound.DownlinkReaderDirectCopy + } + + if *switchToDirectCopy { + if w.directReadCounter != nil { + w.directReadCounter.Add(int64(buffer.Len())) + } + return buffer, err + } + + if *withinPaddingBuffers || w.trafficState.NumberOfPacketToFilter > 0 { + mb2 := make(buf.MultiBuffer, 0, len(buffer)) + for _, b := range buffer { + newbuffer := XtlsUnpadding(b, w.trafficState, w.isUplink, w.ctx) + if newbuffer.Len() > 0 { + mb2 = append(mb2, newbuffer) } } - if w.trafficState.NumberOfPacketToFilter > 0 { - XtlsFilterTls(buffer, w.trafficState, w.ctx) + buffer = mb2 + if *remainingContent > 0 || *remainingPadding > 0 || *currentCommand == 0 { + *withinPaddingBuffers = true + } else if *currentCommand == 1 { + *withinPaddingBuffers = false + } else if *currentCommand == 2 { + *withinPaddingBuffers = false + *switchToDirectCopy = true + } else { + errors.LogInfo(w.ctx, "XtlsRead unknown command ", *currentCommand, buffer.Len()) } } + if w.trafficState.NumberOfPacketToFilter > 0 { + XtlsFilterTls(buffer, w.trafficState, w.ctx) + } + + if *switchToDirectCopy { + // XTLS Vision processes TLS-like conn's input and rawInput + if inputBuffer, err := buf.ReadFrom(w.input); err == nil && !inputBuffer.IsEmpty() { + buffer, _ = buf.MergeMulti(buffer, inputBuffer) + } + if rawInputBuffer, err := buf.ReadFrom(w.rawInput); err == nil && !rawInputBuffer.IsEmpty() { + buffer, _ = buf.MergeMulti(buffer, rawInputBuffer) + } + *w.input = bytes.Reader{} // release memory + w.input = nil + *w.rawInput = bytes.Buffer{} // release memory + w.rawInput = nil + + if inbound := session.InboundFromContext(w.ctx); inbound != nil && inbound.Conn != nil { + if w.isUplink && inbound.CanSpliceCopy == 2 { + inbound.CanSpliceCopy = 1 + } + if !w.isUplink && w.ob != nil && w.ob.CanSpliceCopy == 2 { // ob need to be passed in due to context can have more than one ob + w.ob.CanSpliceCopy = 1 + } + } + readerConn, readCounter, _ := UnwrapRawConn(w.conn) + w.directReadCounter = readCounter + w.Reader = buf.NewReader(readerConn) + } return buffer, err } @@ -241,28 +287,32 @@ func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) { // Note Vision probably only make sense as the inner most layer of writer, since it need assess traffic state from origin proxy traffic type VisionWriter struct { buf.Writer - trafficState *TrafficState - ctx context.Context - writeOnceUserUUID []byte - isUplink bool + trafficState *TrafficState + ctx context.Context + isUplink bool + conn net.Conn + ob *session.Outbound + + // internal + writeOnceUserUUID []byte + directWriteCounter stats.Counter } -func NewVisionWriter(writer buf.Writer, state *TrafficState, isUplink bool, context context.Context) *VisionWriter { - w := make([]byte, len(state.UserUUID)) - copy(w, state.UserUUID) +func NewVisionWriter(writer buf.Writer, trafficState *TrafficState, isUplink bool, ctx context.Context, conn net.Conn, ob *session.Outbound) *VisionWriter { + w := make([]byte, len(trafficState.UserUUID)) + copy(w, trafficState.UserUUID) return &VisionWriter{ Writer: writer, - trafficState: state, - ctx: context, + trafficState: trafficState, + ctx: ctx, writeOnceUserUUID: w, isUplink: isUplink, + conn: conn, + ob: ob, } } func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { - if w.trafficState.NumberOfPacketToFilter > 0 { - XtlsFilterTls(mb, w.trafficState, w.ctx) - } var isPadding *bool var switchToDirectCopy *bool if w.isUplink { @@ -272,6 +322,29 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error { isPadding = &w.trafficState.Inbound.IsPadding switchToDirectCopy = &w.trafficState.Inbound.DownlinkWriterDirectCopy } + + if *switchToDirectCopy { + if inbound := session.InboundFromContext(w.ctx); inbound != nil { + if !w.isUplink && inbound.CanSpliceCopy == 2 { + inbound.CanSpliceCopy = 1 + } + if w.isUplink && w.ob != nil && w.ob.CanSpliceCopy == 2 { + w.ob.CanSpliceCopy = 1 + } + } + rawConn, _, writerCounter := UnwrapRawConn(w.conn) + w.Writer = buf.NewWriter(rawConn) + w.directWriteCounter = writerCounter + *switchToDirectCopy = false + } + if !mb.IsEmpty() && w.directWriteCounter != nil { + w.directWriteCounter.Add(int64(mb.Len())) + } + + if w.trafficState.NumberOfPacketToFilter > 0 { + XtlsFilterTls(mb, w.trafficState, w.ctx) + } + if *isPadding { if len(mb) == 1 && mb[0] == nil { mb[0] = XtlsPadding(nil, CommandPaddingContinue, &w.writeOnceUserUUID, true, w.ctx) // we do a long padding to hide vless header diff --git a/proxy/vless/encoding/addons.go b/proxy/vless/encoding/addons.go index 4474e3c9..77b09861 100644 --- a/proxy/vless/encoding/addons.go +++ b/proxy/vless/encoding/addons.go @@ -3,10 +3,12 @@ package encoding import ( "context" "io" + "net" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/protocol" + "github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/proxy" "github.com/xtls/xray-core/proxy/vless" "google.golang.org/protobuf/proto" @@ -61,15 +63,14 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*Addons, error) { } // EncodeBodyAddons returns a Writer that auto-encrypt content written by caller. -func EncodeBodyAddons(writer io.Writer, request *protocol.RequestHeader, requestAddons *Addons, state *proxy.TrafficState, isUplink bool, context context.Context) buf.Writer { +func EncodeBodyAddons(writer buf.Writer, request *protocol.RequestHeader, requestAddons *Addons, state *proxy.TrafficState, isUplink bool, context context.Context, conn net.Conn, ob *session.Outbound) buf.Writer { if request.Command == protocol.RequestCommandUDP { - return NewMultiLengthPacketWriter(writer.(buf.Writer)) + return NewMultiLengthPacketWriter(writer) } - w := buf.NewWriter(writer) if requestAddons.Flow == vless.XRV { - w = proxy.NewVisionWriter(w, state, isUplink, context) + return proxy.NewVisionWriter(writer, state, isUplink, context, conn, ob) } - return w + return writer } // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body. diff --git a/proxy/vless/encoding/encoding.go b/proxy/vless/encoding/encoding.go index da3a2517..c830ac62 100644 --- a/proxy/vless/encoding/encoding.go +++ b/proxy/vless/encoding/encoding.go @@ -1,7 +1,6 @@ package encoding import ( - "bytes" "context" "io" @@ -11,7 +10,6 @@ import ( "github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/signal" - "github.com/xtls/xray-core/features/stats" "github.com/xtls/xray-core/proxy" "github.com/xtls/xray-core/proxy/vless" ) @@ -171,8 +169,8 @@ func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader) (*A return responseAddons, nil } -// XtlsRead filter and read xtls protocol -func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, trafficState *proxy.TrafficState, ob *session.Outbound, isUplink bool, ctx context.Context) error { +// XtlsRead can switch to splice copy +func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, conn net.Conn, trafficState *proxy.TrafficState, isUplink bool, ctx context.Context) error { err := func() error { for { if isUplink && trafficState.Inbound.UplinkReaderDirectCopy || !isUplink && trafficState.Outbound.DownlinkReaderDirectCopy { @@ -181,74 +179,11 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Conn != nil { writerConn = inbound.Conn inTimer = inbound.Timer - if isUplink && inbound.CanSpliceCopy == 2 { - inbound.CanSpliceCopy = 1 - } - if !isUplink && ob != nil && ob.CanSpliceCopy == 2 { // ob need to be passed in due to context can change - ob.CanSpliceCopy = 1 - } } return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer, inTimer) } buffer, err := reader.ReadMultiBuffer() if !buffer.IsEmpty() { - timer.Update() - if isUplink && trafficState.Inbound.UplinkReaderDirectCopy || !isUplink && trafficState.Outbound.DownlinkReaderDirectCopy { - // XTLS Vision processes TLS-like conn's input and rawInput - if inputBuffer, err := buf.ReadFrom(input); err == nil && !inputBuffer.IsEmpty() { - buffer, _ = buf.MergeMulti(buffer, inputBuffer) - } - if rawInputBuffer, err := buf.ReadFrom(rawInput); err == nil && !rawInputBuffer.IsEmpty() { - buffer, _ = buf.MergeMulti(buffer, rawInputBuffer) - } - *input = bytes.Reader{} // release memory - input = nil - *rawInput = bytes.Buffer{} // release memory - rawInput = nil - } - if werr := writer.WriteMultiBuffer(buffer); werr != nil { - return werr - } - } - if err != nil { - return err - } - } - }() - if err != nil && errors.Cause(err) != io.EOF { - return err - } - return nil -} - -// XtlsWrite filter and write xtls protocol -func XtlsWrite(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater, conn net.Conn, trafficState *proxy.TrafficState, ob *session.Outbound, isUplink bool, ctx context.Context) error { - err := func() error { - var ct stats.Counter - for { - buffer, err := reader.ReadMultiBuffer() - if isUplink && trafficState.Outbound.UplinkWriterDirectCopy || !isUplink && trafficState.Inbound.DownlinkWriterDirectCopy { - if inbound := session.InboundFromContext(ctx); inbound != nil { - if !isUplink && inbound.CanSpliceCopy == 2 { - inbound.CanSpliceCopy = 1 - } - if isUplink && ob != nil && ob.CanSpliceCopy == 2 { - ob.CanSpliceCopy = 1 - } - } - rawConn, _, writerCounter := proxy.UnwrapRawConn(conn) - writer = buf.NewWriter(rawConn) - ct = writerCounter - if isUplink { - trafficState.Outbound.UplinkWriterDirectCopy = false - } else { - trafficState.Inbound.DownlinkWriterDirectCopy = false - } - } - if !buffer.IsEmpty() { - if ct != nil { - ct.Add(int64(buffer.Len())) - } timer.Update() if werr := writer.WriteMultiBuffer(buffer); werr != nil { return werr diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index 617313bd..54c7e76b 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -31,6 +31,7 @@ import ( "github.com/xtls/xray-core/proxy/vless" "github.com/xtls/xray-core/proxy/vless/encoding" "github.com/xtls/xray-core/proxy/vless/encryption" + "github.com/xtls/xray-core/transport" "github.com/xtls/xray-core/transport/internet/reality" "github.com/xtls/xray-core/transport/internet/stat" "github.com/xtls/xray-core/transport/internet/tls" @@ -551,89 +552,24 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s ctx = session.ContextWithAllowedNetwork(ctx, net.Network_UDP) } - sessionPolicy = h.policyManager.ForLevel(request.User.Level) - ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) - inbound.Timer = timer - ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer) - - link, err := dispatcher.Dispatch(ctx, request.Destination()) - if err != nil { - return errors.New("failed to dispatch request to ", request.Destination()).Base(err).AtWarning() - } - - serverReader := link.Reader // .(*pipe.Reader) - serverWriter := link.Writer // .(*pipe.Writer) trafficState := proxy.NewTrafficState(userSentID) - postRequest := func() error { - defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) - - // default: clientReader := reader - clientReader := encoding.DecodeBodyAddons(reader, request, requestAddons) - - var err error - - if requestAddons.Flow == vless.XRV { - ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice - clientReader = proxy.NewVisionReader(clientReader, trafficState, true, ctx1) - err = encoding.XtlsRead(clientReader, serverWriter, timer, connection, input, rawInput, trafficState, nil, true, ctx1) - } else { - // from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBuffer - err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer)) - } - - if err != nil { - return errors.New("failed to transfer request payload").Base(err).AtInfo() - } - - return nil + clientReader := encoding.DecodeBodyAddons(reader, request, requestAddons) + if requestAddons.Flow == vless.XRV { + clientReader = proxy.NewVisionReader(clientReader, trafficState, true, ctx, connection, input, rawInput, nil) } - getResponse := func() error { - defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) - - bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection)) - if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil { - return errors.New("failed to encode response header").Base(err).AtWarning() - } - - // default: clientWriter := bufferWriter - clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, false, ctx) - multiBuffer, err1 := serverReader.ReadMultiBuffer() - if err1 != nil { - return err1 // ... - } - if err := clientWriter.WriteMultiBuffer(multiBuffer); err != nil { - return err // ... - } - // Flush; bufferWriter.WriteMultiBuffer now is bufferWriter.writer.WriteMultiBuffer - if err := bufferWriter.SetBuffered(false); err != nil { - return errors.New("failed to write A response payload").Base(err).AtWarning() - } - - var err error - if requestAddons.Flow == vless.XRV { - err = encoding.XtlsWrite(serverReader, clientWriter, timer, connection, trafficState, nil, false, ctx) - } else { - // from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer - err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer)) - } - if err != nil { - return errors.New("failed to transfer response payload").Base(err).AtInfo() - } - // Indicates the end of response payload. - switch responseAddons.Flow { - default: - } - - return nil + bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection)) + if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil { + return errors.New("failed to encode response header").Base(err).AtWarning() } + clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, false, ctx, connection, nil) + bufferWriter.SetFlushNext() - if err := task.Run(ctx, task.OnSuccess(postRequest, task.Close(serverWriter)), getResponse); err != nil { - common.Interrupt(serverReader) - common.Interrupt(serverWriter) - return errors.New("connection ends").Base(err).AtInfo() + if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{ + Reader: &buf.TimeoutWrapperReader{Reader: clientReader}, + Writer: clientWriter}, + ); err != nil { + return errors.New("failed to dispatch request").Base(err) } - return nil } diff --git a/proxy/vless/outbound/outbound.go b/proxy/vless/outbound/outbound.go index 76352a0a..e04a0e23 100644 --- a/proxy/vless/outbound/outbound.go +++ b/proxy/vless/outbound/outbound.go @@ -225,7 +225,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte } // default: serverWriter := bufferWriter - serverWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, true, ctx) + serverWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, true, ctx, conn, ob) if request.Command == protocol.RequestCommandMux && request.Port == 666 { serverWriter = xudp.NewPacketWriter(serverWriter, target, xudp.GetGlobalID(ctx)) } @@ -253,7 +253,6 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte return errors.New("failed to write A request payload").Base(err).AtWarning() } - var err error if requestAddons.Flow == vless.XRV { if tlsConn, ok := iConn.(*tls.Conn); ok { if tlsConn.ConnectionState().Version != gotls.VersionTLS13 { @@ -264,12 +263,8 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte return errors.New(`failed to use `+requestAddons.Flow+`, found outer tls version `, utlsConn.ConnectionState().Version).AtWarning() } } - ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice - err = encoding.XtlsWrite(clientReader, serverWriter, timer, conn, trafficState, ob, true, ctx1) - } else { - // from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBuffer - err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer)) } + err := buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer)) if err != nil { return errors.New("failed to transfer request payload").Base(err).AtInfo() } @@ -292,7 +287,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte // default: serverReader := buf.NewReader(conn) serverReader := encoding.DecodeBodyAddons(conn, request, responseAddons) if requestAddons.Flow == vless.XRV { - serverReader = proxy.NewVisionReader(serverReader, trafficState, false, ctx) + serverReader = proxy.NewVisionReader(serverReader, trafficState, false, ctx, conn, input, rawInput, ob) } if request.Command == protocol.RequestCommandMux && request.Port == 666 { if requestAddons.Flow == vless.XRV { @@ -303,7 +298,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte } if requestAddons.Flow == vless.XRV { - err = encoding.XtlsRead(serverReader, clientWriter, timer, conn, input, rawInput, trafficState, ob, false, ctx) + err = encoding.XtlsRead(serverReader, clientWriter, timer, conn, trafficState, false, ctx) } else { // from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer))