v2ray-core/transport/internet/kcp/receiving.go

281 lines
5.6 KiB
Go
Raw Normal View History

2016-06-25 19:35:18 +00:00
package kcp
2016-06-30 12:51:49 +00:00
import (
"sync"
"github.com/v2ray/v2ray-core/common/alloc"
)
2016-06-25 19:35:18 +00:00
type ReceivingWindow struct {
start uint32
size uint32
2016-06-29 08:34:34 +00:00
list []*DataSegment
2016-06-25 19:35:18 +00:00
}
func NewReceivingWindow(size uint32) *ReceivingWindow {
return &ReceivingWindow{
start: 0,
size: size,
2016-06-29 08:34:34 +00:00
list: make([]*DataSegment, size),
2016-06-25 19:35:18 +00:00
}
}
func (this *ReceivingWindow) Size() uint32 {
return this.size
}
func (this *ReceivingWindow) Position(idx uint32) uint32 {
return (idx + this.start) % this.size
}
2016-06-29 08:34:34 +00:00
func (this *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
2016-06-25 19:35:18 +00:00
pos := this.Position(idx)
if this.list[pos] != nil {
return false
}
this.list[pos] = value
return true
}
2016-06-29 08:34:34 +00:00
func (this *ReceivingWindow) Remove(idx uint32) *DataSegment {
2016-06-25 19:35:18 +00:00
pos := this.Position(idx)
e := this.list[pos]
this.list[pos] = nil
return e
}
2016-06-29 08:34:34 +00:00
func (this *ReceivingWindow) RemoveFirst() *DataSegment {
2016-06-25 19:35:18 +00:00
return this.Remove(0)
}
func (this *ReceivingWindow) Advance() {
this.start++
if this.start == this.size {
this.start = 0
}
}
2016-06-27 20:34:46 +00:00
2016-06-30 12:51:49 +00:00
type ReceivingQueue struct {
2016-07-06 15:34:38 +00:00
start uint32
cap uint32
len uint32
data []*alloc.Buffer
2016-06-30 12:51:49 +00:00
}
2016-07-02 21:37:51 +00:00
func NewReceivingQueue(size uint32) *ReceivingQueue {
2016-06-30 12:51:49 +00:00
return &ReceivingQueue{
2016-07-06 15:34:38 +00:00
cap: size,
data: make([]*alloc.Buffer, size),
2016-06-30 12:51:49 +00:00
}
}
2016-07-06 15:34:38 +00:00
func (this *ReceivingQueue) IsEmpty() bool {
return this.len == 0
}
2016-07-02 21:37:51 +00:00
2016-07-06 15:34:38 +00:00
func (this *ReceivingQueue) IsFull() bool {
return this.len == this.cap
}
2016-06-30 12:51:49 +00:00
2016-07-06 15:34:38 +00:00
func (this *ReceivingQueue) Read(buf []byte) int {
if this.IsEmpty() {
return 0
}
2016-06-30 12:51:49 +00:00
2016-07-06 15:34:38 +00:00
totalBytes := 0
lenBuf := len(buf)
for !this.IsEmpty() && totalBytes < lenBuf {
payload := this.data[this.start]
nBytes, _ := payload.Read(buf)
buf = buf[nBytes:]
totalBytes += nBytes
if payload.IsEmpty() {
payload.Release()
this.data[this.start] = nil
this.start++
if this.start == this.cap {
this.start = 0
2016-06-30 12:51:49 +00:00
}
2016-07-06 15:34:38 +00:00
this.len--
if this.len == 0 {
this.start = 0
2016-07-05 12:08:08 +00:00
}
2016-06-30 12:51:49 +00:00
}
}
2016-07-06 15:34:38 +00:00
return totalBytes
2016-06-30 12:51:49 +00:00
}
2016-07-06 15:34:38 +00:00
func (this *ReceivingQueue) Put(payload *alloc.Buffer) {
this.data[(this.start+this.len)%this.cap] = payload
this.len++
2016-06-30 12:51:49 +00:00
}
func (this *ReceivingQueue) Close() {
2016-07-06 15:34:38 +00:00
for i := uint32(0); i < this.len; i++ {
this.data[(this.start+i)%this.cap].Release()
this.data[(this.start+i)%this.cap] = nil
2016-06-30 12:51:49 +00:00
}
}
2016-07-02 09:33:34 +00:00
type AckList struct {
2016-07-02 19:26:50 +00:00
writer SegmentWriter
2016-06-27 20:34:46 +00:00
timestamps []uint32
numbers []uint32
2016-06-30 20:19:30 +00:00
nextFlush []uint32
}
2016-07-02 21:18:12 +00:00
func NewAckList(writer SegmentWriter) *AckList {
2016-07-02 09:33:34 +00:00
return &AckList{
2016-07-02 19:26:50 +00:00
writer: writer,
2016-06-30 20:19:30 +00:00
timestamps: make([]uint32, 0, 32),
numbers: make([]uint32, 0, 32),
nextFlush: make([]uint32, 0, 32),
}
2016-06-27 20:34:46 +00:00
}
2016-07-02 09:33:34 +00:00
func (this *AckList) Add(number uint32, timestamp uint32) {
2016-06-27 20:34:46 +00:00
this.timestamps = append(this.timestamps, timestamp)
this.numbers = append(this.numbers, number)
2016-06-30 20:19:30 +00:00
this.nextFlush = append(this.nextFlush, 0)
2016-06-27 20:34:46 +00:00
}
2016-07-02 09:33:34 +00:00
func (this *AckList) Clear(una uint32) {
2016-06-27 20:34:46 +00:00
count := 0
for i := 0; i < len(this.numbers); i++ {
if this.numbers[i] >= una {
if i != count {
this.numbers[count] = this.numbers[i]
this.timestamps[count] = this.timestamps[i]
2016-06-30 20:19:30 +00:00
this.nextFlush[count] = this.nextFlush[i]
2016-06-27 20:34:46 +00:00
}
count++
}
}
2016-06-29 21:41:04 +00:00
if count < len(this.numbers) {
this.numbers = this.numbers[:count]
this.timestamps = this.timestamps[:count]
2016-06-30 20:19:30 +00:00
this.nextFlush = this.nextFlush[:count]
2016-06-29 21:41:04 +00:00
}
2016-06-27 20:34:46 +00:00
}
2016-07-02 20:28:28 +00:00
func (this *AckList) Flush(current uint32, rto uint32) {
2016-07-05 08:28:23 +00:00
seg := NewAckSegment()
2016-07-05 08:33:11 +00:00
for i := 0; i < len(this.numbers) && !seg.IsFull(); i++ {
2016-06-30 20:19:30 +00:00
if this.nextFlush[i] <= current {
seg.PutNumber(this.numbers[i])
seg.PutTimestamp(this.timestamps[i])
2016-07-02 20:28:28 +00:00
this.nextFlush[i] = current + rto/2
2016-06-30 20:19:30 +00:00
}
2016-06-27 20:34:46 +00:00
}
2016-06-30 20:19:30 +00:00
if seg.Count > 0 {
2016-07-02 19:26:50 +00:00
this.writer.Write(seg)
2016-07-05 12:08:08 +00:00
seg.Release()
2016-06-30 20:19:30 +00:00
}
2016-07-02 19:26:50 +00:00
}
type ReceivingWorker struct {
2016-07-12 15:56:36 +00:00
sync.RWMutex
2016-07-06 15:34:38 +00:00
conn *Connection
queue *ReceivingQueue
window *ReceivingWindow
acklist *AckList
updated bool
nextNumber uint32
windowSize uint32
2016-07-02 19:26:50 +00:00
}
2016-07-05 21:02:52 +00:00
func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
2016-07-02 19:26:50 +00:00
windowSize := effectiveConfig.GetReceivingWindowSize()
worker := &ReceivingWorker{
2016-07-05 21:02:52 +00:00
conn: kcp,
2016-07-02 21:37:51 +00:00
queue: NewReceivingQueue(effectiveConfig.GetReceivingQueueSize()),
2016-07-02 19:26:50 +00:00
window: NewReceivingWindow(windowSize),
windowSize: windowSize,
}
2016-07-02 21:18:12 +00:00
worker.acklist = NewAckList(worker)
2016-07-02 19:26:50 +00:00
return worker
}
func (this *ReceivingWorker) ProcessSendingNext(number uint32) {
2016-07-06 15:34:38 +00:00
this.Lock()
defer this.Unlock()
2016-07-02 19:26:50 +00:00
this.acklist.Clear(number)
}
func (this *ReceivingWorker) ProcessSegment(seg *DataSegment) {
2016-07-06 15:34:38 +00:00
this.Lock()
defer this.Unlock()
2016-07-02 19:26:50 +00:00
number := seg.Number
2016-07-02 22:38:33 +00:00
idx := number - this.nextNumber
if idx >= this.windowSize {
2016-07-02 19:26:50 +00:00
return
}
2016-07-06 15:34:38 +00:00
this.acklist.Clear(seg.SendingNext)
2016-07-02 19:26:50 +00:00
this.acklist.Add(number, seg.Timestamp)
if !this.window.Set(idx, seg) {
seg.Release()
}
2016-07-06 15:34:38 +00:00
for !this.queue.IsFull() {
2016-07-02 19:26:50 +00:00
seg := this.window.RemoveFirst()
if seg == nil {
break
}
2016-07-06 15:34:38 +00:00
this.queue.Put(seg.Data)
2016-07-02 19:26:50 +00:00
seg.Data = nil
2016-07-05 12:08:08 +00:00
seg.Release()
2016-07-02 19:26:50 +00:00
this.window.Advance()
this.nextNumber++
this.updated = true
}
}
2016-07-06 15:34:38 +00:00
func (this *ReceivingWorker) Read(b []byte) int {
this.Lock()
defer this.Unlock()
2016-07-02 19:26:50 +00:00
2016-07-06 15:34:38 +00:00
return this.queue.Read(b)
2016-07-02 19:26:50 +00:00
}
2016-07-05 21:02:52 +00:00
func (this *ReceivingWorker) Flush(current uint32) {
2016-07-06 15:34:38 +00:00
this.Lock()
defer this.Unlock()
2016-07-05 21:53:46 +00:00
this.acklist.Flush(current, this.conn.roundTrip.Timeout())
2016-07-02 19:26:50 +00:00
}
2016-07-04 12:17:42 +00:00
func (this *ReceivingWorker) Write(seg Segment) {
2016-07-02 20:17:41 +00:00
ackSeg := seg.(*AckSegment)
2016-07-05 21:02:52 +00:00
ackSeg.Conv = this.conn.conv
2016-07-02 19:26:50 +00:00
ackSeg.ReceivingNext = this.nextNumber
ackSeg.ReceivingWindow = this.nextNumber + this.windowSize
2016-07-05 21:02:52 +00:00
if this.conn.state == StateReadyToClose {
2016-07-14 20:52:00 +00:00
ackSeg.Option = SegmentOptionClose
2016-07-02 19:26:50 +00:00
}
2016-07-05 21:02:52 +00:00
this.conn.output.Write(ackSeg)
2016-07-02 19:26:50 +00:00
this.updated = false
}
func (this *ReceivingWorker) CloseRead() {
2016-07-06 15:34:38 +00:00
this.Lock()
defer this.Unlock()
2016-07-02 19:26:50 +00:00
this.queue.Close()
}
func (this *ReceivingWorker) PingNecessary() bool {
2016-07-12 15:56:36 +00:00
this.RLock()
defer this.RUnlock()
2016-07-02 19:26:50 +00:00
return this.updated
2016-06-27 20:34:46 +00:00
}
2016-07-12 15:56:36 +00:00
func (this *ReceivingWorker) MarkPingNecessary(b bool) {
this.Lock()
defer this.Unlock()
this.updated = b
}