From abe790181e6e4baacca9b11db8ed598edf12f5f4 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Sat, 15 Apr 2017 22:22:29 +0200 Subject: [PATCH] multi reader --- app/proxyman/outbound/handler.go | 16 +++++++++++----- common/buf/io.go | 6 ++++++ common/buf/multi_buffer.go | 4 ++++ common/buf/reader.go | 23 +++++++++++++++++++++++ common/crypto/auth.go | 17 +++++++++++++++++ 5 files changed, 61 insertions(+), 5 deletions(-) diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index 0e037a1b..804788fe 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -128,8 +128,9 @@ type Connection struct { localAddr net.Addr remoteAddr net.Addr - reader io.Reader - writer buf.Writer + bytesReader io.Reader + reader buf.Reader + writer buf.Writer } func NewConnection(stream ray.Ray) *Connection { @@ -143,8 +144,9 @@ func NewConnection(stream ray.Ray) *Connection { IP: []byte{0, 0, 0, 0}, Port: 0, }, - reader: buf.ToBytesReader(stream.InboundOutput()), - writer: stream.InboundInput(), + bytesReader: buf.ToBytesReader(stream.InboundOutput()), + reader: stream.InboundOutput(), + writer: stream.InboundInput(), } } @@ -153,7 +155,11 @@ func (v *Connection) Read(b []byte) (int, error) { if v.closed { return 0, io.EOF } - return v.reader.Read(b) + return v.bytesReader.Read(b) +} + +func (v *Connection) ReadMultiBuffer() (buf.MultiBuffer, error) { + return v.reader.Read() } // Write implements net.Conn.Write(). diff --git a/common/buf/io.go b/common/buf/io.go index 18d7029c..a3615849 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -76,6 +76,12 @@ func PipeUntilEOF(timer signal.ActivityTimer, reader Reader, writer Writer) erro // NewReader creates a new Reader. // The Reader instance doesn't take the ownership of reader. func NewReader(reader io.Reader) Reader { + if mr, ok := reader.(MultiBufferReader); ok { + return &readerAdpater{ + MultiBufferReader: mr, + } + } + return &BytesToBufferReader{ reader: reader, buffer: NewLocal(32 * 1024), diff --git a/common/buf/multi_buffer.go b/common/buf/multi_buffer.go index 93eecfa3..d1fe389b 100644 --- a/common/buf/multi_buffer.go +++ b/common/buf/multi_buffer.go @@ -6,6 +6,10 @@ type MultiBufferWriter interface { WriteMultiBuffer(MultiBuffer) (int, error) } +type MultiBufferReader interface { + ReadMultiBuffer() (MultiBuffer, error) +} + type MultiBuffer []*Buffer func NewMultiBuffer() MultiBuffer { diff --git a/common/buf/reader.go b/common/buf/reader.go index 4f050b69..097e459c 100644 --- a/common/buf/reader.go +++ b/common/buf/reader.go @@ -23,6 +23,14 @@ func (v *BytesToBufferReader) Read() (MultiBuffer, error) { return mb, nil } +type readerAdpater struct { + MultiBufferReader +} + +func (r *readerAdpater) Read() (MultiBuffer, error) { + return r.ReadMultiBuffer() +} + type bufferToBytesReader struct { stream Reader current MultiBuffer @@ -57,3 +65,18 @@ func (v *bufferToBytesReader) Read(b []byte) (int, error) { } return nBytes, err } + +func (v *bufferToBytesReader) ReadMultiBuffer() (MultiBuffer, error) { + if v.err != nil { + return nil, v.err + } + if v.current == nil { + v.fill() + if v.err != nil { + return nil, v.err + } + } + b := v.current + v.current = nil + return b, nil +} diff --git a/common/crypto/auth.go b/common/crypto/auth.go index 4e21592e..52c84ebc 100644 --- a/common/crypto/auth.go +++ b/common/crypto/auth.go @@ -199,6 +199,23 @@ func (v *AuthenticationReader) Read(b []byte) (int, error) { return v.copyChunk(b), nil } +func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) { + err := r.ensureChunk() + if err != nil { + return nil, err + } + + mb := buf.NewMultiBuffer() + for len(r.chunk) > 0 { + b := buf.New() + nBytes, _ := b.Write(r.chunk) + mb.Append(b) + r.chunk = r.chunk[nBytes:] + } + r.chunk = nil + return mb, nil +} + type AuthenticationWriter struct { auth Authenticator buffer []byte