mirror of https://github.com/v2ray/v2ray-core
Use []byte in pool instead of buffer
parent
472cf7f523
commit
3cb1951dfc
|
@ -12,10 +12,13 @@ type Buffer struct {
|
||||||
|
|
||||||
func (b *Buffer) Release() {
|
func (b *Buffer) Release() {
|
||||||
b.pool.free(b)
|
b.pool.free(b)
|
||||||
|
b.head = nil
|
||||||
|
b.Value = nil
|
||||||
|
b.pool = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Buffer) Clear() {
|
func (b *Buffer) Clear() {
|
||||||
b.Value = b.Value[:0]
|
b.Value = b.head[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Buffer) Append(data []byte) {
|
func (b *Buffer) Append(data []byte) {
|
||||||
|
@ -35,38 +38,41 @@ func (b *Buffer) Len() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
type bufferPool struct {
|
type bufferPool struct {
|
||||||
chain chan *Buffer
|
chain chan []byte
|
||||||
allocator func(*bufferPool) *Buffer
|
bufferSize int
|
||||||
elements2Keep int
|
buffers2Keep int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBufferPool(allocator func(*bufferPool) *Buffer, elements2Keep, size int) *bufferPool {
|
func newBufferPool(bufferSize, buffers2Keep, poolSize int) *bufferPool {
|
||||||
pool := &bufferPool{
|
pool := &bufferPool{
|
||||||
chain: make(chan *Buffer, size),
|
chain: make(chan []byte, poolSize),
|
||||||
allocator: allocateSmall,
|
bufferSize: bufferSize,
|
||||||
elements2Keep: elements2Keep,
|
buffers2Keep: buffers2Keep,
|
||||||
}
|
}
|
||||||
for i := 0; i < elements2Keep; i++ {
|
for i := 0; i < buffers2Keep; i++ {
|
||||||
pool.chain <- allocator(pool)
|
pool.chain <- make([]byte, bufferSize)
|
||||||
}
|
}
|
||||||
go pool.cleanup(time.Tick(1 * time.Second))
|
go pool.cleanup(time.Tick(1 * time.Second))
|
||||||
return pool
|
return pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *bufferPool) allocate() *Buffer {
|
func (p *bufferPool) allocate() *Buffer {
|
||||||
var b *Buffer
|
var b []byte
|
||||||
select {
|
select {
|
||||||
case b = <-p.chain:
|
case b = <-p.chain:
|
||||||
default:
|
default:
|
||||||
b = p.allocator(p)
|
b = make([]byte, p.bufferSize)
|
||||||
|
}
|
||||||
|
return &Buffer{
|
||||||
|
head: b,
|
||||||
|
pool: p,
|
||||||
|
Value: b,
|
||||||
}
|
}
|
||||||
b.Value = b.head
|
|
||||||
return b
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *bufferPool) free(buffer *Buffer) {
|
func (p *bufferPool) free(buffer *Buffer) {
|
||||||
select {
|
select {
|
||||||
case p.chain <- buffer:
|
case p.chain <- buffer.head:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -74,26 +80,17 @@ func (p *bufferPool) free(buffer *Buffer) {
|
||||||
func (p *bufferPool) cleanup(tick <-chan time.Time) {
|
func (p *bufferPool) cleanup(tick <-chan time.Time) {
|
||||||
for range tick {
|
for range tick {
|
||||||
pSize := len(p.chain)
|
pSize := len(p.chain)
|
||||||
if pSize > p.elements2Keep {
|
if pSize > p.buffers2Keep {
|
||||||
<-p.chain
|
<-p.chain
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for delta := pSize - p.elements2Keep; delta > 0; delta-- {
|
for delta := pSize - p.buffers2Keep; delta > 0; delta-- {
|
||||||
p.chain <- p.allocator(p)
|
p.chain <- make([]byte, p.bufferSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func allocateSmall(pool *bufferPool) *Buffer {
|
var smallPool = newBufferPool(8*1024, 256, 2048)
|
||||||
b := &Buffer{
|
|
||||||
head: make([]byte, 8*1024),
|
|
||||||
}
|
|
||||||
b.Value = b.head
|
|
||||||
b.pool = pool
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
var smallPool = newBufferPool(allocateSmall, 256, 2048)
|
|
||||||
|
|
||||||
func NewBuffer() *Buffer {
|
func NewBuffer() *Buffer {
|
||||||
return smallPool.allocate()
|
return smallPool.allocate()
|
||||||
|
|
|
@ -23,7 +23,6 @@ func ReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error {
|
||||||
stream <- buffer
|
stream <- buffer
|
||||||
} else {
|
} else {
|
||||||
buffer.Release()
|
buffer.Release()
|
||||||
buffer = nil
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -36,7 +35,6 @@ func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error {
|
||||||
for buffer := range stream {
|
for buffer := range stream {
|
||||||
_, err := writer.Write(buffer.Value)
|
_, err := writer.Write(buffer.Value)
|
||||||
buffer.Release()
|
buffer.Release()
|
||||||
buffer = nil
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,6 @@ func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.Outb
|
||||||
if chunk := firstPacket.Chunk(); chunk != nil {
|
if chunk := firstPacket.Chunk(); chunk != nil {
|
||||||
conn.Write(chunk.Value)
|
conn.Write(chunk.Value)
|
||||||
chunk.Release()
|
chunk.Release()
|
||||||
chunk = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !firstPacket.MoreChunks() {
|
if !firstPacket.MoreChunks() {
|
||||||
|
@ -74,7 +73,6 @@ func dumpOutput(conn net.Conn, output chan<- *alloc.Buffer, finish *sync.Mutex,
|
||||||
output <- response
|
output <- response
|
||||||
} else {
|
} else {
|
||||||
response.Release()
|
response.Release()
|
||||||
response = nil
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
@ -40,22 +40,25 @@ func (server *SocksServer) getUDPAddr() v2net.Address {
|
||||||
func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error {
|
func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error {
|
||||||
for {
|
for {
|
||||||
buffer := alloc.NewBuffer()
|
buffer := alloc.NewBuffer()
|
||||||
defer buffer.Release()
|
|
||||||
nBytes, addr, err := conn.ReadFromUDP(buffer.Value)
|
nBytes, addr, err := conn.ReadFromUDP(buffer.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Socks failed to read UDP packets: %v", err)
|
log.Error("Socks failed to read UDP packets: %v", err)
|
||||||
|
buffer.Release()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
buffer.Slice(0, nBytes)
|
buffer.Slice(0, nBytes)
|
||||||
log.Info("Client UDP connection from %v", addr)
|
log.Info("Client UDP connection from %v", addr)
|
||||||
request, err := protocol.ReadUDPRequest(buffer.Value)
|
request, err := protocol.ReadUDPRequest(buffer.Value)
|
||||||
|
buffer.Release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Socks failed to parse UDP request: %v", err)
|
log.Error("Socks failed to parse UDP request: %v", err)
|
||||||
|
request.Data.Release()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if request.Fragment != 0 {
|
if request.Fragment != 0 {
|
||||||
log.Warning("Dropping framented UDP packets.")
|
log.Warning("Dropping framented UDP packets.")
|
||||||
// TODO handle fragments
|
// TODO handle fragments
|
||||||
|
request.Data.Release()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +82,6 @@ func (server *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet,
|
||||||
udpMessage := response.Bytes(nil)
|
udpMessage := response.Bytes(nil)
|
||||||
nBytes, err := conn.WriteToUDP(udpMessage, clientAddr)
|
nBytes, err := conn.WriteToUDP(udpMessage, clientAddr)
|
||||||
response.Data.Release()
|
response.Data.Release()
|
||||||
response.Data = nil
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Socks failed to write UDP message (%d bytes) to %s: %v", nBytes, clientAddr.String(), err)
|
log.Error("Socks failed to write UDP message (%d bytes) to %s: %v", nBytes, clientAddr.String(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,14 +91,15 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er
|
||||||
}
|
}
|
||||||
|
|
||||||
// Optimize for small response packet
|
// Optimize for small response packet
|
||||||
buffer := make([]byte, 0, 4*1024)
|
buffer := alloc.NewBuffer()
|
||||||
buffer = append(buffer, request.ResponseHeader...)
|
buffer.Clear()
|
||||||
|
buffer.Append(request.ResponseHeader)
|
||||||
|
|
||||||
if data, open := <-output; open {
|
if data, open := <-output; open {
|
||||||
buffer = append(buffer, data.Value...)
|
buffer.Append(data.Value)
|
||||||
data = nil
|
data.Release()
|
||||||
responseWriter.Write(buffer)
|
responseWriter.Write(buffer.Value)
|
||||||
buffer = nil
|
buffer.Release()
|
||||||
go handleOutput(request, responseWriter, output, &writeFinish)
|
go handleOutput(request, responseWriter, output, &writeFinish)
|
||||||
writeFinish.Lock()
|
writeFinish.Lock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ func (handler *VMessInboundHandler) AcceptPackets(conn *net.UDPConn) {
|
||||||
nBytes, err = cryptReader.Read(data.Value)
|
nBytes, err = cryptReader.Read(data.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warning("VMessIn: Unable to decrypt data: %v", err)
|
log.Warning("VMessIn: Unable to decrypt data: %v", err)
|
||||||
|
data.Release()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
data.Slice(0, nBytes)
|
data.Slice(0, nBytes)
|
||||||
|
@ -91,7 +92,6 @@ func (handler *VMessInboundHandler) handlePacket(conn *net.UDPConn, request *pro
|
||||||
hasData = true
|
hasData = true
|
||||||
responseWriter.Write(data.Value)
|
responseWriter.Write(data.Value)
|
||||||
data.Release()
|
data.Release()
|
||||||
data = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if hasData {
|
if hasData {
|
||||||
|
|
|
@ -150,11 +150,9 @@ func handleRequest(conn net.Conn, request *protocol.VMessRequest, firstPacket v2
|
||||||
encryptRequestWriter.Crypt(firstChunk.Value)
|
encryptRequestWriter.Crypt(firstChunk.Value)
|
||||||
requestBytes = append(requestBytes, firstChunk.Value...)
|
requestBytes = append(requestBytes, firstChunk.Value...)
|
||||||
firstChunk.Release()
|
firstChunk.Release()
|
||||||
firstChunk = nil
|
|
||||||
|
|
||||||
_, err = conn.Write(requestBytes)
|
_, err = conn.Write(requestBytes)
|
||||||
buffer.Release()
|
buffer.Release()
|
||||||
buffer = nil
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("VMessOut: Failed to write VMess request: %v", err)
|
log.Error("VMessOut: Failed to write VMess request: %v", err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
"settings": {
|
"settings": {
|
||||||
"vnext": [
|
"vnext": [
|
||||||
{
|
{
|
||||||
"address": "127.0.0.1",
|
"address": "45.78.9.54",
|
||||||
"port": 27183,
|
"port": 27183,
|
||||||
"users": [
|
"users": [
|
||||||
{"id": "ad937d9d-6e23-4a5a-ba23-bce5092a7c51"}
|
{"id": "ad937d9d-6e23-4a5a-ba23-bce5092a7c51"}
|
||||||
|
|
Loading…
Reference in New Issue