From 1948d0738f63b65f1afe29d64ed8eb5f74a22bf6 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Fri, 9 Dec 2016 13:17:34 +0100 Subject: [PATCH] refactor io package --- app/proxy/proxy.go | 10 +- common/buf/io.go | 77 +++++++++++++- common/buf/reader.go | 102 +++++++++++++++++++ common/{io => buf}/reader_test.go | 11 +- common/buf/writer.go | 68 +++++++++++++ common/{io => buf}/writer_test.go | 14 +-- common/{io => bufio}/buffered_reader.go | 4 +- common/{io => bufio}/buffered_reader_test.go | 6 +- common/{io => bufio}/buffered_writer.go | 4 +- common/{io => bufio}/buffered_writer_test.go | 8 +- common/bufio/bufio.go | 15 +++ common/common.go | 2 +- common/io/chain_writer.go | 50 --------- common/io/chan_reader.go | 62 ----------- common/io/reader.go | 69 ------------- common/io/transport.go | 40 -------- common/io/writer.go | 48 --------- common/serial/hash.go | 8 +- common/serial/numbers.go | 10 +- common/serial/string.go | 4 +- proxy/blackhole/config.go | 11 +- proxy/blackhole/config_test.go | 3 +- proxy/dokodemo/dokodemo.go | 9 +- proxy/freedom/freedom.go | 10 +- proxy/http/server.go | 22 ++-- proxy/shadowsocks/client.go | 13 +-- proxy/shadowsocks/protocol.go | 21 ++-- proxy/shadowsocks/server.go | 11 +- proxy/socks/server.go | 21 ++-- proxy/testing/mocks/inboundhandler.go | 10 +- proxy/testing/mocks/outboundhandler.go | 9 +- proxy/vmess/encoding/client.go | 10 +- proxy/vmess/encoding/server.go | 10 +- proxy/vmess/inbound/inbound.go | 10 +- proxy/vmess/outbound/outbound.go | 14 +-- transport/ray/ray.go | 8 +- 36 files changed, 390 insertions(+), 414 deletions(-) create mode 100644 common/buf/reader.go rename common/{io => buf}/reader_test.go (67%) create mode 100644 common/buf/writer.go rename common/{io => buf}/writer_test.go (58%) rename common/{io => bufio}/buffered_reader.go (92%) rename common/{io => bufio}/buffered_reader_test.go (88%) rename common/{io => bufio}/buffered_writer.go (96%) rename common/{io => bufio}/buffered_writer_test.go (88%) create mode 100644 common/bufio/bufio.go delete mode 100644 common/io/chain_writer.go delete mode 100644 common/io/chan_reader.go delete mode 100644 common/io/reader.go delete mode 100644 common/io/transport.go delete mode 100644 common/io/writer.go diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go index e1e32362..d2d0f0b5 100644 --- a/app/proxy/proxy.go +++ b/app/proxy/proxy.go @@ -7,8 +7,8 @@ import ( "v2ray.com/core/app" "v2ray.com/core/app/proxyman" + "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/transport/internet" @@ -63,8 +63,8 @@ type ProxyConnection struct { localAddr net.Addr remoteAddr net.Addr - reader *v2io.ChanReader - writer *v2io.ChainWriter + reader *buf.BufferToBytesReader + writer *buf.BytesToBufferWriter } func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ray) *ProxyConnection { @@ -78,8 +78,8 @@ func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ra IP: []byte{0, 0, 0, 0}, Port: 0, }, - reader: v2io.NewChanReader(stream.InboundOutput()), - writer: v2io.NewChainWriter(stream.InboundInput()), + reader: buf.NewBytesReader(stream.InboundOutput()), + writer: buf.NewBytesWriter(stream.InboundInput()), } } diff --git a/common/buf/io.go b/common/buf/io.go index 29e760a7..8a417eb2 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -1,6 +1,24 @@ package buf -import "io" +import ( + "io" + + "v2ray.com/core/common/errors" +) + +// Reader extends io.Reader with alloc.Buffer. +type Reader interface { + Release() + // Read reads content from underlying reader, and put it into an alloc.Buffer. + Read() (*Buffer, error) +} + +// Writer extends io.Writer with alloc.Buffer. +type Writer interface { + Release() + // Write writes an alloc.Buffer into underlying writer. + Write(*Buffer) error +} func ReadFrom(reader io.Reader) Supplier { return func(b []byte) (int, error) { @@ -13,3 +31,60 @@ func ReadFullFrom(reader io.Reader, size int) Supplier { return io.ReadFull(reader, b[:size]) } } + +// Pipe dumps all content from reader to writer, until an error happens. +func Pipe(reader Reader, writer Writer) error { + for { + buffer, err := reader.Read() + if err != nil { + return err + } + + if buffer.IsEmpty() { + buffer.Release() + continue + } + + err = writer.Write(buffer) + if err != nil { + buffer.Release() + return err + } + } +} + +// PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF. +func PipeUntilEOF(reader Reader, writer Writer) error { + err := Pipe(reader, writer) + if err != nil && errors.Cause(err) != io.EOF { + return err + } + return nil +} + +// NewReader creates a new Reader. +// The Reader instance doesn't take the ownership of reader. +func NewReader(reader io.Reader) Reader { + return &BytesToBufferReader{ + reader: reader, + } +} + +func NewBytesReader(stream Reader) *BufferToBytesReader { + return &BufferToBytesReader{ + stream: stream, + } +} + +// NewWriter creates a new Writer. +func NewWriter(writer io.Writer) Writer { + return &BufferToBytesWriter{ + writer: writer, + } +} + +func NewBytesWriter(writer Writer) *BytesToBufferWriter { + return &BytesToBufferWriter{ + writer: writer, + } +} diff --git a/common/buf/reader.go b/common/buf/reader.go new file mode 100644 index 00000000..7fd5790e --- /dev/null +++ b/common/buf/reader.go @@ -0,0 +1,102 @@ +package buf + +import ( + "io" + "sync" +) + +// BytesToBufferReader is a Reader that adjusts its reading speed automatically. +type BytesToBufferReader struct { + reader io.Reader + largeBuffer *Buffer + highVolumn bool +} + +// Read implements Reader.Read(). +func (v *BytesToBufferReader) Read() (*Buffer, error) { + if v.highVolumn && v.largeBuffer.IsEmpty() { + if v.largeBuffer == nil { + v.largeBuffer = NewLocal(32 * 1024) + } + err := v.largeBuffer.AppendSupplier(ReadFrom(v.reader)) + if err != nil { + return nil, err + } + if v.largeBuffer.Len() < Size { + v.highVolumn = false + } + } + + buffer := New() + if !v.largeBuffer.IsEmpty() { + buffer.AppendSupplier(ReadFrom(v.largeBuffer)) + return buffer, nil + } + + err := buffer.AppendSupplier(ReadFrom(v.reader)) + if err != nil { + buffer.Release() + return nil, err + } + + if buffer.IsFull() { + v.highVolumn = true + } + + return buffer, nil +} + +// Release implements Releasable.Release(). +func (v *BytesToBufferReader) Release() { + v.reader = nil +} + +type BufferToBytesReader struct { + sync.Mutex + stream Reader + current *Buffer + eof bool +} + + + +// Private: Visible for testing. +func (v *BufferToBytesReader) Fill() { + b, err := v.stream.Read() + v.current = b + if err != nil { + v.eof = true + v.current = nil + } +} + +func (v *BufferToBytesReader) Read(b []byte) (int, error) { + if v.eof { + return 0, io.EOF + } + + v.Lock() + defer v.Unlock() + if v.current == nil { + v.Fill() + if v.eof { + return 0, io.EOF + } + } + nBytes, err := v.current.Read(b) + if v.current.IsEmpty() { + v.current.Release() + v.current = nil + } + return nBytes, err +} + +func (v *BufferToBytesReader) Release() { + v.Lock() + defer v.Unlock() + + v.eof = true + v.current.Release() + v.current = nil + v.stream = nil +} diff --git a/common/io/reader_test.go b/common/buf/reader_test.go similarity index 67% rename from common/io/reader_test.go rename to common/buf/reader_test.go index f991a2af..b6234a06 100644 --- a/common/io/reader_test.go +++ b/common/buf/reader_test.go @@ -1,11 +1,10 @@ -package io_test +package buf_test import ( "bytes" "testing" - "v2ray.com/core/common/buf" - . "v2ray.com/core/common/io" + . "v2ray.com/core/common/buf" "v2ray.com/core/testing/assert" ) @@ -15,12 +14,12 @@ func TestAdaptiveReader(t *testing.T) { rawContent := make([]byte, 1024*1024) buffer := bytes.NewBuffer(rawContent) - reader := NewAdaptiveReader(buffer) + reader := NewReader(buffer) b1, err := reader.Read() assert.Error(err).IsNil() assert.Bool(b1.IsFull()).IsTrue() - assert.Int(b1.Len()).Equals(buf.Size) - assert.Int(buffer.Len()).Equals(cap(rawContent) - buf.Size) + assert.Int(b1.Len()).Equals(Size) + assert.Int(buffer.Len()).Equals(cap(rawContent) - Size) b2, err := reader.Read() assert.Error(err).IsNil() diff --git a/common/buf/writer.go b/common/buf/writer.go new file mode 100644 index 00000000..acf943ba --- /dev/null +++ b/common/buf/writer.go @@ -0,0 +1,68 @@ +package buf + +import ( + "io" + "sync" +) + +// BufferToBytesWriter is a Writer that writes alloc.Buffer into underlying writer. +type BufferToBytesWriter struct { + writer io.Writer +} + +// Write implements Writer.Write(). Write() takes ownership of the given buffer. +func (v *BufferToBytesWriter) Write(buffer *Buffer) error { + defer buffer.Release() + for { + nBytes, err := v.writer.Write(buffer.Bytes()) + if err != nil { + return err + } + if nBytes == buffer.Len() { + break + } + buffer.SliceFrom(nBytes) + } + return nil +} + +// Release implements Releasable.Release(). +func (v *BufferToBytesWriter) Release() { + v.writer = nil +} + +type BytesToBufferWriter struct { + sync.Mutex + writer Writer +} + +func (v *BytesToBufferWriter) Write(payload []byte) (int, error) { + v.Lock() + defer v.Unlock() + if v.writer == nil { + return 0, io.ErrClosedPipe + } + + bytesWritten := 0 + size := len(payload) + for size > 0 { + buffer := New() + nBytes, _ := buffer.Write(payload) + size -= nBytes + payload = payload[nBytes:] + bytesWritten += nBytes + err := v.writer.Write(buffer) + if err != nil { + return bytesWritten, err + } + } + + return bytesWritten, nil +} + +func (v *BytesToBufferWriter) Release() { + v.Lock() + v.writer.Release() + v.writer = nil + v.Unlock() +} diff --git a/common/io/writer_test.go b/common/buf/writer_test.go similarity index 58% rename from common/io/writer_test.go rename to common/buf/writer_test.go index 1ac93ad1..3f82e7a1 100644 --- a/common/io/writer_test.go +++ b/common/buf/writer_test.go @@ -1,26 +1,26 @@ -package io_test +package buf_test import ( "bytes" "crypto/rand" "testing" - "v2ray.com/core/common/buf" - . "v2ray.com/core/common/io" + . "v2ray.com/core/common/buf" + "v2ray.com/core/common/bufio" "v2ray.com/core/testing/assert" ) -func TestAdaptiveWriter(t *testing.T) { +func TestWriter(t *testing.T) { assert := assert.On(t) - lb := buf.New() - lb.AppendSupplier(buf.ReadFrom(rand.Reader)) + lb := New() + lb.AppendSupplier(ReadFrom(rand.Reader)) expectedBytes := append([]byte(nil), lb.Bytes()...) writeBuffer := bytes.NewBuffer(make([]byte, 0, 1024*1024)) - writer := NewAdaptiveWriter(NewBufferedWriter(writeBuffer)) + writer := NewWriter(bufio.NewWriter(writeBuffer)) err := writer.Write(lb) assert.Error(err).IsNil() assert.Bytes(expectedBytes).Equals(writeBuffer.Bytes()) diff --git a/common/io/buffered_reader.go b/common/bufio/buffered_reader.go similarity index 92% rename from common/io/buffered_reader.go rename to common/bufio/buffered_reader.go index 5cebe90a..1c53dae9 100644 --- a/common/io/buffered_reader.go +++ b/common/bufio/buffered_reader.go @@ -1,4 +1,4 @@ -package io +package bufio import ( "io" @@ -14,7 +14,7 @@ type BufferedReader struct { cached bool } -func NewBufferedReader(rawReader io.Reader) *BufferedReader { +func NewReader(rawReader io.Reader) *BufferedReader { return &BufferedReader{ reader: rawReader, buffer: buf.New(), diff --git a/common/io/buffered_reader_test.go b/common/bufio/buffered_reader_test.go similarity index 88% rename from common/io/buffered_reader_test.go rename to common/bufio/buffered_reader_test.go index 24bf6f11..0bf0bf50 100644 --- a/common/io/buffered_reader_test.go +++ b/common/bufio/buffered_reader_test.go @@ -1,11 +1,11 @@ -package io_test +package bufio_test import ( "crypto/rand" "testing" "v2ray.com/core/common/buf" - . "v2ray.com/core/common/io" + . "v2ray.com/core/common/bufio" "v2ray.com/core/testing/assert" ) @@ -17,7 +17,7 @@ func TestBufferedReader(t *testing.T) { len := content.Len() - reader := NewBufferedReader(content) + reader := NewReader(content) assert.Bool(reader.Cached()).IsTrue() payload := make([]byte, 16) diff --git a/common/io/buffered_writer.go b/common/bufio/buffered_writer.go similarity index 96% rename from common/io/buffered_writer.go rename to common/bufio/buffered_writer.go index 4a868930..497a5964 100644 --- a/common/io/buffered_writer.go +++ b/common/bufio/buffered_writer.go @@ -1,4 +1,4 @@ -package io +package bufio import ( "io" @@ -15,7 +15,7 @@ type BufferedWriter struct { cached bool } -func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter { +func NewWriter(rawWriter io.Writer) *BufferedWriter { return &BufferedWriter{ writer: rawWriter, buffer: buf.NewSmall(), diff --git a/common/io/buffered_writer_test.go b/common/bufio/buffered_writer_test.go similarity index 88% rename from common/io/buffered_writer_test.go rename to common/bufio/buffered_writer_test.go index 56e0997a..09994f38 100644 --- a/common/io/buffered_writer_test.go +++ b/common/bufio/buffered_writer_test.go @@ -1,11 +1,11 @@ -package io_test +package bufio_test import ( "crypto/rand" "testing" "v2ray.com/core/common/buf" - . "v2ray.com/core/common/io" + . "v2ray.com/core/common/bufio" "v2ray.com/core/testing/assert" ) @@ -14,7 +14,7 @@ func TestBufferedWriter(t *testing.T) { content := buf.New() - writer := NewBufferedWriter(content) + writer := NewWriter(content) assert.Bool(writer.Cached()).IsTrue() payload := make([]byte, 16) @@ -34,7 +34,7 @@ func TestBufferedWriterLargePayload(t *testing.T) { content := buf.NewLocal(128 * 1024) - writer := NewBufferedWriter(content) + writer := NewWriter(content) assert.Bool(writer.Cached()).IsTrue() payload := make([]byte, 64*1024) diff --git a/common/bufio/bufio.go b/common/bufio/bufio.go new file mode 100644 index 00000000..70a7c4b7 --- /dev/null +++ b/common/bufio/bufio.go @@ -0,0 +1,15 @@ +// Package bufio is a replacement of the standard golang package bufio. +package bufio + +import ( + "bufio" + "io" +) + +func OriginalReader(reader io.Reader) *bufio.Reader { + return bufio.NewReader(reader) +} + +func OriginalReaderSize(reader io.Reader, size int) *bufio.Reader { + return bufio.NewReaderSize(reader, size) +} diff --git a/common/common.go b/common/common.go index dba9e802..d0b70ead 100644 --- a/common/common.go +++ b/common/common.go @@ -3,7 +3,7 @@ package common import ( - "v2ray.com/core/common/errors" + "errors" ) var ( diff --git a/common/io/chain_writer.go b/common/io/chain_writer.go deleted file mode 100644 index 446191fc..00000000 --- a/common/io/chain_writer.go +++ /dev/null @@ -1,50 +0,0 @@ -package io - -import ( - "io" - "sync" - - "v2ray.com/core/common/buf" -) - -type ChainWriter struct { - sync.Mutex - writer Writer -} - -func NewChainWriter(writer Writer) *ChainWriter { - return &ChainWriter{ - writer: writer, - } -} - -func (v *ChainWriter) Write(payload []byte) (int, error) { - v.Lock() - defer v.Unlock() - if v.writer == nil { - return 0, io.ErrClosedPipe - } - - bytesWritten := 0 - size := len(payload) - for size > 0 { - buffer := buf.New() - nBytes, _ := buffer.Write(payload) - size -= nBytes - payload = payload[nBytes:] - bytesWritten += nBytes - err := v.writer.Write(buffer) - if err != nil { - return bytesWritten, err - } - } - - return bytesWritten, nil -} - -func (v *ChainWriter) Release() { - v.Lock() - v.writer.Release() - v.writer = nil - v.Unlock() -} diff --git a/common/io/chan_reader.go b/common/io/chan_reader.go deleted file mode 100644 index 23d0215a..00000000 --- a/common/io/chan_reader.go +++ /dev/null @@ -1,62 +0,0 @@ -package io - -import ( - "io" - "sync" - - "v2ray.com/core/common/buf" -) - -type ChanReader struct { - sync.Mutex - stream Reader - current *buf.Buffer - eof bool -} - -func NewChanReader(stream Reader) *ChanReader { - return &ChanReader{ - stream: stream, - } -} - -// Private: Visible for testing. -func (v *ChanReader) Fill() { - b, err := v.stream.Read() - v.current = b - if err != nil { - v.eof = true - v.current = nil - } -} - -func (v *ChanReader) Read(b []byte) (int, error) { - if v.eof { - return 0, io.EOF - } - - v.Lock() - defer v.Unlock() - if v.current == nil { - v.Fill() - if v.eof { - return 0, io.EOF - } - } - nBytes, err := v.current.Read(b) - if v.current.IsEmpty() { - v.current.Release() - v.current = nil - } - return nBytes, err -} - -func (v *ChanReader) Release() { - v.Lock() - defer v.Unlock() - - v.eof = true - v.current.Release() - v.current = nil - v.stream = nil -} diff --git a/common/io/reader.go b/common/io/reader.go deleted file mode 100644 index 3e6c2199..00000000 --- a/common/io/reader.go +++ /dev/null @@ -1,69 +0,0 @@ -package io - -import ( - "io" - - "v2ray.com/core/common" - "v2ray.com/core/common/buf" -) - -// Reader extends io.Reader with alloc.Buffer. -type Reader interface { - common.Releasable - // Read reads content from underlying reader, and put it into an alloc.Buffer. - Read() (*buf.Buffer, error) -} - -// AdaptiveReader is a Reader that adjusts its reading speed automatically. -type AdaptiveReader struct { - reader io.Reader - largeBuffer *buf.Buffer - highVolumn bool -} - -// NewAdaptiveReader creates a new AdaptiveReader. -// The AdaptiveReader instance doesn't take the ownership of reader. -func NewAdaptiveReader(reader io.Reader) *AdaptiveReader { - return &AdaptiveReader{ - reader: reader, - } -} - -// Read implements Reader.Read(). -func (v *AdaptiveReader) Read() (*buf.Buffer, error) { - if v.highVolumn && v.largeBuffer.IsEmpty() { - if v.largeBuffer == nil { - v.largeBuffer = buf.NewLocal(32 * 1024) - } - err := v.largeBuffer.AppendSupplier(buf.ReadFrom(v.reader)) - if err != nil { - return nil, err - } - if v.largeBuffer.Len() < buf.Size { - v.highVolumn = false - } - } - - buffer := buf.New() - if !v.largeBuffer.IsEmpty() { - buffer.AppendSupplier(buf.ReadFrom(v.largeBuffer)) - return buffer, nil - } - - err := buffer.AppendSupplier(buf.ReadFrom(v.reader)) - if err != nil { - buffer.Release() - return nil, err - } - - if buffer.IsFull() { - v.highVolumn = true - } - - return buffer, nil -} - -// Release implements Releasable.Release(). -func (v *AdaptiveReader) Release() { - v.reader = nil -} diff --git a/common/io/transport.go b/common/io/transport.go deleted file mode 100644 index a56561a7..00000000 --- a/common/io/transport.go +++ /dev/null @@ -1,40 +0,0 @@ -package io - -import ( - "io" - - "v2ray.com/core/common/errors" - "v2ray.com/core/common/log" -) - -// Pipe dumps all content from reader to writer, until an error happens. -func Pipe(reader Reader, writer Writer) error { - for { - buffer, err := reader.Read() - if err != nil { - log.Debug("IO: Pipe exits as ", err) - return err - } - - if buffer.IsEmpty() { - buffer.Release() - continue - } - - err = writer.Write(buffer) - if err != nil { - log.Debug("IO: Pipe exits as ", err) - buffer.Release() - return err - } - } -} - -// PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF. -func PipeUntilEOF(reader Reader, writer Writer) error { - err := Pipe(reader, writer) - if err != nil && errors.Cause(err) != io.EOF { - return err - } - return nil -} diff --git a/common/io/writer.go b/common/io/writer.go deleted file mode 100644 index 8c7ba3f6..00000000 --- a/common/io/writer.go +++ /dev/null @@ -1,48 +0,0 @@ -package io - -import ( - "io" - - "v2ray.com/core/common" - "v2ray.com/core/common/buf" -) - -// Writer extends io.Writer with alloc.Buffer. -type Writer interface { - common.Releasable - // Write writes an alloc.Buffer into underlying writer. - Write(*buf.Buffer) error -} - -// AdaptiveWriter is a Writer that writes alloc.Buffer into underlying writer. -type AdaptiveWriter struct { - writer io.Writer -} - -// NewAdaptiveWriter creates a new AdaptiveWriter. -func NewAdaptiveWriter(writer io.Writer) *AdaptiveWriter { - return &AdaptiveWriter{ - writer: writer, - } -} - -// Write implements Writer.Write(). Write() takes ownership of the given buffer. -func (v *AdaptiveWriter) Write(buffer *buf.Buffer) error { - defer buffer.Release() - for { - nBytes, err := v.writer.Write(buffer.Bytes()) - if err != nil { - return err - } - if nBytes == buffer.Len() { - break - } - buffer.SliceFrom(nBytes) - } - return nil -} - -// Release implements Releasable.Release(). -func (v *AdaptiveWriter) Release() { - v.writer = nil -} diff --git a/common/serial/hash.go b/common/serial/hash.go index ffb57637..63aa4c02 100644 --- a/common/serial/hash.go +++ b/common/serial/hash.go @@ -1,12 +1,8 @@ package serial -import ( - "hash" +import "hash" - "v2ray.com/core/common/buf" -) - -func WriteHash(h hash.Hash) buf.Supplier { +func WriteHash(h hash.Hash) func([]byte) (int, error) { return func(b []byte) (int, error) { h.Sum(b[:0]) return h.Size(), nil diff --git a/common/serial/numbers.go b/common/serial/numbers.go index 692942e0..05a98bbe 100644 --- a/common/serial/numbers.go +++ b/common/serial/numbers.go @@ -1,10 +1,6 @@ package serial -import ( - "strconv" - - "v2ray.com/core/common/buf" -) +import "strconv" func Uint16ToBytes(value uint16, b []byte) []byte { return append(b, byte(value>>8), byte(value)) @@ -14,7 +10,7 @@ func Uint16ToString(value uint16) string { return strconv.Itoa(int(value)) } -func WriteUint16(value uint16) buf.Supplier { +func WriteUint16(value uint16) func([]byte) (int, error) { return func(b []byte) (int, error) { b = Uint16ToBytes(value, b[:0]) return 2, nil @@ -29,7 +25,7 @@ func Uint32ToString(value uint32) string { return strconv.FormatUint(uint64(value), 10) } -func WriteUint32(value uint32) buf.Supplier { +func WriteUint32(value uint32) func([]byte) (int, error) { return func(b []byte) (int, error) { b = Uint32ToBytes(value, b[:0]) return 4, nil diff --git a/common/serial/string.go b/common/serial/string.go index 823d9665..6a7ea918 100644 --- a/common/serial/string.go +++ b/common/serial/string.go @@ -3,8 +3,6 @@ package serial import ( "fmt" "strings" - - "v2ray.com/core/common/buf" ) func ToString(v interface{}) string { @@ -36,7 +34,7 @@ func Concat(v ...interface{}) string { return strings.Join(values, "") } -func WriteString(s string) buf.Supplier { +func WriteString(s string) func([]byte) (int, error) { return func(b []byte) (int, error) { return copy(b, []byte(s)), nil } diff --git a/proxy/blackhole/config.go b/proxy/blackhole/config.go index c73be808..78d669e4 100644 --- a/proxy/blackhole/config.go +++ b/proxy/blackhole/config.go @@ -1,11 +1,10 @@ package blackhole import ( - "v2ray.com/core/common/buf" - v2io "v2ray.com/core/common/io" - "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" + + "v2ray.com/core/common/buf" "v2ray.com/core/common/serial" ) @@ -21,17 +20,17 @@ Content-Length: 0 type ResponseConfig interface { AsAny() *any.Any - WriteTo(v2io.Writer) + WriteTo(buf.Writer) } -func (v *NoneResponse) WriteTo(v2io.Writer) {} +func (v *NoneResponse) WriteTo(buf.Writer) {} func (v *NoneResponse) AsAny() *any.Any { r, _ := ptypes.MarshalAny(v) return r } -func (v *HTTPResponse) WriteTo(writer v2io.Writer) { +func (v *HTTPResponse) WriteTo(writer buf.Writer) { b := buf.NewLocal(512) b.AppendSupplier(serial.WriteString(http403response)) writer.Write(b) diff --git a/proxy/blackhole/config_test.go b/proxy/blackhole/config_test.go index e0bc06cf..7678d7cf 100644 --- a/proxy/blackhole/config_test.go +++ b/proxy/blackhole/config_test.go @@ -6,7 +6,6 @@ import ( "testing" "v2ray.com/core/common/buf" - v2io "v2ray.com/core/common/io" . "v2ray.com/core/proxy/blackhole" "v2ray.com/core/testing/assert" ) @@ -17,7 +16,7 @@ func TestHTTPResponse(t *testing.T) { buffer := buf.New() httpResponse := new(HTTPResponse) - httpResponse.WriteTo(v2io.NewAdaptiveWriter(buffer)) + httpResponse.WriteTo(buf.NewWriter(buffer)) reader := bufio.NewReader(buffer) response, err := http.ReadResponse(reader, nil) diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 8fbd5467..06370c39 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -7,7 +7,6 @@ import ( "v2ray.com/core/app/dispatcher" "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/loader" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" @@ -176,10 +175,10 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { wg.Add(1) go func() { - v2reader := v2io.NewAdaptiveReader(reader) + v2reader := buf.NewReader(reader) defer v2reader.Release() - if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { + if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { log.Info("Dokodemo: Failed to transport all TCP request: ", err) } wg.Done() @@ -188,10 +187,10 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { wg.Add(1) go func() { - v2writer := v2io.NewAdaptiveWriter(conn) + v2writer := buf.NewWriter(conn) defer v2writer.Release() - if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { + if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { log.Info("Dokodemo: Failed to transport all TCP response: ", err) } wg.Done() diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index e14f760c..a67503df 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -2,12 +2,12 @@ package freedom import ( "io" + "v2ray.com/core/app" "v2ray.com/core/app/dns" "v2ray.com/core/common/buf" "v2ray.com/core/common/dice" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/loader" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" @@ -100,10 +100,10 @@ func (v *FreedomConnection) Dispatch(destination v2net.Destination, payload *buf } go func() { - v2writer := v2io.NewAdaptiveWriter(conn) + v2writer := buf.NewWriter(conn) defer v2writer.Release() - if err := v2io.PipeUntilEOF(input, v2writer); err != nil { + if err := buf.PipeUntilEOF(input, v2writer); err != nil { log.Info("Freedom: Failed to transport all TCP request: ", err) } if tcpConn, ok := conn.(*tcp.RawConnection); ok { @@ -121,8 +121,8 @@ func (v *FreedomConnection) Dispatch(destination v2net.Destination, payload *buf reader = v2net.NewTimeOutReader(timeout /* seconds */, conn) } - v2reader := v2io.NewAdaptiveReader(reader) - if err := v2io.PipeUntilEOF(v2reader, output); err != nil { + v2reader := buf.NewReader(reader) + if err := buf.PipeUntilEOF(v2reader, output); err != nil { log.Info("Freedom: Failed to transport all TCP response: ", err) } v2reader.Release() diff --git a/proxy/http/server.go b/proxy/http/server.go index 4558f586..ad6184db 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -1,7 +1,6 @@ package http import ( - "bufio" "io" "net" "net/http" @@ -12,8 +11,9 @@ import ( "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" "v2ray.com/core/common" + "v2ray.com/core/common/buf" + "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/loader" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" @@ -98,7 +98,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error func (v *Server) handleConnection(conn internet.Connection) { defer conn.Close() timedReader := v2net.NewTimeOutReader(v.config.Timeout, conn) - reader := bufio.NewReaderSize(timedReader, 2048) + reader := bufio.OriginalReaderSize(timedReader, 2048) request, err := http.ReadRequest(reader) if err != nil { @@ -158,10 +158,10 @@ func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay defer wg.Wait() go func() { - v2reader := v2io.NewAdaptiveReader(input) + v2reader := buf.NewReader(input) defer v2reader.Release() - if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { + if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { log.Info("HTTP: Failed to transport all TCP request: ", err) } ray.InboundInput().Close() @@ -169,10 +169,10 @@ func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay }() go func() { - v2writer := v2io.NewAdaptiveWriter(output) + v2writer := buf.NewWriter(output) defer v2writer.Release() - if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { + if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { log.Info("HTTP: Failed to transport all TCP response: ", err) } ray.InboundOutput().Release() @@ -221,7 +221,7 @@ func (v *Server) GenerateResponse(statusCode int, status string) *http.Response } } -func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionInfo, reader *bufio.Reader, writer io.Writer) { +func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionInfo, reader io.Reader, writer io.Writer) { if len(request.URL.Host) <= 0 { response := v.GenerateResponse(400, "Bad Request") response.Write(writer) @@ -240,7 +240,7 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn finish.Add(1) go func() { defer finish.Done() - requestWriter := v2io.NewBufferedWriter(v2io.NewChainWriter(ray.InboundInput())) + requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput())) err := request.Write(requestWriter) if err != nil { log.Warning("HTTP: Failed to write request: ", err) @@ -252,13 +252,13 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn finish.Add(1) go func() { defer finish.Done() - responseReader := bufio.NewReader(v2io.NewChanReader(ray.InboundOutput())) + responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput())) response, err := http.ReadResponse(responseReader, request) if err != nil { log.Warning("HTTP: Failed to read response: ", err) response = v.GenerateResponse(503, "Service Unavailable") } - responseWriter := v2io.NewBufferedWriter(writer) + responseWriter := bufio.NewWriter(writer) err = response.Write(responseWriter) if err != nil { log.Warning("HTTP: Failed to write response: ", err) diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index d7192ba7..ca833304 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -2,9 +2,10 @@ package shadowsocks import ( "sync" + "v2ray.com/core/app" "v2ray.com/core/common/buf" - v2io "v2ray.com/core/common/io" + "v2ray.com/core/common/bufio" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" @@ -87,7 +88,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra } if request.Command == protocol.RequestCommandTCP { - bufferedWriter := v2io.NewBufferedWriter(conn) + bufferedWriter := bufio.NewWriter(conn) defer bufferedWriter.Release() bodyWriter, err := WriteTCPRequest(request, bufferedWriter) @@ -115,13 +116,13 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra return } - if err := v2io.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil { + if err := buf.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil { log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err) } }() bufferedWriter.SetCached(false) - if err := v2io.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { + if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err) } @@ -141,7 +142,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra User: user, } - if err := v2io.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil { + if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil { log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err) } }() @@ -156,7 +157,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra return } } - if err := v2io.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { + if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) } diff --git a/proxy/shadowsocks/protocol.go b/proxy/shadowsocks/protocol.go index e57cb5f7..4de2f51f 100644 --- a/proxy/shadowsocks/protocol.go +++ b/proxy/shadowsocks/protocol.go @@ -8,7 +8,6 @@ import ( "v2ray.com/core/common/buf" "v2ray.com/core/common/crypto" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/serial" @@ -23,7 +22,7 @@ const ( AddrTypeDomain = 3 ) -func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHeader, v2io.Reader, error) { +func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHeader, buf.Reader, error) { rawAccount, err := user.GetTypedAccount() if err != nil { return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to parse account.") @@ -121,17 +120,17 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea } } - var chunkReader v2io.Reader + var chunkReader buf.Reader if request.Option.Has(RequestOptionOneTimeAuth) { chunkReader = NewChunkReader(reader, NewAuthenticator(ChunkKeyGenerator(iv))) } else { - chunkReader = v2io.NewAdaptiveReader(reader) + chunkReader = buf.NewReader(reader) } return request, chunkReader, nil } -func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Writer, error) { +func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (buf.Writer, error) { user := request.User rawAccount, err := user.GetTypedAccount() if err != nil { @@ -183,17 +182,17 @@ func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Wr return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to write header.") } - var chunkWriter v2io.Writer + var chunkWriter buf.Writer if request.Option.Has(RequestOptionOneTimeAuth) { chunkWriter = NewChunkWriter(writer, NewAuthenticator(ChunkKeyGenerator(iv))) } else { - chunkWriter = v2io.NewAdaptiveWriter(writer) + chunkWriter = buf.NewWriter(writer) } return chunkWriter, nil } -func ReadTCPResponse(user *protocol.User, reader io.Reader) (v2io.Reader, error) { +func ReadTCPResponse(user *protocol.User, reader io.Reader) (buf.Reader, error) { rawAccount, err := user.GetTypedAccount() if err != nil { return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to parse account.") @@ -210,10 +209,10 @@ func ReadTCPResponse(user *protocol.User, reader io.Reader) (v2io.Reader, error) if err != nil { return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to initialize decoding stream.") } - return v2io.NewAdaptiveReader(crypto.NewCryptionReader(stream, reader)), nil + return buf.NewReader(crypto.NewCryptionReader(stream, reader)), nil } -func WriteTCPResponse(request *protocol.RequestHeader, writer io.Writer) (v2io.Writer, error) { +func WriteTCPResponse(request *protocol.RequestHeader, writer io.Writer) (buf.Writer, error) { user := request.User rawAccount, err := user.GetTypedAccount() if err != nil { @@ -233,7 +232,7 @@ func WriteTCPResponse(request *protocol.RequestHeader, writer io.Writer) (v2io.W return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to create encoding stream.") } - return v2io.NewAdaptiveWriter(crypto.NewCryptionWriter(stream, writer)), nil + return buf.NewWriter(crypto.NewCryptionWriter(stream, writer)), nil } func EncodeUDPPacket(request *protocol.RequestHeader, payload *buf.Buffer) (*buf.Buffer, error) { diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 473df9f3..52be34d3 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -3,12 +3,13 @@ package shadowsocks import ( "sync" + "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" "v2ray.com/core/common" "v2ray.com/core/common/buf" + "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" @@ -150,7 +151,7 @@ func (v *Server) handleConnection(conn internet.Connection) { timedReader := v2net.NewTimeOutReader(16, conn) defer timedReader.Release() - bufferedReader := v2io.NewBufferedReader(timedReader) + bufferedReader := bufio.NewReader(timedReader) defer bufferedReader.Release() request, bodyReader, err := ReadTCPSession(v.user, bufferedReader) @@ -183,7 +184,7 @@ func (v *Server) handleConnection(conn internet.Connection) { go func() { defer writeFinish.Unlock() - bufferedWriter := v2io.NewBufferedWriter(conn) + bufferedWriter := bufio.NewWriter(conn) defer bufferedWriter.Release() responseWriter, err := WriteTCPResponse(request, bufferedWriter) @@ -197,13 +198,13 @@ func (v *Server) handleConnection(conn internet.Connection) { responseWriter.Write(payload) bufferedWriter.SetCached(false) - if err := v2io.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { + if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) } } }() - if err := v2io.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil { + if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil { log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err) } ray.InboundInput().Close() diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 61936853..94d7f659 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -7,9 +7,10 @@ import ( "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" + "v2ray.com/core/common/buf" + "v2ray.com/core/common/bufio" "v2ray.com/core/common/crypto" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/loader" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" @@ -106,10 +107,10 @@ func (v *Server) handleConnection(connection internet.Connection) { defer connection.Close() timedReader := v2net.NewTimeOutReader(v.config.Timeout, connection) - reader := v2io.NewBufferedReader(timedReader) + reader := bufio.NewReader(timedReader) defer reader.Release() - writer := v2io.NewBufferedWriter(connection) + writer := bufio.NewWriter(connection) defer writer.Release() auth, auth4, err := protocol.ReadAuthentication(reader) @@ -128,7 +129,7 @@ func (v *Server) handleConnection(connection internet.Connection) { } } -func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks5AuthenticationRequest) error { +func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *bufio.BufferedReader, writer *bufio.BufferedWriter, auth protocol.Socks5AuthenticationRequest) error { expectedAuthMethod := protocol.AuthNotRequired if v.config.AuthType == AuthType_PASSWORD { expectedAuthMethod = protocol.AuthUserPass @@ -232,7 +233,7 @@ func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *v2io.Buffere return nil } -func (v *Server) handleUDP(reader io.Reader, writer *v2io.BufferedWriter) error { +func (v *Server) handleUDP(reader io.Reader, writer *bufio.BufferedWriter) error { response := protocol.NewSocks5Response() response.Error = protocol.ErrorSuccess @@ -264,7 +265,7 @@ func (v *Server) handleUDP(reader io.Reader, writer *v2io.BufferedWriter) error return nil } -func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks4AuthenticationRequest) error { +func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *bufio.BufferedReader, writer *bufio.BufferedWriter, auth protocol.Socks4AuthenticationRequest) error { result := protocol.Socks4RequestGranted if auth.Command == protocol.CmdBind { result = protocol.Socks4RequestRejected @@ -302,19 +303,19 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se defer output.Release() go func() { - v2reader := v2io.NewAdaptiveReader(reader) + v2reader := buf.NewReader(reader) defer v2reader.Release() - if err := v2io.PipeUntilEOF(v2reader, input); err != nil { + if err := buf.PipeUntilEOF(v2reader, input); err != nil { log.Info("Socks|Server: Failed to transport all TCP request: ", err) } input.Close() }() - v2writer := v2io.NewAdaptiveWriter(writer) + v2writer := buf.NewWriter(writer) defer v2writer.Release() - if err := v2io.PipeUntilEOF(output, v2writer); err != nil { + if err := buf.PipeUntilEOF(output, v2writer); err != nil { log.Info("Socks|Server: Failed to transport all TCP response: ", err) } output.Release() diff --git a/proxy/testing/mocks/inboundhandler.go b/proxy/testing/mocks/inboundhandler.go index 948fcbf2..b90eb330 100644 --- a/proxy/testing/mocks/inboundhandler.go +++ b/proxy/testing/mocks/inboundhandler.go @@ -5,7 +5,7 @@ import ( "sync" "v2ray.com/core/app/dispatcher" - v2io "v2ray.com/core/common/io" + "v2ray.com/core/common/buf" v2net "v2ray.com/core/common/net" "v2ray.com/core/proxy" ) @@ -49,19 +49,19 @@ func (v *InboundConnectionHandler) Communicate(destination v2net.Destination) er writeFinish.Lock() go func() { - v2reader := v2io.NewAdaptiveReader(v.ConnInput) + v2reader := buf.NewReader(v.ConnInput) defer v2reader.Release() - v2io.Pipe(v2reader, input) + buf.Pipe(v2reader, input) input.Close() readFinish.Unlock() }() go func() { - v2writer := v2io.NewAdaptiveWriter(v.ConnOutput) + v2writer := buf.NewWriter(v.ConnOutput) defer v2writer.Release() - v2io.Pipe(output, v2writer) + buf.Pipe(output, v2writer) output.Release() writeFinish.Unlock() }() diff --git a/proxy/testing/mocks/outboundhandler.go b/proxy/testing/mocks/outboundhandler.go index 1b21c5dc..fce69d82 100644 --- a/proxy/testing/mocks/outboundhandler.go +++ b/proxy/testing/mocks/outboundhandler.go @@ -6,7 +6,6 @@ import ( "v2ray.com/core/app" "v2ray.com/core/common/buf" - v2io "v2ray.com/core/common/io" v2net "v2ray.com/core/common/net" "v2ray.com/core/proxy" "v2ray.com/core/transport/ray" @@ -33,20 +32,20 @@ func (v *OutboundConnectionHandler) Dispatch(destination v2net.Destination, payl writeFinish.Lock() go func() { - v2writer := v2io.NewAdaptiveWriter(v.ConnOutput) + v2writer := buf.NewWriter(v.ConnOutput) defer v2writer.Release() - v2io.Pipe(input, v2writer) + buf.Pipe(input, v2writer) writeFinish.Unlock() input.Release() }() writeFinish.Lock() - v2reader := v2io.NewAdaptiveReader(v.ConnInput) + v2reader := buf.NewReader(v.ConnInput) defer v2reader.Release() - v2io.Pipe(v2reader, output) + buf.Pipe(v2reader, output) output.Close() } diff --git a/proxy/vmess/encoding/client.go b/proxy/vmess/encoding/client.go index 287988e8..24329ed0 100644 --- a/proxy/vmess/encoding/client.go +++ b/proxy/vmess/encoding/client.go @@ -11,9 +11,9 @@ import ( "golang.org/x/crypto/chacha20poly1305" + "v2ray.com/core/common/buf" "v2ray.com/core/common/crypto" "v2ray.com/core/common/dice" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" @@ -109,7 +109,7 @@ func (v *ClientSession) EncodeRequestHeader(header *protocol.RequestHeader, writ return } -func (v *ClientSession) EncodeRequestBody(request *protocol.RequestHeader, writer io.Writer) v2io.Writer { +func (v *ClientSession) EncodeRequestBody(request *protocol.RequestHeader, writer io.Writer) buf.Writer { var authWriter io.Writer if request.Security.Is(protocol.SecurityType_NONE) { if request.Option.Has(protocol.RequestOptionChunkStream) { @@ -162,7 +162,7 @@ func (v *ClientSession) EncodeRequestBody(request *protocol.RequestHeader, write authWriter = crypto.NewAuthenticationWriter(auth, writer) } - return v2io.NewAdaptiveWriter(authWriter) + return buf.NewWriter(authWriter) } @@ -204,7 +204,7 @@ func (v *ClientSession) DecodeResponseHeader(reader io.Reader) (*protocol.Respon return header, nil } -func (v *ClientSession) DecodeResponseBody(request *protocol.RequestHeader, reader io.Reader) v2io.Reader { +func (v *ClientSession) DecodeResponseBody(request *protocol.RequestHeader, reader io.Reader) buf.Reader { aggressive := (request.Command == protocol.RequestCommandTCP) var authReader io.Reader if request.Security.Is(protocol.SecurityType_NONE) { @@ -256,7 +256,7 @@ func (v *ClientSession) DecodeResponseBody(request *protocol.RequestHeader, read authReader = crypto.NewAuthenticationReader(auth, reader, aggressive) } - return v2io.NewAdaptiveReader(authReader) + return buf.NewReader(authReader) } type ChunkNonceGenerator struct { diff --git a/proxy/vmess/encoding/server.go b/proxy/vmess/encoding/server.go index 082a89a2..04fc40eb 100644 --- a/proxy/vmess/encoding/server.go +++ b/proxy/vmess/encoding/server.go @@ -9,9 +9,9 @@ import ( "golang.org/x/crypto/chacha20poly1305" + "v2ray.com/core/common/buf" "v2ray.com/core/common/crypto" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" @@ -155,7 +155,7 @@ func (v *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.Request return request, nil } -func (v *ServerSession) DecodeRequestBody(request *protocol.RequestHeader, reader io.Reader) v2io.Reader { +func (v *ServerSession) DecodeRequestBody(request *protocol.RequestHeader, reader io.Reader) buf.Reader { aggressive := (request.Command == protocol.RequestCommandTCP) var authReader io.Reader if request.Security.Is(protocol.SecurityType_NONE) { @@ -209,7 +209,7 @@ func (v *ServerSession) DecodeRequestBody(request *protocol.RequestHeader, reade authReader = crypto.NewAuthenticationReader(auth, reader, aggressive) } - return v2io.NewAdaptiveReader(authReader) + return buf.NewReader(authReader) } func (v *ServerSession) EncodeResponseHeader(header *protocol.ResponseHeader, writer io.Writer) { @@ -229,7 +229,7 @@ func (v *ServerSession) EncodeResponseHeader(header *protocol.ResponseHeader, wr } } -func (v *ServerSession) EncodeResponseBody(request *protocol.RequestHeader, writer io.Writer) v2io.Writer { +func (v *ServerSession) EncodeResponseBody(request *protocol.RequestHeader, writer io.Writer) buf.Writer { var authWriter io.Writer if request.Security.Is(protocol.SecurityType_NONE) { if request.Option.Has(protocol.RequestOptionChunkStream) { @@ -280,5 +280,5 @@ func (v *ServerSession) EncodeResponseBody(request *protocol.RequestHeader, writ authWriter = crypto.NewAuthenticationWriter(auth, writer) } - return v2io.NewAdaptiveWriter(authWriter) + return buf.NewWriter(authWriter) } diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 8b0f562b..1c212ab2 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -9,8 +9,8 @@ import ( "v2ray.com/core/app/proxyman" "v2ray.com/core/common" "v2ray.com/core/common/buf" + "v2ray.com/core/common/bufio" "v2ray.com/core/common/errors" - v2io "v2ray.com/core/common/io" "v2ray.com/core/common/loader" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" @@ -139,7 +139,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { connReader := v2net.NewTimeOutReader(8, connection) defer connReader.Release() - reader := v2io.NewBufferedReader(connReader) + reader := bufio.NewReader(connReader) defer reader.Release() v.RLock() @@ -186,7 +186,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { go func() { bodyReader := session.DecodeRequestBody(request, reader) - if err := v2io.PipeUntilEOF(bodyReader, input); err != nil { + if err := buf.PipeUntilEOF(bodyReader, input); err != nil { connection.SetReusable(false) } bodyReader.Release() @@ -195,7 +195,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { readFinish.Unlock() }() - writer := v2io.NewBufferedWriter(connection) + writer := bufio.NewWriter(connection) defer writer.Release() response := &protocol.ResponseHeader{ @@ -218,7 +218,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { writer.SetCached(false) - if err := v2io.PipeUntilEOF(output, bodyWriter); err != nil { + if err := buf.PipeUntilEOF(output, bodyWriter); err != nil { connection.SetReusable(false) } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index f7a02f80..9bca067e 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -5,7 +5,7 @@ import ( "v2ray.com/core/app" "v2ray.com/core/common/buf" - v2io "v2ray.com/core/common/io" + "v2ray.com/core/common/bufio" "v2ray.com/core/common/loader" "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" @@ -92,10 +92,10 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.B return } -func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *buf.Buffer, input v2io.Reader, finish *sync.Mutex) { +func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *buf.Buffer, input buf.Reader, finish *sync.Mutex) { defer finish.Unlock() - writer := v2io.NewBufferedWriter(conn) + writer := bufio.NewWriter(conn) defer writer.Release() session.EncodeRequestHeader(request, writer) @@ -111,7 +111,7 @@ func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, co } writer.SetCached(false) - if err := v2io.PipeUntilEOF(input, bodyWriter); err != nil { + if err := buf.PipeUntilEOF(input, bodyWriter); err != nil { conn.SetReusable(false) } @@ -124,10 +124,10 @@ func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, co return } -func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) { +func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output buf.Writer, finish *sync.Mutex) { defer finish.Unlock() - reader := v2io.NewBufferedReader(conn) + reader := bufio.NewReader(conn) defer reader.Release() header, err := session.DecodeResponseHeader(reader) @@ -146,7 +146,7 @@ func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, c bodyReader := session.DecodeResponseBody(request, reader) defer bodyReader.Release() - if err := v2io.PipeUntilEOF(bodyReader, output); err != nil { + if err := buf.PipeUntilEOF(bodyReader, output); err != nil { conn.SetReusable(false) } diff --git a/transport/ray/ray.go b/transport/ray/ray.go index 7ff8a28f..b949d119 100644 --- a/transport/ray/ray.go +++ b/transport/ray/ray.go @@ -1,8 +1,6 @@ package ray -import ( - v2io "v2ray.com/core/common/io" -) +import "v2ray.com/core/common/buf" // OutboundRay is a transport interface for outbound connections. type OutboundRay interface { @@ -36,11 +34,11 @@ type Ray interface { } type InputStream interface { - v2io.Reader + buf.Reader Close() } type OutputStream interface { - v2io.Writer + buf.Writer Close() }