From f6433441540bfc2ec35da56397e92146d7bb5832 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Mon, 27 Mar 2017 11:26:44 +0200 Subject: [PATCH] merge buf stream --- common/buf/merge_reader.go | 23 ++++++++++++---------- common/buf/merge_reader_test.go | 33 ++++++++++++++++++++++++++++++++ proxy/shadowsocks/client.go | 7 +++++-- proxy/vmess/outbound/outbound.go | 11 +++++++++-- 4 files changed, 60 insertions(+), 14 deletions(-) create mode 100644 common/buf/merge_reader_test.go diff --git a/common/buf/merge_reader.go b/common/buf/merge_reader.go index 2a03b66b..5dc8df65 100644 --- a/common/buf/merge_reader.go +++ b/common/buf/merge_reader.go @@ -31,17 +31,20 @@ func (r *MergingReader) Read() (*Buffer, error) { return b, nil } - b2, err := r.timeoutReader.ReadTimeout(0) - if err != nil { - return b, nil - } + for { + b2, err := r.timeoutReader.ReadTimeout(0) + if err != nil { + break + } - nBytes := b.Append(b2.Bytes()) - b2.SliceFrom(nBytes) - if b2.IsEmpty() { - b2.Release() - } else { - r.leftover = b2 + nBytes := b.Append(b2.Bytes()) + b2.SliceFrom(nBytes) + if b2.IsEmpty() { + b2.Release() + } else { + r.leftover = b2 + break + } } return b, nil diff --git a/common/buf/merge_reader_test.go b/common/buf/merge_reader_test.go new file mode 100644 index 00000000..57cbdfc4 --- /dev/null +++ b/common/buf/merge_reader_test.go @@ -0,0 +1,33 @@ +package buf_test + +import ( + "testing" + + "context" + + . "v2ray.com/core/common/buf" + "v2ray.com/core/testing/assert" + "v2ray.com/core/transport/ray" +) + +func TestMergingReader(t *testing.T) { + assert := assert.On(t) + + stream := ray.NewStream(context.Background()) + b1 := New() + b1.AppendBytes('a', 'b', 'c') + stream.Write(b1) + + b2 := New() + b2.AppendBytes('e', 'f', 'g') + stream.Write(b2) + + b3 := New() + b3.AppendBytes('h', 'i', 'j') + stream.Write(b3) + + reader := NewMergingReader(stream) + b, err := reader.Read() + assert.Error(err).IsNil() + assert.String(b.String()).Equals("abcefghij") +} diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index d9f2d710..55530e37 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -101,10 +101,13 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale return err } - bufferedWriter.SetBuffered(false) + if err := bufferedWriter.SetBuffered(false); err != nil { + return err + } requestDone := signal.ExecuteAsync(func() error { - if err := buf.PipeUntilEOF(timer, outboundRay.OutboundInput(), bodyWriter); err != nil { + mergedInput := buf.NewMergingReader(outboundRay.OutboundInput()) + if err := buf.PipeUntilEOF(timer, mergedInput, bodyWriter); err != nil { return err } return nil diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 192cfbf3..c90fd535 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -124,9 +124,16 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial firstPayload.Release() } - writer.SetBuffered(false) + if err := writer.SetBuffered(false); err != nil { + return err + } - if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil { + var inputReader buf.Reader = input + if request.Command == protocol.RequestCommandTCP { + inputReader = buf.NewMergingReader(input) + } + + if err := buf.PipeUntilEOF(timer, inputReader, bodyWriter); err != nil { return err }