From 5e8cf7bde6c5360275bf2c6d271fd5cc4252bbde Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Fri, 17 Nov 2017 20:54:24 +0100 Subject: [PATCH] redo http upload performance --- proxy/http/server.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/proxy/http/server.go b/proxy/http/server.go index d61f70ec..df426314 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -87,7 +87,7 @@ func parseBasicAuth(auth string) (username, password string, ok bool) { } func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher dispatcher.Interface) error { - reader := bufio.NewReaderSize(conn, 2048) + reader := bufio.NewReaderSize(conn, buf.Size) Start: conn.SetReadDeadline(time.Now().Add(time.Second * 16)) @@ -132,7 +132,7 @@ Start: keepAlive := (strings.TrimSpace(strings.ToLower(request.Header.Get("Proxy-Connection"))) == "keep-alive") - err = s.handlePlainHTTP(ctx, request, reader, conn, dest, dispatcher) + err = s.handlePlainHTTP(ctx, request, conn, dest, dispatcher) if err == errWaitAnother { if keepAlive { goto Start @@ -143,8 +143,8 @@ Start: return err } -func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error { - _, err := writer.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) +func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader *bufio.Reader, conn internet.Connection, dest net.Destination, dispatcher dispatcher.Interface) error { + _, err := conn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) if err != nil { return newError("failed to write back OK response").Base(err) } @@ -160,10 +160,20 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade return err } + if reader.Buffered() > 0 { + payload := buf.New() + common.Must(payload.Reset(func(b []byte) (int, error) { + return reader.Read(b[:reader.Buffered()]) + })) + if err := ray.InboundInput().WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil { + return err + } + } + requestDone := signal.ExecuteAsync(func() error { defer ray.InboundInput().Close() - v2reader := buf.NewReader(reader) + v2reader := buf.NewReader(conn) if err := buf.Copy(v2reader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil { return err } @@ -171,7 +181,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade }) responseDone := signal.ExecuteAsync(func() error { - v2writer := buf.NewWriter(writer) + v2writer := buf.NewWriter(conn) if err := buf.Copy(ray.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil { return err } @@ -219,7 +229,7 @@ func StripHopByHopHeaders(header http.Header) { var errWaitAnother = newError("keep alive") -func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error { +func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error { if !s.config.AllowTransparent && len(request.URL.Host) <= 0 { // RFC 2068 (HTTP/1.1) requires URL to be absolute URL in HTTP proxy. response := &http.Response{