You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
v2ray-core/transport/internet/kcp/receiving.go

310 lines
6.1 KiB

package kcp
9 years ago
import (
"io"
"sync"
"time"
"github.com/v2ray/v2ray-core/common/alloc"
)
type ReceivingWindow struct {
start uint32
size uint32
list []*DataSegment
}
func NewReceivingWindow(size uint32) *ReceivingWindow {
return &ReceivingWindow{
start: 0,
size: size,
list: make([]*DataSegment, size),
}
}
func (this *ReceivingWindow) Size() uint32 {
return this.size
}
func (this *ReceivingWindow) Position(idx uint32) uint32 {
return (idx + this.start) % this.size
}
func (this *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
pos := this.Position(idx)
if this.list[pos] != nil {
return false
}
this.list[pos] = value
return true
}
func (this *ReceivingWindow) Remove(idx uint32) *DataSegment {
pos := this.Position(idx)
e := this.list[pos]
this.list[pos] = nil
return e
}
func (this *ReceivingWindow) RemoveFirst() *DataSegment {
return this.Remove(0)
}
func (this *ReceivingWindow) Advance() {
this.start++
if this.start == this.size {
this.start = 0
}
}
9 years ago
9 years ago
type ReceivingQueue struct {
9 years ago
sync.Mutex
9 years ago
closed bool
cache *alloc.Buffer
queue chan *alloc.Buffer
timeout time.Time
}
func NewReceivingQueue(size uint32) *ReceivingQueue {
9 years ago
return &ReceivingQueue{
queue: make(chan *alloc.Buffer, size),
9 years ago
}
}
func (this *ReceivingQueue) Read(buf []byte) (int, error) {
if this.closed {
return 0, io.EOF
}
9 years ago
if this.cache.Len() > 0 {
nBytes, err := this.cache.Read(buf)
if this.cache.IsEmpty() {
this.cache.Release()
this.cache = nil
}
return nBytes, err
}
var totalBytes int
L:
for totalBytes < len(buf) {
timeToSleep := time.Millisecond
select {
case payload, open := <-this.queue:
if !open {
return totalBytes, io.EOF
}
nBytes, err := payload.Read(buf)
totalBytes += nBytes
if err != nil {
return totalBytes, err
}
if !payload.IsEmpty() {
this.cache = payload
}
buf = buf[nBytes:]
case <-time.After(timeToSleep):
if totalBytes > 0 {
break L
}
if !this.timeout.IsZero() && this.timeout.Before(time.Now()) {
return totalBytes, errTimeout
}
timeToSleep += 500 * time.Millisecond
if timeToSleep > 5*time.Second {
timeToSleep = 5 * time.Second
}
9 years ago
}
}
return totalBytes, nil
}
9 years ago
func (this *ReceivingQueue) Put(payload *alloc.Buffer) bool {
if this.closed {
payload.Release()
9 years ago
return false
}
9 years ago
select {
case this.queue <- payload:
return true
default:
return false
}
9 years ago
}
func (this *ReceivingQueue) SetReadDeadline(t time.Time) error {
this.timeout = t
return nil
}
func (this *ReceivingQueue) Close() {
9 years ago
this.Lock()
defer this.Unlock()
9 years ago
if this.closed {
return
}
this.closed = true
close(this.queue)
}
9 years ago
type AckList struct {
9 years ago
sync.Mutex
9 years ago
writer SegmentWriter
9 years ago
timestamps []uint32
numbers []uint32
nextFlush []uint32
}
func NewAckList(writer SegmentWriter) *AckList {
9 years ago
return &AckList{
9 years ago
writer: writer,
timestamps: make([]uint32, 0, 32),
numbers: make([]uint32, 0, 32),
nextFlush: make([]uint32, 0, 32),
}
9 years ago
}
9 years ago
func (this *AckList) Add(number uint32, timestamp uint32) {
9 years ago
this.Lock()
defer this.Unlock()
9 years ago
this.timestamps = append(this.timestamps, timestamp)
this.numbers = append(this.numbers, number)
this.nextFlush = append(this.nextFlush, 0)
9 years ago
}
9 years ago
func (this *AckList) Clear(una uint32) {
9 years ago
this.Lock()
defer this.Unlock()
9 years ago
count := 0
for i := 0; i < len(this.numbers); i++ {
if this.numbers[i] >= una {
if i != count {
this.numbers[count] = this.numbers[i]
this.timestamps[count] = this.timestamps[i]
this.nextFlush[count] = this.nextFlush[i]
9 years ago
}
count++
}
}
if count < len(this.numbers) {
this.numbers = this.numbers[:count]
this.timestamps = this.timestamps[:count]
this.nextFlush = this.nextFlush[:count]
}
9 years ago
}
9 years ago
func (this *AckList) Flush(current uint32, rto uint32) {
seg := NewAckSegment()
9 years ago
this.Lock()
for i := 0; i < len(this.numbers) && !seg.IsFull(); i++ {
if this.nextFlush[i] <= current {
seg.PutNumber(this.numbers[i], this.timestamps[i])
9 years ago
this.nextFlush[i] = current + rto/2
}
9 years ago
}
9 years ago
this.Unlock()
if seg.Count > 0 {
9 years ago
this.writer.Write(seg)
seg.Release()
}
9 years ago
}
type ReceivingWorker struct {
conn *Connection
9 years ago
queue *ReceivingQueue
window *ReceivingWindow
windowMutex sync.Mutex
acklist *AckList
updated bool
nextNumber uint32
windowSize uint32
9 years ago
}
func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
9 years ago
windowSize := effectiveConfig.GetReceivingWindowSize()
worker := &ReceivingWorker{
conn: kcp,
queue: NewReceivingQueue(effectiveConfig.GetReceivingQueueSize()),
9 years ago
window: NewReceivingWindow(windowSize),
windowSize: windowSize,
}
worker.acklist = NewAckList(worker)
9 years ago
return worker
}
func (this *ReceivingWorker) ProcessSendingNext(number uint32) {
this.acklist.Clear(number)
}
func (this *ReceivingWorker) ProcessSegment(seg *DataSegment) {
number := seg.Number
9 years ago
idx := number - this.nextNumber
if idx >= this.windowSize {
9 years ago
return
}
this.ProcessSendingNext(seg.SendingNext)
this.acklist.Add(number, seg.Timestamp)
9 years ago
this.windowMutex.Lock()
9 years ago
defer this.windowMutex.Unlock()
9 years ago
if !this.window.Set(idx, seg) {
seg.Release()
}
for {
seg := this.window.RemoveFirst()
if seg == nil {
break
}
if !this.queue.Put(seg.Data) {
this.window.Set(0, seg)
break
}
seg.Data = nil
seg.Release()
9 years ago
this.window.Advance()
this.nextNumber++
this.updated = true
}
}
func (this *ReceivingWorker) Read(b []byte) (int, error) {
return this.queue.Read(b)
}
func (this *ReceivingWorker) SetReadDeadline(t time.Time) {
this.queue.SetReadDeadline(t)
}
func (this *ReceivingWorker) Flush(current uint32) {
this.acklist.Flush(current, this.conn.roundTrip.Timeout())
9 years ago
}
func (this *ReceivingWorker) Write(seg Segment) {
ackSeg := seg.(*AckSegment)
ackSeg.Conv = this.conn.conv
9 years ago
ackSeg.ReceivingNext = this.nextNumber
ackSeg.ReceivingWindow = this.nextNumber + this.windowSize
if this.conn.state == StateReadyToClose {
9 years ago
ackSeg.Opt = SegmentOptionClose
}
this.conn.output.Write(ackSeg)
9 years ago
this.updated = false
}
func (this *ReceivingWorker) CloseRead() {
this.queue.Close()
}
func (this *ReceivingWorker) PingNecessary() bool {
return this.updated
9 years ago
}