From aaf79b21f03f04309b1bc10580e4943f2b6b32a0 Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Tue, 19 Nov 2019 23:43:52 +0800 Subject: [PATCH] fine mux connection ping calculation, increase connection waiting time --- README.md | 13 +++++ lib/mux/conn.go | 6 ++- lib/mux/mux.go | 130 +++++++++++++++++++++++++++++++++++++++-------- lib/mux/queue.go | 2 +- 4 files changed, 127 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 4a9ddd4..8988d5a 100644 --- a/README.md +++ b/README.md @@ -824,6 +824,19 @@ nps支持对客户端的隧道数量进行限制,该功能默认是关闭的 nps主要通信默认基于多路复用,无需开启。 +多路复用基于TCP滑动窗口原理设计,动态计算延迟以及带宽来算出应该往网络管道中打入的流量。 +由于主要通信大多采用TCP协议,并无法探测其实时丢包情况,对于产生丢包重传的情况,采用较大的宽容度, +5分钟的等待时间,超时将会关闭当前隧道连接并重新建立,这将会抛弃当前所有的连接。 +在Linux上,可以通过调节内核参数来适应不同应用场景。 + +对于需求大带宽又有一定的丢包的场景,可以保持默认参数不变,尽可能少抛弃连接 +高并发下可根据[Linux系统限制](#Linux系统限制) 调整 + +对于延迟敏感而又有一定丢包的场景,可以适当调整TCP重传次数 +`tcp_syn_retries`, `tcp_retries1`, `tcp_retries2` +高并发同上 +nps会在系统主动关闭连接的时候拿到报错,进而重新建立隧道连接 + ### 环境变量渲染 npc支持环境变量渲染以适应在某些特殊场景下的要求。 diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 23bc5d5..7bb88ae 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -3,6 +3,7 @@ package mux import ( "errors" "io" + "math" "net" "runtime" "sync" @@ -208,7 +209,8 @@ func (Self *ReceiveWindow) calcSize() { // calculating maximum receive window size if Self.count == 0 { //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) - n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size())) + n := uint32(2 * math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * + Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size())) if n < 8192 { n = 8192 } @@ -471,7 +473,7 @@ start: func (Self *SendWindow) waitReceiveWindow() (err error) { t := Self.timeout.Sub(time.Now()) if t < 0 { - t = time.Minute + t = time.Minute * 5 } timer := time.NewTimer(t) defer timer.Stop() diff --git a/lib/mux/mux.go b/lib/mux/mux.go index b64243f..3872f7e 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -13,21 +13,22 @@ import ( ) type Mux struct { + latency uint64 // we store latency in bits, but it's float64 net.Listener - conn net.Conn - connMap *connMap - newConnCh chan *conn - id int32 - closeChan chan struct{} - IsClose bool - pingOk int - latency float64 - bw *bandwidth - pingCh chan []byte - pingCheck bool - connType string - writeQueue PriorityQueue - newConnQueue ConnQueue + conn net.Conn + connMap *connMap + newConnCh chan *conn + id int32 + closeChan chan struct{} + IsClose bool + pingOk int + counter *latencyCounter + bw *bandwidth + pingCh chan []byte + pingCheckTime uint32 + connType string + writeQueue PriorityQueue + newConnQueue ConnQueue } func NewMux(c net.Conn, connType string) *Mux { @@ -43,6 +44,7 @@ func NewMux(c net.Conn, connType string) *Mux { IsClose: false, connType: connType, pingCh: make(chan []byte), + counter: newLatencyCounter(), } m.writeQueue.New() m.newConnQueue.New() @@ -175,16 +177,16 @@ func (s *Mux) ping() { select { case <-ticker.C: } - if s.pingCheck { + if atomic.LoadUint32(&s.pingCheckTime) >= 60 { logs.Error("mux: ping time out") s.Close() - // more than 5 seconds not receive the ping return package, + // more than 5 minutes not receive the ping return package, // mux conn is damaged, maybe a packet drop, close it break } now, _ := time.Now().UTC().MarshalText() s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) - s.pingCheck = true + atomic.AddUint32(&s.pingCheckTime, 1) if s.pingOk > 10 && s.connType == "kcp" { logs.Error("mux: kcp ping err") s.Close() @@ -205,16 +207,17 @@ func (s *Mux) pingReturn() { } select { case data = <-s.pingCh: - s.pingCheck = false + atomic.StoreUint32(&s.pingCheckTime, 0) case <-s.closeChan: break } _ = now.UnmarshalText(data) latency := time.Now().UTC().Sub(now).Seconds() / 2 - if latency < 0.5 && latency > 0 { - s.latency = latency + if latency > 0 { + atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency))) + // convert float64 to bits, store it atomic } - //logs.Warn("latency", s.latency) + //logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency))) common.WindowBuff.Put(data) } }() @@ -379,3 +382,88 @@ func (Self *bandwidth) Get() (bw float64) { } return Self.readBandwidth } + +const counterBits = 4 +const counterMask = 1<> counterBits) & counterMask) + // we set head is 4 bits + min = uint8(idxs & counterMask) + return +} + +func (Self *latencyCounter) pack(head, min uint8) uint8 { + return uint8(head< value { + min = head + } + head++ + Self.headMin = Self.pack(head, min) +} + +func (Self *latencyCounter) minimal() (min uint8) { + var val float64 + var i uint8 + for i = 0; i < counterMask; i++ { + if Self.buf[i] > 0 { + if val > Self.buf[i] { + val = Self.buf[i] + min = i + } + } + } + return +} + +func (Self *latencyCounter) Latency(value float64) (latency float64) { + Self.add(value) + _, min := Self.unpack(Self.headMin) + latency = Self.buf[min] * Self.countSuccess() + return +} + +const lossRatio = 1.6 + +func (Self *latencyCounter) countSuccess() (successRate float64) { + var success, loss, i uint8 + _, min := Self.unpack(Self.headMin) + for i = 0; i < counterMask; i++ { + if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 { + loss++ + } + if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 { + success++ + } + } + // counting all the data in the ring buf, except zero + successRate = float64(success) / float64(loss+success) + return +} diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 0bfea18..2fe8a44 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -288,7 +288,7 @@ func (Self *ReceiveWindowQueue) waitPush() (err error) { //defer logs.Warn("wait push finish") t := Self.timeout.Sub(time.Now()) if t <= 0 { - t = time.Second * 10 + t = time.Minute * 5 } timer := time.NewTimer(t) defer timer.Stop()