From 634c4964cc0f4dbe29cca5bf3a9f3840cf77ba4c Mon Sep 17 00:00:00 2001 From: v2ray Date: Thu, 12 May 2016 17:20:07 -0700 Subject: [PATCH] Massive fixes --- common/alloc/buffer.go | 9 +++++++-- common/io/buffered_writer.go | 16 ++++++++-------- common/io/transport.go | 6 ++++++ proxy/vmess/inbound/inbound.go | 31 ++++++++++++------------------- proxy/vmess/io/reader.go | 21 +++++++++------------ proxy/vmess/outbound/outbound.go | 14 ++++---------- shell/point/point.go | 2 +- transport/hub/udp_server.go | 32 ++++++++++++++++++++++++-------- 8 files changed, 71 insertions(+), 60 deletions(-) diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index 111d3fa1..9cd6a64b 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -69,12 +69,14 @@ func (b *Buffer) Bytes() []byte { // Slice cuts the buffer at the given position. func (b *Buffer) Slice(from, to int) *Buffer { + b.offset += from b.Value = b.Value[from:to] return b } // SliceFrom cuts the buffer at the given position. func (b *Buffer) SliceFrom(from int) *Buffer { + b.offset += from b.Value = b.Value[from:] return b } @@ -121,9 +123,10 @@ func (b *Buffer) Read(data []byte) (int, error) { } nBytes := copy(data, b.Value) if nBytes == b.Len() { - b.Value = b.Value[:0] + b.Clear() } else { b.Value = b.Value[nBytes:] + b.offset += nBytes } return nBytes, nil } @@ -132,7 +135,9 @@ func (b *Buffer) FillFrom(reader io.Reader) (int, error) { begin := b.Len() b.Value = b.Value[:cap(b.Value)] nBytes, err := reader.Read(b.Value[begin:]) - b.Value = b.Value[:begin+nBytes] + if err == nil { + b.Value = b.Value[:begin+nBytes] + } return nBytes, err } diff --git a/common/io/buffered_writer.go b/common/io/buffered_writer.go index e7652521..a48a5ac2 100644 --- a/common/io/buffered_writer.go +++ b/common/io/buffered_writer.go @@ -35,16 +35,15 @@ func (this *BufferedWriter) Write(b []byte) (int, error) { } func (this *BufferedWriter) Flush() error { - nBytes, err := this.writer.Write(this.buffer.Value) - this.buffer.SliceFrom(nBytes) - if !this.buffer.IsEmpty() { - nBytes, err = this.writer.Write(this.buffer.Value) + defer this.buffer.Clear() + for !this.buffer.IsEmpty() { + nBytes, err := this.writer.Write(this.buffer.Value) + if err != nil { + return err + } this.buffer.SliceFrom(nBytes) } - if this.buffer.IsEmpty() { - this.buffer.Clear() - } - return err + return nil } func (this *BufferedWriter) Cached() bool { @@ -59,6 +58,7 @@ func (this *BufferedWriter) SetCached(cached bool) { } func (this *BufferedWriter) Release() { + this.Flush() this.buffer.Release() this.buffer = nil this.writer = nil diff --git a/common/io/transport.go b/common/io/transport.go index e318bb42..4cb045f0 100644 --- a/common/io/transport.go +++ b/common/io/transport.go @@ -1,9 +1,14 @@ package io +import ( + "github.com/v2ray/v2ray-core/common/log" +) + func Pipe(reader Reader, writer Writer) error { for { buffer, err := reader.Read() if err != nil { + log.Debug("IO: Pipe exits as ", err) return err } @@ -14,6 +19,7 @@ func Pipe(reader Reader, writer Writer) error { err = writer.Write(buffer) if err != nil { + log.Debug("IO: Pipe exits as ", err) return err } } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 95e35dab..1bc36e75 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -143,9 +143,8 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { defer input.Close() defer output.Release() - var readFinish, writeFinish sync.Mutex + var readFinish sync.Mutex readFinish.Lock() - writeFinish.Lock() userSettings := protocol.GetUserSettings(request.User.Level) connReader.SetTimeOut(userSettings.PayloadReadTimeout) @@ -177,27 +176,21 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { // Optimize for small response packet if data, err := output.Read(); err == nil { + var v2writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) if request.Option.IsChunkStream() { - vmessio.Authenticate(data) + v2writer = vmessio.NewAuthChunkWriter(v2writer) } - bodyWriter.Write(data.Value) - data.Release() + + v2writer.Write(data) writer.SetCached(false) - go func(finish *sync.Mutex) { - var writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) - if request.Option.IsChunkStream() { - writer = vmessio.NewAuthChunkWriter(writer) - } - v2io.Pipe(output, writer) - output.Release() - if request.Option.IsChunkStream() { - writer.Write(alloc.NewSmallBuffer().Clear()) - } - writer.Release() - finish.Unlock() - }(&writeFinish) - writeFinish.Lock() + + v2io.Pipe(output, v2writer) + output.Release() + if request.Option.IsChunkStream() { + v2writer.Write(alloc.NewSmallBuffer().Clear()) + } + v2writer.Release() } readFinish.Lock() diff --git a/proxy/vmess/io/reader.go b/proxy/vmess/io/reader.go index 27e622af..ddbe76b2 100644 --- a/proxy/vmess/io/reader.go +++ b/proxy/vmess/io/reader.go @@ -6,6 +6,7 @@ import ( "io" "github.com/v2ray/v2ray-core/common/alloc" + "github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/serial" "github.com/v2ray/v2ray-core/transport" ) @@ -36,6 +37,7 @@ func (this *Validator) Consume(b []byte) { } func (this *Validator) Validate() bool { + log.Debug("VMess Reader: Expected auth ", this.expectedAuth, " actual auth: ", this.actualAuth.Sum32()) return this.actualAuth.Sum32() == this.expectedAuth } @@ -70,6 +72,7 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) { return nil, err } } + log.Debug("VMess Reader: raw buffer: ", buffer.Value) length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value() this.chunkLength = int(length) - 4 this.validator = NewValidator(serial.BytesLiteral(buffer.Value[2:6]).Uint32Value()) @@ -87,17 +90,9 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) { return nil, io.EOF } - if buffer.Len() <= this.chunkLength { + if buffer.Len() < this.chunkLength { this.validator.Consume(buffer.Value) this.chunkLength -= buffer.Len() - if this.chunkLength == 0 { - if !this.validator.Validate() { - buffer.Release() - return nil, transport.ErrorCorruptedPacket - } - this.chunkLength = -1 - this.validator = nil - } } else { this.validator.Consume(buffer.Value[:this.chunkLength]) if !this.validator.Validate() { @@ -105,9 +100,11 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) { return nil, transport.ErrorCorruptedPacket } leftLength := buffer.Len() - this.chunkLength - this.last = AllocBuffer(leftLength).Clear() - this.last.Append(buffer.Value[this.chunkLength:]) - buffer.Slice(0, this.chunkLength) + if leftLength > 0 { + this.last = AllocBuffer(leftLength).Clear() + this.last.Append(buffer.Value[this.chunkLength:]) + buffer.Slice(0, this.chunkLength) + } this.chunkLength = -1 this.validator = nil diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index d346e4c3..2134874d 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -69,25 +69,19 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) { defer finish.Unlock() - defer payload.Release() writer := v2io.NewBufferedWriter(conn) defer writer.Release() session.EncodeRequestHeader(request, writer) - if request.Option.IsChunkStream() { - vmessio.Authenticate(payload) - } - bodyWriter := session.EncodeRequestBody(writer) - bodyWriter.Write(payload.Value) - - writer.SetCached(false) - var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter) if request.Option.IsChunkStream() { streamWriter = vmessio.NewAuthChunkWriter(streamWriter) } + streamWriter.Write(payload) + writer.SetCached(false) + v2io.Pipe(input, streamWriter) if request.Option.IsChunkStream() { streamWriter.Write(alloc.NewSmallBuffer().Clear()) @@ -110,7 +104,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con go this.handleCommand(dest, header.Command) reader.SetCached(false) - decryptReader := session.DecodeResponseBody(conn) + decryptReader := session.DecodeResponseBody(reader) var bodyReader v2io.Reader if request.Option.IsChunkStream() { diff --git a/shell/point/point.go b/shell/point/point.go index cc810469..0a784f87 100644 --- a/shell/point/point.go +++ b/shell/point/point.go @@ -196,7 +196,7 @@ func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link r payload, err := link.OutboundInput().Read() if err != nil { log.Info("Point: No payload to dispatch, stopping dispatching now.") - link.OutboundOutput().Close() + link.OutboundOutput().Release() link.OutboundInput().Release() return } diff --git a/transport/hub/udp_server.go b/transport/hub/udp_server.go index 1e755be2..2c8c6649 100644 --- a/transport/hub/udp_server.go +++ b/transport/hub/udp_server.go @@ -6,6 +6,7 @@ import ( "github.com/v2ray/v2ray-core/app/dispatcher" "github.com/v2ray/v2ray-core/common/alloc" + "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -32,7 +33,7 @@ func NewTimedInboundRay(name string, inboundRay ray.InboundRay) *TimedInboundRay func (this *TimedInboundRay) Monitor() { for { - time.Sleep(16 * time.Second) + time.Sleep(time.Second * 16) select { case <-this.accessed: default: @@ -58,7 +59,7 @@ func (this *TimedInboundRay) InboundInput() ray.OutputStream { func (this *TimedInboundRay) InboundOutput() ray.InputStream { this.RLock() - this.RUnlock() + defer this.RUnlock() if this.inboundRay == nil { return nil } @@ -70,6 +71,7 @@ func (this *TimedInboundRay) InboundOutput() ray.InputStream { } func (this *TimedInboundRay) Release() { + log.Debug("UDP Server: Releasing TimedInboundRay: ", this.name) this.Lock() defer this.Unlock() if this.server == nil { @@ -102,12 +104,17 @@ func (this *UDPServer) RemoveRay(name string) { } func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buffer) bool { + log.Debug("UDP Server: Locating existing connection for ", name) this.RLock() defer this.RUnlock() if entry, found := this.conns[name]; found { - err := entry.InboundInput().Write(payload) + outputStream := entry.InboundInput() + if outputStream == nil { + return false + } + err := outputStream.Write(payload) if err != nil { - this.RemoveRay(name) + go this.RemoveRay(name) return false } return true @@ -116,16 +123,21 @@ func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buf } func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) { - destString := source.Address().String() + "-" + destination.Address().String() + destString := source.Address().String() + "-" + destination.NetAddr() + log.Debug("UDP Server: Dispatch request: ", destString) if this.locateExistingAndDispatch(destString, payload) { return } - this.Lock() + log.Info("UDP Server: establishing new connection for ", destString) inboundRay := this.packetDispatcher.DispatchToOutbound(destination) - inboundRay.InboundInput().Write(payload) - timedInboundRay := NewTimedInboundRay(destString, inboundRay) + outputStream := timedInboundRay.InboundInput() + if outputStream != nil { + outputStream.Write(payload) + } + + this.Lock() this.conns[destString] = timedInboundRay this.Unlock() go this.handleConnection(timedInboundRay, source, callback) @@ -133,6 +145,10 @@ func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Dest func (this *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) { for { + inputStream := inboundRay.InboundOutput() + if inputStream == nil { + break + } data, err := inboundRay.InboundOutput().Read() if err != nil { break