diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 315f645..8e91de8 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -45,13 +45,13 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) { return } err = binary.Write(writer, binary.LittleEndian, Self.Content) - //logs.Warn(Self.Length, string(Self.Content)) return } //Unpack 会导致传入的数字类型转化成float64!! //主要原因是json unmarshal并未传入正确的数据类型 func (Self *BasePackager) UnPack(reader io.Reader) (err error) { + Self.clean() err = binary.Read(reader, binary.LittleEndian, &Self.Length) if err != nil { return diff --git a/lib/common/util.go b/lib/common/util.go index ce4381d..ce2b896 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "github.com/cnlh/nps/lib/crypt" "github.com/cnlh/nps/lib/pool" + "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "html/template" "io" "io/ioutil" @@ -268,8 +269,10 @@ func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { defer pool.PutBufPoolCopy(buf) for { nr, er := src.Read(buf) + logs.Warn("read finish", nr, er) if nr > 0 { nw, ew := dst.Write(buf[0:nr]) + logs.Warn("write finish", nw, ew) if nw > 0 { written += int64(nw) } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index a14e98d..cbaf935 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -22,11 +22,12 @@ type conn struct { endRead int //now end read readFlag bool readCh chan struct{} - waitQueue *sliceEntry + readQueue *sliceEntry stopWrite bool connId int32 isClose bool readWait bool + sendClose bool hasWrite int mux *Mux } @@ -37,7 +38,7 @@ func NewConn(connId int32, mux *Mux) *conn { getStatusCh: make(chan struct{}), connStatusOkCh: make(chan struct{}), connStatusFailCh: make(chan struct{}), - waitQueue: NewQueue(), + readQueue: NewQueue(), connId: connId, mux: mux, } @@ -45,11 +46,12 @@ func NewConn(connId int32, mux *Mux) *conn { } func (s *conn) Read(buf []byte) (n int, err error) { + logs.Warn("starting read ", s.connId) if s.isClose || buf == nil { return 0, errors.New("the conn has closed") } if s.endRead-s.startRead == 0 { //read finish or start - if s.waitQueue.Size() == 0 { + if s.readQueue.Size() == 0 { s.readWait = true if t := s.readTimeOut.Sub(time.Now()); t > 0 { timer := time.NewTimer(t) @@ -67,19 +69,22 @@ func (s *conn) Read(buf []byte) (n int, err error) { if s.isClose { //If the connection is closed instead of continuing command return 0, errors.New("the conn has closed") } - if node, err := s.waitQueue.Pop(); err != nil { + if node, err := s.readQueue.Pop(); err != nil { s.Close() return 0, io.EOF } else { pool.PutBufPoolCopy(s.readBuffer) if node.val == nil { //close + s.sendClose = true s.Close() logs.Warn("close from read msg ", s.connId) + return 0, io.EOF } else { s.readBuffer = node.val s.endRead = node.l s.startRead = 0 + logs.Warn("get a new data buffer ", s.connId) } } } @@ -90,10 +95,12 @@ func (s *conn) Read(buf []byte) (n int, err error) { n = copy(buf, s.readBuffer[s.startRead:s.endRead]) s.startRead += n } + logs.Warn("end read ", s.connId) return } func (s *conn) Write(buf []byte) (n int, err error) { + logs.Warn("trying write", s.connId) if s.isClose { return 0, errors.New("the conn has closed") } @@ -113,6 +120,7 @@ func (s *conn) Write(buf []byte) (n int, err error) { if s.isClose { return 0, io.EOF } + logs.Warn("write success ", s.connId) return len(buf), nil } func (s *conn) write(buf []byte, ch chan struct{}) { @@ -130,7 +138,8 @@ func (s *conn) write(buf []byte, ch chan struct{}) { ch <- struct{}{} } -func (s *conn) Close() error { +func (s *conn) Close() (err error) { + logs.Warn("start closing ", s.connId) if s.isClose { logs.Warn("already closed", s.connId) return errors.New("the conn has closed") @@ -140,12 +149,22 @@ func (s *conn) Close() error { if s.readWait { s.readCh <- struct{}{} } - s.waitQueue.Clear() + s.readQueue.Clear() s.mux.connMap.Delete(s.connId) if !s.mux.IsClose { - s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) + if !s.sendClose { + logs.Warn("start send closing msg", s.connId) + err = s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) + logs.Warn("send closing msg ok ", s.connId) + if err != nil { + logs.Warn(err) + return + } + } else { + logs.Warn("send mux conn close pass ", s.connId) + } } - return nil + return } func (s *conn) LocalAddr() net.Addr { diff --git a/lib/mux/mux.go b/lib/mux/mux.go index bfd82ff..c099d51 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -1,6 +1,7 @@ package mux import ( + "bytes" "errors" "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/pool" @@ -22,7 +23,7 @@ type Mux struct { IsClose bool pingOk int connType string - writeQueue *sliceEntry + writeQueue chan *bytes.Buffer sync.Mutex } @@ -35,12 +36,13 @@ func NewMux(c net.Conn, connType string) *Mux { newConnCh: make(chan *conn), IsClose: false, connType: connType, - writeQueue: NewQueue(), + writeQueue: make(chan *bytes.Buffer, 20), } //read session by flag go m.readSession() //ping go m.ping() + //go m.writeSession() return m } @@ -82,8 +84,10 @@ func (s *Mux) Addr() net.Addr { } func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) { + if flag == common.MUX_NEW_MSG { + logs.Warn("trying write to mux new msg", id) + } buf := pool.BuffPool.Get() - defer pool.BuffPool.Put(buf) pack := common.MuxPackager{} err = pack.NewPac(flag, id, content) if err != nil { @@ -97,14 +101,39 @@ func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) { logs.Warn("pack err", err) return } + //s.writeQueue <- buf _, err = buf.WriteTo(s.conn) if err != nil { s.Close() logs.Warn("write err", err) } + pool.BuffPool.Put(buf) + if flag == common.MUX_CONN_CLOSE { + logs.Warn("write to mux conn close success", id) + } + if flag == common.MUX_NEW_MSG { + logs.Warn("write to mux new msg success", id) + } return } +func (s *Mux) writeSession() { + go func() { + for { + buf := <-s.writeQueue + l := buf.Len() + n, err := buf.WriteTo(s.conn) + pool.BuffPool.Put(buf) + if err != nil || int(n) != l { + logs.Warn("close from write to ", err, n, l) + s.Close() + break + } + } + }() + <-s.closeChan +} + func (s *Mux) ping() { go func() { ticker := time.NewTicker(time.Second * 1) @@ -138,7 +167,7 @@ func (s *Mux) readSession() { s.pingOk = 0 switch pack.Flag { case common.MUX_NEW_CONN: //new conn - logs.Warn("mux new conn", pack.Id) + //logs.Warn("mux new conn", pack.Id) conn := NewConn(pack.Id, s) s.connMap.Set(pack.Id, conn) //it has been set before send ok s.newConnCh <- conn @@ -151,22 +180,30 @@ func (s *Mux) readSession() { continue } if conn, ok := s.connMap.Get(pack.Id); ok && !conn.isClose { + logs.Warn("read session flag id", pack.Flag, pack.Id) switch pack.Flag { case common.MUX_NEW_MSG: //new msg from remote conn //insert wait queue - conn.waitQueue.Push(NewBufNode(pack.Content, int(pack.Length))) + conn.readQueue.Push(NewBufNode(pack.Content, int(pack.Length))) //judge len if >xxx ,send stop if conn.readWait { conn.readWait = false conn.readCh <- struct{}{} } + logs.Warn("push a read buffer ", conn.connId, pack.Id) case common.MUX_NEW_CONN_OK: //conn ok conn.connStatusOkCh <- struct{}{} case common.MUX_NEW_CONN_Fail: conn.connStatusFailCh <- struct{}{} case common.MUX_CONN_CLOSE: //close the connection - conn.waitQueue.Push(NewBufNode(nil, 0)) + conn.readQueue.Push(NewBufNode(nil, 0)) + if conn.readWait { + logs.Warn("close read wait", pack.Id) + conn.readWait = false + conn.readCh <- struct{}{} + } s.connMap.Delete(pack.Id) + logs.Warn("read session mux conn close finish", pack.Id) } } else if pack.Flag == common.MUX_NEW_MSG { pool.PutBufPoolCopy(pack.Content) @@ -192,6 +229,8 @@ func (s *Mux) Close() error { select { case s.closeChan <- struct{}{}: } + s.closeChan <- struct{}{} + close(s.writeQueue) close(s.newConnCh) return s.conn.Close() } diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index 067e939..5b571d0 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -26,22 +26,25 @@ func TestNewMux(t *testing.T) { time.Sleep(time.Second * 3) go func() { m2 := NewMux(conn2, "tcp") + connCh := make(chan bool, 1) for { c, err := m2.Accept() if err != nil { log.Fatalln(err) } - go func(c net.Conn) { + connCh <- true + go func(c net.Conn, ch chan bool) { c2, err := net.Dial("tcp", "127.0.0.1:80") if err != nil { log.Fatalln(err) } go common.CopyBuffer(c2, c) common.CopyBuffer(c, c2) - c.Close() - //logs.Warn("close from out npc ") c2.Close() - }(c) + c.Close() + logs.Warn("close npc") + <-ch + }(c, connCh) } }() @@ -51,12 +54,14 @@ func TestNewMux(t *testing.T) { if err != nil { log.Fatalln(err) } + connCh := make(chan bool, 1) for { conn, err := l.Accept() if err != nil { log.Fatalln(err) } - go func(conn net.Conn) { + connCh <- true + go func(conn net.Conn, ch chan bool) { tmpCpnn, err := m1.NewConn() if err != nil { log.Fatalln(err) @@ -64,9 +69,10 @@ func TestNewMux(t *testing.T) { go common.CopyBuffer(tmpCpnn, conn) common.CopyBuffer(conn, tmpCpnn) conn.Close() - tmpCpnn.Close() + //tmpCpnn.Close() logs.Warn("close from out nps ", tmpCpnn.connId) - }(conn) + <-ch + }(conn, connCh) } }()