From b3ed822c72c9ec31f873151edd4d16333046651a Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Mon, 7 Oct 2019 23:04:54 +0800 Subject: [PATCH] change slide window design --- lib/common/const.go | 4 +- lib/common/netpackager.go | 37 +- lib/mux/conn.go | 703 ++++++++++++++++++++++---------------- lib/mux/mux.go | 118 ++++--- lib/mux/queue.go | 148 ++++++-- 5 files changed, 623 insertions(+), 387 deletions(-) diff --git a/lib/common/const.go b/lib/common/const.go index d77f16b..95364a2 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -42,9 +42,11 @@ const ( MUX_NEW_CONN_OK MUX_NEW_CONN_Fail MUX_NEW_MSG + MUX_NEW_MSG_PART MUX_MSG_SEND_OK MUX_NEW_CONN MUX_CONN_CLOSE MUX_PING_RETURN - MUX_PING int32 = -1 + MUX_PING int32 = -1 + MAXIMUM_SEGMENT_SIZE = 4096 - 16 - 32 - 32 - 8 ) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 1129940..ec2cb69 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -15,7 +15,7 @@ type NetPackager interface { } type BasePackager struct { - Length uint32 + Length uint16 Content []byte } @@ -101,7 +101,7 @@ func (Self *BasePackager) Unmarshal(content interface{}) (err error) { } func (Self *BasePackager) setLength() { - Self.Length = uint32(len(Self.Content)) + Self.Length = uint16(len(Self.Content)) return } @@ -147,25 +147,32 @@ func (Self *ConnPackager) UnPack(reader io.Reader) (err error) { } type MuxPackager struct { - Flag uint8 - Id int32 - Window uint16 + Flag uint8 + Id int32 + Window uint32 + ReadLength uint32 BasePackager } func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { Self.Flag = flag Self.Id = id - if flag == MUX_NEW_MSG { + if flag == MUX_NEW_MSG || flag == MUX_NEW_MSG_PART || flag == MUX_PING_FLAG { err = Self.BasePackager.NewPac(content...) } if flag == MUX_MSG_SEND_OK { // MUX_MSG_SEND_OK only allows one data switch content[0].(type) { case int: - Self.Window = uint16(content[0].(int)) - case uint16: - Self.Window = content[0].(uint16) + Self.Window = uint32(content[0].(int)) + case uint32: + Self.Window = content[0].(uint32) + } + switch content[1].(type) { + case int: + Self.ReadLength = uint32(content[1].(int)) + case uint32: + Self.ReadLength = content[1].(uint32) } } return @@ -180,11 +187,15 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { if err != nil { return } - if Self.Flag == MUX_NEW_MSG { + if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG { err = Self.BasePackager.Pack(writer) } if Self.Flag == MUX_MSG_SEND_OK { err = binary.Write(writer, binary.LittleEndian, Self.Window) + if err != nil { + return + } + err = binary.Write(writer, binary.LittleEndian, Self.ReadLength) } return } @@ -199,11 +210,15 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { if err != nil { return } - if Self.Flag == MUX_NEW_MSG { + if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG { err = Self.BasePackager.UnPack(reader) } if Self.Flag == MUX_MSG_SEND_OK { err = binary.Read(reader, binary.LittleEndian, &Self.Window) + if err != nil { + return + } + err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength) } return } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index bf9e0d6..f4d5396 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -2,7 +2,9 @@ package mux import ( "errors" + "github.com/astaxie/beego/logs" "io" + "math" "net" "sync" "time" @@ -15,16 +17,11 @@ type conn struct { getStatusCh chan struct{} connStatusOkCh chan struct{} connStatusFailCh chan struct{} - readTimeOut time.Time - writeTimeOut time.Time connId int32 isClose bool closeFlag bool // close conn flag - receiveWindow *window - sendWindow *window - readCh waitingCh - writeCh waitingCh - mux *Mux + receiveWindow *ReceiveWindow + sendWindow *SendWindow once sync.Once } @@ -34,15 +31,12 @@ func NewConn(connId int32, mux *Mux) *conn { connStatusOkCh: make(chan struct{}), connStatusFailCh: make(chan struct{}), connId: connId, - receiveWindow: new(window), - sendWindow: new(window), - mux: mux, + receiveWindow: new(ReceiveWindow), + sendWindow: new(SendWindow), once: sync.Once{}, } - c.receiveWindow.NewReceive() - c.sendWindow.NewSend() - c.readCh.new() - c.writeCh.new() + c.receiveWindow.New(mux) + c.sendWindow.New(mux) return c } @@ -50,39 +44,14 @@ func (s *conn) Read(buf []byte) (n int, err error) { if s.isClose || buf == nil { return 0, errors.New("the conn has closed") } + if len(buf) == 0 { + return 0, nil + } // waiting for takeout from receive window finish or timeout - go s.readWindow(buf, s.readCh.nCh, s.readCh.errCh) - if t := s.readTimeOut.Sub(time.Now()); t > 0 { - timer := time.NewTimer(t) - defer timer.Stop() - select { - case <-timer.C: - return 0, errors.New("read timeout") - case n = <-s.readCh.nCh: - err = <-s.readCh.errCh - } - } else { - n = <-s.readCh.nCh - err = <-s.readCh.errCh - } + n, err = s.receiveWindow.Read(buf, s.connId) return } -func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) { - n, err := s.receiveWindow.Read(buf) - if s.receiveWindow.WindowFull { - if s.receiveWindow.Size() > 0 { - // window.Read may be invoked before window.Write, and WindowFull flag change to true - // so make sure that receiveWindow is free some space - s.receiveWindow.WindowFull = false - s.mux.sendInfo(common.MUX_MSG_SEND_OK, s.connId, s.receiveWindow.Size()) - // acknowledge other side, have empty some receive window space - } - } - nCh <- n - errCh <- err -} - func (s *conn) Write(buf []byte) (n int, err error) { if s.isClose { return 0, errors.New("the conn has closed") @@ -91,45 +60,13 @@ func (s *conn) Write(buf []byte) (n int, err error) { //s.Close() return 0, errors.New("io: write on closed conn") } - s.sendWindow.SetSendBuf(buf) // set the buf to send window - go s.write(s.writeCh.nCh, s.writeCh.errCh) - // waiting for send to other side or timeout - if t := s.writeTimeOut.Sub(time.Now()); t > 0 { - timer := time.NewTimer(t) - defer timer.Stop() - select { - case <-timer.C: - return 0, errors.New("write timeout") - case n = <-s.writeCh.nCh: - err = <-s.writeCh.errCh - } - } else { - n = <-s.writeCh.nCh - err = <-s.writeCh.errCh + if len(buf) == 0 { + return 0, nil } + //logs.Warn("write buf", len(buf)) + n, err = s.sendWindow.WriteFull(buf, s.connId) return } -func (s *conn) write(nCh chan int, errCh chan error) { - var n int - var err error - for { - buf, err := s.sendWindow.WriteTo() - // get the usable window size buf from send window - if buf == nil && err == io.EOF { - // send window is drain, break the loop - err = nil - break - } - if err != nil { - break - } - n += len(buf) - s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf) - // send to other side, not send nil data to other side - } - nCh <- n - errCh <- err -} func (s *conn) Close() (err error) { s.once.Do(s.closeProcess) @@ -138,11 +75,11 @@ func (s *conn) Close() (err error) { func (s *conn) closeProcess() { s.isClose = true - s.mux.connMap.Delete(s.connId) - if !s.mux.IsClose { + s.receiveWindow.mux.connMap.Delete(s.connId) + if !s.receiveWindow.mux.IsClose { // if server or user close the conn while reading, will get a io.EOF // and this Close method will be invoke, send this signal to close other side - s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) + s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) } s.sendWindow.CloseWindow() s.receiveWindow.CloseWindow() @@ -150,276 +87,440 @@ func (s *conn) closeProcess() { } func (s *conn) LocalAddr() net.Addr { - return s.mux.conn.LocalAddr() + return s.receiveWindow.mux.conn.LocalAddr() } func (s *conn) RemoteAddr() net.Addr { - return s.mux.conn.RemoteAddr() + return s.receiveWindow.mux.conn.RemoteAddr() } func (s *conn) SetDeadline(t time.Time) error { - s.readTimeOut = t - s.writeTimeOut = t + _ = s.SetReadDeadline(t) + _ = s.SetWriteDeadline(t) return nil } func (s *conn) SetReadDeadline(t time.Time) error { - s.readTimeOut = t + s.receiveWindow.SetTimeOut(t) return nil } func (s *conn) SetWriteDeadline(t time.Time) error { - s.writeTimeOut = t + s.sendWindow.SetTimeOut(t) return nil } type window struct { - windowBuff []byte - off uint16 - readOp chan struct{} - readWait bool - WindowFull bool - usableReceiveWindow chan uint16 - WriteWg sync.WaitGroup - closeOp bool - closeOpCh chan struct{} - WriteEndOp chan struct{} - mutex sync.Mutex + off uint32 + maxSize uint32 + closeOp bool + closeOpCh chan struct{} + mux *Mux } -func (Self *window) NewReceive() { +func (Self *window) New() { + Self.closeOpCh = make(chan struct{}, 2) +} + +func (Self *window) CloseWindow() { + if !Self.closeOp { + Self.closeOp = true + Self.closeOpCh <- struct{}{} + Self.closeOpCh <- struct{}{} + } +} + +type ReceiveWindow struct { + bufQueue FIFOQueue + element *ListElement + readLength uint32 + readOp chan struct{} + readWait bool + windowFull bool + count int8 + bw *bandwidth + once sync.Once + window +} + +func (Self *ReceiveWindow) New(mux *Mux) { // initial a window for receive - Self.windowBuff = common.WindowBuff.Get() Self.readOp = make(chan struct{}) - Self.WriteEndOp = make(chan struct{}) - Self.closeOpCh = make(chan struct{}, 3) + Self.bufQueue.New() + Self.bw = new(bandwidth) + Self.element = new(ListElement) + Self.maxSize = 8192 + Self.mux = mux + Self.window.New() } -func (Self *window) NewSend() { - // initial a window for send - Self.usableReceiveWindow = make(chan uint16) - Self.closeOpCh = make(chan struct{}, 3) +func (Self *ReceiveWindow) RemainingSize() (n uint32) { + // receive window remaining + if Self.maxSize >= Self.bufQueue.Len() { + n = Self.maxSize - Self.bufQueue.Len() + } + // if maxSize is small than bufQueue length, return 0 + return } -func (Self *window) SetSendBuf(buf []byte) { +func (Self *ReceiveWindow) ReadSize() (n uint32) { + // acknowledge the size already read + Self.bufQueue.mutex.Lock() + n = Self.readLength + Self.readLength = 0 + Self.bufQueue.mutex.Unlock() + Self.count += 1 + return +} + +func (Self *ReceiveWindow) CalcSize() { + // calculating maximum receive window size + if Self.count == 0 { + logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) + n := uint32(2 * Self.mux.latency * Self.bw.Get()) + if n < 8192 { + n = 8192 + } + if n < Self.bufQueue.Len() { + n = Self.bufQueue.Len() + } + // set the minimal size + logs.Warn("n", n) + Self.maxSize = n + Self.count = -5 + } +} + +func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) { + if Self.closeOp { + return errors.New("conn.receiveWindow: write on closed window") + } + element := ListElement{} + err = element.New(buf, l, part) + //logs.Warn("push the buf", len(buf), l, (&element).l) + if err != nil { + return + } + Self.bufQueue.Push(&element) // must push data before allow read + //logs.Warn("read session calc size ", Self.maxSize) + // calculating the receive window size + Self.CalcSize() + logs.Warn("read session calc size finish", Self.maxSize) + if Self.RemainingSize() == 0 { + Self.windowFull = true + //logs.Warn("window full true", Self.windowFull) + } + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize()) + return nil +} + +func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { + if Self.closeOp { + return 0, io.EOF // receive close signal, returns eof + } + pOff := 0 + l := 0 + //logs.Warn("receive window read off, element.l", Self.off, Self.element.l) +copyData: + Self.bw.StartRead() + if Self.off == uint32(Self.element.l) { + // on the first Read method invoked, Self.off and Self.element.l + // both zero value + Self.element, err = Self.bufQueue.Pop() + // if the queue is empty, Pop method will wait until one element push + // into the queue successful, or timeout. + // timer start on timeout parameter is set up , + // reset to 60s if timeout and data still available + Self.off = 0 + if err != nil { + return // queue receive stop or time out, break the loop and return + } + //logs.Warn("pop element", Self.element.l, Self.element.part) + } + l = copy(p[pOff:], Self.element.buf[Self.off:]) + Self.bw.SetCopySize(l) + pOff += l + Self.off += uint32(l) + Self.bufQueue.mutex.Lock() + Self.readLength += uint32(l) + //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len()) + Self.bufQueue.mutex.Unlock() + n += l + l = 0 + Self.bw.EndRead() + Self.sendStatus(id) + if pOff < len(p) && Self.element.part { + // element is a part of the segments, trying to fill up buf p + goto copyData + } + return // buf p is full or all of segments in buf, return +} + +func (Self *ReceiveWindow) sendStatus(id int32) { + if Self.windowFull || Self.bufQueue.Len() == 0 { + // window is full before read or empty now + Self.windowFull = false + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.ReadSize()) + // acknowledge other side, have empty some receive window space + //} + } +} + +func (Self *ReceiveWindow) SetTimeOut(t time.Time) { + // waiting for FIFO queue Pop method + Self.bufQueue.SetTimeOut(t) +} + +func (Self *ReceiveWindow) Stop() { + // queue has no more data to push, so unblock pop method + Self.once.Do(Self.bufQueue.Stop) +} + +func (Self *ReceiveWindow) CloseWindow() { + Self.window.CloseWindow() + Self.Stop() +} + +type SendWindow struct { + buf []byte + sentLength uint32 + setSizeCh chan struct{} + setSizeWait bool + unSlide uint32 + timeout time.Time + window + mutex sync.Mutex +} + +func (Self *SendWindow) New(mux *Mux) { + Self.setSizeCh = make(chan struct{}) + Self.maxSize = 4096 + Self.mux = mux + Self.window.New() +} + +func (Self *SendWindow) SetSendBuf(buf []byte) { // send window buff from conn write method, set it to send window Self.mutex.Lock() - Self.windowBuff = buf + Self.buf = buf Self.off = 0 Self.mutex.Unlock() } -func (Self *window) fullSlide() { - // slide by allocate - newBuf := common.WindowBuff.Get() - Self.liteSlide() - n := copy(newBuf[:Self.len()], Self.windowBuff) - common.WindowBuff.Put(Self.windowBuff) - Self.windowBuff = newBuf[:n] +func (Self *SendWindow) RemainingSize() (n uint32) { + if Self.maxSize >= Self.sentLength { + n = Self.maxSize - Self.sentLength + } return } -func (Self *window) liteSlide() { - // slide by re slice - Self.windowBuff = Self.windowBuff[Self.off:] - Self.off = 0 - return -} - -func (Self *window) Size() (n int) { - // receive Window remaining - n = common.PoolSizeWindow - Self.len() - return -} - -func (Self *window) len() (n int) { - n = len(Self.windowBuff[Self.off:]) - return -} - -func (Self *window) cap() (n int) { - n = cap(Self.windowBuff[Self.off:]) - return -} - -func (Self *window) grow(n int) { - Self.windowBuff = Self.windowBuff[:Self.len()+n] -} - -func (Self *window) Write(p []byte) (n int, err error) { - if Self.closeOp { - return 0, errors.New("conn.receiveWindow: write on closed window") - } - if len(p) > Self.Size() { - return 0, errors.New("conn.receiveWindow: write too large") - } - Self.mutex.Lock() - // slide the offset - if len(p) > Self.cap()-Self.len() { - // not enough space, need to allocate - Self.fullSlide() - } else { - // have enough space, re slice - Self.liteSlide() - } - length := Self.len() // length before grow - Self.grow(len(p)) // grow for copy - n = copy(Self.windowBuff[length:], p) // must copy data before allow Read - if Self.readWait { - // if there condition is length == 0 and - // Read method just take away all the windowBuff, - // this method will block until windowBuff is empty again - - // allow continue read - defer Self.allowRead() - } - Self.mutex.Unlock() - return n, nil -} - -func (Self *window) allowRead() (closed bool) { - if Self.closeOp { - close(Self.readOp) - return true - } - Self.mutex.Lock() - Self.readWait = false - Self.mutex.Unlock() - select { - case <-Self.closeOpCh: - close(Self.readOp) - return true - case Self.readOp <- struct{}{}: - return false - } -} - -func (Self *window) Read(p []byte) (n int, err error) { - if Self.closeOp { - return 0, io.EOF // Write method receive close signal, returns eof - } - Self.mutex.Lock() - length := Self.len() // protect the length data, it invokes - // before Write lock and after Write unlock - if length == 0 { - // window is empty, waiting for Write method send a success readOp signal - // or get timeout or close - Self.readWait = true - Self.mutex.Unlock() - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - select { - case _, ok := <-Self.readOp: - if !ok { - return 0, errors.New("conn.receiveWindow: window closed") - } - case <-Self.WriteEndOp: - return 0, io.EOF // receive eof signal, returns eof - case <-ticker.C: - return 0, errors.New("conn.receiveWindow: read time out") - case <-Self.closeOpCh: - close(Self.readOp) - return 0, io.EOF // receive close signal, returns eof - } - } else { - Self.mutex.Unlock() - } - minCopy := 512 - for { - Self.mutex.Lock() - if len(p) == n || Self.len() == 0 { - Self.mutex.Unlock() - break - } - if n+minCopy > len(p) { - minCopy = len(p) - n - } - i := copy(p[n:n+minCopy], Self.windowBuff[Self.off:]) - Self.off += uint16(i) - n += i - Self.mutex.Unlock() - } - p = p[:n] - return -} - -func (Self *window) WriteTo() (p []byte, err error) { - if Self.closeOp { - return nil, errors.New("conn.writeWindow: window closed") - } - if Self.len() == 0 { - return nil, io.EOF - // send window buff is drain, return eof and get another one - } - var windowSize uint16 - var ok bool -waiting: - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - // waiting for receive usable window size, or timeout - select { - case windowSize, ok = <-Self.usableReceiveWindow: - if !ok { - return nil, errors.New("conn.writeWindow: window closed") - } - case <-ticker.C: - return nil, errors.New("conn.writeWindow: write to time out") - case <-Self.closeOpCh: - return nil, errors.New("conn.writeWindow: window closed") - } - if windowSize == 0 { - goto waiting // waiting for another usable window size - } - Self.mutex.Lock() - if windowSize > uint16(Self.len()) { - // usable window size is bigger than window buff size, send the full buff - windowSize = uint16(Self.len()) - } - p = Self.windowBuff[Self.off : windowSize+Self.off] - Self.off += windowSize - Self.mutex.Unlock() - return -} - -func (Self *window) SetAllowSize(value uint16) (closed bool) { +func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) { defer func() { if recover() != nil { closed = true } }() if Self.closeOp { - close(Self.usableReceiveWindow) + close(Self.setSizeCh) return true } - select { - case Self.usableReceiveWindow <- value: - return false - case <-Self.closeOpCh: - close(Self.usableReceiveWindow) - return true + if readLength == 0 && Self.maxSize == windowSize { + logs.Warn("waiting for another window size") + return false // waiting for receive another usable window size } + logs.Warn("set send window size to ", windowSize, readLength) + Self.mutex.Lock() + Self.slide(windowSize, readLength) + if Self.setSizeWait { + // send window into the wait status, need notice the channel + //logs.Warn("send window remaining size is 0 , wait") + if Self.RemainingSize() == 0 { + //logs.Warn("waiting for another window size after slide") + // keep the wait status + Self.mutex.Unlock() + return false + } + Self.setSizeWait = false + Self.mutex.Unlock() + //logs.Warn("send window remaining size is 0 starting wait") + select { + case Self.setSizeCh <- struct{}{}: + //logs.Warn("send window remaining size is 0 finish") + return false + case <-Self.closeOpCh: + close(Self.setSizeCh) + return true + } + } + // send window not into the wait status, so just do slide + Self.mutex.Unlock() + return false } -func (Self *window) CloseWindow() { - Self.closeOp = true - Self.closeOpCh <- struct{}{} - Self.closeOpCh <- struct{}{} - Self.closeOpCh <- struct{}{} - close(Self.closeOpCh) +func (Self *SendWindow) slide(windowSize, readLength uint32) { + Self.sentLength -= readLength + Self.maxSize = windowSize +} + +func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) { + // returns buf segments, return only one segments, need a loop outside + // until err = io.EOF + if Self.closeOp { + return nil, false, errors.New("conn.writeWindow: window closed") + } + if Self.off == uint32(len(Self.buf)) { + return nil, false, io.EOF + // send window buff is drain, return eof and get another one + } + Self.mutex.Lock() + if Self.RemainingSize() == 0 { + Self.setSizeWait = true + Self.mutex.Unlock() + // into the wait status + err = Self.waitReceiveWindow() + if err != nil { + return nil, false, err + } + } else { + Self.mutex.Unlock() + } + Self.mutex.Lock() + var sendSize uint32 + if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE { + sendSize = common.MAXIMUM_SEGMENT_SIZE + part = true + } else { + sendSize = uint32(len(Self.buf[Self.off:])) + part = false + } + if Self.RemainingSize() < sendSize { + // usable window size is small than + // window MAXIMUM_SEGMENT_SIZE or send buf left + sendSize = Self.RemainingSize() + part = true + } + //logs.Warn("send size", sendSize) + p = Self.buf[Self.off : sendSize+Self.off] + Self.off += sendSize + Self.sentLength += sendSize + Self.mutex.Unlock() return } -type waitingCh struct { - nCh chan int - errCh chan error +func (Self *SendWindow) waitReceiveWindow() (err error) { + t := Self.timeout.Sub(time.Now()) + if t < 0 { + t = time.Minute + } + timer := time.NewTimer(t) + defer timer.Stop() + // waiting for receive usable window size, or timeout + select { + case _, ok := <-Self.setSizeCh: + if !ok { + return errors.New("conn.writeWindow: window closed") + } + return nil + case <-timer.C: + return errors.New("conn.writeWindow: write to time out") + case <-Self.closeOpCh: + return errors.New("conn.writeWindow: window closed") + } } -func (Self *waitingCh) new() { - Self.nCh = make(chan int) - Self.errCh = make(chan error) +func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) { + Self.SetSendBuf(buf) // set the buf to send window + var bufSeg []byte + var part bool + for { + bufSeg, part, err = Self.WriteTo() + //logs.Warn("buf seg", len(bufSeg), part, err) + // get the buf segments from send window + if bufSeg == nil && part == false && err == io.EOF { + // send window is drain, break the loop + err = nil + break + } + if err != nil { + break + } + n += len(bufSeg) + if part { + Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg) + } else { + Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg) + //logs.Warn("buf seg sent", len(bufSeg), part, err) + } + // send to other side, not send nil data to other side + } + //logs.Warn("buf seg write success") + return } -func (Self *waitingCh) close() { - close(Self.nCh) - close(Self.errCh) +func (Self *SendWindow) SetTimeOut(t time.Time) { + // waiting for receive a receive window size + Self.timeout = t +} + +type bandwidth struct { + lastReadStart time.Time + readStart time.Time + readEnd time.Time + bufLength int + lastBufLength int + count int8 + readBW float64 + writeBW float64 +} + +func (Self *bandwidth) StartRead() { + Self.lastReadStart, Self.readStart = Self.readStart, time.Now() +} + +func (Self *bandwidth) EndRead() { + if !Self.lastReadStart.IsZero() { + if Self.count == 0 { + Self.calcWriteBandwidth() + } + } + Self.readEnd = time.Now() + if Self.count == 0 { + Self.calcReadBandwidth() + Self.count = -3 + } + Self.count += 1 +} + +func (Self *bandwidth) SetCopySize(n int) { + // must be invoke between StartRead and EndRead + Self.lastBufLength, Self.bufLength = Self.bufLength, n +} + +func (Self *bandwidth) calcReadBandwidth() { + // Bandwidth between nps and npc + readTime := Self.readEnd.Sub(Self.readStart) + Self.readBW = float64(Self.bufLength) / readTime.Seconds() + //logs.Warn("calc read bw", Self.bufLength, readTime.Seconds()) +} + +func (Self *bandwidth) calcWriteBandwidth() { + // Bandwidth between nps and user, npc and application + //logs.Warn("calc write bw") + writeTime := Self.readEnd.Sub(Self.lastReadStart) + Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds() +} + +func (Self *bandwidth) Get() (bw float64) { + // The zero value, 0 for numeric types + if Self.writeBW == 0 && Self.readBW == 0 { + logs.Warn("bw both 0") + return 100 + } + if Self.writeBW == 0 && Self.readBW != 0 { + return Self.readBW + } + if Self.readBW == 0 && Self.writeBW != 0 { + return Self.writeBW + } + return math.Min(Self.readBW, Self.writeBW) } diff --git a/lib/mux/mux.go b/lib/mux/mux.go index a662ad0..e6a9e67 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -3,6 +3,7 @@ package mux import ( "bytes" "errors" + "io" "math" "net" "sync" @@ -22,8 +23,10 @@ type Mux struct { closeChan chan struct{} IsClose bool pingOk int + latency float64 + pingCh chan []byte connType string - writeQueue Queue + writeQueue PriorityQueue bufCh chan *bytes.Buffer sync.Mutex } @@ -38,13 +41,15 @@ func NewMux(c net.Conn, connType string) *Mux { IsClose: false, connType: connType, bufCh: make(chan *bytes.Buffer), + pingCh: make(chan []byte), } m.writeQueue.New() //read session by flag - go m.readSession() + m.readSession() //ping - go m.ping() - go m.writeSession() + m.ping() + m.pingReturn() + m.writeSession() return m } @@ -83,10 +88,10 @@ func (s *Mux) Addr() net.Addr { return s.conn.LocalAddr() } -func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { +func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { var err error pack := common.MuxPack.Get() - err = pack.NewPac(flag, id, data) + err = pack.NewPac(flag, id, data...) if err != nil { common.MuxPack.Put(pack) return @@ -98,11 +103,13 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { func (s *Mux) writeSession() { go s.packBuf() go s.writeBuf() - <-s.closeChan } func (s *Mux) packBuf() { for { + if s.IsClose { + break + } pack := s.writeQueue.Pop() buffer := common.BuffPool.Get() err := pack.Pack(buffer) @@ -117,12 +124,14 @@ func (s *Mux) packBuf() { case <-s.closeChan: break } - } } func (s *Mux) writeBuf() { for { + if s.IsClose { + break + } select { case buffer := <-s.bufCh: l := buffer.Len() @@ -141,8 +150,15 @@ func (s *Mux) writeBuf() { func (s *Mux) ping() { go func() { - ticker := time.NewTicker(time.Second * 1) + now, _ := time.Now().MarshalText() + s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) + // send the ping flag and get the latency first + ticker := time.NewTicker(time.Second * 15) for { + if s.IsClose { + ticker.Stop() + break + } select { case <-ticker.C: } @@ -150,7 +166,8 @@ func (s *Mux) ping() { if (math.MaxInt32 - s.id) < 10000 { s.id = 0 } - s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil) + now, _ := time.Now().MarshalText() + s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) if s.pingOk > 10 && s.connType == "kcp" { s.Close() break @@ -158,15 +175,32 @@ func (s *Mux) ping() { s.pingOk++ } }() - select { - case <-s.closeChan: - } +} + +func (s *Mux) pingReturn() { + go func() { + var now time.Time + var data []byte + for { + select { + case data = <-s.pingCh: + case <-s.closeChan: + break + } + _ = now.UnmarshalText(data) + s.latency = time.Since(now).Seconds() + s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil) + } + }() } func (s *Mux) readSession() { go func() { pack := common.MuxPack.Get() for { + if s.IsClose { + break + } pack = common.MuxPack.Get() if pack.UnPack(s.conn) != nil { break @@ -176,44 +210,25 @@ func (s *Mux) readSession() { case common.MUX_NEW_CONN: //new connection connection := NewConn(pack.Id, s) s.connMap.Set(pack.Id, connection) //it has been set before send ok - go func(connection *conn) { - connection.sendWindow.SetAllowSize(512) // set the initial receive window - }(connection) s.newConnCh <- connection s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) continue case common.MUX_PING_FLAG: //ping - go s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil) + s.pingCh <- pack.Content continue case common.MUX_PING_RETURN: continue } if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose { switch pack.Flag { - case common.MUX_NEW_MSG: //new msg from remote connection - //insert wait queue - if connection.isClose { - continue + case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection + err := s.newMsg(connection, pack) + if err != nil { + connection.Close() } - connection.receiveWindow.WriteWg.Add(1) - go func(connection *conn, content []byte) { // do not block read session - _, err := connection.receiveWindow.Write(content) - if err != nil { - logs.Warn("mux new msg err close", err) - connection.Close() - } - size := connection.receiveWindow.Size() - if size == 0 { - connection.receiveWindow.WindowFull = true - } - s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size) - connection.receiveWindow.WriteWg.Done() - }(connection, pack.Content) continue case common.MUX_NEW_CONN_OK: //connection ok connection.connStatusOkCh <- struct{}{} - go connection.sendWindow.SetAllowSize(512) - // set the initial receive window both side continue case common.MUX_NEW_CONN_Fail: connection.connStatusFailCh <- struct{}{} @@ -222,15 +237,12 @@ func (s *Mux) readSession() { if connection.isClose { continue } - go connection.sendWindow.SetAllowSize(pack.Window) + connection.sendWindow.SetSize(pack.Window, pack.ReadLength) continue case common.MUX_CONN_CLOSE: //close the connection s.connMap.Delete(pack.Id) connection.closeFlag = true - go func(connection *conn) { - connection.receiveWindow.WriteWg.Wait() - connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window - }(connection) + connection.receiveWindow.Stop() // close signal to receive window continue } } else if pack.Flag == common.MUX_CONN_CLOSE { @@ -241,9 +253,24 @@ func (s *Mux) readSession() { common.MuxPack.Put(pack) s.Close() }() - select { - case <-s.closeChan: +} + +func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) { + if connection.isClose { + err = io.ErrClosedPipe + return } + //logs.Warn("read session receive new msg", pack.Length) + //go func(connection *conn, pack *common.MuxPackager) { // do not block read session + //insert into queue + if pack.Flag == common.MUX_NEW_MSG_PART { + err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id) + } + if pack.Flag == common.MUX_NEW_MSG { + err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id) + } + //logs.Warn("read session write success", pack.Length) + return } func (s *Mux) Close() error { @@ -255,9 +282,6 @@ func (s *Mux) Close() error { s.connMap.Close() s.closeChan <- struct{}{} s.closeChan <- struct{}{} - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} close(s.newConnCh) return s.conn.Close() } diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 081b2c9..5a57151 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -2,25 +2,54 @@ package mux import ( "container/list" + "errors" "github.com/cnlh/nps/lib/common" + "io" "sync" + "time" ) -type Queue struct { - list *list.List +type QueueOp struct { readOp chan struct{} cleanOp chan struct{} popWait bool mutex sync.Mutex } -func (Self *Queue) New() { - Self.list = list.New() +func (Self *QueueOp) New() { Self.readOp = make(chan struct{}) Self.cleanOp = make(chan struct{}, 2) } -func (Self *Queue) Push(packager *common.MuxPackager) { +func (Self *QueueOp) allowPop() (closed bool) { + Self.mutex.Lock() + Self.popWait = false + Self.mutex.Unlock() + select { + case Self.readOp <- struct{}{}: + return false + case <-Self.cleanOp: + return true + } +} + +func (Self *QueueOp) Clean() { + Self.cleanOp <- struct{}{} + Self.cleanOp <- struct{}{} + close(Self.cleanOp) +} + +type PriorityQueue struct { + list *list.List + QueueOp +} + +func (Self *PriorityQueue) New() { + Self.list = list.New() + Self.QueueOp.New() +} + +func (Self *PriorityQueue) Push(packager *common.MuxPackager) { Self.mutex.Lock() if Self.popWait { defer Self.allowPop() @@ -35,28 +64,16 @@ func (Self *Queue) Push(packager *common.MuxPackager) { return } -func (Self *Queue) allowPop() (closed bool) { - Self.mutex.Lock() - Self.popWait = false - Self.mutex.Unlock() - select { - case Self.readOp <- struct{}{}: - return false - case <-Self.cleanOp: - return true - } -} - -func (Self *Queue) insert(packager *common.MuxPackager) { +func (Self *PriorityQueue) insert(packager *common.MuxPackager) { element := Self.list.Back() for { - if element == nil { // Queue dose not have any of msg package with this close package id + if element == nil { // PriorityQueue dose not have any of msg package with this close package id Self.list.PushFront(packager) // insert close package to first break } if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG && element.Value.(*common.MuxPackager).Id == packager.Id { - Self.list.InsertAfter(packager, element) // Queue has some msg package + Self.list.InsertAfter(packager, element) // PriorityQueue has some msg package // with this close package id, insert close package after last msg package break } @@ -64,7 +81,7 @@ func (Self *Queue) insert(packager *common.MuxPackager) { } } -func (Self *Queue) Pop() (packager *common.MuxPackager) { +func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { Self.mutex.Lock() element := Self.list.Front() if element != nil { @@ -73,7 +90,7 @@ func (Self *Queue) Pop() (packager *common.MuxPackager) { Self.mutex.Unlock() return } - Self.popWait = true // Queue is empty, notice Push method + Self.popWait = true // PriorityQueue is empty, notice Push method Self.mutex.Unlock() select { case <-Self.readOp: @@ -83,13 +100,90 @@ func (Self *Queue) Pop() (packager *common.MuxPackager) { } } -func (Self *Queue) Len() (n int) { +func (Self *PriorityQueue) Len() (n int) { n = Self.list.Len() return } -func (Self *Queue) Clean() { - Self.cleanOp <- struct{}{} - Self.cleanOp <- struct{}{} - close(Self.cleanOp) +type ListElement struct { + buf []byte + l uint16 + part bool +} + +func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) { + if uint16(len(buf)) != l { + return errors.New("ListElement: buf length not match") + } + Self.buf = buf + Self.l = l + Self.part = part + return nil +} + +type FIFOQueue struct { + list []*ListElement + length uint32 + stopOp chan struct{} + timeout time.Time + QueueOp +} + +func (Self *FIFOQueue) New() { + Self.QueueOp.New() + Self.stopOp = make(chan struct{}, 1) +} + +func (Self *FIFOQueue) Push(element *ListElement) { + Self.mutex.Lock() + if Self.popWait { + defer Self.allowPop() + } + Self.list = append(Self.list, element) + Self.length += uint32(element.l) + Self.mutex.Unlock() + return +} + +func (Self *FIFOQueue) Pop() (element *ListElement, err error) { + Self.mutex.Lock() + if len(Self.list) == 0 { + Self.popWait = true + Self.mutex.Unlock() + t := Self.timeout.Sub(time.Now()) + if t <= 0 { + t = time.Minute + } + timer := time.NewTimer(t) + defer timer.Stop() + select { + case <-Self.readOp: + Self.mutex.Lock() + case <-Self.cleanOp: + return + case <-Self.stopOp: + err = io.EOF + return + case <-timer.C: + err = errors.New("mux.queue: read time out") + return + } + } + element = Self.list[0] + Self.list = Self.list[1:] + Self.length -= uint32(element.l) + Self.mutex.Unlock() + return +} + +func (Self *FIFOQueue) Len() (n uint32) { + return Self.length +} + +func (Self *FIFOQueue) Stop() { + Self.stopOp <- struct{}{} +} + +func (Self *FIFOQueue) SetTimeOut(t time.Time) { + Self.timeout = t }