Massive fixes

pull/168/head v1.12.9
v2ray 2016-05-12 17:20:07 -07:00
parent 3c02805186
commit 634c4964cc
8 changed files with 71 additions and 60 deletions

View File

@ -69,12 +69,14 @@ func (b *Buffer) Bytes() []byte {
// Slice cuts the buffer at the given position.
func (b *Buffer) Slice(from, to int) *Buffer {
b.offset += from
b.Value = b.Value[from:to]
return b
}
// SliceFrom cuts the buffer at the given position.
func (b *Buffer) SliceFrom(from int) *Buffer {
b.offset += from
b.Value = b.Value[from:]
return b
}
@ -121,9 +123,10 @@ func (b *Buffer) Read(data []byte) (int, error) {
}
nBytes := copy(data, b.Value)
if nBytes == b.Len() {
b.Value = b.Value[:0]
b.Clear()
} else {
b.Value = b.Value[nBytes:]
b.offset += nBytes
}
return nBytes, nil
}
@ -132,7 +135,9 @@ func (b *Buffer) FillFrom(reader io.Reader) (int, error) {
begin := b.Len()
b.Value = b.Value[:cap(b.Value)]
nBytes, err := reader.Read(b.Value[begin:])
b.Value = b.Value[:begin+nBytes]
if err == nil {
b.Value = b.Value[:begin+nBytes]
}
return nBytes, err
}

View File

