diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 8e91de8..1252710 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -160,6 +160,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { } func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { + Self.Length=0 err = binary.Read(reader, binary.LittleEndian, &Self.Flag) if err != nil { return diff --git a/lib/common/util.go b/lib/common/util.go index ce2b896..812c543 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -264,15 +264,14 @@ func GetPortByAddr(addr string) int { return p } -func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { +func CopyBuffer(dst io.Writer, src io.Reader,connId int32) (written int64, err error) { buf := pool.GetBufPoolCopy() defer pool.PutBufPoolCopy(buf) for { nr, er := src.Read(buf) - logs.Warn("read finish", nr, er) if nr > 0 { + logs.Warn("write",connId, nr, string(buf[0:10])) nw, ew := dst.Write(buf[0:nr]) - logs.Warn("write finish", nw, ew) if nw > 0 { written += int64(nw) } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index cbaf935..6c6ab95 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -46,7 +46,6 @@ func NewConn(connId int32, mux *Mux) *conn { } func (s *conn) Read(buf []byte) (n int, err error) { - logs.Warn("starting read ", s.connId) if s.isClose || buf == nil { return 0, errors.New("the conn has closed") } @@ -73,18 +72,16 @@ func (s *conn) Read(buf []byte) (n int, err error) { s.Close() return 0, io.EOF } else { - pool.PutBufPoolCopy(s.readBuffer) + //pool.PutBufPoolCopy(s.readBuffer) if node.val == nil { //close s.sendClose = true s.Close() - logs.Warn("close from read msg ", s.connId) return 0, io.EOF } else { s.readBuffer = node.val s.endRead = node.l s.startRead = 0 - logs.Warn("get a new data buffer ", s.connId) } } } @@ -95,12 +92,10 @@ func (s *conn) Read(buf []byte) (n int, err error) { n = copy(buf, s.readBuffer[s.startRead:s.endRead]) s.startRead += n } - logs.Warn("end read ", s.connId) return } func (s *conn) Write(buf []byte) (n int, err error) { - logs.Warn("trying write", s.connId) if s.isClose { return 0, errors.New("the conn has closed") } @@ -120,7 +115,6 @@ func (s *conn) Write(buf []byte) (n int, err error) { if s.isClose { return 0, io.EOF } - logs.Warn("write success ", s.connId) return len(buf), nil } func (s *conn) write(buf []byte, ch chan struct{}) { @@ -139,9 +133,7 @@ func (s *conn) write(buf []byte, ch chan struct{}) { } func (s *conn) Close() (err error) { - logs.Warn("start closing ", s.connId) if s.isClose { - logs.Warn("already closed", s.connId) return errors.New("the conn has closed") } s.isClose = true @@ -153,7 +145,6 @@ func (s *conn) Close() (err error) { s.mux.connMap.Delete(s.connId) if !s.mux.IsClose { if !s.sendClose { - logs.Warn("start send closing msg", s.connId) err = s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) logs.Warn("send closing msg ok ", s.connId) if err != nil { @@ -161,7 +152,6 @@ func (s *conn) Close() (err error) { return } } else { - logs.Warn("send mux conn close pass ", s.connId) } } return diff --git a/lib/mux/mux.go b/lib/mux/mux.go index c099d51..e6daae7 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -85,7 +85,6 @@ func (s *Mux) Addr() net.Addr { func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) { if flag == common.MUX_NEW_MSG { - logs.Warn("trying write to mux new msg", id) } buf := pool.BuffPool.Get() pack := common.MuxPackager{} @@ -109,10 +108,8 @@ func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) { } pool.BuffPool.Put(buf) if flag == common.MUX_CONN_CLOSE { - logs.Warn("write to mux conn close success", id) } if flag == common.MUX_NEW_MSG { - logs.Warn("write to mux new msg success", id) } return } @@ -164,6 +161,11 @@ func (s *Mux) readSession() { if pack.UnPack(s.conn) != nil { break } + if pack.Flag != 0 && pack.Flag != 7 { + if pack.Length>10 { + logs.Warn(pack.Flag, pack.Id, pack.Length,string(pack.Content[:10])) + } + } s.pingOk = 0 switch pack.Flag { case common.MUX_NEW_CONN: //new conn @@ -180,7 +182,6 @@ func (s *Mux) readSession() { continue } if conn, ok := s.connMap.Get(pack.Id); ok && !conn.isClose { - logs.Warn("read session flag id", pack.Flag, pack.Id) switch pack.Flag { case common.MUX_NEW_MSG: //new msg from remote conn //insert wait queue @@ -190,7 +191,6 @@ func (s *Mux) readSession() { conn.readWait = false conn.readCh <- struct{}{} } - logs.Warn("push a read buffer ", conn.connId, pack.Id) case common.MUX_NEW_CONN_OK: //conn ok conn.connStatusOkCh <- struct{}{} case common.MUX_NEW_CONN_Fail: @@ -203,7 +203,6 @@ func (s *Mux) readSession() { conn.readCh <- struct{}{} } s.connMap.Delete(pack.Id) - logs.Warn("read session mux conn close finish", pack.Id) } } else if pack.Flag == common.MUX_NEW_MSG { pool.PutBufPoolCopy(pack.Content) diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index 5b571d0..2d3d2d0 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -26,25 +26,20 @@ func TestNewMux(t *testing.T) { time.Sleep(time.Second * 3) go func() { m2 := NewMux(conn2, "tcp") - connCh := make(chan bool, 1) for { c, err := m2.Accept() if err != nil { log.Fatalln(err) } - connCh <- true - go func(c net.Conn, ch chan bool) { - c2, err := net.Dial("tcp", "127.0.0.1:80") - if err != nil { - log.Fatalln(err) - } - go common.CopyBuffer(c2, c) - common.CopyBuffer(c, c2) - c2.Close() - c.Close() - logs.Warn("close npc") - <-ch - }(c, connCh) + c2, err := net.Dial("tcp", "127.0.0.1:8080") + if err != nil { + log.Fatalln(err) + } + go common.CopyBuffer(c2, c,0) + common.CopyBuffer(c, c2,0) + c2.Close() + c.Close() + logs.Warn("close npc") } }() @@ -54,25 +49,22 @@ func TestNewMux(t *testing.T) { if err != nil { log.Fatalln(err) } - connCh := make(chan bool, 1) for { conn, err := l.Accept() if err != nil { log.Fatalln(err) } - connCh <- true - go func(conn net.Conn, ch chan bool) { - tmpCpnn, err := m1.NewConn() - if err != nil { - log.Fatalln(err) - } - go common.CopyBuffer(tmpCpnn, conn) - common.CopyBuffer(conn, tmpCpnn) - conn.Close() - //tmpCpnn.Close() - logs.Warn("close from out nps ", tmpCpnn.connId) - <-ch - }(conn, connCh) + + tmpCpnn, err := m1.NewConn() + if err != nil { + log.Fatalln(err) + } + go common.CopyBuffer(tmpCpnn, conn,tmpCpnn.connId) + _, err = common.CopyBuffer(conn, tmpCpnn,tmpCpnn.connId) + logs.Warn(err, tmpCpnn.connId) + conn.Close() + tmpCpnn.Close() + logs.Warn("close from out nps ", tmpCpnn.connId) } }() diff --git a/lib/pool/pool.go b/lib/pool/pool.go index fb337a2..e491a5e 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -33,8 +33,7 @@ var BufPoolSmall = sync.Pool{ } var BufPoolCopy = sync.Pool{ New: func() interface{} { - buf := make([]byte, PoolSizeCopy) - return &buf + return make([]byte, PoolSizeCopy) }, } @@ -46,12 +45,12 @@ func PutBufPoolUdp(buf []byte) { func PutBufPoolCopy(buf []byte) { if cap(buf) == PoolSizeCopy { - BufPoolCopy.Put(&buf) + BufPoolCopy.Put(buf[:PoolSizeCopy]) } } func GetBufPoolCopy() []byte { - return (*BufPoolCopy.Get().(*[]byte))[:PoolSizeCopy] + return (BufPoolCopy.Get().([]byte))[:PoolSizeCopy] } func PutBufPoolMax(buf []byte) {