From 34f34bb25d2abdd4e0190583221fe4c3bf7cf479 Mon Sep 17 00:00:00 2001 From: v2ray Date: Tue, 15 Dec 2015 22:13:09 +0100 Subject: [PATCH] refine http proxy --- proxy/http/chan_reader.go | 47 +++++++++++++++++++++++++++++++++++++++ proxy/http/http.go | 28 ++++++++++++++++++----- 2 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 proxy/http/chan_reader.go diff --git a/proxy/http/chan_reader.go b/proxy/http/chan_reader.go new file mode 100644 index 00000000..f169cce7 --- /dev/null +++ b/proxy/http/chan_reader.go @@ -0,0 +1,47 @@ +package http + +import ( + "io" + + "github.com/v2ray/v2ray-core/common/alloc" +) + +type ChanReader struct { + stream <-chan *alloc.Buffer + current *alloc.Buffer + eof bool +} + +func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader { + this := &ChanReader{ + stream: stream, + } + this.fill() + return this +} + +func (this *ChanReader) fill() { + b, ok := <-this.stream + this.current = b + if !ok { + this.eof = true + this.current = nil + } +} + +func (this *ChanReader) Read(b []byte) (int, error) { + if this.current == nil { + this.fill() + if this.eof { + return 0, io.EOF + } + } + nBytes := copy(b, this.current.Value) + if nBytes == this.current.Len() { + this.current.Release() + this.current = nil + } else { + this.current.SliceFrom(nBytes) + } + return nBytes, nil +} diff --git a/proxy/http/http.go b/proxy/http/http.go index 00a24463..326bb4e5 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/common/alloc" @@ -131,9 +132,21 @@ func (this *HttpProxyServer) handleRequest(request *http.Request, reader io.Read buffer := alloc.NewBuffer().Clear() request.Write(buffer) log.Info("Request to remote: %s", string(buffer.Value)) - packet := v2net.NewPacket(v2net.NewTCPDestination(address), buffer, false) + packet := v2net.NewPacket(v2net.NewTCPDestination(address), buffer, true) ray := this.space.PacketDispatcher().DispatchToOutbound(packet) - this.transport(nil, writer, ray) + defer close(ray.InboundInput()) + + responseReader := bufio.NewReader(NewChanReader(ray.InboundOutput())) + response, err := http.ReadResponse(responseReader, request) + if err != nil { + return + } + + responseBuffer := alloc.NewBuffer().Clear() + defer responseBuffer.Release() + response.Write(responseBuffer) + writer.Write(responseBuffer.Value) + } else { response := &http.Response{ Status: "400 Bad Request", @@ -155,17 +168,17 @@ func (this *HttpProxyServer) handleRequest(request *http.Request, reader io.Read func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ray.InboundRay) { var inputFinish, outputFinish sync.Mutex - inputFinish.Lock() outputFinish.Lock() if input != nil { + inputFinish.Lock() go func() { v2net.ReaderToChan(ray.InboundInput(), input) inputFinish.Unlock() - close(ray.InboundInput()) }() } else { - close(ray.InboundInput()) + // TODO: We can not close write so quickly, as some HTTP server will stop responding if so. + } go func() { @@ -173,5 +186,10 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra outputFinish.Unlock() }() + inputFinish.Lock() + go func() { + <-time.After(10 * time.Second) + close(ray.InboundInput()) + }() outputFinish.Lock() }