From fc92b6295a3129f0b31612451698a3b8e2268a41 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 27 Dec 2018 16:36:48 +0100 Subject: [PATCH] compact buffers --- common/buf/multi_buffer.go | 25 ++++++++++++++++++ transport/internet/quic/conn.go | 30 +++++----------------- transport/internet/websocket/connection.go | 28 ++++++++++++-------- 3 files changed, 50 insertions(+), 33 deletions(-) diff --git a/common/buf/multi_buffer.go b/common/buf/multi_buffer.go index 57ccf682..312853e2 100644 --- a/common/buf/multi_buffer.go +++ b/common/buf/multi_buffer.go @@ -3,6 +3,7 @@ package buf import ( "io" + "v2ray.com/core/common" "v2ray.com/core/common/errors" "v2ray.com/core/common/serial" ) @@ -121,6 +122,30 @@ func SplitBytes(mb MultiBuffer, b []byte) (MultiBuffer, int) { return mb, totalBytes } +// Compact returns another MultiBuffer by merging all content of the given one together. +func Compact(mb MultiBuffer) MultiBuffer { + if len(mb) == 0 { + return mb + } + + mb2 := make(MultiBuffer, 0, len(mb)) + last := mb[0] + + for i := 1; i < len(mb); i++ { + curr := mb[i] + if last.Len()+curr.Len() > Size { + mb2 = append(mb2, last) + last = curr + } else { + common.Must2(last.ReadFrom(curr)) + curr.Release() + } + } + + mb2 = append(mb2, last) + return mb2 +} + // SplitFirst splits the first Buffer from the beginning of the MultiBuffer. func SplitFirst(mb MultiBuffer) (MultiBuffer, *Buffer) { if len(mb) == 0 { diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go index 631954f3..afefbc09 100644 --- a/transport/internet/quic/conn.go +++ b/transport/internet/quic/conn.go @@ -171,33 +171,17 @@ func (c *interConn) ReadMultiBuffer() (buf.MultiBuffer, error) { } func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { - if mb.IsEmpty() { - return nil - } - - if len(mb) == 1 { - _, err := c.Write(mb[0].Bytes()) - buf.ReleaseMulti(mb) - return err - } - - b := getBuffer() - defer putBuffer(b) - - reader := buf.MultiBufferContainer{ - MultiBuffer: mb, - } - defer reader.Close() + mb = buf.Compact(mb) for { - nBytes, err := reader.Read(b[:1200]) - if err != nil { + mb2, b := buf.SplitFirst(mb) + mb = mb2 + if b == nil { break } - if nBytes == 0 { - continue - } - if _, err := c.Write(b[:nBytes]); err != nil { + + if err := buf.WriteAllBytes(c, b.Bytes()); err != nil { + buf.ReleaseMulti(mb) return err } } diff --git a/transport/internet/websocket/connection.go b/transport/internet/websocket/connection.go index 8ffce7d3..c15e7f7d 100644 --- a/transport/internet/websocket/connection.go +++ b/transport/internet/websocket/connection.go @@ -18,10 +18,9 @@ var ( // connection is a wrapper for net.Conn over WebSocket connection. type connection struct { - conn *websocket.Conn - reader io.Reader - mergingWriter *buf.BufferedWriter - remoteAddr net.Addr + conn *websocket.Conn + reader io.Reader + remoteAddr net.Addr } func newConnection(conn *websocket.Conn, remoteAddr net.Addr) *connection { @@ -70,13 +69,22 @@ func (c *connection) Write(b []byte) (int, error) { } func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error { - if c.mergingWriter == nil { - c.mergingWriter = buf.NewBufferedWriter(&buf.BufferToBytesWriter{Writer: c}) + mb = buf.Compact(mb) + + for { + mb2, b := buf.SplitFirst(mb) + mb = mb2 + if b == nil { + break + } + + if err := buf.WriteAllBytes(c, b.Bytes()); err != nil { + buf.ReleaseMulti(mb) + return err + } } - if err := c.mergingWriter.WriteMultiBuffer(mb); err != nil { - return err - } - return c.mergingWriter.Flush() + + return nil } func (c *connection) Close() error {