You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
v2ray-core/transport/internet/kcp/receiving.go

260 lines
4.8 KiB

package kcp
9 years ago
import (
"sync"
6 years ago
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
9 years ago
)
type ReceivingWindow struct {
cache map[uint32]*DataSegment
}
func NewReceivingWindow() *ReceivingWindow {
return &ReceivingWindow{
cache: make(map[uint32]*DataSegment),
}
}
func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {
_, f := w.cache[id]
if f {
return false
}
w.cache[id] = value
return true
}
func (w *ReceivingWindow) Has(id uint32) bool {
_, f := w.cache[id]
return f
}
func (w *ReceivingWindow) Remove(id uint32) *DataSegment {
v, f := w.cache[id]
if !f {
return nil
}
delete(w.cache, id)
return v
}
9 years ago
9 years ago
type AckList struct {
9 years ago
writer SegmentWriter
9 years ago
timestamps []uint32
numbers []uint32
nextFlush []uint32
flushCandidates []uint32
dirty bool
}
func NewAckList(writer SegmentWriter) *AckList {
9 years ago
return &AckList{
writer: writer,
timestamps: make([]uint32, 0, 128),
numbers: make([]uint32, 0, 128),
nextFlush: make([]uint32, 0, 128),
flushCandidates: make([]uint32, 0, 128),
}
9 years ago
}
7 years ago
func (l *AckList) Add(number uint32, timestamp uint32) {
l.timestamps = append(l.timestamps, timestamp)
l.numbers = append(l.numbers, number)
l.nextFlush = append(l.nextFlush, 0)
l.dirty = true
9 years ago
}
7 years ago
func (l *AckList) Clear(una uint32) {
9 years ago
count := 0
7 years ago
for i := 0; i < len(l.numbers); i++ {
if l.numbers[i] < una {
continue
9 years ago
}
if i != count {
7 years ago
l.numbers[count] = l.numbers[i]
l.timestamps[count] = l.timestamps[i]
l.nextFlush[count] = l.nextFlush[i]
}
count++
9 years ago
}
7 years ago
if count < len(l.numbers) {
l.numbers = l.numbers[:count]
l.timestamps = l.timestamps[:count]
l.nextFlush = l.nextFlush[:count]
l.dirty = true
}
9 years ago
}
7 years ago
func (l *AckList) Flush(current uint32, rto uint32) {
l.flushCandidates = l.flushCandidates[:0]
seg := NewAckSegment()
7 years ago
for i := 0; i < len(l.numbers); i++ {
if l.nextFlush[i] > current {
if len(l.flushCandidates) < cap(l.flushCandidates) {
l.flushCandidates = append(l.flushCandidates, l.numbers[i])
}
continue
}
7 years ago
seg.PutNumber(l.numbers[i])
seg.PutTimestamp(l.timestamps[i])
timeout := rto / 2
if timeout < 20 {
timeout = 20
}
7 years ago
l.nextFlush[i] = current + timeout
if seg.IsFull() {
7 years ago
l.writer.Write(seg)
seg.Release()
seg = NewAckSegment()
7 years ago
l.dirty = false
}
9 years ago
}
7 years ago
if l.dirty || !seg.IsEmpty() {
for _, number := range l.flushCandidates {
if seg.IsFull() {
break
}
seg.PutNumber(number)
}
7 years ago
l.writer.Write(seg)
l.dirty = false
}
seg.Release()
9 years ago
}
type ReceivingWorker struct {
sync.RWMutex
conn *Connection
leftOver buf.MultiBuffer
window *ReceivingWindow
acklist *AckList
nextNumber uint32
windowSize uint32
9 years ago
}
func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
9 years ago
worker := &ReceivingWorker{
conn: kcp,
window: NewReceivingWindow(),
windowSize: kcp.Config.GetReceivingInFlightSize(),
9 years ago
}
worker.acklist = NewAckList(worker)
9 years ago
return worker
}
7 years ago
func (w *ReceivingWorker) Release() {
w.Lock()
w.leftOver.Release()
w.Unlock()
}
7 years ago
func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
w.Lock()
defer w.Unlock()
7 years ago
w.acklist.Clear(number)
9 years ago
}
7 years ago
func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
w.Lock()
defer w.Unlock()
9 years ago
number := seg.Number
7 years ago
idx := number - w.nextNumber
if idx >= w.windowSize {
9 years ago
return
}
7 years ago
w.acklist.Clear(seg.SendingNext)
w.acklist.Add(number, seg.Timestamp)
9 years ago
if !w.window.Set(seg.Number, seg) {
9 years ago
seg.Release()
}
}
7 years ago
func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
if w.leftOver != nil {
mb := w.leftOver
w.leftOver = nil
return mb
}
9 years ago
mb := buf.NewMultiBufferCap(32)
7 years ago
w.Lock()
defer w.Unlock()
for {
seg := w.window.Remove(w.nextNumber)
9 years ago
if seg == nil {
break
}
7 years ago
w.nextNumber++
mb.Append(seg.Detach())
seg.Release()
}
return mb
}
7 years ago
func (w *ReceivingWorker) Read(b []byte) int {
mb := w.ReadMultiBuffer()
6 years ago
if mb.IsEmpty() {
return 0
}
6 years ago
nBytes, err := mb.Read(b)
common.Must(err)
if !mb.IsEmpty() {
7 years ago
w.leftOver = mb
}
return nBytes
9 years ago
}
func (w *ReceivingWorker) IsDataAvailable() bool {
w.RLock()
defer w.RUnlock()
return w.window.Has(w.nextNumber)
}
func (w *ReceivingWorker) NextNumber() uint32 {
w.RLock()
defer w.RUnlock()
return w.nextNumber
}
7 years ago
func (w *ReceivingWorker) Flush(current uint32) {
w.Lock()
defer w.Unlock()
7 years ago
w.acklist.Flush(current, w.conn.roundTrip.Timeout())
9 years ago
}
7 years ago
func (w *ReceivingWorker) Write(seg Segment) error {
ackSeg := seg.(*AckSegment)
ackSeg.Conv = w.conn.meta.Conversation
7 years ago
ackSeg.ReceivingNext = w.nextNumber
ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
ackSeg.Option = 0
7 years ago
if w.conn.State() == StateReadyToClose {
8 years ago
ackSeg.Option = SegmentOptionClose
9 years ago
}
7 years ago
return w.conn.output.Write(ackSeg)
9 years ago
}
7 years ago
func (*ReceivingWorker) CloseRead() {
9 years ago
}
7 years ago
func (w *ReceivingWorker) UpdateNecessary() bool {
w.RLock()
defer w.RUnlock()
7 years ago
return len(w.acklist.numbers) > 0
}