From 13e83c17a58f24e3c2e2260c741c272d5fa033f7 Mon Sep 17 00:00:00 2001 From: v2ray Date: Tue, 5 Jul 2016 10:28:23 +0200 Subject: [PATCH] apply sync.Pool to segments --- transport/internet/kcp/kcp.go | 11 +++--- transport/internet/kcp/receiving.go | 2 +- transport/internet/kcp/segment.go | 52 +++++++++++++++++++++++++++-- transport/internet/kcp/sending.go | 5 ++- 4 files changed, 58 insertions(+), 12 deletions(-) diff --git a/transport/internet/kcp/kcp.go b/transport/internet/kcp/kcp.go index 5673dc8e..4c0963fb 100644 --- a/transport/internet/kcp/kcp.go +++ b/transport/internet/kcp/kcp.go @@ -204,12 +204,11 @@ func (kcp *KCP) flush() { kcp.sendingWorker.Flush() if kcp.sendingWorker.PingNecessary() || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 { - seg := &CmdOnlySegment{ - Conv: kcp.conv, - Cmd: SegmentCommandPing, - ReceivinNext: kcp.receivingWorker.nextNumber, - SendingNext: kcp.sendingWorker.firstUnacknowledged, - } + seg := NewCmdOnlySegment() + seg.Conv = kcp.conv + seg.Cmd = SegmentCommandPing + seg.ReceivinNext = kcp.receivingWorker.nextNumber + seg.SendingNext = kcp.sendingWorker.firstUnacknowledged if kcp.state == StateReadyToClose { seg.Opt = SegmentOptionClose } diff --git a/transport/internet/kcp/receiving.go b/transport/internet/kcp/receiving.go index 4e08c4df..b281441a 100644 --- a/transport/internet/kcp/receiving.go +++ b/transport/internet/kcp/receiving.go @@ -200,7 +200,7 @@ func (this *AckList) Clear(una uint32) { } func (this *AckList) Flush(current uint32, rto uint32) { - seg := new(AckSegment) + seg := NewAckSegment() this.Lock() for i := 0; i < len(this.numbers); i++ { if this.nextFlush[i] <= current { diff --git a/transport/internet/kcp/segment.go b/transport/internet/kcp/segment.go index 3c5c8144..1cbbb18a 100644 --- a/transport/internet/kcp/segment.go +++ b/transport/internet/kcp/segment.go @@ -1,12 +1,26 @@ package kcp import ( + "sync" + "github.com/v2ray/v2ray-core/common" "github.com/v2ray/v2ray-core/common/alloc" _ "github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/serial" ) +var ( + dataSegmentPool = &sync.Pool{ + New: func() interface{} { return new(DataSegment) }, + } + ackSegmentPool = &sync.Pool{ + New: func() interface{} { return new(AckSegment) }, + } + cmdSegmentPool = &sync.Pool{ + New: func() interface{} { return new(CmdOnlySegment) }, + } +) + type SegmentCommand byte const ( @@ -45,6 +59,10 @@ type DataSegment struct { transmit uint32 } +func NewDataSegment() *DataSegment { + return dataSegmentPool.Get().(*DataSegment) +} + func (this *DataSegment) Bytes(b []byte) []byte { b = serial.Uint16ToBytes(this.Conv, b) b = append(b, byte(SegmentCommandData), byte(this.Opt)) @@ -62,6 +80,12 @@ func (this *DataSegment) ByteSize() int { func (this *DataSegment) Release() { this.Data.Release() + this.Data = nil + this.Opt = 0 + this.timeout = 0 + this.ackSkipped = 0 + this.transmit = 0 + dataSegmentPool.Put(this) } type AckSegment struct { @@ -74,6 +98,17 @@ type AckSegment struct { TimestampList []uint32 } +func NewAckSegment() *AckSegment { + seg := ackSegmentPool.Get().(*AckSegment) + if seg.NumberList == nil { + seg.NumberList = make([]uint32, 0, 128) + } + if seg.TimestampList == nil { + seg.TimestampList = make([]uint32, 0, 128) + } + return seg +} + func (this *AckSegment) ByteSize() int { return 2 + 1 + 1 + 4 + 4 + 1 + int(this.Count)*4 + int(this.Count)*4 } @@ -91,7 +126,13 @@ func (this *AckSegment) Bytes(b []byte) []byte { return b } -func (this *AckSegment) Release() {} +func (this *AckSegment) Release() { + this.Opt = 0 + this.Count = 0 + this.NumberList = this.NumberList[:0] + this.TimestampList = this.TimestampList[:0] + ackSegmentPool.Put(this) +} type CmdOnlySegment struct { Conv uint16 @@ -101,6 +142,10 @@ type CmdOnlySegment struct { ReceivinNext uint32 } +func NewCmdOnlySegment() *CmdOnlySegment { + return cmdSegmentPool.Get().(*CmdOnlySegment) +} + func (this *CmdOnlySegment) ByteSize() int { return 2 + 1 + 1 + 4 + 4 } @@ -113,7 +158,10 @@ func (this *CmdOnlySegment) Bytes(b []byte) []byte { return b } -func (this *CmdOnlySegment) Release() {} +func (this *CmdOnlySegment) Release() { + this.Opt = 0 + cmdSegmentPool.Put(this) +} func ReadSegment(buf []byte) (Segment, []byte) { if len(buf) <= 4 { diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index 0ff2f19c..5c1c7fa5 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -313,9 +313,8 @@ func (this *SendingWorker) Push(b []byte) int { } else { size = len(b) } - seg := &DataSegment{ - Data: alloc.NewSmallBuffer().Clear().Append(b[:size]), - } + seg := NewDataSegment() + seg.Data = alloc.NewSmallBuffer().Clear().Append(b[:size]) this.Lock() this.queue.Push(seg) this.Unlock()