diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index bd71ef0e..609a453e 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -92,9 +92,14 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in requestDone := func() error { defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly) - chunkReader := buf.NewReader(conn) + var reader buf.Reader + if plcy.Buffer.PerConnection == 0 { + reader = &buf.SingleReader{Reader: conn} + } else { + reader = buf.NewReader(conn) + } - if err := buf.Copy(chunkReader, link.Writer, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport request").Base(err) } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 4206d312..e5a44989 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -118,11 +118,12 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia } defer conn.Close() // nolint: errcheck + plcy := h.policy() ctx, cancel := context.WithCancel(ctx) - timer := signal.CancelAfterInactivity(ctx, cancel, h.policy().Timeouts.ConnectionIdle) + timer := signal.CancelAfterInactivity(ctx, cancel, plcy.Timeouts.ConnectionIdle) requestDone := func() error { - defer timer.SetTimeout(h.policy().Timeouts.DownlinkOnly) + defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly) var writer buf.Writer if destination.Network == net.Network_TCP { @@ -138,10 +139,15 @@ func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia } responseDone := func() error { - defer timer.SetTimeout(h.policy().Timeouts.UplinkOnly) + defer timer.SetTimeout(plcy.Timeouts.UplinkOnly) - v2reader := buf.NewReader(conn) - if err := buf.Copy(v2reader, output, buf.UpdateActivity(timer)); err != nil { + var reader buf.Reader + if plcy.Buffer.PerConnection == 0 { + reader = &buf.SingleReader{Reader: conn} + } else { + reader = buf.NewReader(conn) + } + if err := buf.Copy(reader, output, buf.UpdateActivity(timer)); err != nil { return newError("failed to process response").Base(err) } diff --git a/proxy/http/server.go b/proxy/http/server.go index 5f6a05f7..ad2938bf 100755 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -194,14 +194,19 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade } requestDone := func() error { - defer timer.SetTimeout(s.policy().Timeouts.DownlinkOnly) + defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly) - v2reader := buf.NewReader(conn) - return buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer)) + var reader buf.Reader + if plcy.Buffer.PerConnection == 0 { + reader = &buf.SingleReader{Reader: conn} + } else { + reader = buf.NewReader(conn) + } + return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)) } responseDone := func() error { - defer timer.SetTimeout(s.policy().Timeouts.UplinkOnly) + defer timer.SetTimeout(plcy.Timeouts.UplinkOnly) v2writer := buf.NewWriter(conn) if err := buf.Copy(link.Reader, v2writer, buf.UpdateActivity(timer)); err != nil { diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 779ea79c..bfdca366 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -149,7 +149,17 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { sessionPolicy := s.v.PolicyManager().ForLevel(s.user.Level) conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)) - bufferedReader := buf.BufferedReader{Reader: buf.NewReader(conn)} + + var bufferedReader buf.BufferedReader + { + var reader buf.Reader + if sessionPolicy.Buffer.PerConnection == 0 { + reader = &buf.SingleReader{Reader: conn} + } else { + reader = buf.NewReader(conn) + } + bufferedReader = buf.BufferedReader{Reader: reader} + } request, bodyReader, err := ReadTCPSession(s.user, &bufferedReader) if err != nil { log.Record(&log.AccessMessage{ diff --git a/proxy/socks/client.go b/proxy/socks/client.go index e837b53c..97169a97 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -113,7 +113,13 @@ func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dial } responseFunc = func() error { defer timer.SetTimeout(p.Timeouts.UplinkOnly) - return buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer)) + var reader buf.Reader + if p.Buffer.PerConnection == 0 { + reader = &buf.SingleReader{Reader: conn} + } else { + reader = buf.NewReader(conn) + } + return buf.Copy(reader, link.Writer, buf.UpdateActivity(timer)) } } else if request.Command == protocol.RequestCommandUDP { udpConn, err := dialer.Dial(ctx, udpRequest.Destination()) diff --git a/proxy/socks/server.go b/proxy/socks/server.go index a97f90f1..794c5fb3 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -68,11 +68,21 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet } func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { - if err := conn.SetReadDeadline(time.Now().Add(s.policy().Timeouts.Handshake)); err != nil { + plcy := s.policy() + if err := conn.SetReadDeadline(time.Now().Add(plcy.Timeouts.Handshake)); err != nil { newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) } - reader := &buf.BufferedReader{Reader: buf.NewReader(conn)} + var reader *buf.BufferedReader + { + var r buf.Reader + if plcy.Buffer.PerConnection == 0 { + r = &buf.SingleReader{Reader: conn} + } else { + r = buf.NewReader(conn) + } + reader = &buf.BufferedReader{Reader: r} + } inboundDest, ok := proxy.InboundEntryPointFromContext(ctx) if !ok { @@ -141,9 +151,7 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ requestDone := func() error { defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly) - - v2reader := buf.NewReader(reader) - if err := buf.Copy(v2reader, link.Writer, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(buf.NewReader(reader), link.Writer, buf.UpdateActivity(timer)); err != nil { return newError("failed to transport all TCP request").Base(err) } diff --git a/proxy/vmess/encoding/client.go b/proxy/vmess/encoding/client.go index 45f2c761..31506dcd 100644 --- a/proxy/vmess/encoding/client.go +++ b/proxy/vmess/encoding/client.go @@ -140,7 +140,7 @@ func (c *ClientSession) EncodeRequestBody(request *protocol.RequestHeader, write return crypto.NewAuthenticationWriter(auth, sizeParser, cryptionWriter, request.Command.TransferType(), padding) } - return buf.NewWriter(cryptionWriter) + return &buf.SequentialWriter{Writer: cryptionWriter} case protocol.SecurityType_AES128_GCM: block, _ := aes.NewCipher(c.requestBodyKey[:]) aead, _ := cipher.NewGCM(block) diff --git a/proxy/vmess/encoding/server.go b/proxy/vmess/encoding/server.go index a035a693..7bc18590 100644 --- a/proxy/vmess/encoding/server.go +++ b/proxy/vmess/encoding/server.go @@ -332,7 +332,7 @@ func (s *ServerSession) EncodeResponseBody(request *protocol.RequestHeader, writ return crypto.NewAuthenticationWriter(auth, sizeParser, s.responseWriter, request.Command.TransferType(), padding) } - return buf.NewWriter(s.responseWriter) + return &buf.SequentialWriter{Writer: s.responseWriter} case protocol.SecurityType_AES128_GCM: block, _ := aes.NewCipher(s.responseBodyKey[:]) aead, _ := cipher.NewGCM(block) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index b63c80b8..187ee3c5 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -224,7 +224,16 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i return newError("unable to set read deadline").Base(err).AtWarning() } - reader := &buf.BufferedReader{Reader: buf.NewReader(connection)} + var reader *buf.BufferedReader + { + var r buf.Reader + if sessionPolicy.Buffer.PerConnection == 0 { + r = &buf.SingleReader{Reader: connection} + } else { + r = buf.NewReader(connection) + } + reader = &buf.BufferedReader{Reader: r} + } svrSession := encoding.NewServerSession(h.clients, h.sessionHistory) request, err := svrSession.DecodeRequestHeader(reader) diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 71d973d3..fe278304 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -144,7 +144,16 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia responseDone := func() error { defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) - reader := &buf.BufferedReader{Reader: buf.NewReader(conn)} + var reader *buf.BufferedReader + { + var r buf.Reader + if sessionPolicy.Buffer.PerConnection == 0 { + r = &buf.SingleReader{Reader: conn} + } else { + r = buf.NewReader(conn) + } + reader = &buf.BufferedReader{Reader: r} + } header, err := session.DecodeResponseHeader(reader) if err != nil { return newError("failed to read header").Base(err)