From 3cb1951dfc7be72d7151cb8d86e299f1201a6acd Mon Sep 17 00:00:00 2001 From: V2Ray Date: Thu, 8 Oct 2015 17:41:38 +0200 Subject: [PATCH] Use []byte in pool instead of buffer --- common/alloc/buffer.go | 53 ++++++++++++-------------- common/net/transport.go | 2 - proxy/freedom/freedom.go | 2 - proxy/socks/udp.go | 6 ++- proxy/vmess/vmessin.go | 13 ++++--- proxy/vmess/vmessin_udp.go | 2 +- proxy/vmess/vmessout.go | 2 - release/config/vpoint_socks_vmess.json | 2 +- 8 files changed, 38 insertions(+), 44 deletions(-) diff --git a/common/alloc/buffer.go b/common/alloc/buffer.go index 25c343a6..8d241fc3 100644 --- a/common/alloc/buffer.go +++ b/common/alloc/buffer.go @@ -12,10 +12,13 @@ type Buffer struct { func (b *Buffer) Release() { b.pool.free(b) + b.head = nil + b.Value = nil + b.pool = nil } func (b *Buffer) Clear() { - b.Value = b.Value[:0] + b.Value = b.head[:0] } func (b *Buffer) Append(data []byte) { @@ -35,38 +38,41 @@ func (b *Buffer) Len() int { } type bufferPool struct { - chain chan *Buffer - allocator func(*bufferPool) *Buffer - elements2Keep int + chain chan []byte + bufferSize int + buffers2Keep int } -func newBufferPool(allocator func(*bufferPool) *Buffer, elements2Keep, size int) *bufferPool { +func newBufferPool(bufferSize, buffers2Keep, poolSize int) *bufferPool { pool := &bufferPool{ - chain: make(chan *Buffer, size), - allocator: allocateSmall, - elements2Keep: elements2Keep, + chain: make(chan []byte, poolSize), + bufferSize: bufferSize, + buffers2Keep: buffers2Keep, } - for i := 0; i < elements2Keep; i++ { - pool.chain <- allocator(pool) + for i := 0; i < buffers2Keep; i++ { + pool.chain <- make([]byte, bufferSize) } go pool.cleanup(time.Tick(1 * time.Second)) return pool } func (p *bufferPool) allocate() *Buffer { - var b *Buffer + var b []byte select { case b = <-p.chain: default: - b = p.allocator(p) + b = make([]byte, p.bufferSize) + } + return &Buffer{ + head: b, + pool: p, + Value: b, } - b.Value = b.head - return b } func (p *bufferPool) free(buffer *Buffer) { select { - case p.chain <- buffer: + case p.chain <- buffer.head: default: } } @@ -74,26 +80,17 @@ func (p *bufferPool) free(buffer *Buffer) { func (p *bufferPool) cleanup(tick <-chan time.Time) { for range tick { pSize := len(p.chain) - if pSize > p.elements2Keep { + if pSize > p.buffers2Keep { <-p.chain continue } - for delta := pSize - p.elements2Keep; delta > 0; delta-- { - p.chain <- p.allocator(p) + for delta := pSize - p.buffers2Keep; delta > 0; delta-- { + p.chain <- make([]byte, p.bufferSize) } } } -func allocateSmall(pool *bufferPool) *Buffer { - b := &Buffer{ - head: make([]byte, 8*1024), - } - b.Value = b.head - b.pool = pool - return b -} - -var smallPool = newBufferPool(allocateSmall, 256, 2048) +var smallPool = newBufferPool(8*1024, 256, 2048) func NewBuffer() *Buffer { return smallPool.allocate() diff --git a/common/net/transport.go b/common/net/transport.go index 4e718565..98e2ab5e 100644 --- a/common/net/transport.go +++ b/common/net/transport.go @@ -23,7 +23,6 @@ func ReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error { stream <- buffer } else { buffer.Release() - buffer = nil } if err != nil { return err @@ -36,7 +35,6 @@ func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error { for buffer := range stream { _, err := writer.Write(buffer.Value) buffer.Release() - buffer = nil if err != nil { return err } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 2e6f6234..adc8e97a 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -36,7 +36,6 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.Outb if chunk := firstPacket.Chunk(); chunk != nil { conn.Write(chunk.Value) chunk.Release() - chunk = nil } if !firstPacket.MoreChunks() { @@ -74,7 +73,6 @@ func dumpOutput(conn net.Conn, output chan<- *alloc.Buffer, finish *sync.Mutex, output <- response } else { response.Release() - response = nil } if err != nil { return diff --git a/proxy/socks/udp.go b/proxy/socks/udp.go index 054e9b3b..9d5e97e7 100644 --- a/proxy/socks/udp.go +++ b/proxy/socks/udp.go @@ -40,22 +40,25 @@ func (server *SocksServer) getUDPAddr() v2net.Address { func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error { for { buffer := alloc.NewBuffer() - defer buffer.Release() nBytes, addr, err := conn.ReadFromUDP(buffer.Value) if err != nil { log.Error("Socks failed to read UDP packets: %v", err) + buffer.Release() continue } buffer.Slice(0, nBytes) log.Info("Client UDP connection from %v", addr) request, err := protocol.ReadUDPRequest(buffer.Value) + buffer.Release() if err != nil { log.Error("Socks failed to parse UDP request: %v", err) + request.Data.Release() continue } if request.Fragment != 0 { log.Warning("Dropping framented UDP packets.") // TODO handle fragments + request.Data.Release() continue } @@ -79,7 +82,6 @@ func (server *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet, udpMessage := response.Bytes(nil) nBytes, err := conn.WriteToUDP(udpMessage, clientAddr) response.Data.Release() - response.Data = nil if err != nil { log.Error("Socks failed to write UDP message (%d bytes) to %s: %v", nBytes, clientAddr.String(), err) } diff --git a/proxy/vmess/vmessin.go b/proxy/vmess/vmessin.go index 291669bd..bda9dc88 100644 --- a/proxy/vmess/vmessin.go +++ b/proxy/vmess/vmessin.go @@ -91,14 +91,15 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er } // Optimize for small response packet - buffer := make([]byte, 0, 4*1024) - buffer = append(buffer, request.ResponseHeader...) + buffer := alloc.NewBuffer() + buffer.Clear() + buffer.Append(request.ResponseHeader) if data, open := <-output; open { - buffer = append(buffer, data.Value...) - data = nil - responseWriter.Write(buffer) - buffer = nil + buffer.Append(data.Value) + data.Release() + responseWriter.Write(buffer.Value) + buffer.Release() go handleOutput(request, responseWriter, output, &writeFinish) writeFinish.Lock() } diff --git a/proxy/vmess/vmessin_udp.go b/proxy/vmess/vmessin_udp.go index 3c9729d3..7c84fd1b 100644 --- a/proxy/vmess/vmessin_udp.go +++ b/proxy/vmess/vmessin_udp.go @@ -60,6 +60,7 @@ func (handler *VMessInboundHandler) AcceptPackets(conn *net.UDPConn) { nBytes, err = cryptReader.Read(data.Value) if err != nil { log.Warning("VMessIn: Unable to decrypt data: %v", err) + data.Release() continue } data.Slice(0, nBytes) @@ -91,7 +92,6 @@ func (handler *VMessInboundHandler) handlePacket(conn *net.UDPConn, request *pro hasData = true responseWriter.Write(data.Value) data.Release() - data = nil } if hasData { diff --git a/proxy/vmess/vmessout.go b/proxy/vmess/vmessout.go index 1ef4e0eb..6846d259 100644 --- a/proxy/vmess/vmessout.go +++ b/proxy/vmess/vmessout.go @@ -150,11 +150,9 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2 encryptRequestWriter.Crypt(firstChunk.Value) requestBytes = append(requestBytes, firstChunk.Value...) firstChunk.Release() - firstChunk = nil _, err = conn.Write(requestBytes) buffer.Release() - buffer = nil if err != nil { log.Error("VMessOut: Failed to write VMess request: %v", err) return diff --git a/release/config/vpoint_socks_vmess.json b/release/config/vpoint_socks_vmess.json index fce0dfc8..dc561501 100644 --- a/release/config/vpoint_socks_vmess.json +++ b/release/config/vpoint_socks_vmess.json @@ -12,7 +12,7 @@ "settings": { "vnext": [ { - "address": "127.0.0.1", + "address": "45.78.9.54", "port": 27183, "users": [ {"id": "ad937d9d-6e23-4a5a-ba23-bce5092a7c51"}