receiving worker

pull/215/head
v2ray 2016-07-02 21:26:50 +02:00
parent 3e84e4cb44
commit 080f0abee9
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
4 changed files with 162 additions and 107 deletions

View File

@ -85,7 +85,7 @@ func (this *Connection) Read(b []byte) (int, error) {
if this == nil || this.kcp.state == StateTerminating || this.kcp.state == StateTerminated { if this == nil || this.kcp.state == StateTerminating || this.kcp.state == StateTerminated {
return 0, io.EOF return 0, io.EOF
} }
return this.kcp.rcv_queue.Read(b) return this.kcp.receivingWorker.Read(b)
} }
// Write implements the Conn Write method. // Write implements the Conn Write method.
@ -177,7 +177,7 @@ func (this *Connection) SetReadDeadline(t time.Time) error {
} }
this.kcpAccess.Lock() this.kcpAccess.Lock()
defer this.kcpAccess.Unlock() defer this.kcpAccess.Unlock()
this.kcp.rcv_queue.SetReadDeadline(t) this.kcp.receivingWorker.SetReadDeadline(t)
return nil return nil
} }

View File

@ -32,25 +32,21 @@ type KCP struct {
lastIncomingTime uint32 lastIncomingTime uint32
lastPayloadTime uint32 lastPayloadTime uint32
sendingUpdated bool sendingUpdated bool
receivingUpdated bool
lastPingTime uint32 lastPingTime uint32
mss uint32 mss uint32
snd_una, snd_nxt, rcv_nxt uint32 snd_una, snd_nxt uint32
rx_rttvar, rx_srtt, rx_rto uint32 rx_rttvar, rx_srtt, rx_rto uint32
snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32 snd_wnd, rmt_wnd, cwnd uint32
current, interval uint32 current, interval uint32
snd_queue *SendingQueue snd_queue *SendingQueue
rcv_queue *ReceivingQueue
snd_buf *SendingWindow snd_buf *SendingWindow
rcv_buf *ReceivingWindow receivingWorker *ReceivingWorker
acklist *AckList
fastresend int32 fastresend int32
congestionControl bool congestionControl bool
output *SegmentWriter output *BufferedSegmentWriter
} }
// NewKCP create a new kcp control object, 'conv' must equal in two endpoint // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
@ -60,18 +56,15 @@ func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
kcp := new(KCP) kcp := new(KCP)
kcp.conv = conv kcp.conv = conv
kcp.snd_wnd = effectiveConfig.GetSendingWindowSize() kcp.snd_wnd = effectiveConfig.GetSendingWindowSize()
kcp.rcv_wnd = effectiveConfig.GetReceivingWindowSize()
kcp.rmt_wnd = 32 kcp.rmt_wnd = 32
kcp.mss = output.Mtu() - DataSegmentOverhead kcp.mss = output.Mtu() - DataSegmentOverhead
kcp.rx_rto = 100 kcp.rx_rto = 100
kcp.interval = effectiveConfig.Tti kcp.interval = effectiveConfig.Tti
kcp.output = NewSegmentWriter(output) kcp.output = NewSegmentWriter(output)
kcp.rcv_buf = NewReceivingWindow(effectiveConfig.GetReceivingWindowSize())
kcp.snd_queue = NewSendingQueue(effectiveConfig.GetSendingQueueSize()) kcp.snd_queue = NewSendingQueue(effectiveConfig.GetSendingQueueSize())
kcp.rcv_queue = NewReceivingQueue()
kcp.acklist = NewACKList(kcp)
kcp.snd_buf = NewSendingWindow(kcp, effectiveConfig.GetSendingWindowSize()) kcp.snd_buf = NewSendingWindow(kcp, effectiveConfig.GetSendingWindowSize())
kcp.cwnd = kcp.snd_wnd kcp.cwnd = kcp.snd_wnd
kcp.receivingWorker = NewReceivingWorker(kcp)
return kcp return kcp
} }
@ -81,13 +74,13 @@ func (kcp *KCP) SetState(state State) {
switch state { switch state {
case StateReadyToClose: case StateReadyToClose:
kcp.rcv_queue.Close() kcp.receivingWorker.CloseRead()
case StatePeerClosed: case StatePeerClosed:
kcp.ClearSendQueue() kcp.ClearSendQueue()
case StateTerminating: case StateTerminating:
kcp.rcv_queue.Close() kcp.receivingWorker.CloseRead()
case StateTerminated: case StateTerminated:
kcp.rcv_queue.Close() kcp.receivingWorker.CloseRead()
} }
} }
@ -115,23 +108,6 @@ func (kcp *KCP) OnClose() {
} }
} }
// DumpReceivingBuf moves available data from rcv_buf -> rcv_queue
// @Private
func (kcp *KCP) DumpReceivingBuf() {
for {
seg := kcp.rcv_buf.RemoveFirst()
if seg == nil {
break
}
kcp.rcv_queue.Put(seg.Data)
seg.Data = nil
kcp.rcv_buf.Advance()
kcp.rcv_nxt++
kcp.receivingUpdated = true
}
}
// Send is user/upper level send, returns below zero for error // Send is user/upper level send, returns below zero for error
func (kcp *KCP) Send(buffer []byte) int { func (kcp *KCP) Send(buffer []byte) int {
nBytes := 0 nBytes := 0
@ -214,25 +190,6 @@ func (kcp *KCP) HandleReceivingNext(receivingNext uint32) {
kcp.snd_buf.Clear(receivingNext) kcp.snd_buf.Clear(receivingNext)
} }
func (kcp *KCP) HandleSendingNext(sendingNext uint32) {
kcp.acklist.Clear(sendingNext)
}
func (kcp *KCP) parse_data(newseg *DataSegment) {
sn := newseg.Number
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
_itimediff(sn, kcp.rcv_nxt) < 0 {
return
}
idx := sn - kcp.rcv_nxt
if !kcp.rcv_buf.Set(idx, newseg) {
newseg.Release()
}
kcp.DumpReceivingBuf()
}
// Input when you received a low level packet (eg. UDP packet), call it // Input when you received a low level packet (eg. UDP packet), call it
func (kcp *KCP) Input(data []byte) int { func (kcp *KCP) Input(data []byte) int {
kcp.lastIncomingTime = kcp.current kcp.lastIncomingTime = kcp.current
@ -249,10 +206,7 @@ func (kcp *KCP) Input(data []byte) int {
switch seg := seg.(type) { switch seg := seg.(type) {
case *DataSegment: case *DataSegment:
kcp.HandleOption(seg.Opt) kcp.HandleOption(seg.Opt)
kcp.HandleSendingNext(seg.SendingNext) kcp.receivingWorker.ProcessSegment(seg)
kcp.acklist.Add(seg.Number, seg.Timestamp)
kcp.receivingUpdated = true
kcp.parse_data(seg)
kcp.lastPayloadTime = kcp.current kcp.lastPayloadTime = kcp.current
case *ACKSegment: case *ACKSegment:
kcp.HandleOption(seg.Opt) kcp.HandleOption(seg.Opt)
@ -288,7 +242,7 @@ func (kcp *KCP) Input(data []byte) int {
} }
} }
kcp.HandleReceivingNext(seg.ReceivinNext) kcp.HandleReceivingNext(seg.ReceivinNext)
kcp.HandleSendingNext(seg.SendingNext) kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
kcp.shrink_buf() kcp.shrink_buf()
default: default:
} }
@ -330,9 +284,7 @@ func (kcp *KCP) flush() {
current := kcp.current current := kcp.current
// flush acknowledges // flush acknowledges
if kcp.acklist.Flush() { kcp.receivingWorker.Flush()
kcp.receivingUpdated = false
}
// calculate window size // calculate window size
cwnd := kcp.snd_una + kcp.snd_wnd cwnd := kcp.snd_una + kcp.snd_wnd
@ -359,11 +311,11 @@ func (kcp *KCP) flush() {
kcp.sendingUpdated = false kcp.sendingUpdated = false
} }
if kcp.sendingUpdated || kcp.receivingUpdated || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 { if kcp.sendingUpdated || kcp.receivingWorker.PingNecessary() || _itimediff(kcp.current, kcp.lastPingTime) >= 5000 {
seg := &CmdOnlySegment{ seg := &CmdOnlySegment{
Conv: kcp.conv, Conv: kcp.conv,
Cmd: SegmentCommandPing, Cmd: SegmentCommandPing,
ReceivinNext: kcp.rcv_nxt, ReceivinNext: kcp.receivingWorker.nextNumber,
SendingNext: kcp.snd_una, SendingNext: kcp.snd_una,
} }
if kcp.state == StateReadyToClose { if kcp.state == StateReadyToClose {
@ -372,7 +324,6 @@ func (kcp *KCP) flush() {
kcp.output.Write(seg) kcp.output.Write(seg)
kcp.lastPingTime = kcp.current kcp.lastPingTime = kcp.current
kcp.sendingUpdated = false kcp.sendingUpdated = false
kcp.receivingUpdated = false
} }
// flash remain segments // flash remain segments

View File

@ -8,21 +8,25 @@ import (
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
) )
type SegmentWriter struct { type SegmentWriter interface {
Write(seg ISegment)
}
type BufferedSegmentWriter struct {
sync.Mutex sync.Mutex
mtu uint32 mtu uint32
buffer *alloc.Buffer buffer *alloc.Buffer
writer v2io.Writer writer v2io.Writer
} }
func NewSegmentWriter(writer *AuthenticationWriter) *SegmentWriter { func NewSegmentWriter(writer *AuthenticationWriter) *BufferedSegmentWriter {
return &SegmentWriter{ return &BufferedSegmentWriter{
mtu: writer.Mtu(), mtu: writer.Mtu(),
writer: writer, writer: writer,
} }
} }
func (this *SegmentWriter) Write(seg ISegment) { func (this *BufferedSegmentWriter) Write(seg ISegment) {
this.Lock() this.Lock()
defer this.Unlock() defer this.Unlock()
@ -38,12 +42,12 @@ func (this *SegmentWriter) Write(seg ISegment) {
this.buffer.Append(seg.Bytes(nil)) this.buffer.Append(seg.Bytes(nil))
} }
func (this *SegmentWriter) FlushWithoutLock() { func (this *BufferedSegmentWriter) FlushWithoutLock() {
this.writer.Write(this.buffer) this.writer.Write(this.buffer)
this.buffer = nil this.buffer = nil
} }
func (this *SegmentWriter) Flush() { func (this *BufferedSegmentWriter) Flush() {
this.Lock() this.Lock()
defer this.Unlock() defer this.Unlock()

View File

@ -58,7 +58,6 @@ func (this *ReceivingWindow) Advance() {
} }
type ReceivingQueue struct { type ReceivingQueue struct {
sync.RWMutex
closed bool closed bool
cache *alloc.Buffer cache *alloc.Buffer
queue chan *alloc.Buffer queue chan *alloc.Buffer
@ -104,12 +103,9 @@ L:
if totalBytes > 0 { if totalBytes > 0 {
break L break L
} }
this.RLock()
if !this.timeout.IsZero() && this.timeout.Before(time.Now()) { if !this.timeout.IsZero() && this.timeout.Before(time.Now()) {
this.RUnlock()
return totalBytes, errTimeout return totalBytes, errTimeout
} }
this.RUnlock()
timeToSleep += 500 * time.Millisecond timeToSleep += 500 * time.Millisecond
} }
} }
@ -117,30 +113,26 @@ L:
return totalBytes, nil return totalBytes, nil
} }
func (this *ReceivingQueue) Put(payload *alloc.Buffer) { func (this *ReceivingQueue) Put(payload *alloc.Buffer) bool {
this.RLock()
defer this.RUnlock()
if this.closed { if this.closed {
payload.Release() payload.Release()
return return false
} }
this.queue <- payload select {
case this.queue <- payload:
return true
default:
return false
}
} }
func (this *ReceivingQueue) SetReadDeadline(t time.Time) error { func (this *ReceivingQueue) SetReadDeadline(t time.Time) error {
this.Lock()
defer this.Unlock()
this.timeout = t this.timeout = t
return nil return nil
} }
func (this *ReceivingQueue) Close() { func (this *ReceivingQueue) Close() {
this.Lock()
defer this.Unlock()
if this.closed { if this.closed {
return return
} }
@ -149,15 +141,15 @@ func (this *ReceivingQueue) Close() {
} }
type AckList struct { type AckList struct {
kcp *KCP writer SegmentWriter
timestamps []uint32 timestamps []uint32
numbers []uint32 numbers []uint32
nextFlush []uint32 nextFlush []uint32
} }
func NewACKList(kcp *KCP) *AckList { func NewACKList(writer SegmentWriter) *AckList {
return &AckList{ return &AckList{
kcp: kcp, writer: writer,
timestamps: make([]uint32, 0, 32), timestamps: make([]uint32, 0, 32),
numbers: make([]uint32, 0, 32), numbers: make([]uint32, 0, 32),
nextFlush: make([]uint32, 0, 32), nextFlush: make([]uint32, 0, 32),
@ -189,16 +181,8 @@ func (this *AckList) Clear(una uint32) {
} }
} }
func (this *AckList) Flush() bool { func (this *AckList) Flush(current uint32) {
seg := &ACKSegment{ seg := new(ACKSegment)
Conv: this.kcp.conv,
ReceivingNext: this.kcp.rcv_nxt,
ReceivingWindow: this.kcp.rcv_nxt + this.kcp.rcv_wnd,
}
if this.kcp.state == StateReadyToClose {
seg.Opt = SegmentOptionClose
}
current := this.kcp.current
for i := 0; i < len(this.numbers); i++ { for i := 0; i < len(this.numbers); i++ {
if this.nextFlush[i] <= current { if this.nextFlush[i] <= current {
seg.Count++ seg.Count++
@ -211,8 +195,124 @@ func (this *AckList) Flush() bool {
} }
} }
if seg.Count > 0 { if seg.Count > 0 {
this.kcp.output.Write(seg) this.writer.Write(seg)
return true
} }
return false }
type ReceivingWorker struct {
sync.Mutex
kcp *KCP
queue *ReceivingQueue
window *ReceivingWindow
acklist *AckList
updated bool
nextNumber uint32
windowSize uint32
}
func NewReceivingWorker(kcp *KCP) *ReceivingWorker {
windowSize := effectiveConfig.GetReceivingWindowSize()
worker := &ReceivingWorker{
kcp: kcp,
queue: NewReceivingQueue(),
window: NewReceivingWindow(windowSize),
windowSize: windowSize,
}
worker.acklist = NewACKList(worker)
return worker
}
func (this *ReceivingWorker) ProcessSendingNext(number uint32) {
this.Lock()
defer this.Unlock()
this.acklist.Clear(number)
}
func (this *ReceivingWorker) ProcessSegment(seg *DataSegment) {
number := seg.Number
if _itimediff(number, this.nextNumber+this.windowSize) >= 0 || _itimediff(number, this.nextNumber) < 0 {
return
}
this.ProcessSendingNext(seg.SendingNext)
this.Lock()
this.acklist.Add(number, seg.Timestamp)
idx := number - this.nextNumber
if !this.window.Set(idx, seg) {
seg.Release()
}
this.Unlock()
this.DumpWindow()
}
// @Private
func (this *ReceivingWorker) DumpWindow() {
this.Lock()
defer this.Unlock()
for {
seg := this.window.RemoveFirst()
if seg == nil {
break
}
if !this.queue.Put(seg.Data) {
this.window.Set(0, seg)
break
}
seg.Data = nil
this.window.Advance()
this.nextNumber++
this.updated = true
}
}
func (this *ReceivingWorker) Read(b []byte) (int, error) {
this.Lock()
defer this.Unlock()
return this.queue.Read(b)
}
func (this *ReceivingWorker) SetReadDeadline(t time.Time) {
this.Lock()
defer this.Unlock()
this.queue.SetReadDeadline(t)
}
func (this *ReceivingWorker) Flush() {
this.Lock()
defer this.Unlock()
this.acklist.Flush(this.kcp.current)
}
func (this *ReceivingWorker) Write(seg ISegment) {
ackSeg := seg.(*ACKSegment)
ackSeg.Conv = this.kcp.conv
ackSeg.ReceivingNext = this.nextNumber
ackSeg.ReceivingWindow = this.nextNumber + this.windowSize
if this.kcp.state == StateReadyToClose {
ackSeg.Opt = SegmentOptionClose
}
this.kcp.output.Write(ackSeg)
this.updated = false
}
func (this *ReceivingWorker) CloseRead() {
this.Lock()
defer this.Unlock()
this.queue.Close()
}
func (this *ReceivingWorker) PingNecessary() bool {
return this.updated
} }