diff --git a/lib/mux/conn.go b/lib/mux/conn.go index be73e5c..81f876f 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -187,6 +187,7 @@ type window struct { windowBuff []byte off uint16 readOp chan struct{} + readWait bool WindowFull bool usableReceiveWindow chan uint16 WriteWg sync.WaitGroup @@ -274,7 +275,11 @@ func (Self *window) Write(p []byte) (n int, err error) { length := Self.len() // length before grow Self.grow(len(p)) // grow for copy n = copy(Self.windowBuff[length:], p) // must copy data before allow Read - if length == 0 { + if Self.readWait { + // if there condition is length == 0 and + // Read method just take away all the windowBuff, + // this method will block until windowBuff is empty again + // allow continue read defer Self.allowRead() } @@ -287,6 +292,9 @@ func (Self *window) allowRead() (closed bool) { close(Self.readOp) return true } + Self.mutex.Lock() + Self.readWait = false + Self.mutex.Unlock() select { case <-Self.closeOpCh: close(Self.readOp) @@ -303,10 +311,11 @@ func (Self *window) Read(p []byte) (n int, err error) { Self.mutex.Lock() length := Self.len() // protect the length data, it invokes // before Write lock and after Write unlock - Self.mutex.Unlock() if length == 0 { // window is empty, waiting for Write method send a success readOp signal // or get timeout or close + Self.readWait = true + Self.mutex.Unlock() ticker := time.NewTicker(2 * time.Minute) defer ticker.Stop() select { @@ -322,6 +331,8 @@ func (Self *window) Read(p []byte) (n int, err error) { close(Self.readOp) return 0, io.EOF // receive close signal, returns eof } + } else { + Self.mutex.Unlock() } Self.mutex.Lock() n = copy(p, Self.windowBuff[Self.off:]) diff --git a/lib/mux/mux.go b/lib/mux/mux.go index dd3dee2..f4bc6fa 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -22,21 +22,23 @@ type Mux struct { IsClose bool pingOk int connType string - writeQueue chan *bytes.Buffer + writeQueue Queue + bufCh chan *bytes.Buffer sync.Mutex } func NewMux(c net.Conn, connType string) *Mux { m := &Mux{ - conn: c, - connMap: NewConnMap(), - id: 0, - closeChan: make(chan struct{}), - newConnCh: make(chan *conn), - IsClose: false, - connType: connType, - writeQueue: make(chan *bytes.Buffer, 20), + conn: c, + connMap: NewConnMap(), + id: 0, + closeChan: make(chan struct{}), + newConnCh: make(chan *conn), + IsClose: false, + connType: connType, + bufCh: make(chan *bytes.Buffer), } + m.writeQueue.New() //read session by flag go m.readSession() //ping @@ -88,26 +90,27 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { //logs.Warn("send info content is nil") } } - buf := common.BuffPool.Get() + //buf := common.BuffPool.Get() //defer pool.BuffPool.Put(buf) pack := common.MuxPack.Get() - defer common.MuxPack.Put(pack) + err = pack.NewPac(flag, id, data) if err != nil { //logs.Warn("new pack err", err) - common.BuffPool.Put(buf) + common.MuxPack.Put(pack) return } - err = pack.Pack(buf) - if err != nil { - //logs.Warn("pack err", err) - common.BuffPool.Put(buf) - return - } - if pack.Flag == common.MUX_NEW_CONN { - //logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id) - } - s.writeQueue <- buf + s.writeQueue.Push(pack) + //err = pack.Pack(buf) + //if err != nil { + // //logs.Warn("pack err", err) + // common.BuffPool.Put(buf) + // return + //} + //if pack.Flag == common.MUX_NEW_CONN { + // //logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id) + //} + //s.writeQueue <- buf //_, err = buf.WriteTo(s.conn) //if err != nil { // s.Close() @@ -118,20 +121,47 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { } func (s *Mux) writeSession() { - go func() { - for { - buf := <-s.writeQueue - l := buf.Len() - n, err := buf.WriteTo(s.conn) - common.BuffPool.Put(buf) + go s.packBuf() + go s.writeBuf() + <-s.closeChan +} + +func (s *Mux) packBuf() { + for { + pack := s.writeQueue.Pop() + buffer := common.BuffPool.Get() + err := pack.Pack(buffer) + common.MuxPack.Put(pack) + if err != nil { + logs.Warn("pack err", err) + common.BuffPool.Put(buffer) + break + } + select { + case s.bufCh <- buffer: + case <-s.closeChan: + break + } + + } +} + +func (s *Mux) writeBuf() { + for { + select { + case buffer := <-s.bufCh: + l := buffer.Len() + n, err := buffer.WriteTo(s.conn) + common.BuffPool.Put(buffer) if err != nil || int(n) != l { - //logs.Warn("close from write session fail ", err, n, l) + logs.Warn("close from write session fail ", err, n, l) s.Close() break } + case <-s.closeChan: + break } - }() - <-s.closeChan + } } func (s *Mux) ping() { @@ -273,14 +303,11 @@ func (s *Mux) Close() error { } s.IsClose = true s.connMap.Close() - select { - case s.closeChan <- struct{}{}: - } - select { - case s.closeChan <- struct{}{}: - } s.closeChan <- struct{}{} - close(s.writeQueue) + s.closeChan <- struct{}{} + s.closeChan <- struct{}{} + s.closeChan <- struct{}{} + s.closeChan <- struct{}{} close(s.newConnCh) return s.conn.Close() } diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 4388fb6..081b2c9 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -1,82 +1,95 @@ package mux import ( - "errors" + "container/list" "github.com/cnlh/nps/lib/common" "sync" ) -type Element *bufNode - -type bufNode struct { - val []byte //buf value - l int //length +type Queue struct { + list *list.List + readOp chan struct{} + cleanOp chan struct{} + popWait bool + mutex sync.Mutex } -func NewBufNode(buf []byte, l int) *bufNode { - return &bufNode{ - val: buf, - l: l, +func (Self *Queue) New() { + Self.list = list.New() + Self.readOp = make(chan struct{}) + Self.cleanOp = make(chan struct{}, 2) +} + +func (Self *Queue) Push(packager *common.MuxPackager) { + Self.mutex.Lock() + if Self.popWait { + defer Self.allowPop() } -} - -type Queue interface { - Push(e Element) //向队列中添加元素 - Pop() Element //移除队列中最前面的元素 - Clear() bool //清空队列 - Size() int //获取队列的元素个数 - IsEmpty() bool //判断队列是否是空 -} - -type sliceEntry struct { - element []Element - sync.Mutex -} - -func NewQueue() *sliceEntry { - return &sliceEntry{} -} - -//向队列中添加元素 -func (entry *sliceEntry) Push(e Element) { - entry.Lock() - defer entry.Unlock() - entry.element = append(entry.element, e) -} - -//移除队列中最前面的额元素 -func (entry *sliceEntry) Pop() (Element, error) { - if entry.IsEmpty() { - return nil, errors.New("queue is empty!") + if packager.Flag == common.MUX_CONN_CLOSE { + Self.insert(packager) // the close package may need priority, + // prevent wait too long to close + } else { + Self.list.PushBack(packager) } - entry.Lock() - defer entry.Unlock() - firstElement := entry.element[0] - entry.element = entry.element[1:] - return firstElement, nil + Self.mutex.Unlock() + return } -func (entry *sliceEntry) Clear() bool { - entry.Lock() - defer entry.Unlock() - if entry.IsEmpty() { +func (Self *Queue) allowPop() (closed bool) { + Self.mutex.Lock() + Self.popWait = false + Self.mutex.Unlock() + select { + case Self.readOp <- struct{}{}: return false - } - for i := 0; i < entry.Size(); i++ { - common.CopyBuff.Put(entry.element[i].val) - entry.element[i] = nil - } - entry.element = nil - return true -} - -func (entry *sliceEntry) Size() int { - return len(entry.element) -} - -func (entry *sliceEntry) IsEmpty() bool { - if len(entry.element) == 0 { + case <-Self.cleanOp: return true } - return false +} + +func (Self *Queue) insert(packager *common.MuxPackager) { + element := Self.list.Back() + for { + if element == nil { // Queue dose not have any of msg package with this close package id + Self.list.PushFront(packager) // insert close package to first + break + } + if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG && + element.Value.(*common.MuxPackager).Id == packager.Id { + Self.list.InsertAfter(packager, element) // Queue has some msg package + // with this close package id, insert close package after last msg package + break + } + element = element.Prev() + } +} + +func (Self *Queue) Pop() (packager *common.MuxPackager) { + Self.mutex.Lock() + element := Self.list.Front() + if element != nil { + packager = element.Value.(*common.MuxPackager) + Self.list.Remove(element) + Self.mutex.Unlock() + return + } + Self.popWait = true // Queue is empty, notice Push method + Self.mutex.Unlock() + select { + case <-Self.readOp: + return Self.Pop() + case <-Self.cleanOp: + return nil + } +} + +func (Self *Queue) Len() (n int) { + n = Self.list.Len() + return +} + +func (Self *Queue) Clean() { + Self.cleanOp <- struct{}{} + Self.cleanOp <- struct{}{} + close(Self.cleanOp) }