mirror of https://github.com/XTLS/Xray-core
				
				
				
			
		
			
				
	
	
		
			259 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Go
		
	
	
package kcp
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/xtls/xray-core/common/buf"
 | 
						|
)
 | 
						|
 | 
						|
type ReceivingWindow struct {
 | 
						|
	cache map[uint32]*DataSegment
 | 
						|
}
 | 
						|
 | 
						|
func NewReceivingWindow() *ReceivingWindow {
 | 
						|
	return &ReceivingWindow{
 | 
						|
		cache: make(map[uint32]*DataSegment),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool {
 | 
						|
	_, f := w.cache[id]
 | 
						|
	if f {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	w.cache[id] = value
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWindow) Has(id uint32) bool {
 | 
						|
	_, f := w.cache[id]
 | 
						|
	return f
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWindow) Remove(id uint32) *DataSegment {
 | 
						|
	v, f := w.cache[id]
 | 
						|
	if !f {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	delete(w.cache, id)
 | 
						|
	return v
 | 
						|
}
 | 
						|
 | 
						|
type AckList struct {
 | 
						|
	writer     SegmentWriter
 | 
						|
	timestamps []uint32
 | 
						|
	numbers    []uint32
 | 
						|
	nextFlush  []uint32
 | 
						|
 | 
						|
	flushCandidates []uint32
 | 
						|
	dirty           bool
 | 
						|
}
 | 
						|
 | 
						|
func NewAckList(writer SegmentWriter) *AckList {
 | 
						|
	return &AckList{
 | 
						|
		writer:          writer,
 | 
						|
		timestamps:      make([]uint32, 0, 128),
 | 
						|
		numbers:         make([]uint32, 0, 128),
 | 
						|
		nextFlush:       make([]uint32, 0, 128),
 | 
						|
		flushCandidates: make([]uint32, 0, 128),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (l *AckList) Add(number uint32, timestamp uint32) {
 | 
						|
	l.timestamps = append(l.timestamps, timestamp)
 | 
						|
	l.numbers = append(l.numbers, number)
 | 
						|
	l.nextFlush = append(l.nextFlush, 0)
 | 
						|
	l.dirty = true
 | 
						|
}
 | 
						|
 | 
						|
func (l *AckList) Clear(una uint32) {
 | 
						|
	count := 0
 | 
						|
	for i := 0; i < len(l.numbers); i++ {
 | 
						|
		if l.numbers[i] < una {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if i != count {
 | 
						|
			l.numbers[count] = l.numbers[i]
 | 
						|
			l.timestamps[count] = l.timestamps[i]
 | 
						|
			l.nextFlush[count] = l.nextFlush[i]
 | 
						|
		}
 | 
						|
		count++
 | 
						|
	}
 | 
						|
	if count < len(l.numbers) {
 | 
						|
		l.numbers = l.numbers[:count]
 | 
						|
		l.timestamps = l.timestamps[:count]
 | 
						|
		l.nextFlush = l.nextFlush[:count]
 | 
						|
		l.dirty = true
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (l *AckList) Flush(current uint32, rto uint32) {
 | 
						|
	l.flushCandidates = l.flushCandidates[:0]
 | 
						|
 | 
						|
	seg := NewAckSegment()
 | 
						|
	for i := 0; i < len(l.numbers); i++ {
 | 
						|
		if l.nextFlush[i] > current {
 | 
						|
			if len(l.flushCandidates) < cap(l.flushCandidates) {
 | 
						|
				l.flushCandidates = append(l.flushCandidates, l.numbers[i])
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		seg.PutNumber(l.numbers[i])
 | 
						|
		seg.PutTimestamp(l.timestamps[i])
 | 
						|
		timeout := rto / 2
 | 
						|
		if timeout < 20 {
 | 
						|
			timeout = 20
 | 
						|
		}
 | 
						|
		l.nextFlush[i] = current + timeout
 | 
						|
 | 
						|
		if seg.IsFull() {
 | 
						|
			l.writer.Write(seg)
 | 
						|
			seg.Release()
 | 
						|
			seg = NewAckSegment()
 | 
						|
			l.dirty = false
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if l.dirty || !seg.IsEmpty() {
 | 
						|
		for _, number := range l.flushCandidates {
 | 
						|
			if seg.IsFull() {
 | 
						|
				break
 | 
						|
			}
 | 
						|
			seg.PutNumber(number)
 | 
						|
		}
 | 
						|
		l.writer.Write(seg)
 | 
						|
		l.dirty = false
 | 
						|
	}
 | 
						|
 | 
						|
	seg.Release()
 | 
						|
}
 | 
						|
 | 
						|
type ReceivingWorker struct {
 | 
						|
	sync.RWMutex
 | 
						|
	conn       *Connection
 | 
						|
	leftOver   buf.MultiBuffer
 | 
						|
	window     *ReceivingWindow
 | 
						|
	acklist    *AckList
 | 
						|
	nextNumber uint32
 | 
						|
	windowSize uint32
 | 
						|
}
 | 
						|
 | 
						|
func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
 | 
						|
	worker := &ReceivingWorker{
 | 
						|
		conn:       kcp,
 | 
						|
		window:     NewReceivingWindow(),
 | 
						|
		windowSize: kcp.Config.GetReceivingInFlightSize(),
 | 
						|
	}
 | 
						|
	worker.acklist = NewAckList(worker)
 | 
						|
	return worker
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) Release() {
 | 
						|
	w.Lock()
 | 
						|
	buf.ReleaseMulti(w.leftOver)
 | 
						|
	w.leftOver = nil
 | 
						|
	w.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) ProcessSendingNext(number uint32) {
 | 
						|
	w.Lock()
 | 
						|
	defer w.Unlock()
 | 
						|
 | 
						|
	w.acklist.Clear(number)
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) {
 | 
						|
	w.Lock()
 | 
						|
	defer w.Unlock()
 | 
						|
 | 
						|
	number := seg.Number
 | 
						|
	idx := number - w.nextNumber
 | 
						|
	if idx >= w.windowSize {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	w.acklist.Clear(seg.SendingNext)
 | 
						|
	w.acklist.Add(number, seg.Timestamp)
 | 
						|
 | 
						|
	if !w.window.Set(seg.Number, seg) {
 | 
						|
		seg.Release()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) ReadMultiBuffer() buf.MultiBuffer {
 | 
						|
	if w.leftOver != nil {
 | 
						|
		mb := w.leftOver
 | 
						|
		w.leftOver = nil
 | 
						|
		return mb
 | 
						|
	}
 | 
						|
 | 
						|
	mb := make(buf.MultiBuffer, 0, 32)
 | 
						|
 | 
						|
	w.Lock()
 | 
						|
	defer w.Unlock()
 | 
						|
	for {
 | 
						|
		seg := w.window.Remove(w.nextNumber)
 | 
						|
		if seg == nil {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		w.nextNumber++
 | 
						|
		mb = append(mb, seg.Detach())
 | 
						|
		seg.Release()
 | 
						|
	}
 | 
						|
 | 
						|
	return mb
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) Read(b []byte) int {
 | 
						|
	mb := w.ReadMultiBuffer()
 | 
						|
	if mb.IsEmpty() {
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	mb, nBytes := buf.SplitBytes(mb, b)
 | 
						|
	if !mb.IsEmpty() {
 | 
						|
		w.leftOver = mb
 | 
						|
	}
 | 
						|
	return nBytes
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) IsDataAvailable() bool {
 | 
						|
	w.RLock()
 | 
						|
	defer w.RUnlock()
 | 
						|
	return w.window.Has(w.nextNumber)
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) NextNumber() uint32 {
 | 
						|
	w.RLock()
 | 
						|
	defer w.RUnlock()
 | 
						|
 | 
						|
	return w.nextNumber
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) Flush(current uint32) {
 | 
						|
	w.Lock()
 | 
						|
	defer w.Unlock()
 | 
						|
 | 
						|
	w.acklist.Flush(current, w.conn.roundTrip.Timeout())
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) Write(seg Segment) error {
 | 
						|
	ackSeg := seg.(*AckSegment)
 | 
						|
	ackSeg.Conv = w.conn.meta.Conversation
 | 
						|
	ackSeg.ReceivingNext = w.nextNumber
 | 
						|
	ackSeg.ReceivingWindow = w.nextNumber + w.windowSize
 | 
						|
	ackSeg.Option = 0
 | 
						|
	if w.conn.State() == StateReadyToClose {
 | 
						|
		ackSeg.Option = SegmentOptionClose
 | 
						|
	}
 | 
						|
	return w.conn.output.Write(ackSeg)
 | 
						|
}
 | 
						|
 | 
						|
func (*ReceivingWorker) CloseRead() {
 | 
						|
}
 | 
						|
 | 
						|
func (w *ReceivingWorker) UpdateNecessary() bool {
 | 
						|
	w.RLock()
 | 
						|
	defer w.RUnlock()
 | 
						|
 | 
						|
	return len(w.acklist.numbers) > 0
 | 
						|
}
 |