From bb2cffe10a1f296337f3b72643dea5631ddc2789 Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Fri, 30 Aug 2019 20:05:09 +0800 Subject: [PATCH] Change pool, change mux connection close ,change copy buffer --- lib/common/netpackager.go | 16 +++-- lib/{pool => common}/pool.go | 38 ++++++++++-- lib/common/util.go | 114 +++++++++++++++++++++++++++-------- lib/conn/conn.go | 11 ++-- lib/conn/snappy.go | 6 +- lib/mux/conn.go | 42 +++++++------ lib/mux/mux.go | 74 ++++++++++++++--------- lib/mux/mux_test.go | 65 ++++++++++++-------- lib/mux/queue.go | 4 +- server/proxy/p2p.go | 3 +- server/proxy/udp.go | 7 +-- 11 files changed, 257 insertions(+), 123 deletions(-) rename lib/{pool => common}/pool.go (71%) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 2d589aa..6d67f44 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -4,7 +4,8 @@ import ( "bytes" "encoding/binary" "encoding/json" - "github.com/cnlh/nps/lib/pool" + "errors" + "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "io" "strings" ) @@ -20,7 +21,6 @@ type BasePackager struct { } func (Self *BasePackager) NewPac(contents ...interface{}) (err error) { - Self.Content = pool.CopyBuff.Get() Self.clean() for _, content := range contents { switch content.(type) { @@ -50,7 +50,8 @@ func (Self *BasePackager) appendByte(data []byte) (err error) { copy(Self.Content[m:n], data) return nil } else { - return bytes.ErrTooLarge + logs.Warn(len(data), len(Self.Content), cap(Self.Content)) + return errors.New("pack content too large") } } @@ -61,20 +62,22 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) { return } err = binary.Write(writer, binary.LittleEndian, Self.Content) - pool.CopyBuff.Put(Self.Content) return } //Unpack 会导致传入的数字类型转化成float64!! //主要原因是json unmarshal并未传入正确的数据类型 func (Self *BasePackager) UnPack(reader io.Reader) (err error) { - Self.Content = pool.CopyBuff.Get() Self.clean() err = binary.Read(reader, binary.LittleEndian, &Self.Length) if err != nil { return } - Self.Content = Self.Content[:Self.Length] + if int(Self.Length) > cap(Self.Content) { + logs.Warn("unpack", cap(Self.Content)) + err = errors.New("unpack err, content length too large") + } + Self.Content = Self.Content[:int(Self.Length)] //n, err := io.ReadFull(reader, Self.Content) //if n != int(Self.Length) { // err = io.ErrUnexpectedEOF @@ -177,6 +180,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { } func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { + Self.BasePackager.clean() // also clean the content err = binary.Read(reader, binary.LittleEndian, &Self.Flag) if err != nil { return diff --git a/lib/pool/pool.go b/lib/common/pool.go similarity index 71% rename from lib/pool/pool.go rename to lib/common/pool.go index 0540a9d..205378d 100644 --- a/lib/pool/pool.go +++ b/lib/common/pool.go @@ -1,4 +1,4 @@ -package pool +package common import ( "bytes" @@ -73,12 +73,15 @@ func (Self *CopyBufferPool) New() { func (Self *CopyBufferPool) Get() []byte { buf := Self.pool.Get().([]byte) - return buf[:cap(buf)] // grow to capacity + return buf[:PoolSizeCopy] // just like make a new slice, but data may not be 0 } func (Self *CopyBufferPool) Put(x []byte) { - x = x[:0] - Self.pool.Put(x) + if len(x) == PoolSizeCopy { + Self.pool.Put(x) + } else { + x = nil // buf is not full, maybe truncated by gc in pool, not allowed + } } type BufferPool struct { @@ -102,13 +105,40 @@ func (Self *BufferPool) Put(x *bytes.Buffer) { Self.pool.Put(x) } +type MuxPackagerPool struct { + pool sync.Pool +} + +func (Self *MuxPackagerPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + pack := MuxPackager{} + return &pack + }, + } +} + +func (Self *MuxPackagerPool) Get() *MuxPackager { + pack := Self.pool.Get().(*MuxPackager) + buf := CopyBuff.Get() + pack.Content = buf + return pack +} + +func (Self *MuxPackagerPool) Put(pack *MuxPackager) { + CopyBuff.Put(pack.Content) + Self.pool.Put(pack) +} + var once = sync.Once{} var BuffPool = BufferPool{} var CopyBuff = CopyBufferPool{} +var MuxPack = MuxPackagerPool{} func newPool() { BuffPool.New() CopyBuff.New() + MuxPack.New() } func init() { diff --git a/lib/common/util.go b/lib/common/util.go index 25cfee2..3b4369f 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -4,8 +4,8 @@ import ( "bytes" "encoding/base64" "encoding/binary" + "errors" "github.com/cnlh/nps/lib/crypt" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "html/template" "io" @@ -264,34 +264,96 @@ func GetPortByAddr(addr string) int { return p } -func CopyBuffer(dst io.Writer, src io.Reader, connId int32) (written int64, err error) { - buf := pool.CopyBuff.Get() - defer pool.CopyBuff.Put(buf) - for { - nr, er := src.Read(buf) - if er != nil { - if er != io.EOF { - err = er - } - break +type ConnCopy struct { + dst net.Conn + src net.Conn + buf []byte + connId int32 +} + +func (Self *ConnCopy) New(dst net.Conn, src net.Conn, connId int32) { + Self.dst = dst + Self.src = src + Self.buf = CopyBuff.Get() + Self.connId = connId +} + +func (Self *ConnCopy) copyBufferOnce() (written int64, err error) { + nr, er := Self.src.Read(Self.buf) + if nr > 0 { + //logs.Warn("write", Self.connId, nr, string(buf[0:10])) + nw, ew := Self.dst.Write(Self.buf[0:nr]) + if nw > 0 { + written = int64(nw) } - if nr > 0 { - logs.Warn("write", connId, nr, string(buf[0:10])) - nw, ew := dst.Write(buf[0:nr]) - if nw > 0 { - written += int64(nw) - } - if ew != nil { - err = ew - break - } - if nr != nw { - err = io.ErrShortWrite - break - } + if ew != nil { + //logs.Warn("write err ew id nw", ew, Self.connId, nw) + err = ew + return + } + if nr != nw { + err = io.ErrShortWrite + return + } + if nw == 0 { + err = errors.New("io: write on closed pipe") + //logs.Warn("write buffer", err) + return } } - return written, err + if nr == 0 && er == nil { + err = errors.New("io: read on closed pipe") + //logs.Warn("read buffer", err) + return + } + if er != nil { + err = er + return + } + return +} + +func (Self *ConnCopy) copyBuffer() (written int64, err error) { + var write int64 + write, err = Self.copyBufferOnce() // first copy, if written is zero and err is io.EOF + // means conn already closed, so need to close all the conn + written += write + if err == io.EOF && written == 0 { + err = errors.New("io: read on closed pipe") + return + } else if err == io.EOF && written > 0 { + err = nil + return + } + for { + write, err = Self.copyBufferOnce() + written += write + if err != nil { + if err == io.EOF { + err = nil + } + return + } + } +} + +func (Self *ConnCopy) CopyConn() (written int64, err error) { + defer CopyBuff.Put(Self.buf) + if Self.dst != nil && Self.src != nil { + written, err = Self.copyBuffer() + } else { + return 0, errors.New("copy conn nil src or dst") + } + if err != nil { // copyBuffer do not return io.EOF ,close all conn + logs.Warn("close by copy conn ", Self.connId, err) + if Self.dst != nil { + Self.dst.Close() + } + if Self.src != nil { + Self.src.Close() + } + } + return } //send this ip forget to get a local udp port diff --git a/lib/conn/conn.go b/lib/conn/conn.go index 7b6e729..058b7d8 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -10,7 +10,6 @@ import ( "github.com/cnlh/nps/lib/crypt" "github.com/cnlh/nps/lib/file" "github.com/cnlh/nps/lib/mux" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/lib/rate" "github.com/cnlh/nps/vender/github.com/xtaci/kcp" "io" @@ -158,8 +157,8 @@ func (s *Conn) SendHealthInfo(info, status string) (int, error) { //get health info from conn func (s *Conn) GetHealthInfo() (info string, status bool, err error) { var l int - buf := pool.BufPoolMax.Get().([]byte) - defer pool.PutBufPoolMax(buf) + buf := common.BufPoolMax.Get().([]byte) + defer common.PutBufPoolMax(buf) if l, err = s.GetLen(); err != nil { return } else if _, err = s.ReadLen(l, buf); err != nil { @@ -232,8 +231,8 @@ func (s *Conn) SendInfo(t interface{}, flag string) (int, error) { //get task info func (s *Conn) getInfo(t interface{}) (err error) { var l int - buf := pool.BufPoolMax.Get().([]byte) - defer pool.PutBufPoolMax(buf) + buf := common.BufPoolMax.Get().([]byte) + defer common.PutBufPoolMax(buf) if l, err = s.GetLen(); err != nil { return } else if _, err = s.ReadLen(l, buf); err != nil { @@ -373,7 +372,7 @@ func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Ra } //get crypt or snappy conn -func GetConn(conn net.Conn, cpt, snappy bool, rt *rate.Rate, isServer bool) (io.ReadWriteCloser) { +func GetConn(conn net.Conn, cpt, snappy bool, rt *rate.Rate, isServer bool) io.ReadWriteCloser { if cpt { if isServer { return rate.NewRateConn(crypt.NewTlsServerConn(conn), rt) diff --git a/lib/conn/snappy.go b/lib/conn/snappy.go index cfd33c4..cda20b5 100644 --- a/lib/conn/snappy.go +++ b/lib/conn/snappy.go @@ -1,7 +1,7 @@ package conn import ( - "github.com/cnlh/nps/lib/pool" + "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/vender/github.com/golang/snappy" "io" ) @@ -31,8 +31,8 @@ func (s *SnappyConn) Write(b []byte) (n int, err error) { //snappy压缩读 func (s *SnappyConn) Read(b []byte) (n int, err error) { - buf := pool.BufPool.Get().([]byte) - defer pool.BufPool.Put(buf) + buf := common.BufPool.Get().([]byte) + defer common.BufPool.Put(buf) if n, err = s.r.Read(buf); err != nil { return } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index d30bedc..7430454 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -3,7 +3,6 @@ package mux import ( "errors" "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "io" "net" @@ -27,7 +26,8 @@ type conn struct { connId int32 isClose bool readWait bool - sendClose bool + sendClose bool // MUX_CONN_CLOSE already send + writeClose bool // close conn Write hasWrite int mux *Mux } @@ -69,12 +69,14 @@ func (s *conn) Read(buf []byte) (n int, err error) { return 0, errors.New("the conn has closed") } if node, err := s.readQueue.Pop(); err != nil { + logs.Warn("conn close by read pop err", s.connId, err) s.Close() return 0, io.EOF } else { if node.val == nil { //close s.sendClose = true + logs.Warn("conn close by read ", s.connId) s.Close() return 0, io.EOF } else { @@ -90,7 +92,7 @@ func (s *conn) Read(buf []byte) (n int, err error) { } else { n = copy(buf, s.readBuffer[s.startRead:s.endRead]) s.startRead += n - pool.CopyBuff.Put(s.readBuffer) + common.CopyBuff.Put(s.readBuffer) } return } @@ -99,6 +101,12 @@ func (s *conn) Write(buf []byte) (n int, err error) { if s.isClose { return 0, errors.New("the conn has closed") } + if s.writeClose { + s.sendClose = true + logs.Warn("conn close by write ", s.connId) + s.Close() + return 0, errors.New("io: write on closed conn") + } ch := make(chan struct{}) go s.write(buf, ch) if t := s.writeTimeOut.Sub(time.Now()); t > 0 { @@ -112,19 +120,22 @@ func (s *conn) Write(buf []byte) (n int, err error) { } else { <-ch } - if s.isClose { - return 0, io.EOF - } + close(ch) + //if s.isClose { + // return 0, io.ErrClosedPipe + //} return len(buf), nil } func (s *conn) write(buf []byte, ch chan struct{}) { start := 0 l := len(buf) for { - if l-start > pool.PoolSizeCopy { - s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:start+pool.PoolSizeCopy]) - start += pool.PoolSizeCopy + if l-start > common.PoolSizeCopy { + logs.Warn("conn write > poolsizecopy") + s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:start+common.PoolSizeCopy]) + start += common.PoolSizeCopy } else { + logs.Warn("conn write <= poolsizecopy, start, len", start, l) s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:l]) break } @@ -137,21 +148,16 @@ func (s *conn) Close() (err error) { return errors.New("the conn has closed") } s.isClose = true - pool.CopyBuff.Put(s.readBuffer) + s.mux.connMap.Delete(s.connId) + common.CopyBuff.Put(s.readBuffer) if s.readWait { s.readCh <- struct{}{} } s.readQueue.Clear() - s.mux.connMap.Delete(s.connId) if !s.mux.IsClose { if !s.sendClose { - err = s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) - logs.Warn("send closing msg ok ", s.connId) - if err != nil { - logs.Warn(err) - return - } - } else { + logs.Warn("conn send close") + go s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) } } return diff --git a/lib/mux/mux.go b/lib/mux/mux.go index ad17cb0..c40427d 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "math" "net" @@ -42,7 +41,7 @@ func NewMux(c net.Conn, connType string) *Mux { go m.readSession() //ping go m.ping() - //go m.writeSession() + go m.writeSession() return m } @@ -53,9 +52,8 @@ func (s *Mux) NewConn() (*conn, error) { conn := NewConn(s.getId(), s) //it must be set before send s.connMap.Set(conn.connId, conn) - if err := s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil); err != nil { - return nil, err - } + s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil) + logs.Warn("send mux new conn ", conn.connId) //set a timer timeout 30 second timer := time.NewTimer(time.Minute * 2) defer timer.Stop() @@ -83,34 +81,41 @@ func (s *Mux) Addr() net.Addr { return s.conn.LocalAddr() } -func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) { +func (s *Mux) sendInfo(flag uint8, id int32, content []byte) { + var err error if flag == common.MUX_NEW_MSG { + if len(content) == 0 { + logs.Warn("send info content is nil") + } } - buf := pool.BuffPool.Get() - pack := common.MuxPackager{} + buf := common.BuffPool.Get() + //defer pool.BuffPool.Put(buf) + pack := common.MuxPack.Get() err = pack.NewPac(flag, id, content) if err != nil { s.Close() logs.Warn("new pack err", err) + common.BuffPool.Put(buf) return } err = pack.Pack(buf) if err != nil { s.Close() logs.Warn("pack err", err) + common.BuffPool.Put(buf) return } - //s.writeQueue <- buf - _, err = buf.WriteTo(s.conn) - if err != nil { - s.Close() - logs.Warn("write err", err) - } - pool.BuffPool.Put(buf) - if flag == common.MUX_CONN_CLOSE { - } - if flag == common.MUX_NEW_MSG { - } + s.writeQueue <- buf + common.MuxPack.Put(pack) + //_, err = buf.WriteTo(s.conn) + //if err != nil { + // s.Close() + // logs.Warn("write err, close mux", err) + //} + //if flag == common.MUX_CONN_CLOSE { + //} + //if flag == common.MUX_NEW_MSG { + //} return } @@ -120,7 +125,7 @@ func (s *Mux) writeSession() { buf := <-s.writeQueue l := buf.Len() n, err := buf.WriteTo(s.conn) - pool.BuffPool.Put(buf) + common.BuffPool.Put(buf) if err != nil || int(n) != l { logs.Warn("close from write to ", err, n, l) s.Close() @@ -142,7 +147,9 @@ func (s *Mux) ping() { if (math.MaxInt32 - s.id) < 10000 { s.id = 0 } - if err := s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") { + //logs.Warn("send mux ping") + s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil) + if s.pingOk > 10 && s.connType == "kcp" { s.Close() break } @@ -155,28 +162,30 @@ func (s *Mux) ping() { } func (s *Mux) readSession() { - var pack common.MuxPackager go func() { for { + pack := common.MuxPack.Get() if pack.UnPack(s.conn) != nil { break } if pack.Flag != 0 && pack.Flag != 7 { if pack.Length > 10 { - logs.Warn(pack.Flag, pack.Id, pack.Length, string(pack.Content[:10])) + //logs.Warn(pack.Flag, pack.Id, pack.Length, string(pack.Content[:10])) } } s.pingOk = 0 switch pack.Flag { case common.MUX_NEW_CONN: //new conn - //logs.Warn("mux new conn", pack.Id) + logs.Warn("mux new conn", pack.Id) conn := NewConn(pack.Id, s) s.connMap.Set(pack.Id, conn) //it has been set before send ok s.newConnCh <- conn s.sendInfo(common.MUX_NEW_CONN_OK, pack.Id, nil) + logs.Warn("send mux new conn ok", pack.Id) continue case common.MUX_PING_FLAG: //ping - s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil) + //logs.Warn("send mux ping return") + go s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil) continue case common.MUX_PING_RETURN: continue @@ -185,6 +194,7 @@ func (s *Mux) readSession() { switch pack.Flag { case common.MUX_NEW_MSG: //new msg from remote conn //insert wait queue + logs.Warn("mux new msg ", pack.Id) conn.readQueue.Push(NewBufNode(pack.Content, int(pack.Length))) //judge len if >xxx ,send stop if conn.readWait { @@ -192,21 +202,29 @@ func (s *Mux) readSession() { conn.readCh <- struct{}{} } case common.MUX_NEW_CONN_OK: //conn ok + logs.Warn("mux new conn ok ", pack.Id) conn.connStatusOkCh <- struct{}{} case common.MUX_NEW_CONN_Fail: + logs.Warn("mux new conn fail", pack.Id) conn.connStatusFailCh <- struct{}{} case common.MUX_CONN_CLOSE: //close the connection + logs.Warn("mux conn close", pack.Id) + s.connMap.Delete(pack.Id) + conn.writeClose = true conn.readQueue.Push(NewBufNode(nil, 0)) if conn.readWait { logs.Warn("close read wait", pack.Id) conn.readWait = false conn.readCh <- struct{}{} } - s.connMap.Delete(pack.Id) + logs.Warn("receive mux conn close, finish", conn.connId) } } else if pack.Flag == common.MUX_NEW_MSG { - pool.CopyBuff.Put(pack.Content) + common.CopyBuff.Put(pack.Content) + } else if pack.Flag == common.MUX_CONN_CLOSE { + logs.Warn("mux conn close no id ", pack.Id) } + common.MuxPack.Put(pack) } s.Close() }() @@ -228,7 +246,7 @@ func (s *Mux) Close() error { select { case s.closeChan <- struct{}{}: } - //s.closeChan <- struct{}{} + s.closeChan <- struct{}{} close(s.writeQueue) close(s.newConnCh) return s.conn.Close() diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index 2d3d2d0..728dfa3 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -2,9 +2,7 @@ package mux import ( "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" - "log" "net" "net/http" _ "net/http/pprof" @@ -27,19 +25,29 @@ func TestNewMux(t *testing.T) { go func() { m2 := NewMux(conn2, "tcp") for { + logs.Warn("npc starting accept") c, err := m2.Accept() if err != nil { - log.Fatalln(err) + logs.Warn(err) + continue } - c2, err := net.Dial("tcp", "127.0.0.1:8080") + logs.Warn("npc accept success ") + c2, err := net.Dial("tcp", "127.0.0.1:80") if err != nil { - log.Fatalln(err) + logs.Warn(err) + continue + } + var npcToServer common.ConnCopy + npcToServer.New(c2, c, 0) + go npcToServer.CopyConn() + var serverToNpc common.ConnCopy + serverToNpc.New(c, c2, 10000) + _, err = serverToNpc.CopyConn() + if err == nil { + logs.Warn("close npc") + c2.Close() + c.Close() } - go common.CopyBuffer(c2, c,0) - common.CopyBuffer(c, c2,0) - c2.Close() - c.Close() - logs.Warn("close npc") } }() @@ -47,24 +55,33 @@ func TestNewMux(t *testing.T) { m1 := NewMux(conn1, "tcp") l, err := net.Listen("tcp", "127.0.0.1:7777") if err != nil { - log.Fatalln(err) + logs.Warn(err) } for { + logs.Warn("nps starting accept") conn, err := l.Accept() if err != nil { - log.Fatalln(err) + logs.Warn(err) + continue } - + logs.Warn("nps accept success starting new conn") tmpCpnn, err := m1.NewConn() if err != nil { - log.Fatalln(err) + logs.Warn("nps new conn err ", err) + continue + } + logs.Warn("nps new conn success ", tmpCpnn.connId) + var userToNps common.ConnCopy + userToNps.New(tmpCpnn, conn, tmpCpnn.connId) + go userToNps.CopyConn() + var npsToUser common.ConnCopy + npsToUser.New(conn, tmpCpnn, tmpCpnn.connId+10000) + _, err = npsToUser.CopyConn() + if err == nil { + logs.Warn("close from out nps ", tmpCpnn.connId) + conn.Close() + tmpCpnn.Close() } - go common.CopyBuffer(tmpCpnn, conn,tmpCpnn.connId) - _, err = common.CopyBuffer(conn, tmpCpnn,tmpCpnn.connId) - logs.Warn(err, tmpCpnn.connId) - conn.Close() - tmpCpnn.Close() - logs.Warn("close from out nps ", tmpCpnn.connId) } }() @@ -77,12 +94,12 @@ func server() { var err error l, err := net.Listen("tcp", "127.0.0.1:9999") if err != nil { - log.Fatalln(err) + logs.Warn(err) } go func() { conn1, err = l.Accept() if err != nil { - log.Fatalln(err) + logs.Warn(err) } }() return @@ -92,12 +109,12 @@ func client() { var err error conn2, err = net.Dial("tcp", "127.0.0.1:9999") if err != nil { - log.Fatalln(err) + logs.Warn(err) } } func TestNewConn(t *testing.T) { - buf := pool.GetBufPoolCopy() + buf := common.GetBufPoolCopy() logs.Warn(len(buf), cap(buf)) //b := pool.GetBufPoolCopy() //b[0] = 1 diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 6a14a8d..4388fb6 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -2,7 +2,7 @@ package mux import ( "errors" - "github.com/cnlh/nps/lib/pool" + "github.com/cnlh/nps/lib/common" "sync" ) @@ -63,7 +63,7 @@ func (entry *sliceEntry) Clear() bool { return false } for i := 0; i < entry.Size(); i++ { - pool.CopyBuff.Put(entry.element[i].val) + common.CopyBuff.Put(entry.element[i].val) entry.element[i] = nil } entry.element = nil diff --git a/server/proxy/p2p.go b/server/proxy/p2p.go index 44cdea3..7c1d270 100644 --- a/server/proxy/p2p.go +++ b/server/proxy/p2p.go @@ -2,7 +2,6 @@ package proxy import ( "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "net" "strings" @@ -36,7 +35,7 @@ func (s *P2PServer) Start() error { return err } for { - buf := pool.BufPoolUdp.Get().([]byte) + buf := common.BufPoolUdp.Get().([]byte) n, addr, err := s.listener.ReadFromUDP(buf) if err != nil { if strings.Contains(err.Error(), "use of closed network connection") { diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 62358a4..2f88155 100755 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -5,7 +5,6 @@ import ( "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/conn" "github.com/cnlh/nps/lib/file" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "net" "strings" @@ -33,7 +32,7 @@ func (s *UdpModeServer) Start() error { if err != nil { return err } - buf := pool.BufPoolUdp.Get().([]byte) + buf := common.BufPoolUdp.Get().([]byte) for { n, addr, err := s.listener.ReadFromUDP(buf) if err != nil { @@ -59,8 +58,8 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) { return } else { s.task.Flow.Add(int64(len(data)), 0) - buf := pool.BufPoolUdp.Get().([]byte) - defer pool.BufPoolUdp.Put(buf) + buf := common.BufPoolUdp.Get().([]byte) + defer common.BufPoolUdp.Put(buf) target.Write(data) s.task.Flow.Add(int64(len(data)), 0) if n, err := target.Read(buf); err != nil {