mirror of https://github.com/v2ray/v2ray-core
				
				
				
			merge kcp into connection
							parent
							
								
									344e48e1bb
								
							
						
					
					
						commit
						e44b374e66
					
				| 
						 | 
					@ -18,6 +18,16 @@ var (
 | 
				
			||||||
	errClosedConnection = errors.New("Connection closed.")
 | 
						errClosedConnection = errors.New("Connection closed.")
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type State int
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						StateActive       State = 0
 | 
				
			||||||
 | 
						StateReadyToClose State = 1
 | 
				
			||||||
 | 
						StatePeerClosed   State = 2
 | 
				
			||||||
 | 
						StateTerminating  State = 3
 | 
				
			||||||
 | 
						StateTerminated   State = 4
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	headerSize uint32 = 2
 | 
						headerSize uint32 = 2
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -30,17 +40,36 @@ func nowMillisec() int64 {
 | 
				
			||||||
// Connection is a KCP connection over UDP.
 | 
					// Connection is a KCP connection over UDP.
 | 
				
			||||||
type Connection struct {
 | 
					type Connection struct {
 | 
				
			||||||
	sync.RWMutex
 | 
						sync.RWMutex
 | 
				
			||||||
	kcp           *KCP // the core ARQ
 | 
					 | 
				
			||||||
	kcpAccess     sync.Mutex
 | 
					 | 
				
			||||||
	block         Authenticator
 | 
						block         Authenticator
 | 
				
			||||||
	local, remote net.Addr
 | 
						local, remote net.Addr
 | 
				
			||||||
	wd            time.Time // write deadline
 | 
						wd            time.Time // write deadline
 | 
				
			||||||
	writer        io.WriteCloser
 | 
						writer        io.WriteCloser
 | 
				
			||||||
	since         int64
 | 
						since         int64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						conv             uint16
 | 
				
			||||||
 | 
						state            State
 | 
				
			||||||
 | 
						stateBeginTime   uint32
 | 
				
			||||||
 | 
						lastIncomingTime uint32
 | 
				
			||||||
 | 
						lastPayloadTime  uint32
 | 
				
			||||||
 | 
						sendingUpdated   bool
 | 
				
			||||||
 | 
						lastPingTime     uint32
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						mss                        uint32
 | 
				
			||||||
 | 
						rx_rttvar, rx_srtt, rx_rto uint32
 | 
				
			||||||
 | 
						interval                   uint32
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						receivingWorker *ReceivingWorker
 | 
				
			||||||
 | 
						sendingWorker   *SendingWorker
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fastresend        uint32
 | 
				
			||||||
 | 
						congestionControl bool
 | 
				
			||||||
 | 
						output            *BufferedSegmentWriter
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewConnection create a new KCP connection between local and remote.
 | 
					// NewConnection create a new KCP connection between local and remote.
 | 
				
			||||||
func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block Authenticator) *Connection {
 | 
					func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr, remote *net.UDPAddr, block Authenticator) *Connection {
 | 
				
			||||||
 | 
						log.Debug("KCP|Connection: creating connection ", conv)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	conn := new(Connection)
 | 
						conn := new(Connection)
 | 
				
			||||||
	conn.local = local
 | 
						conn.local = local
 | 
				
			||||||
	conn.remote = remote
 | 
						conn.remote = remote
 | 
				
			||||||
| 
						 | 
					@ -52,8 +81,16 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr,
 | 
				
			||||||
		Authenticator: block,
 | 
							Authenticator: block,
 | 
				
			||||||
		Writer:        writerCloser,
 | 
							Writer:        writerCloser,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	conn.kcp = NewKCP(conv, authWriter)
 | 
						conn.conv = conv
 | 
				
			||||||
	conn.kcp.current = conn.Elapsed()
 | 
						conn.output = NewSegmentWriter(authWriter)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						conn.mss = authWriter.Mtu() - DataSegmentOverhead
 | 
				
			||||||
 | 
						conn.rx_rto = 100
 | 
				
			||||||
 | 
						conn.interval = effectiveConfig.Tti
 | 
				
			||||||
 | 
						conn.receivingWorker = NewReceivingWorker(conn)
 | 
				
			||||||
 | 
						conn.fastresend = 2
 | 
				
			||||||
 | 
						conn.congestionControl = effectiveConfig.Congestion
 | 
				
			||||||
 | 
						conn.sendingWorker = NewSendingWorker(conn)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go conn.updateTask()
 | 
						go conn.updateTask()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -66,39 +103,37 @@ func (this *Connection) Elapsed() uint32 {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Read implements the Conn Read method.
 | 
					// Read implements the Conn Read method.
 | 
				
			||||||
func (this *Connection) Read(b []byte) (int, error) {
 | 
					func (this *Connection) Read(b []byte) (int, error) {
 | 
				
			||||||
	if this == nil || this.kcp.state == StateTerminating || this.kcp.state == StateTerminated {
 | 
						if this == nil {
 | 
				
			||||||
		return 0, io.EOF
 | 
							return 0, io.EOF
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return this.kcp.receivingWorker.Read(b)
 | 
					
 | 
				
			||||||
 | 
						state := this.State()
 | 
				
			||||||
 | 
						if state == StateTerminating || state == StateTerminated {
 | 
				
			||||||
 | 
							return 0, io.EOF
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return this.receivingWorker.Read(b)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Write implements the Conn Write method.
 | 
					// Write implements the Conn Write method.
 | 
				
			||||||
func (this *Connection) Write(b []byte) (int, error) {
 | 
					func (this *Connection) Write(b []byte) (int, error) {
 | 
				
			||||||
	if this == nil || this.kcp.state != StateActive {
 | 
						if this == nil || this.State() != StateActive {
 | 
				
			||||||
		return 0, io.ErrClosedPipe
 | 
							return 0, io.ErrClosedPipe
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	totalWritten := 0
 | 
						totalWritten := 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		this.RLock()
 | 
							if this == nil || this.State() != StateActive {
 | 
				
			||||||
		if this == nil || this.kcp.state != StateActive {
 | 
					 | 
				
			||||||
			this.RUnlock()
 | 
					 | 
				
			||||||
			return totalWritten, io.ErrClosedPipe
 | 
								return totalWritten, io.ErrClosedPipe
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		this.RUnlock()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		this.kcpAccess.Lock()
 | 
							nBytes := this.sendingWorker.Push(b[totalWritten:])
 | 
				
			||||||
		nBytes := this.kcp.sendingWorker.Push(b[totalWritten:])
 | 
					 | 
				
			||||||
		if nBytes > 0 {
 | 
							if nBytes > 0 {
 | 
				
			||||||
			totalWritten += nBytes
 | 
								totalWritten += nBytes
 | 
				
			||||||
			if totalWritten == len(b) {
 | 
								if totalWritten == len(b) {
 | 
				
			||||||
				this.kcpAccess.Unlock()
 | 
					 | 
				
			||||||
				return totalWritten, nil
 | 
									return totalWritten, nil
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		this.kcpAccess.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		if !this.wd.IsZero() && this.wd.Before(time.Now()) {
 | 
							if !this.wd.IsZero() && this.wd.Before(time.Now()) {
 | 
				
			||||||
			return totalWritten, errTimeout
 | 
								return totalWritten, errTimeout
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					@ -108,21 +143,46 @@ func (this *Connection) Write(b []byte) (int, error) {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Connection) SetState(state State) {
 | 
				
			||||||
 | 
						this.Lock()
 | 
				
			||||||
 | 
						this.state = state
 | 
				
			||||||
 | 
						this.stateBeginTime = this.Elapsed()
 | 
				
			||||||
 | 
						this.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						switch state {
 | 
				
			||||||
 | 
						case StateReadyToClose:
 | 
				
			||||||
 | 
							this.receivingWorker.CloseRead()
 | 
				
			||||||
 | 
						case StatePeerClosed:
 | 
				
			||||||
 | 
							this.sendingWorker.CloseWrite()
 | 
				
			||||||
 | 
						case StateTerminating:
 | 
				
			||||||
 | 
							this.receivingWorker.CloseRead()
 | 
				
			||||||
 | 
							this.sendingWorker.CloseWrite()
 | 
				
			||||||
 | 
						case StateTerminated:
 | 
				
			||||||
 | 
							this.receivingWorker.CloseRead()
 | 
				
			||||||
 | 
							this.sendingWorker.CloseWrite()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Close closes the connection.
 | 
					// Close closes the connection.
 | 
				
			||||||
func (this *Connection) Close() error {
 | 
					func (this *Connection) Close() error {
 | 
				
			||||||
	if this == nil ||
 | 
						if this == nil {
 | 
				
			||||||
		this.kcp.state == StateReadyToClose ||
 | 
							return errClosedConnection
 | 
				
			||||||
		this.kcp.state == StateTerminating ||
 | 
						}
 | 
				
			||||||
		this.kcp.state == StateTerminated {
 | 
					
 | 
				
			||||||
 | 
						state := this.State()
 | 
				
			||||||
 | 
						if state == StateReadyToClose ||
 | 
				
			||||||
 | 
							state == StateTerminating ||
 | 
				
			||||||
 | 
							state == StateTerminated {
 | 
				
			||||||
		return errClosedConnection
 | 
							return errClosedConnection
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	log.Debug("KCP|Connection: Closing connection to ", this.remote)
 | 
						log.Debug("KCP|Connection: Closing connection to ", this.remote)
 | 
				
			||||||
	this.Lock()
 | 
					 | 
				
			||||||
	defer this.Unlock()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	this.kcpAccess.Lock()
 | 
						if state == StateActive {
 | 
				
			||||||
	this.kcp.OnClose()
 | 
							this.SetState(StateReadyToClose)
 | 
				
			||||||
	this.kcpAccess.Unlock()
 | 
						}
 | 
				
			||||||
 | 
						if state == StatePeerClosed {
 | 
				
			||||||
 | 
							this.SetState(StateTerminating)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -156,36 +216,29 @@ func (this *Connection) SetDeadline(t time.Time) error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// SetReadDeadline implements the Conn SetReadDeadline method.
 | 
					// SetReadDeadline implements the Conn SetReadDeadline method.
 | 
				
			||||||
func (this *Connection) SetReadDeadline(t time.Time) error {
 | 
					func (this *Connection) SetReadDeadline(t time.Time) error {
 | 
				
			||||||
	if this == nil || this.kcp.state != StateActive {
 | 
						if this == nil || this.State() != StateActive {
 | 
				
			||||||
		return errClosedConnection
 | 
							return errClosedConnection
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	this.kcpAccess.Lock()
 | 
						this.receivingWorker.SetReadDeadline(t)
 | 
				
			||||||
	defer this.kcpAccess.Unlock()
 | 
					 | 
				
			||||||
	this.kcp.receivingWorker.SetReadDeadline(t)
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// SetWriteDeadline implements the Conn SetWriteDeadline method.
 | 
					// SetWriteDeadline implements the Conn SetWriteDeadline method.
 | 
				
			||||||
func (this *Connection) SetWriteDeadline(t time.Time) error {
 | 
					func (this *Connection) SetWriteDeadline(t time.Time) error {
 | 
				
			||||||
	if this == nil || this.kcp.state != StateActive {
 | 
						if this == nil || this.State() != StateActive {
 | 
				
			||||||
		return errClosedConnection
 | 
							return errClosedConnection
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	this.Lock()
 | 
					 | 
				
			||||||
	defer this.Unlock()
 | 
					 | 
				
			||||||
	this.wd = t
 | 
						this.wd = t
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// kcp update, input loop
 | 
					// kcp update, input loop
 | 
				
			||||||
func (this *Connection) updateTask() {
 | 
					func (this *Connection) updateTask() {
 | 
				
			||||||
	for this.kcp.state != StateTerminated {
 | 
						for this.State() != StateTerminated {
 | 
				
			||||||
		current := this.Elapsed()
 | 
							this.flush()
 | 
				
			||||||
		this.kcpAccess.Lock()
 | 
					 | 
				
			||||||
		this.kcp.Update(current)
 | 
					 | 
				
			||||||
		this.kcpAccess.Unlock()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		interval := time.Duration(effectiveConfig.Tti) * time.Millisecond
 | 
							interval := time.Duration(effectiveConfig.Tti) * time.Millisecond
 | 
				
			||||||
		if this.kcp.state == StateTerminating {
 | 
							if this.State() == StateTerminating {
 | 
				
			||||||
			interval = time.Second
 | 
								interval = time.Second
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		time.Sleep(interval)
 | 
							time.Sleep(interval)
 | 
				
			||||||
| 
						 | 
					@ -193,13 +246,6 @@ func (this *Connection) updateTask() {
 | 
				
			||||||
	this.Terminate()
 | 
						this.Terminate()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *Connection) kcpInput(data []byte) {
 | 
					 | 
				
			||||||
	this.kcpAccess.Lock()
 | 
					 | 
				
			||||||
	this.kcp.current = this.Elapsed()
 | 
					 | 
				
			||||||
	this.kcp.Input(data)
 | 
					 | 
				
			||||||
	this.kcpAccess.Unlock()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *Connection) FetchInputFrom(conn net.Conn) {
 | 
					func (this *Connection) FetchInputFrom(conn net.Conn) {
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		for {
 | 
							for {
 | 
				
			||||||
| 
						 | 
					@ -211,7 +257,7 @@ func (this *Connection) FetchInputFrom(conn net.Conn) {
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			payload.Slice(0, nBytes)
 | 
								payload.Slice(0, nBytes)
 | 
				
			||||||
			if this.block.Open(payload) {
 | 
								if this.block.Open(payload) {
 | 
				
			||||||
				this.kcpInput(payload.Value)
 | 
									this.Input(payload.Value)
 | 
				
			||||||
			} else {
 | 
								} else {
 | 
				
			||||||
				log.Info("KCP|Connection: Invalid response from ", conn.RemoteAddr())
 | 
									log.Info("KCP|Connection: Invalid response from ", conn.RemoteAddr())
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
| 
						 | 
					@ -234,3 +280,151 @@ func (this *Connection) Terminate() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	this.writer.Close()
 | 
						this.writer.Close()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Connection) HandleOption(opt SegmentOption) {
 | 
				
			||||||
 | 
						if (opt & SegmentOptionClose) == SegmentOptionClose {
 | 
				
			||||||
 | 
							this.OnPeerClosed()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Connection) OnPeerClosed() {
 | 
				
			||||||
 | 
						state := this.State()
 | 
				
			||||||
 | 
						if state == StateReadyToClose {
 | 
				
			||||||
 | 
							this.SetState(StateTerminating)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if state == StateActive {
 | 
				
			||||||
 | 
							this.SetState(StatePeerClosed)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// https://tools.ietf.org/html/rfc6298
 | 
				
			||||||
 | 
					func (this *Connection) update_ack(rtt int32) {
 | 
				
			||||||
 | 
						this.Lock()
 | 
				
			||||||
 | 
						defer this.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if this.rx_srtt == 0 {
 | 
				
			||||||
 | 
							this.rx_srtt = uint32(rtt)
 | 
				
			||||||
 | 
							this.rx_rttvar = uint32(rtt) / 2
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							delta := rtt - int32(this.rx_srtt)
 | 
				
			||||||
 | 
							if delta < 0 {
 | 
				
			||||||
 | 
								delta = -delta
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							this.rx_rttvar = (3*this.rx_rttvar + uint32(delta)) / 4
 | 
				
			||||||
 | 
							this.rx_srtt = (7*this.rx_srtt + uint32(rtt)) / 8
 | 
				
			||||||
 | 
							if this.rx_srtt < this.interval {
 | 
				
			||||||
 | 
								this.rx_srtt = this.interval
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var rto uint32
 | 
				
			||||||
 | 
						if this.interval < 4*this.rx_rttvar {
 | 
				
			||||||
 | 
							rto = this.rx_srtt + 4*this.rx_rttvar
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							rto = this.rx_srtt + this.interval
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if rto > 10000 {
 | 
				
			||||||
 | 
							rto = 10000
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						this.rx_rto = rto * 3 / 2
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Input when you received a low level packet (eg. UDP packet), call it
 | 
				
			||||||
 | 
					func (kcp *Connection) Input(data []byte) int {
 | 
				
			||||||
 | 
						current := kcp.Elapsed()
 | 
				
			||||||
 | 
						kcp.lastIncomingTime = current
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var seg Segment
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							seg, data = ReadSegment(data)
 | 
				
			||||||
 | 
							if seg == nil {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							switch seg := seg.(type) {
 | 
				
			||||||
 | 
							case *DataSegment:
 | 
				
			||||||
 | 
								kcp.HandleOption(seg.Opt)
 | 
				
			||||||
 | 
								kcp.receivingWorker.ProcessSegment(seg)
 | 
				
			||||||
 | 
								kcp.lastPayloadTime = current
 | 
				
			||||||
 | 
							case *AckSegment:
 | 
				
			||||||
 | 
								kcp.HandleOption(seg.Opt)
 | 
				
			||||||
 | 
								kcp.sendingWorker.ProcessSegment(current, seg)
 | 
				
			||||||
 | 
								kcp.lastPayloadTime = current
 | 
				
			||||||
 | 
							case *CmdOnlySegment:
 | 
				
			||||||
 | 
								kcp.HandleOption(seg.Opt)
 | 
				
			||||||
 | 
								if seg.Cmd == SegmentCommandTerminated {
 | 
				
			||||||
 | 
									if kcp.state == StateActive ||
 | 
				
			||||||
 | 
										kcp.state == StateReadyToClose ||
 | 
				
			||||||
 | 
										kcp.state == StatePeerClosed {
 | 
				
			||||||
 | 
										kcp.SetState(StateTerminating)
 | 
				
			||||||
 | 
									} else if kcp.state == StateTerminating {
 | 
				
			||||||
 | 
										kcp.SetState(StateTerminated)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								kcp.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
 | 
				
			||||||
 | 
								kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return 0
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Connection) flush() {
 | 
				
			||||||
 | 
						current := this.Elapsed()
 | 
				
			||||||
 | 
						state := this.State()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if state == StateTerminated {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if state == StateActive && current-this.lastPayloadTime >= 30000 {
 | 
				
			||||||
 | 
							this.Close()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if state == StateTerminating {
 | 
				
			||||||
 | 
							this.output.Write(&CmdOnlySegment{
 | 
				
			||||||
 | 
								Conv: this.conv,
 | 
				
			||||||
 | 
								Cmd:  SegmentCommandTerminated,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							this.output.Flush()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if current-this.stateBeginTime > 8000 {
 | 
				
			||||||
 | 
								this.SetState(StateTerminated)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if state == StateReadyToClose && current-this.stateBeginTime > 15000 {
 | 
				
			||||||
 | 
							this.SetState(StateTerminating)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// flush acknowledges
 | 
				
			||||||
 | 
						this.receivingWorker.Flush(current)
 | 
				
			||||||
 | 
						this.sendingWorker.Flush(current)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if this.sendingWorker.PingNecessary() || this.receivingWorker.PingNecessary() || current-this.lastPingTime >= 5000 {
 | 
				
			||||||
 | 
							seg := NewCmdOnlySegment()
 | 
				
			||||||
 | 
							seg.Conv = this.conv
 | 
				
			||||||
 | 
							seg.Cmd = SegmentCommandPing
 | 
				
			||||||
 | 
							seg.ReceivinNext = this.receivingWorker.nextNumber
 | 
				
			||||||
 | 
							seg.SendingNext = this.sendingWorker.firstUnacknowledged
 | 
				
			||||||
 | 
							if state == StateReadyToClose {
 | 
				
			||||||
 | 
								seg.Opt = SegmentOptionClose
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							this.output.Write(seg)
 | 
				
			||||||
 | 
							this.lastPingTime = current
 | 
				
			||||||
 | 
							this.sendingUpdated = false
 | 
				
			||||||
 | 
							seg.Release()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// flash remain segments
 | 
				
			||||||
 | 
						this.output.Flush()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Connection) State() State {
 | 
				
			||||||
 | 
						this.RLock()
 | 
				
			||||||
 | 
						defer this.RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return this.state
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -4,225 +4,3 @@
 | 
				
			||||||
//    skywind3000@github for inventing the KCP protocol
 | 
					//    skywind3000@github for inventing the KCP protocol
 | 
				
			||||||
//    xtaci@github for translating to Golang
 | 
					//    xtaci@github for translating to Golang
 | 
				
			||||||
package kcp
 | 
					package kcp
 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"github.com/v2ray/v2ray-core/common/log"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type State int
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	StateActive       State = 0
 | 
					 | 
				
			||||||
	StateReadyToClose State = 1
 | 
					 | 
				
			||||||
	StatePeerClosed   State = 2
 | 
					 | 
				
			||||||
	StateTerminating  State = 3
 | 
					 | 
				
			||||||
	StateTerminated   State = 4
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// KCP defines a single KCP connection
 | 
					 | 
				
			||||||
type KCP struct {
 | 
					 | 
				
			||||||
	conv             uint16
 | 
					 | 
				
			||||||
	state            State
 | 
					 | 
				
			||||||
	stateBeginTime   uint32
 | 
					 | 
				
			||||||
	lastIncomingTime uint32
 | 
					 | 
				
			||||||
	lastPayloadTime  uint32
 | 
					 | 
				
			||||||
	sendingUpdated   bool
 | 
					 | 
				
			||||||
	lastPingTime     uint32
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	mss                        uint32
 | 
					 | 
				
			||||||
	rx_rttvar, rx_srtt, rx_rto uint32
 | 
					 | 
				
			||||||
	current, interval          uint32
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	receivingWorker *ReceivingWorker
 | 
					 | 
				
			||||||
	sendingWorker   *SendingWorker
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	fastresend        uint32
 | 
					 | 
				
			||||||
	congestionControl bool
 | 
					 | 
				
			||||||
	output            *BufferedSegmentWriter
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewKCP create a new kcp control object, 'conv' must equal in two endpoint
 | 
					 | 
				
			||||||
// from the same connection.
 | 
					 | 
				
			||||||
func NewKCP(conv uint16, output *AuthenticationWriter) *KCP {
 | 
					 | 
				
			||||||
	log.Debug("KCP|Core: creating KCP ", conv)
 | 
					 | 
				
			||||||
	kcp := new(KCP)
 | 
					 | 
				
			||||||
	kcp.conv = conv
 | 
					 | 
				
			||||||
	kcp.mss = output.Mtu() - DataSegmentOverhead
 | 
					 | 
				
			||||||
	kcp.rx_rto = 100
 | 
					 | 
				
			||||||
	kcp.interval = effectiveConfig.Tti
 | 
					 | 
				
			||||||
	kcp.output = NewSegmentWriter(output)
 | 
					 | 
				
			||||||
	kcp.receivingWorker = NewReceivingWorker(kcp)
 | 
					 | 
				
			||||||
	kcp.fastresend = 2
 | 
					 | 
				
			||||||
	kcp.congestionControl = effectiveConfig.Congestion
 | 
					 | 
				
			||||||
	kcp.sendingWorker = NewSendingWorker(kcp)
 | 
					 | 
				
			||||||
	return kcp
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (kcp *KCP) SetState(state State) {
 | 
					 | 
				
			||||||
	kcp.state = state
 | 
					 | 
				
			||||||
	kcp.stateBeginTime = kcp.current
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	switch state {
 | 
					 | 
				
			||||||
	case StateReadyToClose:
 | 
					 | 
				
			||||||
		kcp.receivingWorker.CloseRead()
 | 
					 | 
				
			||||||
	case StatePeerClosed:
 | 
					 | 
				
			||||||
		kcp.sendingWorker.CloseWrite()
 | 
					 | 
				
			||||||
	case StateTerminating:
 | 
					 | 
				
			||||||
		kcp.receivingWorker.CloseRead()
 | 
					 | 
				
			||||||
		kcp.sendingWorker.CloseWrite()
 | 
					 | 
				
			||||||
	case StateTerminated:
 | 
					 | 
				
			||||||
		kcp.receivingWorker.CloseRead()
 | 
					 | 
				
			||||||
		kcp.sendingWorker.CloseWrite()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (kcp *KCP) HandleOption(opt SegmentOption) {
 | 
					 | 
				
			||||||
	if (opt & SegmentOptionClose) == SegmentOptionClose {
 | 
					 | 
				
			||||||
		kcp.OnPeerClosed()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (kcp *KCP) OnPeerClosed() {
 | 
					 | 
				
			||||||
	if kcp.state == StateReadyToClose {
 | 
					 | 
				
			||||||
		kcp.SetState(StateTerminating)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if kcp.state == StateActive {
 | 
					 | 
				
			||||||
		kcp.SetState(StatePeerClosed)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (kcp *KCP) OnClose() {
 | 
					 | 
				
			||||||
	if kcp.state == StateActive {
 | 
					 | 
				
			||||||
		kcp.SetState(StateReadyToClose)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if kcp.state == StatePeerClosed {
 | 
					 | 
				
			||||||
		kcp.SetState(StateTerminating)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// https://tools.ietf.org/html/rfc6298
 | 
					 | 
				
			||||||
func (kcp *KCP) update_ack(rtt int32) {
 | 
					 | 
				
			||||||
	if kcp.rx_srtt == 0 {
 | 
					 | 
				
			||||||
		kcp.rx_srtt = uint32(rtt)
 | 
					 | 
				
			||||||
		kcp.rx_rttvar = uint32(rtt) / 2
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		delta := rtt - int32(kcp.rx_srtt)
 | 
					 | 
				
			||||||
		if delta < 0 {
 | 
					 | 
				
			||||||
			delta = -delta
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		kcp.rx_rttvar = (3*kcp.rx_rttvar + uint32(delta)) / 4
 | 
					 | 
				
			||||||
		kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
 | 
					 | 
				
			||||||
		if kcp.rx_srtt < kcp.interval {
 | 
					 | 
				
			||||||
			kcp.rx_srtt = kcp.interval
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	var rto uint32
 | 
					 | 
				
			||||||
	if kcp.interval < 4*kcp.rx_rttvar {
 | 
					 | 
				
			||||||
		rto = kcp.rx_srtt + 4*kcp.rx_rttvar
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		rto = kcp.rx_srtt + kcp.interval
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if rto > 10000 {
 | 
					 | 
				
			||||||
		rto = 10000
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	kcp.rx_rto = rto * 3 / 2
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Input when you received a low level packet (eg. UDP packet), call it
 | 
					 | 
				
			||||||
func (kcp *KCP) Input(data []byte) int {
 | 
					 | 
				
			||||||
	kcp.lastIncomingTime = kcp.current
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var seg Segment
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		seg, data = ReadSegment(data)
 | 
					 | 
				
			||||||
		if seg == nil {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		switch seg := seg.(type) {
 | 
					 | 
				
			||||||
		case *DataSegment:
 | 
					 | 
				
			||||||
			kcp.HandleOption(seg.Opt)
 | 
					 | 
				
			||||||
			kcp.receivingWorker.ProcessSegment(seg)
 | 
					 | 
				
			||||||
			kcp.lastPayloadTime = kcp.current
 | 
					 | 
				
			||||||
		case *AckSegment:
 | 
					 | 
				
			||||||
			kcp.HandleOption(seg.Opt)
 | 
					 | 
				
			||||||
			kcp.sendingWorker.ProcessSegment(seg)
 | 
					 | 
				
			||||||
			kcp.lastPayloadTime = kcp.current
 | 
					 | 
				
			||||||
		case *CmdOnlySegment:
 | 
					 | 
				
			||||||
			kcp.HandleOption(seg.Opt)
 | 
					 | 
				
			||||||
			if seg.Cmd == SegmentCommandTerminated {
 | 
					 | 
				
			||||||
				if kcp.state == StateActive ||
 | 
					 | 
				
			||||||
					kcp.state == StateReadyToClose ||
 | 
					 | 
				
			||||||
					kcp.state == StatePeerClosed {
 | 
					 | 
				
			||||||
					kcp.SetState(StateTerminating)
 | 
					 | 
				
			||||||
				} else if kcp.state == StateTerminating {
 | 
					 | 
				
			||||||
					kcp.SetState(StateTerminated)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			kcp.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
 | 
					 | 
				
			||||||
			kcp.receivingWorker.ProcessSendingNext(seg.SendingNext)
 | 
					 | 
				
			||||||
		default:
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return 0
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// flush pending data
 | 
					 | 
				
			||||||
func (kcp *KCP) flush() {
 | 
					 | 
				
			||||||
	if kcp.state == StateTerminated {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if kcp.state == StateActive && kcp.current-kcp.lastPayloadTime >= 30000 {
 | 
					 | 
				
			||||||
		kcp.OnClose()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if kcp.state == StateTerminating {
 | 
					 | 
				
			||||||
		kcp.output.Write(&CmdOnlySegment{
 | 
					 | 
				
			||||||
			Conv: kcp.conv,
 | 
					 | 
				
			||||||
			Cmd:  SegmentCommandTerminated,
 | 
					 | 
				
			||||||
		})
 | 
					 | 
				
			||||||
		kcp.output.Flush()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		if kcp.current-kcp.stateBeginTime > 8000 {
 | 
					 | 
				
			||||||
			kcp.SetState(StateTerminated)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if kcp.state == StateReadyToClose && kcp.current-kcp.stateBeginTime > 15000 {
 | 
					 | 
				
			||||||
		kcp.SetState(StateTerminating)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// flush acknowledges
 | 
					 | 
				
			||||||
	kcp.receivingWorker.Flush()
 | 
					 | 
				
			||||||
	kcp.sendingWorker.Flush()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if kcp.sendingWorker.PingNecessary() || kcp.receivingWorker.PingNecessary() || kcp.current-kcp.lastPingTime >= 5000 {
 | 
					 | 
				
			||||||
		seg := NewCmdOnlySegment()
 | 
					 | 
				
			||||||
		seg.Conv = kcp.conv
 | 
					 | 
				
			||||||
		seg.Cmd = SegmentCommandPing
 | 
					 | 
				
			||||||
		seg.ReceivinNext = kcp.receivingWorker.nextNumber
 | 
					 | 
				
			||||||
		seg.SendingNext = kcp.sendingWorker.firstUnacknowledged
 | 
					 | 
				
			||||||
		if kcp.state == StateReadyToClose {
 | 
					 | 
				
			||||||
			seg.Opt = SegmentOptionClose
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		kcp.output.Write(seg)
 | 
					 | 
				
			||||||
		kcp.lastPingTime = kcp.current
 | 
					 | 
				
			||||||
		kcp.sendingUpdated = false
 | 
					 | 
				
			||||||
		seg.Release()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// flash remain segments
 | 
					 | 
				
			||||||
	kcp.output.Flush()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
 | 
					 | 
				
			||||||
// ikcp_check when to call it again (without ikcp_input/_send calling).
 | 
					 | 
				
			||||||
// 'current' - current timestamp in millisec.
 | 
					 | 
				
			||||||
func (kcp *KCP) Update(current uint32) {
 | 
					 | 
				
			||||||
	kcp.current = current
 | 
					 | 
				
			||||||
	kcp.flush()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -81,7 +81,7 @@ func (this *Listener) OnReceive(payload *alloc.Buffer, src v2net.Destination) {
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		this.sessions[srcAddrStr] = conn
 | 
							this.sessions[srcAddrStr] = conn
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	conn.kcpInput(payload.Value)
 | 
						conn.Input(payload.Value)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *Listener) Remove(dest string) {
 | 
					func (this *Listener) Remove(dest string) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -216,7 +216,7 @@ func (this *AckList) Flush(current uint32, rto uint32) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type ReceivingWorker struct {
 | 
					type ReceivingWorker struct {
 | 
				
			||||||
	kcp         *KCP
 | 
						conn        *Connection
 | 
				
			||||||
	queue       *ReceivingQueue
 | 
						queue       *ReceivingQueue
 | 
				
			||||||
	window      *ReceivingWindow
 | 
						window      *ReceivingWindow
 | 
				
			||||||
	windowMutex sync.Mutex
 | 
						windowMutex sync.Mutex
 | 
				
			||||||
| 
						 | 
					@ -226,10 +226,10 @@ type ReceivingWorker struct {
 | 
				
			||||||
	windowSize  uint32
 | 
						windowSize  uint32
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewReceivingWorker(kcp *KCP) *ReceivingWorker {
 | 
					func NewReceivingWorker(kcp *Connection) *ReceivingWorker {
 | 
				
			||||||
	windowSize := effectiveConfig.GetReceivingWindowSize()
 | 
						windowSize := effectiveConfig.GetReceivingWindowSize()
 | 
				
			||||||
	worker := &ReceivingWorker{
 | 
						worker := &ReceivingWorker{
 | 
				
			||||||
		kcp:        kcp,
 | 
							conn:       kcp,
 | 
				
			||||||
		queue:      NewReceivingQueue(effectiveConfig.GetReceivingQueueSize()),
 | 
							queue:      NewReceivingQueue(effectiveConfig.GetReceivingQueueSize()),
 | 
				
			||||||
		window:     NewReceivingWindow(windowSize),
 | 
							window:     NewReceivingWindow(windowSize),
 | 
				
			||||||
		windowSize: windowSize,
 | 
							windowSize: windowSize,
 | 
				
			||||||
| 
						 | 
					@ -284,19 +284,19 @@ func (this *ReceivingWorker) SetReadDeadline(t time.Time) {
 | 
				
			||||||
	this.queue.SetReadDeadline(t)
 | 
						this.queue.SetReadDeadline(t)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *ReceivingWorker) Flush() {
 | 
					func (this *ReceivingWorker) Flush(current uint32) {
 | 
				
			||||||
	this.acklist.Flush(this.kcp.current, this.kcp.rx_rto)
 | 
						this.acklist.Flush(current, this.conn.rx_rto)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *ReceivingWorker) Write(seg Segment) {
 | 
					func (this *ReceivingWorker) Write(seg Segment) {
 | 
				
			||||||
	ackSeg := seg.(*AckSegment)
 | 
						ackSeg := seg.(*AckSegment)
 | 
				
			||||||
	ackSeg.Conv = this.kcp.conv
 | 
						ackSeg.Conv = this.conn.conv
 | 
				
			||||||
	ackSeg.ReceivingNext = this.nextNumber
 | 
						ackSeg.ReceivingNext = this.nextNumber
 | 
				
			||||||
	ackSeg.ReceivingWindow = this.nextNumber + this.windowSize
 | 
						ackSeg.ReceivingWindow = this.nextNumber + this.windowSize
 | 
				
			||||||
	if this.kcp.state == StateReadyToClose {
 | 
						if this.conn.state == StateReadyToClose {
 | 
				
			||||||
		ackSeg.Opt = SegmentOptionClose
 | 
							ackSeg.Opt = SegmentOptionClose
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	this.kcp.output.Write(ackSeg)
 | 
						this.conn.output.Write(ackSeg)
 | 
				
			||||||
	this.updated = false
 | 
						this.updated = false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -149,6 +149,7 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxI
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if needsend {
 | 
							if needsend {
 | 
				
			||||||
 | 
								segment.Timestamp = current
 | 
				
			||||||
			this.writer.Write(segment)
 | 
								this.writer.Write(segment)
 | 
				
			||||||
			inFlightSize++
 | 
								inFlightSize++
 | 
				
			||||||
			if inFlightSize >= maxInFlightSize {
 | 
								if inFlightSize >= maxInFlightSize {
 | 
				
			||||||
| 
						 | 
					@ -227,7 +228,7 @@ func (this *SendingQueue) Len() uint32 {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type SendingWorker struct {
 | 
					type SendingWorker struct {
 | 
				
			||||||
	sync.Mutex
 | 
						sync.Mutex
 | 
				
			||||||
	kcp                 *KCP
 | 
						conn                *Connection
 | 
				
			||||||
	window              *SendingWindow
 | 
						window              *SendingWindow
 | 
				
			||||||
	queue               *SendingQueue
 | 
						queue               *SendingQueue
 | 
				
			||||||
	firstUnacknowledged uint32
 | 
						firstUnacknowledged uint32
 | 
				
			||||||
| 
						 | 
					@ -238,9 +239,9 @@ type SendingWorker struct {
 | 
				
			||||||
	updated             bool
 | 
						updated             bool
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewSendingWorker(kcp *KCP) *SendingWorker {
 | 
					func NewSendingWorker(kcp *Connection) *SendingWorker {
 | 
				
			||||||
	worker := &SendingWorker{
 | 
						worker := &SendingWorker{
 | 
				
			||||||
		kcp:              kcp,
 | 
							conn:             kcp,
 | 
				
			||||||
		queue:            NewSendingQueue(effectiveConfig.GetSendingQueueSize()),
 | 
							queue:            NewSendingQueue(effectiveConfig.GetSendingQueueSize()),
 | 
				
			||||||
		fastResend:       2,
 | 
							fastResend:       2,
 | 
				
			||||||
		remoteNextNumber: 32,
 | 
							remoteNextNumber: 32,
 | 
				
			||||||
| 
						 | 
					@ -282,7 +283,7 @@ func (this *SendingWorker) ProcessAck(number uint32) {
 | 
				
			||||||
	this.FindFirstUnacknowledged()
 | 
						this.FindFirstUnacknowledged()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *SendingWorker) ProcessSegment(seg *AckSegment) {
 | 
					func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) {
 | 
				
			||||||
	if this.remoteNextNumber < seg.ReceivingWindow {
 | 
						if this.remoteNextNumber < seg.ReceivingWindow {
 | 
				
			||||||
		this.remoteNextNumber = seg.ReceivingWindow
 | 
							this.remoteNextNumber = seg.ReceivingWindow
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -291,8 +292,8 @@ func (this *SendingWorker) ProcessSegment(seg *AckSegment) {
 | 
				
			||||||
	for i := 0; i < int(seg.Count); i++ {
 | 
						for i := 0; i < int(seg.Count); i++ {
 | 
				
			||||||
		timestamp := seg.TimestampList[i]
 | 
							timestamp := seg.TimestampList[i]
 | 
				
			||||||
		number := seg.NumberList[i]
 | 
							number := seg.NumberList[i]
 | 
				
			||||||
		if this.kcp.current-timestamp < 10000 {
 | 
							if current-timestamp < 10000 {
 | 
				
			||||||
			this.kcp.update_ack(int32(this.kcp.current - timestamp))
 | 
								this.conn.update_ack(int32(current - timestamp))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		this.ProcessAck(number)
 | 
							this.ProcessAck(number)
 | 
				
			||||||
		if maxack < number {
 | 
							if maxack < number {
 | 
				
			||||||
| 
						 | 
					@ -308,8 +309,8 @@ func (this *SendingWorker) Push(b []byte) int {
 | 
				
			||||||
	nBytes := 0
 | 
						nBytes := 0
 | 
				
			||||||
	for len(b) > 0 && !this.queue.IsFull() {
 | 
						for len(b) > 0 && !this.queue.IsFull() {
 | 
				
			||||||
		var size int
 | 
							var size int
 | 
				
			||||||
		if len(b) > int(this.kcp.mss) {
 | 
							if len(b) > int(this.conn.mss) {
 | 
				
			||||||
			size = int(this.kcp.mss)
 | 
								size = int(this.conn.mss)
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			size = len(b)
 | 
								size = len(b)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					@ -327,15 +328,14 @@ func (this *SendingWorker) Push(b []byte) int {
 | 
				
			||||||
func (this *SendingWorker) Write(seg Segment) {
 | 
					func (this *SendingWorker) Write(seg Segment) {
 | 
				
			||||||
	dataSeg := seg.(*DataSegment)
 | 
						dataSeg := seg.(*DataSegment)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dataSeg.Conv = this.kcp.conv
 | 
						dataSeg.Conv = this.conn.conv
 | 
				
			||||||
	dataSeg.Timestamp = this.kcp.current
 | 
					 | 
				
			||||||
	dataSeg.SendingNext = this.firstUnacknowledged
 | 
						dataSeg.SendingNext = this.firstUnacknowledged
 | 
				
			||||||
	dataSeg.Opt = 0
 | 
						dataSeg.Opt = 0
 | 
				
			||||||
	if this.kcp.state == StateReadyToClose {
 | 
						if this.conn.state == StateReadyToClose {
 | 
				
			||||||
		dataSeg.Opt = SegmentOptionClose
 | 
							dataSeg.Opt = SegmentOptionClose
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	this.kcp.output.Write(dataSeg)
 | 
						this.conn.output.Write(dataSeg)
 | 
				
			||||||
	this.updated = false
 | 
						this.updated = false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -344,7 +344,7 @@ func (this *SendingWorker) PingNecessary() bool {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *SendingWorker) OnPacketLoss(lossRate uint32) {
 | 
					func (this *SendingWorker) OnPacketLoss(lossRate uint32) {
 | 
				
			||||||
	if !effectiveConfig.Congestion || this.kcp.rx_srtt == 0 {
 | 
						if !effectiveConfig.Congestion || this.conn.rx_srtt == 0 {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -361,7 +361,7 @@ func (this *SendingWorker) OnPacketLoss(lossRate uint32) {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *SendingWorker) Flush() {
 | 
					func (this *SendingWorker) Flush(current uint32) {
 | 
				
			||||||
	this.Lock()
 | 
						this.Lock()
 | 
				
			||||||
	defer this.Unlock()
 | 
						defer this.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -376,14 +376,14 @@ func (this *SendingWorker) Flush() {
 | 
				
			||||||
	for !this.queue.IsEmpty() && !this.window.IsFull() {
 | 
						for !this.queue.IsEmpty() && !this.window.IsFull() {
 | 
				
			||||||
		seg := this.queue.Pop()
 | 
							seg := this.queue.Pop()
 | 
				
			||||||
		seg.Number = this.nextNumber
 | 
							seg.Number = this.nextNumber
 | 
				
			||||||
		seg.timeout = this.kcp.current
 | 
							seg.timeout = current
 | 
				
			||||||
		seg.ackSkipped = 0
 | 
							seg.ackSkipped = 0
 | 
				
			||||||
		seg.transmit = 0
 | 
							seg.transmit = 0
 | 
				
			||||||
		this.window.Push(seg)
 | 
							this.window.Push(seg)
 | 
				
			||||||
		this.nextNumber++
 | 
							this.nextNumber++
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	this.window.Flush(this.kcp.current, this.kcp.fastresend, this.kcp.rx_rto, cwnd)
 | 
						this.window.Flush(current, this.conn.fastresend, this.conn.rx_rto, cwnd)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *SendingWorker) CloseWrite() {
 | 
					func (this *SendingWorker) CloseWrite() {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue