diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 335841b6..441d5b92 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -169,7 +169,11 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { } else { requestReader = v2io.NewAdaptiveReader(bodyReader) } - v2io.Pipe(requestReader, input) + err := v2io.Pipe(requestReader, input) + if err != vmessio.ErrorStreamCompleted { + connection.SetReusable(false) + } + requestReader.Release() input.Close() readFinish.Unlock() @@ -197,7 +201,11 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { writer.SetCached(false) - v2io.Pipe(output, v2writer) + err = v2io.Pipe(output, v2writer) + if err != vmessio.ErrorStreamCompleted { + connection.SetReusable(false) + } + output.Release() if request.Option.IsChunkStream() { v2writer.Write(alloc.NewSmallBuffer().Clear()) diff --git a/proxy/vmess/io/reader.go b/proxy/vmess/io/reader.go index f984f87d..d027bc3a 100644 --- a/proxy/vmess/io/reader.go +++ b/proxy/vmess/io/reader.go @@ -1,6 +1,7 @@ package io import ( + "errors" "hash" "hash/fnv" "io" @@ -10,6 +11,10 @@ import ( "github.com/v2ray/v2ray-core/transport" ) +var ( + ErrorStreamCompleted = errors.New("Stream completed.") +) + // @Private type Validator struct { actualAuth hash.Hash32 @@ -76,7 +81,7 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) { if this.chunkLength == 0 { buffer.Release() - return nil, io.EOF + return nil, ErrorStreamCompleted } if buffer.Len() < this.chunkLength { diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 60870d9d..05721b0e 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -1,7 +1,6 @@ package outbound import ( - "net" "sync" "github.com/v2ray/v2ray-core/app" @@ -70,7 +69,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al return nil } -func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) { +func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) { defer finish.Unlock() writer := v2io.NewBufferedWriter(conn) @@ -85,7 +84,11 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn streamWriter.Write(payload) writer.SetCached(false) - v2io.Pipe(input, streamWriter) + err := v2io.Pipe(input, streamWriter) + if err != vmessio.ErrorStreamCompleted { + conn.SetReusable(false) + } + if request.Option.IsChunkStream() { streamWriter.Write(alloc.NewSmallBuffer().Clear()) } @@ -93,7 +96,7 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn return } -func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) { +func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) { defer finish.Unlock() reader := v2io.NewBufferedReader(conn) @@ -116,7 +119,11 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con bodyReader = v2io.NewAdaptiveReader(decryptReader) } - v2io.Pipe(bodyReader, output) + err = v2io.Pipe(bodyReader, output) + if err != vmessio.ErrorStreamCompleted { + conn.SetReusable(false) + } + bodyReader.Release() return