From 3156c4586c4961d9d03436f779c3909a1f5460d5 Mon Sep 17 00:00:00 2001 From: v2ray Date: Wed, 25 May 2016 09:32:26 +0200 Subject: [PATCH] Allow data stream passing through http proxy --- common/alloc/buffer.go | 12 ++++++++++ common/alloc/buffer_pool.go | 1 + common/io/chain_writer.go | 48 +++++++++++++++++++++++++++++++++++++ proxy/http/server.go | 39 +++++++++++++++++++----------- 4 files changed, 86 insertions(+), 14 deletions(-) create mode 100644 common/io/chain_writer.go diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index 9cd6a64b..64147d25 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -159,3 +159,15 @@ func NewBuffer() *Buffer { func NewLargeBuffer() *Buffer { return largePool.Allocate() } + +func NewBufferWithSize(size int) *Buffer { + if size <= SmallBufferSize { + return NewSmallBuffer() + } + + if size <= BufferSize { + return NewBuffer() + } + + return NewLargeBuffer() +} diff --git a/common/alloc/buffer_pool.go b/common/alloc/buffer_pool.go index 36e72e3a..68f3ed4f 100644 --- a/common/alloc/buffer_pool.go +++ b/common/alloc/buffer_pool.go @@ -50,6 +50,7 @@ func (p *BufferPool) Free(buffer *Buffer) { } const ( + SmallBufferSize = 1024 - defaultOffset BufferSize = 8*1024 - defaultOffset LargeBufferSize = 64*1024 - defaultOffset ) diff --git a/common/io/chain_writer.go b/common/io/chain_writer.go new file mode 100644 index 00000000..56bbce3f --- /dev/null +++ b/common/io/chain_writer.go @@ -0,0 +1,48 @@ +package io + +import ( + "io" + "sync" + + "github.com/v2ray/v2ray-core/common/alloc" +) + +type ChainWriter struct { + sync.Mutex + writer Writer +} + +func NewChainWriter(writer Writer) *ChainWriter { + return &ChainWriter{ + writer: writer, + } +} + +func (this *ChainWriter) Write(payload []byte) (int, error) { + if this.writer == nil { + return 0, io.EOF + } + + size := len(payload) + buffer := alloc.NewBufferWithSize(size).Clear() + buffer.Append(payload) + + this.Lock() + defer this.Unlock() + if this.writer == nil { + return 0, io.EOF + } + + err := this.writer.Write(buffer) + if err != nil { + return 0, err + } + return size, nil +} + +func (this *ChainWriter) Release() { + this.Lock() + this.writer.Release() + this.writer = nil + this.Unlock() +} diff --git a/proxy/http/server.go b/proxy/http/server.go index bf365bc9..1310e22d 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -228,30 +228,41 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D request.Host = request.URL.Host StripHopByHopHeaders(request) - requestBuffer := alloc.NewBuffer().Clear() // Don't release this buffer as it is passed into a Packet. - request.Write(requestBuffer) - log.Debug("Request to remote:\n", requestBuffer.Value) - ray := this.packetDispatcher.DispatchToOutbound(dest) - ray.InboundInput().Write(requestBuffer) defer ray.InboundInput().Close() + defer ray.InboundOutput().Release() - var wg sync.WaitGroup - wg.Add(1) + var finish sync.WaitGroup + finish.Add(1) go func() { - defer wg.Done() + defer finish.Done() + requestWriter := v2io.NewBufferedWriter(v2io.NewChainWriter(ray.InboundInput())) + err := request.Write(requestWriter) + if err != nil { + log.Warning("HTTP: Failed to write request: ", err) + return + } + requestWriter.Flush() + }() + + finish.Add(1) + go func() { + defer finish.Done() responseReader := bufio.NewReader(NewChanReader(ray.InboundOutput())) response, err := http.ReadResponse(responseReader, request) if err != nil { + log.Warning("HTTP: Failed to read response: ", err) + return + } + responseWriter := v2io.NewBufferedWriter(writer) + err = response.Write(responseWriter) + if err != nil { + log.Warning("HTTP: Failed to write response: ", err) return } - responseBuffer := alloc.NewBuffer().Clear() - defer responseBuffer.Release() - response.Write(responseBuffer) - writer.Write(responseBuffer.Value) - response.Body.Close() + responseWriter.Flush() }() - wg.Wait() + finish.Wait() } func init() {