diff --git a/lib/common/const.go b/lib/common/const.go index f57ce4f..2fd5bb6 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -49,5 +49,6 @@ const ( MUX_PING_RETURN MUX_PING int32 = -1 MAXIMUM_SEGMENT_SIZE = PoolSizeWindow - MAXIMUM_WINDOW_SIZE = 1<<31 - 1 + MAXIMUM_WINDOW_SIZE = 1 << 25 // 1<<31-1 TCP slide window size is very large, + // we use 32M, reduce memory usage ) diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 7bb88ae..c94ab4d 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -210,7 +210,7 @@ func (Self *ReceiveWindow) calcSize() { if Self.count == 0 { //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) n := uint32(2 * math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * - Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size())) + Self.mux.bw.Get() / float64(Self.mux.connMap.Size())) if n < 8192 { n = 8192 } @@ -279,6 +279,9 @@ copyData: // on the first Read method invoked, Self.off and Self.element.l // both zero value common.ListElementPool.Put(Self.element) + if Self.closeOp { + return 0, io.EOF + } Self.element, err = Self.bufQueue.Pop() // if the queue is empty, Pop method will wait until one element push // into the queue successful, or timeout. @@ -343,6 +346,26 @@ func (Self *ReceiveWindow) Stop() { func (Self *ReceiveWindow) CloseWindow() { Self.window.CloseWindow() Self.Stop() + Self.release() +} + +func (Self *ReceiveWindow) release() { + //if Self.element != nil { + // if Self.element.Buf != nil { + // common.WindowBuff.Put(Self.element.Buf) + // } + // common.ListElementPool.Put(Self.element) + //} + for { + Self.element = Self.bufQueue.TryPop() + if Self.element == nil { + return + } + if Self.element.Buf != nil { + common.WindowBuff.Put(Self.element.Buf) + } + common.ListElementPool.Put(Self.element) + } // release resource } type SendWindow struct { diff --git a/lib/mux/map.go b/lib/mux/map.go index 8f07dee..86d09b5 100644 --- a/lib/mux/map.go +++ b/lib/mux/map.go @@ -2,32 +2,35 @@ package mux import ( "sync" - "time" ) type connMap struct { connMap map[int32]*conn - closeCh chan struct{} + //closeCh chan struct{} sync.RWMutex } func NewConnMap() *connMap { connMap := &connMap{ connMap: make(map[int32]*conn), - closeCh: make(chan struct{}), + //closeCh: make(chan struct{}), } - go connMap.clean() + //go connMap.clean() return connMap } func (s *connMap) Size() (n int) { - return len(s.connMap) + s.Lock() + n = len(s.connMap) + s.Unlock() + return } func (s *connMap) Get(id int32) (*conn, bool) { s.Lock() - defer s.Unlock() - if v, ok := s.connMap[id]; ok && v != nil { + v, ok := s.connMap[id] + s.Unlock() + if ok && v != nil { return v, true } return nil, false @@ -35,40 +38,38 @@ func (s *connMap) Get(id int32) (*conn, bool) { func (s *connMap) Set(id int32, v *conn) { s.Lock() - defer s.Unlock() s.connMap[id] = v + s.Unlock() } func (s *connMap) Close() { - s.Lock() - defer s.Unlock() + //s.closeCh <- struct{}{} // stop the clean goroutine first for _, v := range s.connMap { - v.isClose = true + v.Close() // close all the connections in the mux } - s.closeCh <- struct{}{} } func (s *connMap) Delete(id int32) { s.Lock() - defer s.Unlock() delete(s.connMap, id) + s.Unlock() } -func (s *connMap) clean() { - ticker := time.NewTimer(time.Minute * 1) - for { - select { - case <-ticker.C: - s.Lock() - for _, v := range s.connMap { - if v.isClose { - delete(s.connMap, v.connId) - } - } - s.Unlock() - case <-s.closeCh: - ticker.Stop() - return - } - } -} +//func (s *connMap) clean() { +// ticker := time.NewTimer(time.Minute * 1) +// for { +// select { +// case <-ticker.C: +// s.Lock() +// for _, v := range s.connMap { +// if v.isClose { +// delete(s.connMap, v.connId) +// } +// } +// s.Unlock() +// case <-s.closeCh: +// ticker.Stop() +// return +// } +// } +//} diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 3872f7e..8c4febb 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -122,6 +122,9 @@ func (s *Mux) packBuf() { } buffer.Reset() pack := s.writeQueue.Pop() + if s.IsClose { + break + } //buffer := common.BuffPool.Get() err := pack.Pack(buffer) common.MuxPack.Put(pack) @@ -218,7 +221,9 @@ func (s *Mux) pingReturn() { // convert float64 to bits, store it atomic } //logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency))) - common.WindowBuff.Put(data) + if cap(data) > 0 { + common.WindowBuff.Put(data) + } } }() } @@ -227,7 +232,13 @@ func (s *Mux) readSession() { go func() { var connection *conn for { + if s.IsClose { + break + } connection = s.newConnQueue.Pop() + if s.IsClose { + break // make sure that is closed + } s.connMap.Set(connection.connId, connection) //it has been set before send ok s.newConnCh <- connection s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) @@ -287,9 +298,9 @@ func (s *Mux) readSession() { connection.sendWindow.SetSize(pack.Window, pack.ReadLength) continue case common.MUX_CONN_CLOSE: //close the connection - s.connMap.Delete(pack.Id) - //go func(connection *conn) { connection.closeFlag = true + //s.connMap.Delete(pack.Id) + //go func(connection *conn) { connection.receiveWindow.Stop() // close signal to receive window //}(connection) continue @@ -322,17 +333,42 @@ func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) { return } -func (s *Mux) Close() error { +func (s *Mux) Close() (err error) { logs.Warn("close mux") if s.IsClose { return errors.New("the mux has closed") } s.IsClose = true s.connMap.Close() + s.connMap = nil //s.bufQueue.Stop() s.closeChan <- struct{}{} close(s.newConnCh) - return s.conn.Close() + err = s.conn.Close() + s.release() + return +} + +func (s *Mux) release() { + for { + pack := s.writeQueue.TryPop() + if pack == nil { + break + } + if pack.BasePackager.Content != nil { + common.WindowBuff.Put(pack.BasePackager.Content) + } + common.MuxPack.Put(pack) + } + for { + connection := s.newConnQueue.TryPop() + if connection == nil { + break + } + connection = nil + } + s.writeQueue.Stop() + s.newConnQueue.Stop() } //get new connId as unique flag @@ -352,7 +388,7 @@ type bandwidth struct { readStart time.Time lastReadStart time.Time bufLength uint16 - readBandwidth float64 + readBandwidth uint64 // store in bits, but it's float64 } func (Self *bandwidth) StartRead() { @@ -371,16 +407,17 @@ func (Self *bandwidth) SetCopySize(n uint16) { func (Self *bandwidth) calcBandWidth() { t := Self.readStart.Sub(Self.lastReadStart) - Self.readBandwidth = float64(Self.bufLength) / t.Seconds() + atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds())) Self.bufLength = 0 } func (Self *bandwidth) Get() (bw float64) { // The zero value, 0 for numeric types - if Self.readBandwidth <= 0 { - Self.readBandwidth = 100 + bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth)) + if bw <= 0 { + bw = 100 } - return Self.readBandwidth + return } const counterBits = 4 diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 2fe8a44..2288e40 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -59,7 +59,7 @@ const maxStarving uint8 = 8 func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { var iter bool for { - packager = Self.pop() + packager = Self.TryPop() if packager != nil { return } @@ -75,20 +75,20 @@ func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { } Self.cond.L.Lock() defer Self.cond.L.Unlock() - for packager = Self.pop(); packager == nil; { + for packager = Self.TryPop(); packager == nil; { if Self.stop { return } //logs.Warn("queue into wait") Self.cond.Wait() // wait for it with no more iter - packager = Self.pop() + packager = Self.TryPop() //logs.Warn("queue wait finish", packager) } return } -func (Self *PriorityQueue) pop() (packager *common.MuxPackager) { +func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) { ptr, ok := Self.highestChain.popTail() if ok { packager = (*common.MuxPackager)(ptr) @@ -150,7 +150,7 @@ func (Self *ConnQueue) Push(connection *conn) { func (Self *ConnQueue) Pop() (connection *conn) { var iter bool for { - connection = Self.pop() + connection = Self.TryPop() if connection != nil { return } @@ -166,20 +166,20 @@ func (Self *ConnQueue) Pop() (connection *conn) { } Self.cond.L.Lock() defer Self.cond.L.Unlock() - for connection = Self.pop(); connection == nil; { + for connection = Self.TryPop(); connection == nil; { if Self.stop { return } //logs.Warn("queue into wait") Self.cond.Wait() // wait for it with no more iter - connection = Self.pop() + connection = Self.TryPop() //logs.Warn("queue wait finish", packager) } return } -func (Self *ConnQueue) pop() (connection *conn) { +func (Self *ConnQueue) TryPop() (connection *conn) { ptr, ok := Self.chain.popTail() if ok { connection = (*conn)(ptr) @@ -261,18 +261,26 @@ startPop: } // length is not zero, so try to pop for { - ptr, ok := Self.chain.popTail() - if ok { - //logs.Warn("window pop before", Self.Len()) - element = (*common.ListElement)(ptr) - atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<