Browse Source

compact buffers

pull/1869/head^2
Darien Raymond 6 years ago
parent
commit
fc92b6295a
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
  1. 25
      common/buf/multi_buffer.go
  2. 30
      transport/internet/quic/conn.go
  3. 28
      transport/internet/websocket/connection.go

25
common/buf/multi_buffer.go

@ -3,6 +3,7 @@ package buf
import (
"io"
"v2ray.com/core/common"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/serial"
)
@ -121,6 +122,30 @@ func SplitBytes(mb MultiBuffer, b []byte) (MultiBuffer, int) {
return mb, totalBytes
}
// Compact returns another MultiBuffer by merging all content of the given one together.
func Compact(mb MultiBuffer) MultiBuffer {
if len(mb) == 0 {
return mb
}
mb2 := make(MultiBuffer, 0, len(mb))
last := mb[0]
for i := 1; i < len(mb); i++ {
curr := mb[i]
if last.Len()+curr.Len() > Size {
mb2 = append(mb2, last)
last = curr
} else {
common.Must2(last.ReadFrom(curr))
curr.Release()
}
}
mb2 = append(mb2, last)
return mb2
}
// SplitFirst splits the first Buffer from the beginning of the MultiBuffer.
func SplitFirst(mb MultiBuffer) (MultiBuffer, *Buffer) {
if len(mb) == 0 {

30
transport/internet/quic/conn.go

@ -171,33 +171,17 @@ func (c *interConn) ReadMultiBuffer() (buf.MultiBuffer, error) {
}
func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error {
if mb.IsEmpty() {
return nil
}
if len(mb) == 1 {
_, err := c.Write(mb[0].Bytes())
buf.ReleaseMulti(mb)
return err
}
b := getBuffer()
defer putBuffer(b)
reader := buf.MultiBufferContainer{
MultiBuffer: mb,
}
defer reader.Close()
mb = buf.Compact(mb)
for {
nBytes, err := reader.Read(b[:1200])
if err != nil {
mb2, b := buf.SplitFirst(mb)
mb = mb2
if b == nil {
break
}
if nBytes == 0 {
continue
}
if _, err := c.Write(b[:nBytes]); err != nil {
if err := buf.WriteAllBytes(c, b.Bytes()); err != nil {
buf.ReleaseMulti(mb)
return err
}
}

28
transport/internet/websocket/connection.go

@ -18,10 +18,9 @@ var (
// connection is a wrapper for net.Conn over WebSocket connection.
type connection struct {
conn *websocket.Conn
reader io.Reader
mergingWriter *buf.BufferedWriter
remoteAddr net.Addr
conn *websocket.Conn
reader io.Reader
remoteAddr net.Addr
}
func newConnection(conn *websocket.Conn, remoteAddr net.Addr) *connection {
@ -70,13 +69,22 @@ func (c *connection) Write(b []byte) (int, error) {
}
func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
if c.mergingWriter == nil {
c.mergingWriter = buf.NewBufferedWriter(&buf.BufferToBytesWriter{Writer: c})
}
if err := c.mergingWriter.WriteMultiBuffer(mb); err != nil {
return err
mb = buf.Compact(mb)
for {
mb2, b := buf.SplitFirst(mb)
mb = mb2
if b == nil {
break
}
if err := buf.WriteAllBytes(c, b.Bytes()); err != nil {
buf.ReleaseMulti(mb)
return err
}
}
return c.mergingWriter.Flush()
return nil
}
func (c *connection) Close() error {

Loading…
Cancel
Save