diff --git a/transport/internet/kcp/sending.go b/transport/internet/kcp/sending.go index 6cefe6db..0a877377 100644 --- a/transport/internet/kcp/sending.go +++ b/transport/internet/kcp/sending.go @@ -1,6 +1,7 @@ package kcp import ( + "container/list" "sync" "v2ray.com/core/common" @@ -8,31 +9,15 @@ import ( ) type SendingWindow struct { - start uint32 - cap uint32 - len uint32 - last uint32 - - data []DataSegment - inuse []bool - prev []uint32 - next []uint32 - + cache *list.List totalInFlightSize uint32 writer SegmentWriter onPacketLoss func(uint32) } -func NewSendingWindow(size uint32, writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow { +func NewSendingWindow(writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow { window := &SendingWindow{ - start: 0, - cap: size, - len: 0, - last: 0, - data: make([]DataSegment, size), - prev: make([]uint32, size), - next: make([]uint32, size), - inuse: make([]bool, size), + cache: list.New(), writer: writer, onPacketLoss: onPacketLoss, } @@ -43,82 +28,42 @@ func (sw *SendingWindow) Release() { if sw == nil { return } - sw.len = 0 - for _, seg := range sw.data { + for sw.cache.Len() > 0 { + seg := sw.cache.Front().Value.(*DataSegment) seg.Release() + sw.cache.Remove(sw.cache.Front()) } } -func (sw *SendingWindow) Len() int { - return int(sw.len) +func (sw *SendingWindow) Len() uint32 { + return uint32(sw.cache.Len()) } func (sw *SendingWindow) IsEmpty() bool { - return sw.len == 0 -} - -func (sw *SendingWindow) Size() uint32 { - return sw.cap -} - -func (sw *SendingWindow) IsFull() bool { - return sw.len == sw.cap + return sw.cache.Len() == 0 } func (sw *SendingWindow) Push(number uint32) *buf.Buffer { - pos := (sw.start + sw.len) % sw.cap - sw.data[pos].Number = number - sw.data[pos].timeout = 0 - sw.data[pos].transmit = 0 - sw.inuse[pos] = true - if sw.len > 0 { - sw.next[sw.last] = pos - sw.prev[pos] = sw.last - } - sw.last = pos - sw.len++ - return sw.data[pos].Data() + seg := NewDataSegment() + seg.Number = number + + sw.cache.PushBack(seg) + return seg.Data() } func (sw *SendingWindow) FirstNumber() uint32 { - return sw.data[sw.start].Number + return sw.cache.Front().Value.(*DataSegment).Number } func (sw *SendingWindow) Clear(una uint32) { - for !sw.IsEmpty() && sw.data[sw.start].Number < una { - sw.Remove(0) - } -} - -func (sw *SendingWindow) Remove(idx uint32) bool { - if sw.IsEmpty() { - return false - } - - pos := (sw.start + idx) % sw.cap - if !sw.inuse[pos] { - return false - } - sw.inuse[pos] = false - sw.totalInFlightSize-- - if pos == sw.start && pos == sw.last { - sw.len = 0 - sw.start = 0 - sw.last = 0 - } else if pos == sw.start { - delta := sw.next[pos] - sw.start - if sw.next[pos] < sw.start { - delta = sw.next[pos] + sw.cap - sw.start + for !sw.IsEmpty() { + seg := sw.cache.Front().Value.(*DataSegment) + if seg.Number >= una { + break } - sw.start = sw.next[pos] - sw.len -= delta - } else if pos == sw.last { - sw.last = sw.prev[pos] - } else { - sw.next[sw.prev[pos]] = sw.next[pos] - sw.prev[sw.next[pos]] = sw.prev[pos] + seg.Release() + sw.cache.Remove(sw.cache.Front()) } - return true } func (sw *SendingWindow) HandleFastAck(number uint32, rto uint32) { @@ -143,8 +88,9 @@ func (sw *SendingWindow) Visit(visitor func(seg *DataSegment) bool) { return } - for i := sw.start; ; i = sw.next[i] { - if !visitor(&sw.data[i]) || i == sw.last { + for e := sw.cache.Front(); e != nil; e = e.Next() { + seg := e.Value.(*DataSegment) + if !visitor(seg) { break } } @@ -186,6 +132,23 @@ func (sw *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint3 } } +func (sw *SendingWindow) Remove(number uint32) bool { + if sw.IsEmpty() { + return false + } + + for e := sw.cache.Front(); e != nil; e = e.Next() { + seg := e.Value.(*DataSegment) + if seg.Number > number { + return false + } else if seg.Number == number { + return true + } + } + + return false +} + type SendingWorker struct { sync.RWMutex conn *Connection @@ -195,6 +158,7 @@ type SendingWorker struct { remoteNextNumber uint32 controlWindow uint32 fastResend uint32 + windowSize uint32 firstUnacknowledgedUpdated bool closed bool } @@ -205,8 +169,9 @@ func NewSendingWorker(kcp *Connection) *SendingWorker { fastResend: 2, remoteNextNumber: 32, controlWindow: kcp.Config.GetSendingInFlightSize(), + windowSize: kcp.Config.GetSendingBufferSize(), } - worker.window = NewSendingWindow(kcp.Config.GetSendingBufferSize(), worker, worker.OnPacketLoss) + worker.window = NewSendingWindow(worker, worker.OnPacketLoss) return worker } @@ -247,7 +212,7 @@ func (w *SendingWorker) processAck(number uint32) bool { return false } - removed := w.window.Remove(number - w.firstUnacknowledged) + removed := w.window.Remove(number) if removed { w.FindFirstUnacknowledged() } @@ -299,7 +264,7 @@ func (w *SendingWorker) Push(f buf.Supplier) bool { return false } - if w.window.IsFull() { + if w.window.Len() > w.windowSize { return false } diff --git a/transport/internet/kcp/sending_test.go b/transport/internet/kcp/sending_test.go deleted file mode 100644 index 1c9013dd..00000000 --- a/transport/internet/kcp/sending_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package kcp_test - -import ( - "testing" - - . "v2ray.com/core/transport/internet/kcp" - . "v2ray.com/ext/assert" -) - -func TestSendingWindow(t *testing.T) { - assert := With(t) - - window := NewSendingWindow(5, nil, nil) - window.Push(0) - window.Push(1) - window.Push(2) - assert(window.Len(), Equals, 3) - - window.Remove(1) - assert(window.Len(), Equals, 3) - assert(window.FirstNumber(), Equals, uint32(0)) - - window.Remove(0) - assert(window.Len(), Equals, 1) - assert(window.FirstNumber(), Equals, uint32(2)) - - window.Remove(0) - assert(window.Len(), Equals, 0) - - window.Push(4) - assert(window.Len(), Equals, 1) - assert(window.FirstNumber(), Equals, uint32(4)) - - window.Push(5) - assert(window.Len(), Equals, 2) - - window.Remove(1) - assert(window.Len(), Equals, 2) - - window.Remove(0) - assert(window.Len(), Equals, 0) -}