remove use of buf.NewSize()

pull/1331/head
Darien Raymond 6 years ago
parent 053fc38d38
commit fdb3a7b57d
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169

@ -51,7 +51,11 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
return nil, err return nil, err
} }
b := buf.NewSize(int32(size)) if size > buf.Size {
return nil, newError("packet size too large: ", size)
}
b := buf.New()
if err := b.Reset(buf.ReadFullFrom(r.reader, int32(size))); err != nil { if err := b.Reset(buf.ReadFullFrom(r.reader, int32(size))); err != nil {
b.Release() b.Release()
return nil, err return nil, err

@ -2,6 +2,13 @@ package buf
import ( import (
"io" "io"
"v2ray.com/core/common/bytespool"
)
const (
// Size of a regular buffer.
Size = 2048
) )
// Supplier is a writer that writes contents into the given buffer. // Supplier is a writer that writes contents into the given buffer.
@ -11,8 +18,7 @@ type Supplier func([]byte) (int, error)
// the buffer into an internal buffer pool, in order to recreate a buffer more // the buffer into an internal buffer pool, in order to recreate a buffer more
// quickly. // quickly.
type Buffer struct { type Buffer struct {
v []byte v []byte
start int32 start int32
end int32 end int32
} }
@ -22,10 +28,9 @@ func (b *Buffer) Release() {
if b == nil || b.v == nil { if b == nil || b.v == nil {
return return
} }
freeBytes(b.v) bytespool.Free(b.v)
b.v = nil b.v = nil
b.start = 0 b.Clear()
b.end = 0
} }
// Clear clears the content of the buffer, results an empty buffer with // Clear clears the content of the buffer, results an empty buffer with
@ -167,13 +172,6 @@ func (b *Buffer) String() string {
// New creates a Buffer with 0 length and 2K capacity. // New creates a Buffer with 0 length and 2K capacity.
func New() *Buffer { func New() *Buffer {
return &Buffer{ return &Buffer{
v: pool[0].Get().([]byte), v: bytespool.Alloc(Size),
}
}
// NewSize creates and returns a buffer with 0 length and at least the given capacity. Capacity must be positive.
func NewSize(capacity int32) *Buffer {
return &Buffer{
v: newBytes(capacity),
} }
} }

@ -47,10 +47,3 @@ func BenchmarkNewBuffer(b *testing.B) {
buffer.Release() buffer.Release()
} }
} }
func BenchmarkNewLocalBuffer(b *testing.B) {
for i := 0; i < b.N; i++ {
buffer := NewSize(Size)
buffer.Release()
}
}

@ -229,7 +229,7 @@ func (mb *MultiBuffer) SliceBySize(size int32) MultiBuffer {
} }
*mb = (*mb)[endIndex:] *mb = (*mb)[endIndex:]
if endIndex == 0 && len(*mb) > 0 { if endIndex == 0 && len(*mb) > 0 {
b := NewSize(size) b := New()
common.Must(b.Reset(ReadFullFrom((*mb)[0], size))) common.Must(b.Reset(ReadFullFrom((*mb)[0], size)))
return NewMultiBufferValue(b) return NewMultiBufferValue(b)
} }

