diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 8f9f3ec..be73e5c 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -17,22 +17,15 @@ type conn struct { connStatusFailCh chan struct{} readTimeOut time.Time writeTimeOut time.Time - //readBuffer []byte - //startRead int //now read position - //endRead int //now end read - //readFlag bool - //readCh chan struct{} - //readQueue *sliceEntry - //stopWrite bool - connId int32 - isClose bool - //readWait bool - closeFlag bool // close conn flag - hasWrite int - receiveWindow *window - sendWindow *window - mux *Mux - once sync.Once + connId int32 + isClose bool + closeFlag bool // close conn flag + receiveWindow *window + sendWindow *window + readCh waitingCh + writeCh waitingCh + mux *Mux + once sync.Once } func NewConn(connId int32, mux *Mux) *conn { @@ -48,34 +41,32 @@ func NewConn(connId int32, mux *Mux) *conn { } c.receiveWindow.NewReceive() c.sendWindow.NewSend() + c.readCh.new() + c.writeCh.new() return c } func (s *conn) Read(buf []byte) (n int, err error) { - logs.Warn("starting conn read", s.connId) + //logs.Warn("starting conn read", s.connId) if s.isClose || buf == nil { return 0, errors.New("the conn has closed") } - nCh := make(chan int) - errCh := make(chan error) - defer close(nCh) - defer close(errCh) // waiting for takeout from receive window finish or timeout - go s.readWindow(buf, nCh, errCh) + go s.readWindow(buf, s.readCh.nCh, s.readCh.errCh) if t := s.readTimeOut.Sub(time.Now()); t > 0 { timer := time.NewTimer(t) defer timer.Stop() select { case <-timer.C: return 0, errors.New("read timeout") - case n = <-nCh: - err = <-errCh + case n = <-s.readCh.nCh: + err = <-s.readCh.errCh } } else { - n = <-nCh - err = <-errCh + n = <-s.readCh.nCh + err = <-s.readCh.errCh } - logs.Warn("read window finish conn read n err buf", n, err, string(buf[:15])) + //logs.Warn("read window finish conn read n err buf", n, err, string(buf[:15])) return } @@ -97,21 +88,19 @@ func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) { } func (s *conn) Write(buf []byte) (n int, err error) { + //logs.Warn("write starting", s.connId) + //defer logs.Warn("write end ", s.connId) if s.isClose { return 0, errors.New("the conn has closed") } if s.closeFlag { - logs.Warn("conn close by write ", s.connId) + //logs.Warn("conn close by write ", s.connId) //s.Close() return 0, errors.New("io: write on closed conn") } - - nCh := make(chan int) - errCh := make(chan error) - defer close(nCh) - defer close(errCh) s.sendWindow.SetSendBuf(buf) // set the buf to send window - go s.write(nCh, errCh) + //logs.Warn("write set send buf success") + go s.write(s.writeCh.nCh, s.writeCh.errCh) // waiting for send to other side or timeout if t := s.writeTimeOut.Sub(time.Now()); t > 0 { timer := time.NewTimer(t) @@ -119,12 +108,12 @@ func (s *conn) Write(buf []byte) (n int, err error) { select { case <-timer.C: return 0, errors.New("write timeout") - case n = <-nCh: - err = <-errCh + case n = <-s.writeCh.nCh: + err = <-s.writeCh.errCh } } else { - n = <-nCh - err = <-errCh + n = <-s.writeCh.nCh + err = <-s.writeCh.errCh } return } @@ -160,7 +149,7 @@ func (s *conn) closeProcess() { s.isClose = true s.mux.connMap.Delete(s.connId) if !s.mux.IsClose { - logs.Warn("conn send close", s.connId) + //logs.Warn("conn send close", s.connId) // if server or user close the conn while reading, will get a io.EOF // and this Close method will be invoke, send this signal to close other side s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) @@ -198,7 +187,6 @@ type window struct { windowBuff []byte off uint16 readOp chan struct{} - readWait bool WindowFull bool usableReceiveWindow chan uint16 WriteWg sync.WaitGroup @@ -213,13 +201,13 @@ func (Self *window) NewReceive() { Self.windowBuff = common.WindowBuff.Get() Self.readOp = make(chan struct{}) Self.WriteEndOp = make(chan struct{}) - Self.closeOpCh = make(chan struct{}, 2) + Self.closeOpCh = make(chan struct{}, 3) } func (Self *window) NewSend() { // initial a window for send Self.usableReceiveWindow = make(chan uint16) - Self.closeOpCh = make(chan struct{}, 2) + Self.closeOpCh = make(chan struct{}, 3) } func (Self *window) SetSendBuf(buf []byte) { @@ -269,38 +257,32 @@ func (Self *window) grow(n int) { func (Self *window) Write(p []byte) (n int, err error) { if Self.closeOp { - logs.Warn("window write closed len p", len(p)) return 0, errors.New("conn.receiveWindow: write on closed window") } if len(p) > Self.Size() { return 0, errors.New("conn.receiveWindow: write too large") } - if Self.readWait { - defer Self.allowRead() - } - //logs.Warn("window write p string", len(p), string(p[:15])) Self.mutex.Lock() // slide the offset if len(p) > Self.cap()-Self.len() { // not enough space, need to allocate Self.fullSlide() - //logs.Warn("window write full slide len cap", Self.len(), Self.cap()) } else { // have enough space, re slice Self.liteSlide() - //logs.Warn("window write lite slide len cap", Self.len(), Self.cap()) } length := Self.len() // length before grow Self.grow(len(p)) // grow for copy n = copy(Self.windowBuff[length:], p) // must copy data before allow Read - //logs.Warn("window write copy n len cap buf", n, Self.len(), Self.cap(), string(Self.windowBuff[Self.len()-n:Self.len()+15-n])) + if length == 0 { + // allow continue read + defer Self.allowRead() + } Self.mutex.Unlock() return n, nil } func (Self *window) allowRead() (closed bool) { - //logs.Warn("length 0 read op") - Self.readWait = false if Self.closeOp { close(Self.readOp) return true @@ -310,25 +292,25 @@ func (Self *window) allowRead() (closed bool) { close(Self.readOp) return true case Self.readOp <- struct{}{}: - //logs.Warn("length 0 read op finish") return false } } func (Self *window) Read(p []byte) (n int, err error) { - logs.Warn("starting window read method len ", Self.len()) if Self.closeOp { return 0, io.EOF // Write method receive close signal, returns eof } - if Self.len() == 0 { + Self.mutex.Lock() + length := Self.len() // protect the length data, it invokes + // before Write lock and after Write unlock + Self.mutex.Unlock() + if length == 0 { // window is empty, waiting for Write method send a success readOp signal // or get timeout or close - Self.readWait = true ticker := time.NewTicker(2 * time.Minute) defer ticker.Stop() select { case _, ok := <-Self.readOp: - //logs.Warn("read window read op len cap", Self.len(), Self.cap()) if !ok { return 0, errors.New("conn.receiveWindow: window closed") } @@ -341,19 +323,17 @@ func (Self *window) Read(p []byte) (n int, err error) { return 0, io.EOF // receive close signal, returns eof } } - //logs.Warn("window read start len window buff", Self.len(), string(Self.windowBuff[Self.off:Self.off+15])) Self.mutex.Lock() n = copy(p, Self.windowBuff[Self.off:]) Self.off += uint16(n) p = p[:n] - //logs.Warn("window read finish n len p p", n, len(p), string(p[:15])) Self.mutex.Unlock() return } func (Self *window) WriteTo() (p []byte, err error) { if Self.closeOp { - logs.Warn("window write to closed") + //logs.Warn("window write to closed") return nil, errors.New("conn.writeWindow: window closed") } if Self.len() == 0 { @@ -373,6 +353,8 @@ waiting: } case <-ticker.C: return nil, errors.New("conn.writeWindow: write to time out") + case <-Self.closeOpCh: + return nil, errors.New("conn.writeWindow: window closed") } if windowSize == 0 { goto waiting // waiting for another usable window size @@ -380,10 +362,8 @@ waiting: Self.mutex.Lock() if windowSize > uint16(Self.len()) { // usable window size is bigger than window buff size, send the full buff - //logs.Warn("window size overflow windowSize len()", windowSize, Self.len()) windowSize = uint16(Self.len()) } - //logs.Warn("window buff off windowSize", Self.off, windowSize) p = Self.windowBuff[Self.off : windowSize+Self.off] Self.off += windowSize Self.mutex.Unlock() @@ -413,6 +393,22 @@ func (Self *window) CloseWindow() { Self.closeOp = true Self.closeOpCh <- struct{}{} Self.closeOpCh <- struct{}{} + Self.closeOpCh <- struct{}{} close(Self.closeOpCh) return } + +type waitingCh struct { + nCh chan int + errCh chan error +} + +func (Self *waitingCh) new() { + Self.nCh = make(chan int) + Self.errCh = make(chan error) +} + +func (Self *waitingCh) close() { + close(Self.nCh) + close(Self.errCh) +} diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 5ad43c4..dd3dee2 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -53,7 +53,7 @@ func (s *Mux) NewConn() (*conn, error) { //it must be set before send s.connMap.Set(conn.connId, conn) s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil) - logs.Warn("send mux new conn ", conn.connId) + //logs.Warn("send mux new conn ", conn.connId) //set a timer timeout 30 second timer := time.NewTimer(time.Minute * 2) defer timer.Stop() @@ -85,7 +85,7 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { var err error if flag == common.MUX_NEW_MSG { if len(data.([]byte)) == 0 { - logs.Warn("send info content is nil") + //logs.Warn("send info content is nil") } } buf := common.BuffPool.Get() @@ -94,18 +94,18 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { defer common.MuxPack.Put(pack) err = pack.NewPac(flag, id, data) if err != nil { - logs.Warn("new pack err", err) + //logs.Warn("new pack err", err) common.BuffPool.Put(buf) return } err = pack.Pack(buf) if err != nil { - logs.Warn("pack err", err) + //logs.Warn("pack err", err) common.BuffPool.Put(buf) return } if pack.Flag == common.MUX_NEW_CONN { - logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id) + //logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id) } s.writeQueue <- buf //_, err = buf.WriteTo(s.conn) @@ -125,7 +125,7 @@ func (s *Mux) writeSession() { n, err := buf.WriteTo(s.conn) common.BuffPool.Put(buf) if err != nil || int(n) != l { - logs.Warn("close from write session fail ", err, n, l) + //logs.Warn("close from write session fail ", err, n, l) s.Close() break } @@ -173,12 +173,12 @@ func (s *Mux) readSession() { } } if pack.Flag == common.MUX_NEW_CONN { - logs.Warn("unpack mux new connection", pack.Id) + //logs.Warn("unpack mux new connection", pack.Id) } s.pingOk = 0 switch pack.Flag { case common.MUX_NEW_CONN: //new connection - logs.Warn("rec mux new connection", pack.Id) + //logs.Warn("rec mux new connection", pack.Id) connection := NewConn(pack.Id, s) s.connMap.Set(pack.Id, connection) //it has been set before send ok go func(connection *conn) { @@ -186,7 +186,7 @@ func (s *Mux) readSession() { }(connection) s.newConnCh <- connection s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) - logs.Warn("send mux new connection ok", connection.connId) + //logs.Warn("send mux new connection ok", connection.connId) continue case common.MUX_PING_FLAG: //ping //logs.Warn("send mux ping return") @@ -204,61 +204,61 @@ func (s *Mux) readSession() { continue } connection.receiveWindow.WriteWg.Add(1) - logs.Warn("rec mux new msg ", connection.connId, string(pack.Content[0:15])) + //logs.Warn("rec mux new msg ", connection.connId, string(pack.Content[0:15])) go func(connection *conn, content []byte) { // do not block read session _, err := connection.receiveWindow.Write(content) if err != nil { logs.Warn("mux new msg err close", err) - s.Close() + connection.Close() } size := connection.receiveWindow.Size() if size == 0 { connection.receiveWindow.WindowFull = true } s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size) - logs.Warn("send mux new msg ok", connection.connId, size) + //logs.Warn("send mux new msg ok", connection.connId, size) connection.receiveWindow.WriteWg.Done() }(connection, pack.Content) continue case common.MUX_NEW_CONN_OK: //connection ok - logs.Warn("rec mux new connection ok ", pack.Id) + //logs.Warn("rec mux new connection ok ", pack.Id) connection.connStatusOkCh <- struct{}{} go connection.sendWindow.SetAllowSize(512) // set the initial receive window both side continue case common.MUX_NEW_CONN_Fail: - logs.Warn("rec mux new connection fail", pack.Id) + //logs.Warn("rec mux new connection fail", pack.Id) connection.connStatusFailCh <- struct{}{} continue case common.MUX_MSG_SEND_OK: if connection.isClose { - logs.Warn("rec mux msg send ok id window closed!", pack.Id, pack.Window) + //logs.Warn("rec mux msg send ok id window closed!", pack.Id, pack.Window) continue } - logs.Warn("rec mux msg send ok id window", pack.Id, pack.Window) + //logs.Warn("rec mux msg send ok id window", pack.Id, pack.Window) go connection.sendWindow.SetAllowSize(pack.Window) continue case common.MUX_CONN_CLOSE: //close the connection - logs.Warn("rec mux connection close", pack.Id) + //logs.Warn("rec mux connection close", pack.Id) s.connMap.Delete(pack.Id) connection.closeFlag = true go func(connection *conn) { - logs.Warn("receive mux connection close, wg waiting", connection.connId) + //logs.Warn("receive mux connection close, wg waiting", connection.connId) connection.receiveWindow.WriteWg.Wait() - logs.Warn("receive mux connection close, wg waited", connection.connId) + //logs.Warn("receive mux connection close, wg waited", connection.connId) connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window - logs.Warn("receive mux connection close, finish", connection.connId) + //logs.Warn("receive mux connection close, finish", connection.connId) }(connection) continue } } else if pack.Flag == common.MUX_CONN_CLOSE { - logs.Warn("rec mux connection close no id ", pack.Id) + //logs.Warn("rec mux connection close no id ", pack.Id) continue } common.MuxPack.Put(pack) } common.MuxPack.Put(pack) - logs.Warn("read session put pack ", pack.Id) + //logs.Warn("read session put pack ", pack.Id) s.Close() }() select {