@ -35,16 +35,15 @@ func (this *BufferedWriter) Write(b []byte) (int, error) {
}
func (this *BufferedWriter) Flush() error {
nBytes, err := this.writer.Write(this.buffer.Value)
this.buffer.SliceFrom(nBytes)
if !this.buffer.IsEmpty() {
nBytes, err = this.writer.Write(this.buffer.Value)
defer this.buffer.Clear()
for !this.buffer.IsEmpty() {
nBytes, err := this.writer.Write(this.buffer.Value)
if err != nil {
return err
}
this.buffer.SliceFrom(nBytes)
}
if this.buffer.IsEmpty() {
this.buffer.Clear()
}
return err
return nil
}
func (this *BufferedWriter) Cached() bool {
@ -59,6 +58,7 @@ func (this *BufferedWriter) SetCached(cached bool) {
}
func (this *BufferedWriter) Release() {
this.Flush()
this.buffer.Release()
this.buffer = nil
this.writer = nil

View File

@ -1,9 +1,14 @@
package io
import (
"github.com/v2ray/v2ray-core/common/log"
)
func Pipe(reader Reader, writer Writer) error {
for {
buffer, err := reader.Read()
if err != nil {
log.Debug("IO: Pipe exits as ", err)
return err
}
@ -14,6 +19,7 @@ func Pipe(reader Reader, writer Writer) error {
err = writer.Write(buffer)
if err != nil {
log.Debug("IO: Pipe exits as ", err)
return err
}
}

View File

@ -143,9 +143,8 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
defer input.Close()
defer output.Release()
var readFinish, writeFinish sync.Mutex
var readFinish sync.Mutex
readFinish.Lock()
writeFinish.Lock()
userSettings := protocol.GetUserSettings(request.User.Level)
connReader.SetTimeOut(userSettings.PayloadReadTimeout)
@ -177,27 +176,21 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
// Optimize for small response packet
if data, err := output.Read(); err == nil {
var v2writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
if request.Option.IsChunkStream() {
vmessio.Authenticate(data)
v2writer = vmessio.NewAuthChunkWriter(v2writer)
}
bodyWriter.Write(data.Value)
data.Release()
v2writer.Write(data)
writer.SetCached(false)
go func(finish *sync.Mutex) {
var writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
if request.Option.IsChunkStream() {
writer = vmessio.NewAuthChunkWriter(writer)
}
v2io.Pipe(output, writer)
output.Release()
if request.Option.IsChunkStream() {
writer.Write(alloc.NewSmallBuffer().Clear())
}
writer.Release()
finish.Unlock()
}(&writeFinish)
writeFinish.Lock()
v2io.Pipe(output, v2writer)
output.Release()
if request.Option.IsChunkStream() {
v2writer.Write(alloc.NewSmallBuffer().Clear())
}
v2writer.Release()
}
readFinish.Lock()

View File

@ -6,6 +6,7 @@ import (
"io"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log"
"github.com/v2ray/v2ray-core/common/serial"
"github.com/v2ray/v2ray-core/transport"
)
@ -36,6 +37,7 @@ func (this *Validator) Consume(b []byte) {
}
func (this *Validator) Validate() bool {
log.Debug("VMess Reader: Expected auth ", this.expectedAuth, " actual auth: ", this.actualAuth.Sum32())
return this.actualAuth.Sum32() == this.expectedAuth
}
@ -70,6 +72,7 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) {
return nil, err
}
}
log.Debug("VMess Reader: raw buffer: ", buffer.Value)
length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
this.chunkLength = int(length) - 4
this.validator = NewValidator(serial.BytesLiteral(buffer.Value[2:6]).Uint32Value())
@ -87,17 +90,9 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) {
return nil, io.EOF
}
if buffer.Len() <= this.chunkLength {
if buffer.Len() < this.chunkLength {
this.validator.Consume(buffer.Value)
this.chunkLength -= buffer.Len()
if this.chunkLength == 0 {
if !this.validator.Validate() {
buffer.Release()
return nil, transport.ErrorCorruptedPacket
}
this.chunkLength = -1
this.validator = nil
}
} else {
this.validator.Consume(buffer.Value[:this.chunkLength])
if !this.validator.Validate() {
@ -105,9 +100,11 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) {
return nil, transport.ErrorCorruptedPacket
}
leftLength := buffer.Len() - this.chunkLength
this.last = AllocBuffer(leftLength).Clear()
this.last.Append(buffer.Value[this.chunkLength:])
buffer.Slice(0, this.chunkLength)
if leftLength > 0 {
this.last = AllocBuffer(leftLength).Clear()
this.last.Append(buffer.Value[this.chunkLength:])
buffer.Slice(0, this.chunkLength)
}
this.chunkLength = -1
this.validator = nil

View File

@ -69,25 +69,19 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
defer finish.Unlock()
defer payload.Release()
writer := v2io.NewBufferedWriter(conn)
defer writer.Release()
session.EncodeRequestHeader(request, writer)
if request.Option.IsChunkStream() {
vmessio.Authenticate(payload)
}
bodyWriter := session.EncodeRequestBody(writer)
bodyWriter.Write(payload.Value)
writer.SetCached(false)
var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
if request.Option.IsChunkStream() {
streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
}
streamWriter.Write(payload)
writer.SetCached(false)
v2io.Pipe(input, streamWriter)
if request.Option.IsChunkStream() {
streamWriter.Write(alloc.NewSmallBuffer().Clear())
@ -110,7 +104,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
go this.handleCommand(dest, header.Command)
reader.SetCached(false)
decryptReader := session.DecodeResponseBody(conn)
decryptReader := session.DecodeResponseBody(reader)
var bodyReader v2io.Reader
if request.Option.IsChunkStream() {

View File

@ -196,7 +196,7 @@ func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link r
payload, err := link.OutboundInput().Read()
if err != nil {
log.Info("Point: No payload to dispatch, stopping dispatching now.")
link.OutboundOutput().Close()
link.OutboundOutput().Release()
link.OutboundInput().Release()
return
}

View File

@ -6,6 +6,7 @@ import (
"github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport/ray"
)
@ -32,7 +33,7 @@ func NewTimedInboundRay(name string, inboundRay ray.InboundRay) *TimedInboundRay
func (this *TimedInboundRay) Monitor() {
for {
time.Sleep(16 * time.Second)
time.Sleep(time.Second * 16)
select {
case <-this.accessed:
default:
@ -58,7 +59,7 @@ func (this *TimedInboundRay) InboundInput() ray.OutputStream {
func (this *TimedInboundRay) InboundOutput() ray.InputStream {
this.RLock()
this.RUnlock()
defer this.RUnlock()
if this.inboundRay == nil {
return nil
}
@ -70,6 +71,7 @@ func (this *TimedInboundRay) InboundOutput() ray.InputStream {
}
func (this *TimedInboundRay) Release() {
log.Debug("UDP Server: Releasing TimedInboundRay: ", this.name)
this.Lock()
defer this.Unlock()
if this.server == nil {
@ -102,12 +104,17 @@ func (this *UDPServer) RemoveRay(name string) {
}
func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buffer) bool {
log.Debug("UDP Server: Locating existing connection for ", name)
this.RLock()
defer this.RUnlock()
if entry, found := this.conns[name]; found {
err := entry.InboundInput().Write(payload)
outputStream := entry.InboundInput()
if outputStream == nil {
return false
}
err := outputStream.Write(payload)
if err != nil {
this.RemoveRay(name)
go this.RemoveRay(name)
return false
}
return true
@ -116,16 +123,21 @@ func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buf
}
func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
destString := source.Address().String() + "-" + destination.Address().String()
destString := source.Address().String() + "-" + destination.NetAddr()
log.Debug("UDP Server: Dispatch request: ", destString)
if this.locateExistingAndDispatch(destString, payload) {
return
}
this.Lock()
log.Info("UDP Server: establishing new connection for ", destString)
inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
inboundRay.InboundInput().Write(payload)
timedInboundRay := NewTimedInboundRay(destString, inboundRay)
outputStream := timedInboundRay.InboundInput()
if outputStream != nil {
outputStream.Write(payload)
}
this.Lock()
this.conns[destString] = timedInboundRay
this.Unlock()
go this.handleConnection(timedInboundRay, source, callback)
@ -133,6 +145,10 @@ func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Dest
func (this *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
for {
inputStream := inboundRay.InboundOutput()
if inputStream == nil {
break
}
data, err := inboundRay.InboundOutput().Read()
if err != nil {
break