diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 08b0db0..8f9f3ec 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -52,6 +52,7 @@ func NewConn(connId int32, mux *Mux) *conn { } func (s *conn) Read(buf []byte) (n int, err error) { + logs.Warn("starting conn read", s.connId) if s.isClose || buf == nil { return 0, errors.New("the conn has closed") } @@ -315,7 +316,7 @@ func (Self *window) allowRead() (closed bool) { } func (Self *window) Read(p []byte) (n int, err error) { - //logs.Warn("starting window read method len ", Self.len()) + logs.Warn("starting window read method len ", Self.len()) if Self.closeOp { return 0, io.EOF // Write method receive close signal, returns eof } diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 9310c64..5ad43c4 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -179,12 +179,14 @@ func (s *Mux) readSession() { switch pack.Flag { case common.MUX_NEW_CONN: //new connection logs.Warn("rec mux new connection", pack.Id) - conn := NewConn(pack.Id, s) - s.connMap.Set(pack.Id, conn) //it has been set before send ok - s.newConnCh <- conn - go conn.sendWindow.SetAllowSize(512) // set the initial receive window - s.sendInfo(common.MUX_NEW_CONN_OK, pack.Id, nil) - logs.Warn("send mux new connection ok", pack.Id) + connection := NewConn(pack.Id, s) + s.connMap.Set(pack.Id, connection) //it has been set before send ok + go func(connection *conn) { + connection.sendWindow.SetAllowSize(512) // set the initial receive window + }(connection) + s.newConnCh <- connection + s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) + logs.Warn("send mux new connection ok", connection.connId) continue case common.MUX_PING_FLAG: //ping //logs.Warn("send mux ping return") @@ -202,7 +204,7 @@ func (s *Mux) readSession() { continue } connection.receiveWindow.WriteWg.Add(1) - logs.Warn("rec mux new msg ", pack.Id, string(pack.Content[0:15])) + logs.Warn("rec mux new msg ", connection.connId, string(pack.Content[0:15])) go func(connection *conn, content []byte) { // do not block read session _, err := connection.receiveWindow.Write(content) if err != nil { @@ -214,7 +216,7 @@ func (s *Mux) readSession() { connection.receiveWindow.WindowFull = true } s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size) - logs.Warn("send mux new msg ok", pack.Id, size) + logs.Warn("send mux new msg ok", connection.connId, size) connection.receiveWindow.WriteWg.Done() }(connection, pack.Content) continue @@ -241,7 +243,9 @@ func (s *Mux) readSession() { s.connMap.Delete(pack.Id) connection.closeFlag = true go func(connection *conn) { + logs.Warn("receive mux connection close, wg waiting", connection.connId) connection.receiveWindow.WriteWg.Wait() + logs.Warn("receive mux connection close, wg waited", connection.connId) connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window logs.Warn("receive mux connection close, finish", connection.connId) }(connection) diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index 7c75c10..91e0fc6 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -6,6 +6,7 @@ import ( "net" "net/http" _ "net/http/pprof" + "sync" "testing" "time" ) @@ -37,20 +38,29 @@ func TestNewMux(t *testing.T) { logs.Warn(err) continue } + wg := sync.WaitGroup{} + wg.Add(1) go func() { _, err = common.CopyBuffer(c2, c) if err != nil { - logs.Warn("close npc by copy from nps", err) c2.Close() c.Close() + logs.Warn("close npc by copy from nps", err) } + wg.Done() }() - _, err = common.CopyBuffer(c, c2) - if err != nil { - logs.Warn("close npc by copy from server", err) - c2.Close() - c.Close() - } + wg.Add(1) + go func() { + _, err = common.CopyBuffer(c, c2) + if err != nil { + c2.Close() + c.Close() + logs.Warn("close npc by copy from server", err) + } + wg.Done() + }() + logs.Warn("npc wait") + wg.Wait() } }() @@ -77,17 +87,17 @@ func TestNewMux(t *testing.T) { go func() { _, err := common.CopyBuffer(tmpCpnn, conn) if err != nil { - logs.Warn("close nps by copy from user", tmpCpnn.connId) conn.Close() tmpCpnn.Close() + logs.Warn("close nps by copy from user", tmpCpnn.connId) } }() //time.Sleep(time.Second) _, err = common.CopyBuffer(conn, tmpCpnn) if err != nil { - logs.Warn("close nps by copy from npc ", tmpCpnn.connId) conn.Close() tmpCpnn.Close() + logs.Warn("close nps by copy from npc ", tmpCpnn.connId) } } }()