diff --git a/transport/internet/kcp/config.go b/transport/internet/kcp/config.go index 0476b7ae..9dfd1b65 100644 --- a/transport/internet/kcp/config.go +++ b/transport/internet/kcp/config.go @@ -7,6 +7,7 @@ type Config struct { DownlinkCapacity uint32 Congestion bool WriteBuffer uint32 + ReadBuffer uint32 } func (this *Config) Apply() { @@ -37,6 +38,7 @@ func DefaultConfig() Config { DownlinkCapacity: 20, Congestion: false, WriteBuffer: 8 * 1024 * 1024, + ReadBuffer: 8 * 1024 * 1024, } } diff --git a/transport/internet/kcp/connection.go b/transport/internet/kcp/connection.go index ad130fb5..214e62cc 100644 --- a/transport/internet/kcp/connection.go +++ b/transport/internet/kcp/connection.go @@ -46,7 +46,6 @@ type Connection struct { block Authenticator needUpdate bool local, remote net.Addr - rd time.Time // read deadline wd time.Time // write deadline chReadEvent chan struct{} writer io.WriteCloser @@ -86,40 +85,10 @@ func (this *Connection) Elapsed() uint32 { // Read implements the Conn Read method. func (this *Connection) Read(b []byte) (int, error) { - if this == nil || - this.kcp.state == StateReadyToClose || - this.kcp.state == StateTerminating || - this.kcp.state == StateTerminated { + if this == nil || this.kcp.state == StateTerminating || this.kcp.state == StateTerminated { return 0, io.EOF } - - for { - this.RLock() - if this == nil || - this.kcp.state == StateReadyToClose || - this.kcp.state == StateTerminating || - this.kcp.state == StateTerminated { - this.RUnlock() - return 0, io.EOF - } - - if !this.rd.IsZero() && this.rd.Before(time.Now()) { - this.RUnlock() - return 0, errTimeout - } - this.RUnlock() - - this.kcpAccess.Lock() - nBytes := this.kcp.Recv(b) - this.kcpAccess.Unlock() - if nBytes > 0 { - return nBytes, nil - } - select { - case <-this.chReadEvent: - case <-time.After(time.Second): - } - } + return this.kcp.rcv_queue.Read(b) } // Write implements the Conn Write method. @@ -195,13 +164,12 @@ func (this *Connection) RemoteAddr() net.Addr { // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline. func (this *Connection) SetDeadline(t time.Time) error { - if this == nil || this.kcp.state != StateActive { - return errClosedConnection + if err := this.SetReadDeadline(t); err != nil { + return err + } + if err := this.SetWriteDeadline(t); err != nil { + return err } - this.Lock() - defer this.Unlock() - this.rd = t - this.wd = t return nil } @@ -210,9 +178,9 @@ func (this *Connection) SetReadDeadline(t time.Time) error { if this == nil || this.kcp.state != StateActive { return errClosedConnection } - this.Lock() - defer this.Unlock() - this.rd = t + this.kcpAccess.Lock() + defer this.kcpAccess.Unlock() + this.kcp.rcv_queue.SetReadDeadline(t) return nil } diff --git a/transport/internet/kcp/kcp.go b/transport/internet/kcp/kcp.go index f4bc6e74..4e23f972 100644 --- a/transport/internet/kcp/kcp.go +++ b/transport/internet/kcp/kcp.go @@ -12,17 +12,13 @@ import ( ) const ( - IKCP_RTO_NDL = 30 // no delay min rto - IKCP_RTO_MIN = 100 // normal min rto - IKCP_RTO_DEF = 200 - IKCP_RTO_MAX = 60000 - IKCP_WND_SND = 32 - IKCP_WND_RCV = 32 - IKCP_MTU_DEF = 1350 - IKCP_ACK_FAST = 3 - IKCP_INTERVAL = 100 - IKCP_THRESH_INIT = 2 - IKCP_THRESH_MIN = 2 + IKCP_RTO_NDL = 30 // no delay min rto + IKCP_RTO_MIN = 100 // normal min rto + IKCP_RTO_DEF = 200 + IKCP_RTO_MAX = 60000 + IKCP_WND_SND = 32 + IKCP_WND_RCV = 32 + IKCP_INTERVAL = 100 ) func _itimediff(later, earlier uint32) int32 { @@ -50,15 +46,14 @@ type KCP struct { receivingUpdated bool lastPingTime uint32 - mtu, mss uint32 - snd_una, snd_nxt, rcv_nxt uint32 - rx_rttvar, rx_srtt, rx_rto uint32 - snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32 - current, interval, ts_flush, xmit uint32 - updated bool + mtu, mss uint32 + snd_una, snd_nxt, rcv_nxt uint32 + rx_rttvar, rx_srtt, rx_rto uint32 + snd_wnd, rcv_wnd, rmt_wnd, cwnd uint32 + current, interval uint32 snd_queue *SendingQueue - rcv_queue []*DataSegment + rcv_queue *ReceivingQueue snd_buf []*DataSegment rcv_buf *ReceivingWindow @@ -82,15 +77,31 @@ func NewKCP(conv uint16, mtu uint32, sendingWindowSize uint32, receivingWindowSi kcp.mss = kcp.mtu - DataSegmentOverhead kcp.rx_rto = IKCP_RTO_DEF kcp.interval = IKCP_INTERVAL - kcp.ts_flush = IKCP_INTERVAL kcp.output = NewSegmentWriter(mtu, output) kcp.rcv_buf = NewReceivingWindow(receivingWindowSize) kcp.snd_queue = NewSendingQueue(sendingQueueSize) + kcp.rcv_queue = NewReceivingQueue() kcp.acklist = new(ACKList) kcp.cwnd = kcp.snd_wnd return kcp } +func (kcp *KCP) SetState(state State) { + kcp.state = state + kcp.stateBeginTime = kcp.current + + switch state { + case StateReadyToClose: + kcp.rcv_queue.Close() + case StatePeerClosed: + kcp.ClearSendQueue() + case StateTerminating: + kcp.rcv_queue.Close() + case StateTerminated: + kcp.rcv_queue.Close() + } +} + func (kcp *KCP) HandleOption(opt SegmentOption) { if (opt & SegmentOptionClose) == SegmentOptionClose { kcp.OnPeerClosed() @@ -99,52 +110,22 @@ func (kcp *KCP) HandleOption(opt SegmentOption) { func (kcp *KCP) OnPeerClosed() { if kcp.state == StateReadyToClose { - kcp.state = StateTerminating - kcp.stateBeginTime = kcp.current + kcp.SetState(StateTerminating) } if kcp.state == StateActive { - kcp.ClearSendQueue() - kcp.state = StatePeerClosed - kcp.stateBeginTime = kcp.current + kcp.SetState(StatePeerClosed) } } func (kcp *KCP) OnClose() { if kcp.state == StateActive { - kcp.state = StateReadyToClose - kcp.stateBeginTime = kcp.current + kcp.SetState(StateReadyToClose) } if kcp.state == StatePeerClosed { - kcp.state = StateTerminating - kcp.stateBeginTime = kcp.current + kcp.SetState(StateTerminating) } } -// Recv is user/upper level recv: returns size, returns below zero for EAGAIN -func (kcp *KCP) Recv(buffer []byte) (n int) { - if len(kcp.rcv_queue) == 0 { - return -1 - } - - // merge fragment - count := 0 - for _, seg := range kcp.rcv_queue { - dataLen := seg.Data.Len() - if dataLen > len(buffer) { - break - } - copy(buffer, seg.Data.Value) - seg.Release() - buffer = buffer[dataLen:] - n += dataLen - count++ - } - kcp.rcv_queue = kcp.rcv_queue[count:] - - kcp.DumpReceivingBuf() - return -} - // DumpReceivingBuf moves available data from rcv_buf -> rcv_queue // @Private func (kcp *KCP) DumpReceivingBuf() { @@ -153,7 +134,9 @@ func (kcp *KCP) DumpReceivingBuf() { if seg == nil { break } - kcp.rcv_queue = append(kcp.rcv_queue, seg) + kcp.rcv_queue.Put(seg.Data) + seg.Data = nil + kcp.rcv_buf.Advance() kcp.rcv_nxt++ kcp.receivingUpdated = true @@ -335,11 +318,9 @@ func (kcp *KCP) Input(data []byte) int { if kcp.state == StateActive || kcp.state == StateReadyToClose || kcp.state == StatePeerClosed { - kcp.state = StateTerminating - kcp.stateBeginTime = kcp.current + kcp.SetState(StateTerminating) } else if kcp.state == StateTerminating { - kcp.state = StateTerminated - kcp.stateBeginTime = kcp.current + kcp.SetState(StateTerminated) } } kcp.HandleReceivingNext(seg.ReceivinNext) @@ -373,15 +354,13 @@ func (kcp *KCP) flush() { kcp.output.Flush() if _itimediff(kcp.current, kcp.stateBeginTime) > 8000 { - kcp.state = StateTerminated - kcp.stateBeginTime = kcp.current + kcp.SetState(StateTerminated) } return } if kcp.state == StateReadyToClose && _itimediff(kcp.current, kcp.stateBeginTime) > 15000 { - kcp.state = StateTerminating - kcp.stateBeginTime = kcp.current + kcp.SetState(StateTerminating) } current := kcp.current @@ -435,7 +414,6 @@ func (kcp *KCP) flush() { } else if _itimediff(current, segment.timeout) >= 0 { needsend = true segment.transmit++ - kcp.xmit++ segment.timeout = current + kcp.rx_rto lost = true } else if segment.ackSkipped >= resent { @@ -497,29 +475,8 @@ func (kcp *KCP) flush() { // ikcp_check when to call it again (without ikcp_input/_send calling). // 'current' - current timestamp in millisec. func (kcp *KCP) Update(current uint32) { - var slap int32 - kcp.current = current - - if !kcp.updated { - kcp.updated = true - kcp.ts_flush = kcp.current - } - - slap = _itimediff(kcp.current, kcp.ts_flush) - - if slap >= 10000 || slap < -10000 { - kcp.ts_flush = kcp.current - slap = 0 - } - - if slap >= 0 { - kcp.ts_flush += kcp.interval - if _itimediff(kcp.current, kcp.ts_flush) >= 0 { - kcp.ts_flush = kcp.current + kcp.interval - } - kcp.flush() - } + kcp.flush() } // NoDelay options diff --git a/transport/internet/kcp/receiving.go b/transport/internet/kcp/receiving.go index 16358592..e017fda6 100644 --- a/transport/internet/kcp/receiving.go +++ b/transport/internet/kcp/receiving.go @@ -1,5 +1,13 @@ package kcp +import ( + "io" + "sync" + "time" + + "github.com/v2ray/v2ray-core/common/alloc" +) + type ReceivingWindow struct { start uint32 size uint32 @@ -49,6 +57,89 @@ func (this *ReceivingWindow) Advance() { } } +type ReceivingQueue struct { + sync.RWMutex + closed bool + cache *alloc.Buffer + queue chan *alloc.Buffer + timeout time.Time +} + +func NewReceivingQueue() *ReceivingQueue { + return &ReceivingQueue{ + queue: make(chan *alloc.Buffer, effectiveConfig.ReadBuffer/effectiveConfig.Mtu), + } +} + +func (this *ReceivingQueue) Read(buf []byte) (int, error) { + if this.cache.Len() > 0 { + nBytes, err := this.cache.Read(buf) + if this.cache.IsEmpty() { + this.cache.Release() + this.cache = nil + } + return nBytes, err + } + + var totalBytes int + +L: + for totalBytes < len(buf) { + timeToSleep := time.Millisecond + select { + case payload, open := <-this.queue: + if !open { + return totalBytes, io.EOF + } + nBytes, err := payload.Read(buf) + totalBytes += nBytes + if err != nil { + return totalBytes, err + } + if !payload.IsEmpty() { + this.cache = payload + } + buf = buf[nBytes:] + case <-time.After(timeToSleep): + if totalBytes > 0 { + break L + } + this.RLock() + if !this.timeout.IsZero() && this.timeout.Before(time.Now()) { + this.RUnlock() + return totalBytes, errTimeout + } + this.RUnlock() + timeToSleep += 500 * time.Millisecond + } + } + + return totalBytes, nil +} + +func (this *ReceivingQueue) Put(payload *alloc.Buffer) { + this.queue <- payload +} + +func (this *ReceivingQueue) SetReadDeadline(t time.Time) error { + this.Lock() + defer this.Unlock() + + this.timeout = t + return nil +} + +func (this *ReceivingQueue) Close() { + this.Lock() + defer this.Unlock() + + if this.closed { + return + } + this.closed = true + close(this.queue) +} + type ACKList struct { timestamps []uint32 numbers []uint32