mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
258 lines
4.8 KiB
258 lines
4.8 KiB
package kcp |
|
|
|
import ( |
|
"sync" |
|
|
|
"github.com/xtls/xray-core/common/buf" |
|
) |
|
|
|
type ReceivingWindow struct { |
|
cache map[uint32]*DataSegment |
|
} |
|
|
|
func NewReceivingWindow() *ReceivingWindow { |
|
return &ReceivingWindow{ |
|
cache: make(map[uint32]*DataSegment), |
|
} |
|
} |
|
|
|
func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool { |
|
_, f := w.cache[id] |
|
if f { |
|
return false |
|
} |
|
w.cache[id] = value |
|
return true |
|
} |
|
|
|
func (w *ReceivingWindow) Has(id uint32) bool { |
|
_, f := w.cache[id] |
|
return f |
|
} |
|
|
|
func (w *ReceivingWindow) Remove(id uint32) *DataSegment { |
|
v, f := w.cache[id] |
|
if !f { |
|
return nil |
|
} |
|
delete(w.cache, id) |
|
return v |
|
} |
|
|
|
type AckList struct { |
|
writer SegmentWriter |
|
timestamps []uint32 |
|
numbers []uint32 |
|
nextFlush []uint32 |
|
|
|
flushCandidates []uint32 |
|
dirty bool |
|
} |
|
|
|
func NewAckList(writer SegmentWriter) *AckList { |
|
return &AckList{ |
|
writer: writer, |
|
timestamps: make([]uint32, 0, 128), |
|
numbers: make([]uint32, 0, 128), |
|
nextFlush: make([]uint32, 0, 128), |
|
flushCandidates: make([]uint32, 0, 128), |
|
} |
|
} |
|
|
|
func (l *AckList) Add(number uint32, timestamp uint32) { |
|
l.timestamps = append(l.timestamps, timestamp) |
|
l.numbers = append(l.numbers, number) |
|
l.nextFlush = append(l.nextFlush, 0) |
|
l.dirty = true |
|
} |
|
|
|
func (l *AckList) Clear(una uint32) { |
|
count := 0 |
|
for i := 0; i < len(l.numbers); i++ { |
|
if l.numbers[i] < una { |
|
continue |
|
} |
|
if i != count { |
|
l.numbers[count] = l.numbers[i] |
|
l.timestamps[count] = l.timestamps[i] |
|
l.nextFlush[count] = l.nextFlush[i] |
|
} |
|
count++ |
|
} |
|
if count < len(l.numbers) { |
|
l.numbers = l.numbers[:count] |
|
l.timestamps = l.timestamps[:count] |
|
l.nextFlush = l.nextFlush[:count] |
|
l.dirty = true |
|
} |
|
} |
|
|
|
func (l *AckList) Flush(current uint32, rto uint32) { |
|
l.flushCandidates = l.flushCandidates[:0] |
|
|
|
seg := NewAckSegment() |
|
for i := 0; i < len(l.numbers); i++ { |
|
if l.nextFlush[i] > current { |
|
if len(l.flushCandidates) < cap(l.flushCandidates) { |
|
l.flushCandidates = append(l.flushCandidates, l.numbers[i]) |
|
} |
|
continue |
|
} |
|
seg.PutNumber(l.numbers[i]) |
|
seg.PutTimestamp(l.timestamps[i]) |
|
timeout := rto / 2 |
|
if timeout < 20 { |
|
timeout = 20 |
|
} |
|
l.nextFlush[i] = current + timeout |
|
|
|
if seg.IsFull() { |
|
l.writer.Write(seg) |
|
seg.Release() |
|
seg = NewAckSegment() |
|
l.dirty = false |
|
} |
|
} |
|
|
|
if l.dirty || !seg.IsEmpty() { |
|
for _, number := range l.flushCandidates { |
|
if seg.IsFull() { |
|
break |
|
} |
|
seg.PutNumber(number) |
|
} |
|
l.writer.Write(seg) |
|
l.dirty = false |
|
} |
|
|
|
seg.Release() |
|
} |
|
|
|
type ReceivingWorker struct { |
|
sync.RWMutex |
|
conn *Connection |
|
leftOver buf.MultiBuffer |
|
window *ReceivingWindow |
|
acklist *AckList |
|
nextNumber uint32 |
|
windowSize uint32 |
|
} |
|
|
|
func NewReceivingWorker(kcp *Connection) *ReceivingWorker { |
|
worker := &ReceivingWorker{ |
|
conn: kcp, |
|
window: NewReceivingWindow(), |
|
windowSize: kcp.Config.GetReceivingInFlightSize(), |
|
} |
|
worker.acklist = NewAckList(worker) |
|
return worker |
|
} |
|
|
|
func (w *ReceivingWorker) Release() { |
|
w.Lock() |
|
buf.ReleaseMulti(w.leftOver) |
|
w.leftOver = nil |
|
w.Unlock() |
|
} |
|
|
|
func (w *ReceivingWorker) ProcessSendingNext(number uint32) { |
|
w.Lock() |
|
defer w.Unlock() |
|
|
|
w.acklist.Clear(number) |
|
} |
|
|
|
func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) { |
|
w.Lock() |
|
defer w.Unlock() |
|
|
|
number := seg.Number |
|
idx := number - w.nextNumber |
|
if idx >= w.windowSize { |
|
return |
|
} |
|
w.acklist.Clear(seg.SendingNext) |
|
w.acklist.Add(number, seg.Timestamp) |
|
|
|
if !w.window.Set(seg.Number, seg) { |
|
seg.Release() |
|
} |
|
} |
|
|
|
func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer { |
|
if w.leftOver != nil { |
|
mb := w.leftOver |
|
w.leftOver = nil |
|
return mb |
|
} |
|
|
|
mb := make(buf.MultiBuffer, 0, 32) |
|
|
|
w.Lock() |
|
defer w.Unlock() |
|
for { |
|
seg := w.window.Remove(w.nextNumber) |
|
if seg == nil { |
|
break |
|
} |
|
w.nextNumber++ |
|
mb = append(mb, seg.Detach()) |
|
seg.Release() |
|
} |
|
|
|
return mb |
|
} |
|
|
|
func (w *ReceivingWorker) Read(b []byte) int { |
|
mb := w.ReadMultiBuffer() |
|
if mb.IsEmpty() { |
|
return 0 |
|
} |
|
mb, nBytes := buf.SplitBytes(mb, b) |
|
if !mb.IsEmpty() { |
|
w.leftOver = mb |
|
} |
|
return nBytes |
|
} |
|
|
|
func (w *ReceivingWorker) IsDataAvailable() bool { |
|
w.RLock() |
|
defer w.RUnlock() |
|
return w.window.Has(w.nextNumber) |
|
} |
|
|
|
func (w *ReceivingWorker) NextNumber() uint32 { |
|
w.RLock() |
|
defer w.RUnlock() |
|
|
|
return w.nextNumber |
|
} |
|
|
|
func (w *ReceivingWorker) Flush(current uint32) { |
|
w.Lock() |
|
defer w.Unlock() |
|
|
|
w.acklist.Flush(current, w.conn.roundTrip.Timeout()) |
|
} |
|
|
|
func (w *ReceivingWorker) Write(seg Segment) error { |
|
ackSeg := seg.(*AckSegment) |
|
ackSeg.Conv = w.conn.meta.Conversation |
|
ackSeg.ReceivingNext = w.nextNumber |
|
ackSeg.ReceivingWindow = w.nextNumber + w.windowSize |
|
ackSeg.Option = 0 |
|
if w.conn.State() == StateReadyToClose { |
|
ackSeg.Option = SegmentOptionClose |
|
} |
|
return w.conn.output.Write(ackSeg) |
|
} |
|
|
|
func (*ReceivingWorker) CloseRead() { |
|
} |
|
|
|
func (w *ReceivingWorker) UpdateNecessary() bool { |
|
w.RLock() |
|
defer w.RUnlock() |
|
|
|
return len(w.acklist.numbers) > 0 |
|
}
|
|
|