@ -40,14 +40,14 @@ func TestMultiBufferAppend(t *testing.T) {
func TestMultiBufferSliceBySizeLarge(t *testing.T) { func TestMultiBufferSliceBySizeLarge(t *testing.T) {
assert := With(t) assert := With(t)
lb := NewSize(8 * 1024) lb := make([]byte, 8*1024)
common.Must(lb.Reset(ReadFrom(rand.Reader))) common.Must2(io.ReadFull(rand.Reader, lb))
var mb MultiBuffer var mb MultiBuffer
mb.Append(lb) common.Must2(mb.Write(lb))
mb2 := mb.SliceBySize(4 * 1024) mb2 := mb.SliceBySize(1024)
assert(mb2.Len(), Equals, int32(4*1024)) assert(mb2.Len(), Equals, int32(1024))
} }
func TestInterface(t *testing.T) { func TestInterface(t *testing.T) {

@ -4,6 +4,7 @@ import (
"io" "io"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/bytespool"
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
) )
@ -23,6 +24,8 @@ func readOne(r io.Reader) (*Buffer, error) {
return nil, newError("Reader returns too many empty payloads.") return nil, newError("Reader returns too many empty payloads.")
} }
const largeSize = 128 * 1024
// BytesToBufferReader is a Reader that adjusts its reading speed automatically. // BytesToBufferReader is a Reader that adjusts its reading speed automatically.
type BytesToBufferReader struct { type BytesToBufferReader struct {
io.Reader io.Reader
@ -42,13 +45,13 @@ func (r *BytesToBufferReader) readSmall() (MultiBuffer, error) {
return nil, err return nil, err
} }
if b.IsFull() && largeSize > Size { if b.IsFull() && largeSize > Size {
r.buffer = newBytes(Size + 1) r.buffer = bytespool.Alloc(Size + 100)
} }
return NewMultiBufferValue(b), nil return NewMultiBufferValue(b), nil
} }
func (r *BytesToBufferReader) freeBuffer() { func (r *BytesToBufferReader) freeBuffer() {
freeBytes(r.buffer) bytespool.Free(r.buffer)
r.buffer = nil r.buffer = nil
} }
@ -63,8 +66,8 @@ func (r *BytesToBufferReader) ReadMultiBuffer() (MultiBuffer, error) {
mb := NewMultiBufferCap(int32(nBytes/Size) + 1) mb := NewMultiBufferCap(int32(nBytes/Size) + 1)
common.Must2(mb.Write(r.buffer[:nBytes])) common.Must2(mb.Write(r.buffer[:nBytes]))
if nBytes == len(r.buffer) && nBytes < int(largeSize) { if nBytes == len(r.buffer) && nBytes < int(largeSize) {
freeBytes(r.buffer) bytespool.Free(r.buffer)
r.buffer = newBytes(int32(nBytes) + 1) r.buffer = bytespool.Alloc(int32(nBytes) + 100)
} else if nBytes < Size { } else if nBytes < Size {
r.freeBuffer() r.freeBuffer()
} }

@ -16,7 +16,7 @@ func TestAdaptiveReader(t *testing.T) {
reader := NewReader(bytes.NewReader(make([]byte, 1024*1024))) reader := NewReader(bytes.NewReader(make([]byte, 1024*1024)))
b, err := reader.ReadMultiBuffer() b, err := reader.ReadMultiBuffer()
assert(err, IsNil) assert(err, IsNil)
assert(b.Len(), Equals, int32(2*1024)) assert(b.Len(), Equals, int32(Size))
b, err = reader.ReadMultiBuffer() b, err = reader.ReadMultiBuffer()
assert(err, IsNil) assert(err, IsNil)

@ -1,13 +1,6 @@
package buf package bytespool
import ( import "sync"
"sync"
)
const (
// Size of a regular buffer.
Size = 2 * 1024
)
func createAllocFunc(size int32) func() interface{} { func createAllocFunc(size int32) func() interface{} {
return func() interface{} { return func() interface{} {
@ -25,24 +18,23 @@ const (
) )
var ( var (
pool [numPools]sync.Pool pool [numPools]sync.Pool
poolSize [numPools]int32 poolSize [numPools]int32
largeSize int32
) )
func init() { func init() {
size := int32(Size) size := int32(2048)
for i := 0; i < numPools; i++ { for i := 0; i < numPools; i++ {
pool[i] = sync.Pool{ pool[i] = sync.Pool{
New: createAllocFunc(size), New: createAllocFunc(size),
} }
poolSize[i] = size poolSize[i] = size
largeSize = size
size *= sizeMulti size *= sizeMulti
} }
} }
func newBytes(size int32) []byte { // Alloc returns a byte slice with at least the given size. Minimum size of returned slice is 2048.
func Alloc(size int32) []byte {
for idx, ps := range poolSize { for idx, ps := range poolSize {
if size <= ps { if size <= ps {
return pool[idx].Get().([]byte) return pool[idx].Get().([]byte)
@ -51,7 +43,8 @@ func newBytes(size int32) []byte {
return make([]byte, size) return make([]byte, size)
} }
func freeBytes(b []byte) { // Free puts a byte slice into the internal pool.
func Free(b []byte) {
size := int32(cap(b)) size := int32(cap(b))
b = b[0:cap(b)] b = b[0:cap(b)]
for i := numPools - 1; i >= 0; i-- { for i := numPools - 1; i >= 0; i-- {

@ -7,6 +7,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/bytespool"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
) )
@ -122,62 +123,85 @@ func (r *AuthenticationReader) readSize() (uint16, uint16, error) {
var errSoft = newError("waiting for more data") var errSoft = newError("waiting for more data")
func (r *AuthenticationReader) readInternal(soft bool) (*buf.Buffer, error) { func (r *AuthenticationReader) readBuffer(size int32, padding int32) (*buf.Buffer, error) {
b := buf.New()
if err := b.Reset(buf.ReadFullFrom(r.reader, size)); err != nil {
b.Release()
return nil, err
}
size -= padding
rb, err := r.auth.Open(b.BytesTo(0), b.BytesTo(size))
if err != nil {
b.Release()
return nil, err
}
b.Resize(0, int32(len(rb)))
return b, nil
}
func (r *AuthenticationReader) readInternal(soft bool, mb *buf.MultiBuffer) error {
if soft && r.reader.BufferedBytes() < r.sizeParser.SizeBytes() { if soft && r.reader.BufferedBytes() < r.sizeParser.SizeBytes() {
return nil, errSoft return errSoft
} }
if r.done { if r.done {
return nil, io.EOF return io.EOF
} }
size, padding, err := r.readSize() size, padding, err := r.readSize()
if err != nil { if err != nil {
return nil, err return err
} }
if size == uint16(r.auth.Overhead())+padding { if size == uint16(r.auth.Overhead())+padding {
r.done = true r.done = true
return nil, io.EOF return io.EOF
} }
if soft && int32(size) > r.reader.BufferedBytes() { if soft && int32(size) > r.reader.BufferedBytes() {
r.size = size r.size = size
r.paddingLen = padding r.paddingLen = padding
r.hasSize = true r.hasSize = true
return nil, errSoft return errSoft
} }
b := buf.NewSize(int32(size)) if size <= buf.Size {
if err := b.Reset(buf.ReadFullFrom(r.reader, int32(size))); err != nil { b, err := r.readBuffer(int32(size), int32(padding))
b.Release() if err != nil {
return nil, err return nil
}
mb.Append(b)
return nil
}
payload := bytespool.Alloc(int32(size))
defer bytespool.Free(payload)
if _, err := io.ReadFull(r.reader, payload[:size]); err != nil {
return err
} }
size -= padding size -= padding
rb, err := r.auth.Open(b.BytesTo(0), b.BytesTo(int32(size))) rb, err := r.auth.Open(payload[:0], payload[:size])
if err != nil { if err != nil {
b.Release() return err
return nil, err
} }
b.Resize(0, int32(len(rb)))
return b, nil common.Must2(mb.Write(rb))
return nil
} }
func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) { func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
b, err := r.readInternal(false)
if err != nil {
return nil, err
}
const readSize = 16 const readSize = 16
mb := buf.NewMultiBufferCap(readSize) mb := buf.NewMultiBufferCap(readSize)
mb.Append(b) if err := r.readInternal(false, &mb); err != nil {
mb.Release()
return nil, err
}
for i := 1; i < readSize; i++ { for i := 1; i < readSize; i++ {
b, err := r.readInternal(true) err := r.readInternal(true, &mb)
if err == errSoft || err == io.EOF { if err == errSoft || err == io.EOF {
break break
} }
@ -185,7 +209,6 @@ func (r *AuthenticationReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
mb.Release() mb.Release()
return nil, err return nil, err
} }
mb.Append(b)
} }
return mb, nil return mb, nil

@ -1,6 +1,7 @@
package crypto_test package crypto_test
import ( import (
"bytes"
"crypto/aes" "crypto/aes"
"crypto/cipher" "crypto/cipher"
"crypto/rand" "crypto/rand"
@ -29,11 +30,11 @@ func TestAuthenticationReaderWriter(t *testing.T) {
rawPayload := make([]byte, payloadSize) rawPayload := make([]byte, payloadSize)
rand.Read(rawPayload) rand.Read(rawPayload)
payload := buf.NewSize(payloadSize) var payload buf.MultiBuffer
payload.Write(rawPayload) payload.Write(rawPayload)
assert(payload.Len(), Equals, int32(payloadSize)) assert(payload.Len(), Equals, int32(payloadSize))
cache := buf.NewSize(160 * 1024) cache := bytes.NewBuffer(nil)
iv := make([]byte, 12) iv := make([]byte, 12)
rand.Read(iv) rand.Read(iv)
@ -43,8 +44,8 @@ func TestAuthenticationReaderWriter(t *testing.T) {
AdditionalDataGenerator: GenerateEmptyBytes(), AdditionalDataGenerator: GenerateEmptyBytes(),
}, PlainChunkSizeParser{}, cache, protocol.TransferTypeStream, nil) }, PlainChunkSizeParser{}, cache, protocol.TransferTypeStream, nil)
assert(writer.WriteMultiBuffer(buf.NewMultiBufferValue(payload)), IsNil) assert(writer.WriteMultiBuffer(payload), IsNil)
assert(cache.Len(), Equals, int32(82658)) assert(cache.Len(), Equals, int(82658))
assert(writer.WriteMultiBuffer(buf.MultiBuffer{}), IsNil) assert(writer.WriteMultiBuffer(buf.MultiBuffer{}), IsNil)
reader := NewAuthenticationReader(&AEADAuthenticator{ reader := NewAuthenticationReader(&AEADAuthenticator{
@ -83,7 +84,7 @@ func TestAuthenticationReaderWriterPacket(t *testing.T) {
aead, err := cipher.NewGCM(block) aead, err := cipher.NewGCM(block)
assert(err, IsNil) assert(err, IsNil)
cache := buf.NewSize(1024) cache := buf.New()
iv := make([]byte, 12) iv := make([]byte, 12)
rand.Read(iv) rand.Read(iv)

@ -1,6 +1,7 @@
package crypto_test package crypto_test
import ( import (
"bytes"
"io" "io"
"testing" "testing"
@ -13,7 +14,7 @@ import (
func TestChunkStreamIO(t *testing.T) { func TestChunkStreamIO(t *testing.T) {
assert := With(t) assert := With(t)
cache := buf.NewSize(8192) cache := bytes.NewBuffer(make([]byte, 0, 8192))
writer := NewChunkStreamWriter(PlainChunkSizeParser{}, cache) writer := NewChunkStreamWriter(PlainChunkSizeParser{}, cache)
reader := NewChunkStreamReader(PlainChunkSizeParser{}, cache) reader := NewChunkStreamReader(PlainChunkSizeParser{}, cache)

@ -8,6 +8,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/bytespool"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
) )
@ -76,24 +77,26 @@ func (v *ChunkReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
} }
size += AuthSize size += AuthSize
buffer := buf.NewSize(int32(size)) buffer := bytespool.Alloc(int32(size))
if err := buffer.AppendSupplier(buf.ReadFullFrom(v.reader, int32(size))); err != nil { defer bytespool.Free(buffer)
buffer.Release()
if _, err := io.ReadFull(v.reader, buffer[:size]); err != nil {
return nil, err return nil, err
} }
authBytes := buffer.BytesTo(AuthSize) authBytes := buffer[:AuthSize]
payload := buffer.BytesFrom(AuthSize) payload := buffer[AuthSize:size]
actualAuthBytes := make([]byte, AuthSize) actualAuthBytes := make([]byte, AuthSize)
v.auth.Authenticate(payload)(actualAuthBytes) v.auth.Authenticate(payload)(actualAuthBytes)
if !bytes.Equal(authBytes, actualAuthBytes) { if !bytes.Equal(authBytes, actualAuthBytes) {
buffer.Release()
return nil, newError("invalid auth") return nil, newError("invalid auth")
} }
buffer.Advance(AuthSize)
return buf.NewMultiBufferValue(buffer), nil var mb buf.MultiBuffer
common.Must2(mb.Write(payload))
return mb, nil
} }
type ChunkWriter struct { type ChunkWriter struct {

@ -24,11 +24,11 @@ func TestNormalChunkReading(t *testing.T) {
func TestNormalChunkWriting(t *testing.T) { func TestNormalChunkWriting(t *testing.T) {
assert := With(t) assert := With(t)
buffer := buf.NewSize(512) buffer := buf.New()
writer := NewChunkWriter(buffer, NewAuthenticator(ChunkKeyGenerator( writer := NewChunkWriter(buffer, NewAuthenticator(ChunkKeyGenerator(
[]byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36}))) []byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36})))
b := buf.NewSize(256) b := buf.New()
b.Write([]byte{11, 12, 13, 14, 15, 16, 17, 18}) b.Write([]byte{11, 12, 13, 14, 15, 16, 17, 18})
err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b))
assert(err, IsNil) assert(err, IsNil)

@ -3,6 +3,7 @@ package shadowsocks_test
import ( import (
"testing" "testing"
"v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
@ -29,7 +30,7 @@ func TestUDPEncoding(t *testing.T) {
}, },
} }
data := buf.NewSize(256) data := buf.New()
data.AppendSupplier(serial.WriteString("test string")) data.AppendSupplier(serial.WriteString("test string"))
encodedData, err := EncodeUDPPacket(request, data.Bytes()) encodedData, err := EncodeUDPPacket(request, data.Bytes())
assert(err, IsNil) assert(err, IsNil)
@ -104,8 +105,7 @@ func TestTCPRequest(t *testing.T) {
runTest := func(request *protocol.RequestHeader, payload []byte) { runTest := func(request *protocol.RequestHeader, payload []byte) {
data := buf.New() data := buf.New()
defer data.Release() common.Must2(data.Write(payload))
data.Write(payload)
cache := buf.New() cache := buf.New()
defer cache.Release() defer cache.Release()
@ -142,6 +142,8 @@ func TestUDPReaderWriter(t *testing.T) {
}), }),
} }
cache := buf.New() cache := buf.New()
defer cache.Release()
writer := &buf.SequentialWriter{Writer: &UDPWriter{ writer := &buf.SequentialWriter{Writer: &UDPWriter{
Writer: cache, Writer: cache,
Request: &protocol.RequestHeader{ Request: &protocol.RequestHeader{
@ -158,21 +160,25 @@ func TestUDPReaderWriter(t *testing.T) {
User: user, User: user,
} }
b := buf.New() {
b.AppendSupplier(serial.WriteString("test payload")) b := buf.New()
err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) b.AppendSupplier(serial.WriteString("test payload"))
assert(err, IsNil) err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b))
assert(err, IsNil)
payload, err := reader.ReadMultiBuffer() payload, err := reader.ReadMultiBuffer()
assert(err, IsNil) assert(err, IsNil)
assert(payload[0].String(), Equals, "test payload") assert(payload[0].String(), Equals, "test payload")
}
b = buf.New() {
b.AppendSupplier(serial.WriteString("test payload 2")) b := buf.New()
err = writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) b.AppendSupplier(serial.WriteString("test payload 2"))
assert(err, IsNil) err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b))
assert(err, IsNil)
payload, err = reader.ReadMultiBuffer() payload, err := reader.ReadMultiBuffer()
assert(err, IsNil) assert(err, IsNil)
assert(payload[0].String(), Equals, "test payload 2") assert(payload[0].String(), Equals, "test payload 2")
}
} }

@ -10,6 +10,7 @@ import (
"v2ray.com/core" "v2ray.com/core"
"v2ray.com/core/app/proxyman" "v2ray.com/core/app/proxyman"
"v2ray.com/core/common"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/protocol/tls/cert" "v2ray.com/core/common/protocol/tls/cert"
@ -36,7 +37,7 @@ func TestSimpleTLSConnection(t *testing.T) {
MsgProcessor: xor, MsgProcessor: xor,
} }
dest, err := tcpServer.Start() dest, err := tcpServer.Start()
assert(err, IsNil) common.Must(err)
defer tcpServer.Close() defer tcpServer.Close()
userID := protocol.NewID(uuid.New()) userID := protocol.NewID(uuid.New())
@ -123,24 +124,25 @@ func TestSimpleTLSConnection(t *testing.T) {
} }
servers, err := InitializeServerConfigs(serverConfig, clientConfig) servers, err := InitializeServerConfigs(serverConfig, clientConfig)
assert(err, IsNil) common.Must(err)
defer CloseAllServers(servers)
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: int(clientPort),
})
assert(err, IsNil)
payload := "dokodemo request." {
nBytes, err := conn.Write([]byte(payload)) conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
assert(err, IsNil) IP: []byte{127, 0, 0, 1},
assert(nBytes, Equals, len(payload)) Port: int(clientPort),
})
assert(err, IsNil)
response := readFrom(conn, time.Second*2, len(payload)) payload := "dokodemo request."
assert(response, Equals, xor([]byte(payload))) nBytes, err := conn.Write([]byte(payload))
assert(conn.Close(), IsNil) assert(err, IsNil)
assert(nBytes, Equals, len(payload))
CloseAllServers(servers) response := readFrom(conn, time.Second*2, len(payload))
assert(response, Equals, xor([]byte(payload)))
assert(conn.Close(), IsNil)
}
} }
func TestAutoIssuingCertificate(t *testing.T) { func TestAutoIssuingCertificate(t *testing.T) {

@ -16,7 +16,7 @@ func TestReaderWriter(t *testing.T) {
assert := With(t) assert := With(t)
cache := buf.New() cache := buf.New()
b := buf.NewSize(256) b := buf.New()
b.AppendSupplier(serial.WriteString("abcd" + ENDING)) b.AppendSupplier(serial.WriteString("abcd" + ENDING))
writer := NewHeaderWriter(b) writer := NewHeaderWriter(b)
err := writer.Write(cache) err := writer.Write(cache)

@ -70,7 +70,7 @@ func (w *KCPPacketWriter) Overhead() int {
} }
func (w *KCPPacketWriter) Write(b []byte) (int, error) { func (w *KCPPacketWriter) Write(b []byte) (int, error) {
bb := buf.NewSize(int32(len(b) + w.Overhead())) bb := buf.New()
defer bb.Release() defer bb.Release()
if w.Header != nil { if w.Header != nil {

Loading…
Cancel
Save