diff --git a/common/buf/buffer.go b/common/buf/buffer.go index ea56cd3d..857b815d 100644 --- a/common/buf/buffer.go +++ b/common/buf/buffer.go @@ -41,14 +41,15 @@ func (b *Buffer) Clear() { } // AppendBytes appends one or more bytes to the end of the buffer. -func (b *Buffer) AppendBytes(bytes ...byte) { - b.Append(bytes) +func (b *Buffer) AppendBytes(bytes ...byte) int { + return b.Append(bytes) } // Append appends a byte array to the end of the buffer. -func (b *Buffer) Append(data []byte) { +func (b *Buffer) Append(data []byte) int { nBytes := copy(b.v[b.end:], data) b.end += nBytes + return nBytes } // AppendSupplier appends the content of a BytesWriter to the buffer. diff --git a/common/buf/io.go b/common/buf/io.go index 0e24241f..fc739040 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -2,6 +2,7 @@ package buf import ( "io" + "time" "v2ray.com/core/common/errors" "v2ray.com/core/common/signal" @@ -13,6 +14,12 @@ type Reader interface { Read() (*Buffer, error) } +var ErrReadTimeout = errors.New("Buf: IO timeout.") + +type TimeoutReader interface { + ReadTimeout(time.Duration) (*Buffer, error) +} + // Writer extends io.Writer with alloc.Buffer. type Writer interface { // Write writes an alloc.Buffer into underlying writer. diff --git a/common/buf/merge_reader.go b/common/buf/merge_reader.go new file mode 100644 index 00000000..33e97c39 --- /dev/null +++ b/common/buf/merge_reader.go @@ -0,0 +1,42 @@ +package buf + +type MergingReader struct { + reader Reader + leftover *Buffer +} + +func NewMergingReader(reader Reader) Reader { + return &MergingReader{ + reader: reader, + } +} + +func (r *MergingReader) Read() (*Buffer, error) { + if r.leftover != nil { + return r.leftover, nil + } + + b, err := r.reader.Read() + if err != nil { + return nil, err + } + + if b.IsFull() { + return b, nil + } + + b2, err := r.reader.Read() + if err != nil { + return b, nil + } + + nBytes := b.Append(b2.Bytes()) + b2.SliceFrom(nBytes) + if b2.IsEmpty() { + b2.Release() + } else { + r.leftover = b2 + } + + return b, nil +} diff --git a/transport/ray/direct.go b/transport/ray/direct.go index df773d5f..5daa0379 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -2,7 +2,6 @@ package ray import ( "context" - "errors" "io" "time" @@ -13,8 +12,6 @@ const ( bufferSize = 512 ) -var ErrReadTimeout = errors.New("Ray: timeout.") - // NewRay creates a new Ray for direct traffic transport. func NewRay(ctx context.Context) Ray { return &directRay{ @@ -101,7 +98,7 @@ func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) { case <-v.err: return nil, io.ErrClosedPipe case <-time.After(timeout): - return nil, ErrReadTimeout + return nil, buf.ErrReadTimeout } } }