v2ray-core/transport/internet/kcp/receiving.go

277 lines
5.2 KiB
Go
Raw Normal View History

2016-06-25 19:35:18 +00:00
package kcp
2016-06-30 12:51:49 +00:00
import (
"sync"
2016-12-09 10:35:27 +00:00
"v2ray.com/core/common/buf"
2016-06-30 12:51:49 +00:00
)
2016-06-25 19:35:18 +00:00
type ReceivingWindow struct {
start uint32
size uint32
2016-06-29 08:34:34 +00:00
list []*DataSegment
2016-06-25 19:35:18 +00:00
}
func NewReceivingWindow(size uint32) *ReceivingWindow {
return &ReceivingWindow{
start: 0,
size: size,
2016-06-29 08:34:34 +00:00
list: make([]*DataSegment, size),
2016-06-25 19:35:18 +00:00
}
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWindow) Size() uint32 {
return v.size
2016-06-25 19:35:18 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWindow) Position(idx uint32) uint32 {
return (idx + v.start) % v.size
2016-06-25 19:35:18 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
pos := v.Position(idx)
if v.list[pos] != nil {
2016-06-25 19:35:18 +00:00
return false
}
2016-11-27 20:39:09 +00:00
v.list[pos] = value
2016-06-25 19:35:18 +00:00
return true
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWindow) Remove(idx uint32) *DataSegment {
pos := v.Position(idx)
e := v.list[pos]
v.list[pos] = nil
2016-06-25 19:35:18 +00:00
return e
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWindow) RemoveFirst() *DataSegment {
return v.Remove(0)
2016-06-25 19:35:18 +00:00
}
2017-02-17 23:04:25 +00:00
func (w *ReceivingWindow) HasFirst() bool {
return w.list[w.Position(0)] != nil
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWindow) Advance() {
v.start++
if v.start == v.size {
v.start = 0
2016-06-25 19:35:18 +00:00
}
}
2016-06-27 20:34:46 +00:00
2016-07-02 09:33:34 +00:00
type AckList struct {
2016-07-02 19:26:50 +00:00
writer SegmentWriter
2016-06-27 20:34:46 +00:00
timestamps []uint32
numbers []uint32
2016-06-30 20:19:30 +00:00
nextFlush []uint32
2016-11-28 21:06:32 +00:00
flushCandidates []uint32
2016-12-02 15:49:33 +00:00
dirty bool
2016-06-30 20:19:30 +00:00
}
2016-07-02 21:18:12 +00:00
func NewAckList(writer SegmentWriter) *AckList {
2016-07-02 09:33:34 +00:00
return &AckList{
2016-11-28 21:06:32 +00:00
writer: writer,
2016-12-02 15:49:33 +00:00
timestamps: make([]uint32, 0, 128),
numbers: make([]uint32, 0, 128),
nextFlush: make([]uint32, 0, 128),
2016-11-28 21:06:32 +00:00
flushCandidates: make([]uint32, 0, 128),
2016-06-30 20:19:30 +00:00
}
2016-06-27 20:34:46 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *AckList) Add(number uint32, timestamp uint32) {
v.timestamps = append(v.timestamps, timestamp)
v.numbers = append(v.numbers, number)
v.nextFlush = append(v.nextFlush, 0)
2016-12-02 15:49:33 +00:00
v.dirty = true
2016-06-27 20:34:46 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *AckList) Clear(una uint32) {
2016-06-27 20:34:46 +00:00
count := 0
2016-11-27 20:39:09 +00:00
for i := 0; i < len(v.numbers); i++ {
if v.numbers[i] < una {
2016-11-18 15:19:13 +00:00
continue
2016-06-27 20:34:46 +00:00
}
2016-11-18 15:19:13 +00:00
if i != count {
2016-11-27 20:39:09 +00:00
v.numbers[count] = v.numbers[i]
v.timestamps[count] = v.timestamps[i]
v.nextFlush[count] = v.nextFlush[i]
2016-11-18 15:19:13 +00:00
}
count++
2016-06-27 20:34:46 +00:00
}
2016-11-27 20:39:09 +00:00
if count < len(v.numbers) {
v.numbers = v.numbers[:count]
v.timestamps = v.timestamps[:count]
v.nextFlush = v.nextFlush[:count]
2016-12-02 15:49:33 +00:00
v.dirty = true
2016-06-29 21:41:04 +00:00
}
2016-06-27 20:34:46 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *AckList) Flush(current uint32, rto uint32) {
2016-11-28 21:06:32 +00:00
v.flushCandidates = v.flushCandidates[:0]
2016-07-05 08:28:23 +00:00
seg := NewAckSegment()
2016-11-29 07:31:19 +00:00
for i := 0; i < len(v.numbers); i++ {
2016-11-27 20:39:09 +00:00
if v.nextFlush[i] > current {
2016-11-28 21:06:32 +00:00
if len(v.flushCandidates) < cap(v.flushCandidates) {
v.flushCandidates = append(v.flushCandidates, v.numbers[i])
}
2016-11-18 15:19:13 +00:00
continue
}
2016-11-27 20:39:09 +00:00
seg.PutNumber(v.numbers[i])
seg.PutTimestamp(v.timestamps[i])
2016-11-29 16:12:09 +00:00
timeout := rto / 2
2016-11-18 15:19:13 +00:00
if timeout < 20 {
timeout = 20
2016-06-30 20:19:30 +00:00
}
2016-11-27 20:39:09 +00:00
v.nextFlush[i] = current + timeout
2016-11-29 07:31:19 +00:00
if seg.IsFull() {
v.writer.Write(seg)
seg.Release()
seg = NewAckSegment()
2016-12-02 15:49:33 +00:00
v.dirty = false
2016-11-29 07:31:19 +00:00
}
2016-06-27 20:34:46 +00:00
}
2016-12-21 14:37:16 +00:00
if v.dirty || !seg.IsEmpty() {
2016-11-28 21:06:32 +00:00
for _, number := range v.flushCandidates {
if seg.IsFull() {
break
}
seg.PutNumber(number)
}
2016-11-27 20:39:09 +00:00
v.writer.Write(seg)
2016-07-05 12:08:08 +00:00
seg.Release()
2016-12-02 15:49:33 +00:00
v.dirty = false
2016-06-30 20:19:30 +00:00
}
2016-07-02 19:26:50 +00:00
}
type ReceivingWorker struct {
2016-07-12 15:56:36 +00:00
sync.RWMutex
2016-07-06 15:34:38 +00:00
conn *Connection
2016-12-09 10:35:27 +00:00
leftOver *buf.Buffer
2016-07-06 15:34:38 +00:00
window *ReceivingWindow
acklist *AckList
nextNumber uint32
windowSize uint32
2016-07-02 19:26:50 +00:00
}
2016-07-05 21:02:52 +00:00
func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
2016-07-02 19:26:50 +00:00
worker := &ReceivingWorker{
2016-07-05 21:02:52 +00:00
conn: kcp,
2016-10-02 21:43:58 +00:00
window: NewReceivingWindow(kcp.Config.GetReceivingBufferSize()),
windowSize: kcp.Config.GetReceivingInFlightSize(),
2016-07-02 19:26:50 +00:00
}
2016-07-02 21:18:12 +00:00
worker.acklist = NewAckList(worker)
2016-07-02 19:26:50 +00:00
return worker
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWorker) Release() {
2017-02-17 23:04:25 +00:00
v.Lock()
2016-11-27 20:39:09 +00:00
v.leftOver.Release()
2017-02-17 23:04:25 +00:00
v.Unlock()
2016-11-21 21:41:12 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWorker) ProcessSendingNext(number uint32) {
v.Lock()
defer v.Unlock()
2016-07-06 15:34:38 +00:00
2016-11-27 20:39:09 +00:00
v.acklist.Clear(number)
2016-07-02 19:26:50 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWorker) ProcessSegment(seg *DataSegment) {
v.Lock()
defer v.Unlock()
2016-07-06 15:34:38 +00:00
2016-07-02 19:26:50 +00:00
number := seg.Number
2016-11-27 20:39:09 +00:00
idx := number - v.nextNumber
if idx >= v.windowSize {
2016-07-02 19:26:50 +00:00
return
}
2016-11-27 20:39:09 +00:00
v.acklist.Clear(seg.SendingNext)
v.acklist.Add(number, seg.Timestamp)
2016-07-02 19:26:50 +00:00
2016-11-27 20:39:09 +00:00
if !v.window.Set(idx, seg) {
2016-07-02 19:26:50 +00:00
seg.Release()
}
2016-08-24 21:51:53 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWorker) Read(b []byte) int {
v.Lock()
defer v.Unlock()
2016-08-24 21:51:53 +00:00
total := 0
2016-11-27 20:39:09 +00:00
if v.leftOver != nil {
2016-12-06 10:03:42 +00:00
nBytes := copy(b, v.leftOver.Bytes())
2016-11-27 20:39:09 +00:00
if nBytes < v.leftOver.Len() {
v.leftOver.SliceFrom(nBytes)
2016-08-24 21:51:53 +00:00
return nBytes
}
2016-11-27 20:39:09 +00:00
v.leftOver.Release()
v.leftOver = nil
2016-08-24 21:51:53 +00:00
total += nBytes
}
2016-07-02 19:26:50 +00:00
2016-08-24 21:51:53 +00:00
for total < len(b) {
2016-11-27 20:39:09 +00:00
seg := v.window.RemoveFirst()
2016-07-02 19:26:50 +00:00
if seg == nil {
break
}
2016-11-27 20:39:09 +00:00
v.window.Advance()
v.nextNumber++
2016-07-02 19:26:50 +00:00
2016-12-06 10:03:42 +00:00
nBytes := copy(b[total:], seg.Data.Bytes())
2016-08-24 21:51:53 +00:00
total += nBytes
if nBytes < seg.Data.Len() {
seg.Data.SliceFrom(nBytes)
2016-11-27 20:39:09 +00:00
v.leftOver = seg.Data
2016-08-24 21:51:53 +00:00
seg.Data = nil
seg.Release()
break
}
seg.Release()
}
return total
2016-07-02 19:26:50 +00:00
}
2017-02-17 23:04:25 +00:00
func (w *ReceivingWorker) IsDataAvailable() bool {
w.RLock()
defer w.RUnlock()
return w.window.HasFirst()
}
func (w *ReceivingWorker) NextNumber() uint32 {
w.RLock()
defer w.RUnlock()
return w.nextNumber
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWorker) Flush(current uint32) {
v.Lock()
defer v.Unlock()
2016-07-06 15:34:38 +00:00
2016-11-27 20:39:09 +00:00
v.acklist.Flush(current, v.conn.roundTrip.Timeout())
2016-07-02 19:26:50 +00:00
}
2016-12-20 21:53:58 +00:00
func (v *ReceivingWorker) Write(seg Segment) error {
2016-07-02 20:17:41 +00:00
ackSeg := seg.(*AckSegment)
2016-11-27 20:39:09 +00:00
ackSeg.Conv = v.conn.conv
ackSeg.ReceivingNext = v.nextNumber
ackSeg.ReceivingWindow = v.nextNumber + v.windowSize
if v.conn.state == StateReadyToClose {
2016-07-14 20:52:00 +00:00
ackSeg.Option = SegmentOptionClose
2016-07-02 19:26:50 +00:00
}
2016-12-20 21:53:58 +00:00
return v.conn.output.Write(ackSeg)
2016-07-02 19:26:50 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWorker) CloseRead() {
2016-07-02 19:26:50 +00:00
}
2016-11-27 20:39:09 +00:00
func (v *ReceivingWorker) UpdateNecessary() bool {
2017-02-17 23:04:25 +00:00
v.RLock()
defer v.RUnlock()
2016-11-27 20:39:09 +00:00
return len(v.acklist.numbers) > 0
}