From f362c96e1ee801e41703de2fea93c3239593122e Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Sat, 9 Nov 2019 23:02:29 +0800 Subject: [PATCH] fine mux, add goroutine pool --- lib/common/pool.go | 67 ++++-------- lib/conn/conn.go | 37 ++++--- lib/goroutine/pool.go | 73 +++++++++++++ lib/mux/conn.go | 247 +++++++++++++++++++++++++----------------- lib/mux/mux.go | 2 +- lib/mux/mux_test.go | 31 +++--- lib/mux/queue.go | 123 +++++++++++---------- 7 files changed, 352 insertions(+), 228 deletions(-) create mode 100644 lib/goroutine/pool.go diff --git a/lib/common/pool.go b/lib/common/pool.go index 5010012..5d4da00 100644 --- a/lib/common/pool.go +++ b/lib/common/pool.go @@ -2,8 +2,6 @@ package common import ( "bytes" - "github.com/panjf2000/ants/v2" - "net" "sync" ) @@ -151,53 +149,34 @@ func (Self *muxPackagerPool) Put(pack *MuxPackager) { Self.pool.Put(pack) } -type connGroup struct { - src net.Conn - dst net.Conn - wg *sync.WaitGroup +type ListElement struct { + Buf []byte + L uint16 + Part bool } -func newConnGroup(src net.Conn, dst net.Conn, wg *sync.WaitGroup) connGroup { - return connGroup{ - src: src, - dst: dst, - wg: wg, +type listElementPool struct { + pool sync.Pool +} + +func (Self *listElementPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + element := ListElement{} + return &element + }, } } -func copyConnGroup(group interface{}) { - cg, ok := group.(connGroup) - if !ok { - return - } - _, err := CopyBuffer(cg.src, cg.dst) - if err != nil { - cg.src.Close() - cg.dst.Close() - //logs.Warn("close npc by copy from nps", err, c.connId) - } - cg.wg.Done() +func (Self *listElementPool) Get() *ListElement { + return Self.pool.Get().(*ListElement) } -type Conns struct { - conn1 net.Conn - conn2 net.Conn -} - -func NewConns(c1 net.Conn, c2 net.Conn) Conns { - return Conns{ - conn1: c1, - conn2: c2, - } -} - -func copyConns(group interface{}) { - conns := group.(Conns) - wg := new(sync.WaitGroup) - wg.Add(2) - _ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg)) - _ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg)) - wg.Wait() +func (Self *listElementPool) Put(element *ListElement) { + element.L = 0 + element.Buf = nil + element.Part = false + Self.pool.Put(element) } var once = sync.Once{} @@ -205,14 +184,14 @@ var BuffPool = bufferPool{} var CopyBuff = copyBufferPool{} var MuxPack = muxPackagerPool{} var WindowBuff = windowBufferPool{} -var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false)) -var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false)) +var ListElementPool = listElementPool{} func newPool() { BuffPool.New() CopyBuff.New() MuxPack.New() WindowBuff.New() + ListElementPool.New() } func init() { diff --git a/lib/conn/conn.go b/lib/conn/conn.go index 7946c0d..9f0c397 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -6,13 +6,14 @@ import ( "encoding/binary" "encoding/json" "errors" + "github.com/astaxie/beego/logs" + "github.com/cnlh/nps/lib/goroutine" "io" "net" "net/http" "net/url" "strconv" "strings" - "sync" "time" "github.com/cnlh/nps/lib/common" @@ -350,25 +351,29 @@ func SetUdpSession(sess *kcp.UDPSession) { //conn1 mux conn func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) { - var in, out int64 - var wg sync.WaitGroup + //var in, out int64 + //var wg sync.WaitGroup connHandle := GetConn(conn1, crypt, snappy, rate, isServer) if rb != nil { connHandle.Write(rb) } - go func(in *int64) { - wg.Add(1) - *in, _ = common.CopyBuffer(connHandle, conn2) - connHandle.Close() - conn2.Close() - wg.Done() - }(&in) - out, _ = common.CopyBuffer(conn2, connHandle) - connHandle.Close() - conn2.Close() - wg.Wait() - if flow != nil { - flow.Add(in, out) + //go func(in *int64) { + // wg.Add(1) + // *in, _ = common.CopyBuffer(connHandle, conn2) + // connHandle.Close() + // conn2.Close() + // wg.Done() + //}(&in) + //out, _ = common.CopyBuffer(conn2, connHandle) + //connHandle.Close() + //conn2.Close() + //wg.Wait() + //if flow != nil { + // flow.Add(in, out) + //} + err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow)) + if err != nil { + logs.Error(err) } } diff --git a/lib/goroutine/pool.go b/lib/goroutine/pool.go new file mode 100644 index 0000000..287c711 --- /dev/null +++ b/lib/goroutine/pool.go @@ -0,0 +1,73 @@ +package goroutine + +import ( + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/file" + "github.com/panjf2000/ants/v2" + "io" + "net" + "sync" +) + +type connGroup struct { + src io.ReadWriteCloser + dst io.ReadWriteCloser + wg *sync.WaitGroup + n *int64 +} + +func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup { + return connGroup{ + src: src, + dst: dst, + wg: wg, + n: n, + } +} + +func copyConnGroup(group interface{}) { + cg, ok := group.(connGroup) + if !ok { + return + } + var err error + *cg.n, err = common.CopyBuffer(cg.dst, cg.src) + if err != nil { + cg.src.Close() + cg.dst.Close() + //logs.Warn("close npc by copy from nps", err, c.connId) + } + cg.wg.Done() +} + +type Conns struct { + conn1 io.ReadWriteCloser // mux connection + conn2 net.Conn // outside connection + flow *file.Flow +} + +func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow) Conns { + return Conns{ + conn1: c1, + conn2: c2, + flow: flow, + } +} + +func copyConns(group interface{}) { + conns := group.(Conns) + wg := new(sync.WaitGroup) + wg.Add(2) + var in, out int64 + _ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in)) + // outside to mux : incoming + _ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out)) + // mux to outside : outgoing + wg.Wait() + if conns.flow != nil { + conns.flow.Add(in, out) + } +} + +var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false)) +var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false)) diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 0ba6f90..cb982e8 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -4,6 +4,7 @@ import ( "errors" "io" "net" + "runtime" "sync" "sync/atomic" "time" @@ -57,7 +58,12 @@ func (s *conn) Read(buf []byte) (n int, err error) { return 0, nil } // waiting for takeout from receive window finish or timeout + //now := time.Now() n, err = s.receiveWindow.Read(buf, s.connId) + //t := time.Now().Sub(now) + //if t.Seconds() > 0.5 { + //logs.Warn("conn read long", n, t.Seconds()) + //} //var errstr string //if err == nil { // errstr = "err:nil" @@ -82,7 +88,12 @@ func (s *conn) Write(buf []byte) (n int, err error) { return 0, nil } //logs.Warn("write buf", len(buf)) + //now := time.Now() n, err = s.sendWindow.WriteFull(buf, s.connId) + //t := time.Now().Sub(now) + //if t.Seconds() > 0.5 { + // logs.Warn("conn write long", n, t.Seconds()) + //} return } @@ -133,11 +144,25 @@ func (s *conn) SetWriteDeadline(t time.Time) error { } type window struct { - off uint32 - maxSize uint32 - closeOp bool - closeOpCh chan struct{} - mux *Mux + off uint32 + maxSize uint32 + closeOp bool + closeOpCh chan struct{} + remainingWait uint64 + mux *Mux +} + +func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) { + const mask = 1<> dequeueBits) & mask) + wait = uint32(ptrs & mask) + return +} + +func (Self *window) pack(remaining, wait uint32) uint64 { + const mask = 1< 0 { + n = uint32(l) + } + return } func (Self *ReceiveWindow) calcSize() { @@ -194,8 +212,9 @@ func (Self *ReceiveWindow) calcSize() { if n < 8192 { n = 8192 } - if n < Self.bufQueue.Len() { - n = Self.bufQueue.Len() + bufLen := Self.bufQueue.Len() + if n < bufLen { + n = bufLen } // set the minimal size if n > 2*Self.maxSize { @@ -210,28 +229,39 @@ func (Self *ReceiveWindow) calcSize() { Self.count = -10 } Self.count += 1 + return } func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) { if Self.closeOp { return errors.New("conn.receiveWindow: write on closed window") } - element := new(ListElement) - err = element.New(buf, l, part) + element, err := NewListElement(buf, l, part) //logs.Warn("push the buf", len(buf), l, (&element).l) if err != nil { return } - Self.bufQueue.Push(element) // must push data before allow read - //logs.Warn("read session calc size ", Self.maxSize) - // calculating the receive window size - Self.calcSize() - //logs.Warn("read session calc size finish", Self.maxSize) - if Self.remainingSize() == 0 { - atomic.StoreUint32(&Self.windowFull, 1) - //logs.Warn("window full true", Self.windowFull) + Self.calcSize() // calculate the max window size + var wait uint32 +start: + ptrs := atomic.LoadUint64(&Self.remainingWait) + _, wait = Self.unpack(ptrs) + newRemaining := Self.remainingSize(l) + // calculate the remaining window size now, plus the element we will push + if newRemaining == 0 { + //logs.Warn("window full true", remaining) + wait = 1 + } + if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) { + goto start + // another goroutine change the status, make sure shall we need wait + } + Self.bufQueue.Push(element) + // status check finish, now we can push the element into the queue + if wait == 0 { + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining) + // send the remaining window size, not including zero size } - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize()) return nil } @@ -243,10 +273,10 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { l := 0 //logs.Warn("receive window read off, element.l", Self.off, Self.element.l) copyData: - //Self.bw.StartRead() - if Self.off == uint32(Self.element.l) { + if Self.off == uint32(Self.element.L) { // on the first Read method invoked, Self.off and Self.element.l // both zero value + common.ListElementPool.Put(Self.element) Self.element, err = Self.bufQueue.Pop() // if the queue is empty, Pop method will wait until one element push // into the queue successful, or timeout. @@ -258,38 +288,44 @@ copyData: } //logs.Warn("pop element", Self.element.l, Self.element.part) } - l = copy(p[pOff:], Self.element.buf[Self.off:Self.element.l]) - //Self.bw.SetCopySize(l) + l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L]) pOff += l Self.off += uint32(l) - atomic.AddUint32(&Self.readLength, uint32(l)) //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len()) n += l l = 0 - //Self.bw.EndRead() - if Self.off == uint32(Self.element.l) { + if Self.off == uint32(Self.element.L) { //logs.Warn("put the element end ", string(Self.element.buf[:15])) - common.WindowBuff.Put(Self.element.buf) - Self.sendStatus(id) + common.WindowBuff.Put(Self.element.Buf) + Self.sendStatus(id, Self.element.L) + // check the window full status } - if pOff < len(p) && Self.element.part { + if pOff < len(p) && Self.element.Part { // element is a part of the segments, trying to fill up buf p goto copyData } return // buf p is full or all of segments in buf, return } -func (Self *ReceiveWindow) sendStatus(id int32) { - if Self.bufQueue.Len() == 0 { - // window is full before read or empty now - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize()) - // acknowledge other side, have empty some receive window space - //} +func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { + var remaining, wait uint32 + for { + ptrs := atomic.LoadUint64(&Self.remainingWait) + remaining, wait = Self.unpack(ptrs) + remaining += uint32(l) + if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) { + break + } + runtime.Gosched() + // another goroutine change remaining or wait status, make sure + // we need acknowledge other side } - if atomic.LoadUint32(&Self.windowFull) > 0 && Self.remainingSize() > 0 { - atomic.StoreUint32(&Self.windowFull, 0) - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize()) + // now we get the current window status success + if wait == 1 { + //logs.Warn("send the wait status", remaining) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining) } + return } func (Self *ReceiveWindow) SetTimeOut(t time.Time) { @@ -308,17 +344,16 @@ func (Self *ReceiveWindow) CloseWindow() { } type SendWindow struct { - buf []byte - sentLength uint32 - setSizeCh chan struct{} - setSizeWait uint32 - timeout time.Time + buf []byte + setSizeCh chan struct{} + timeout time.Time window } func (Self *SendWindow) New(mux *Mux) { Self.setSizeCh = make(chan struct{}) Self.maxSize = 4096 + atomic.AddUint64(&Self.remainingWait, uint64(4096)< common.MAXIMUM_SEGMENT_SIZE { sendSize = common.MAXIMUM_SEGMENT_SIZE //logs.Warn("cut buf by mss") } else { sendSize = uint32(len(Self.buf[Self.off:])) } - if Self.RemainingSize() < sendSize { + if remaining < sendSize { // usable window size is small than // window MAXIMUM_SEGMENT_SIZE or send buf left - sendSize = Self.RemainingSize() + sendSize = remaining //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) } //logs.Warn("send size", sendSize) @@ -412,7 +464,7 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err } p = Self.buf[Self.off : sendSize+Self.off] Self.off += sendSize - atomic.AddUint32(&Self.sentLength, sendSize) + Self.sent(sendSize) return } @@ -439,6 +491,7 @@ func (Self *SendWindow) waitReceiveWindow() (err error) { func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) { Self.SetSendBuf(buf) // set the buf to send window + //logs.Warn("set the buf to send window") var bufSeg []byte var part bool var l uint32 diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 6a2d6b6..585c980 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -215,7 +215,7 @@ func (s *Mux) pingReturn() { if latency < 0.5 && latency > 0 { s.latency = latency } - logs.Warn("latency", s.latency) + //logs.Warn("latency", s.latency) common.WindowBuff.Put(data) } }() diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index d3061ab..151def1 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/goroutine" "io" "log" "net" @@ -49,7 +50,7 @@ func TestNewMux(t *testing.T) { } //c2.(*net.TCPConn).SetReadBuffer(0) //c2.(*net.TCPConn).SetReadBuffer(0) - _ = common.CopyConnsPool.Invoke(common.NewConns(c2, c)) + _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil)) //go func(c2 net.Conn, c *conn) { // wg := new(sync.WaitGroup) // wg.Add(2) @@ -102,7 +103,7 @@ func TestNewMux(t *testing.T) { continue } //logs.Warn("nps new conn success ", tmpCpnn.connId) - _ = common.CopyConnsPool.Invoke(common.NewConns(tmpCpnn, conns)) + _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil)) //go func(tmpCpnn *conn, conns net.Conn) { // wg := new(sync.WaitGroup) // wg.Add(2) @@ -131,9 +132,9 @@ func TestNewMux(t *testing.T) { //go NewLogServer() time.Sleep(time.Second * 5) - for i := 0; i < 1000; i++ { - go test_raw(i) - } + //for i := 0; i < 1; i++ { + // go test_raw(i) + //} //test_request() for { @@ -166,7 +167,7 @@ func client() { func test_request() { conn, _ := net.Dial("tcp", "127.0.0.1:7777") - for { + for i := 0; i < 1000; i++ { conn.Write([]byte(`GET / HTTP/1.1 Host: 127.0.0.1:7777 Connection: keep-alive @@ -185,19 +186,20 @@ Connection: keep-alive break } fmt.Println(string(b[:20]), err) - time.Sleep(time.Second) + //time.Sleep(time.Second) } + logs.Warn("finish") } func test_raw(k int) { - for i := 0; i < 1; i++ { + for i := 0; i < 1000; i++ { ti := time.Now() conn, err := net.Dial("tcp", "127.0.0.1:7777") if err != nil { logs.Warn("conn dial err", err) } tid := time.Now() - conn.Write([]byte(`GET / HTTP/1.1 + conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1 Host: 127.0.0.1:7777 @@ -227,6 +229,7 @@ Host: 127.0.0.1:7777 logs.Warn("n loss", n, string(buf)) } } + logs.Warn("finish") } func TestNewConn(t *testing.T) { @@ -313,11 +316,12 @@ func TestFIFO(t *testing.T) { d.New() go func() { time.Sleep(time.Second) - for i := 0; i < 300100; i++ { + for i := 0; i < 1001; i++ { data, err := d.Pop() if err == nil { //fmt.Println(i, string(data.buf), err) - logs.Warn(i, string(data.buf), err) + logs.Warn(i, string(data.Buf), err) + common.ListElementPool.Put(data) } else { //fmt.Println("err", err) logs.Warn("err", err) @@ -328,10 +332,9 @@ func TestFIFO(t *testing.T) { }() go func() { time.Sleep(time.Second * 10) - for i := 0; i < 300000; i++ { - data := new(ListElement) + for i := 0; i < 1000; i++ { by := []byte("test " + strconv.Itoa(i) + " ") // - _ = data.New(by, uint16(len(by)), true) + data, _ := NewListElement(by, uint16(len(by)), true) //fmt.Println(string((*data).buf), data) //logs.Warn(string((*data).buf), data) d.Push(data) diff --git a/lib/mux/queue.go b/lib/mux/queue.go index e3c39a1..4790779 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -44,7 +44,6 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) { default: Self.lowestChain.pushHead(unsafe.Pointer(packager)) } - //atomic.AddUint32(&Self.count, 1) Self.cond.Signal() return } @@ -52,7 +51,6 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) { const maxStarving uint8 = 8 func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { - // PriorityQueue is empty, notice Push method var iter bool for { packager = Self.pop() @@ -64,6 +62,7 @@ func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { } if iter { break + // trying to pop twice } iter = true runtime.Gosched() @@ -74,10 +73,12 @@ func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { if Self.stop { return } + //logs.Warn("queue into wait") Self.cond.Wait() + // wait for it with no more iter packager = Self.pop() + //logs.Warn("queue wait finish", packager) } - //atomic.AddUint32(&Self.count, ^uint32(0)) return } @@ -88,6 +89,7 @@ func (Self *PriorityQueue) pop() (packager *common.MuxPackager) { return } if Self.starving < maxStarving { + // not pop too much, lowestChain will wait too long ptr, ok = Self.middleChain.popTail() if ok { packager = (*common.MuxPackager)(ptr) @@ -119,29 +121,27 @@ func (Self *PriorityQueue) Stop() { Self.cond.Broadcast() } -type ListElement struct { - buf []byte - l uint16 - part bool -} - -func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) { +func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) { if uint16(len(buf)) != l { - return errors.New("ListElement: buf length not match") + err = errors.New("ListElement: buf length not match") + return } - Self.buf = buf - Self.l = l - Self.part = part - return nil + //if l == 0 { + // logs.Warn("push zero") + //} + element = common.ListElementPool.Get() + element.Buf = buf + element.L = l + element.Part = part + return } type ReceiveWindowQueue struct { - chain *bufChain - length uint32 - stopOp chan struct{} - readOp chan struct{} - popWait uint32 - timeout time.Time + chain *bufChain + stopOp chan struct{} + readOp chan struct{} + lengthWait uint64 + timeout time.Time } func (Self *ReceiveWindowQueue) New() { @@ -151,45 +151,45 @@ func (Self *ReceiveWindowQueue) New() { Self.stopOp = make(chan struct{}, 2) } -func (Self *ReceiveWindowQueue) Push(element *ListElement) { +func (Self *ReceiveWindowQueue) Push(element *common.ListElement) { + var length, wait uint32 + for { + ptrs := atomic.LoadUint64(&Self.lengthWait) + length, wait = Self.chain.head.unpack(ptrs) + length += uint32(element.L) + if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) { + break + } + // another goroutine change the length or into wait, make sure + } + //logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf)) Self.chain.pushHead(unsafe.Pointer(element)) - atomic.AddUint32(&Self.length, uint32(element.l)) - Self.allowPop() - return -} - -func (Self *ReceiveWindowQueue) pop() (element *ListElement) { - ptr, ok := Self.chain.popTail() - if ok { - element = (*ListElement)(ptr) - atomic.AddUint32(&Self.length, ^uint32(element.l-1)) - return + //logs.Warn("window push", Self.Len()) + if wait == 1 { + Self.allowPop() } return } -func (Self *ReceiveWindowQueue) Pop() (element *ListElement, err error) { - var iter bool +func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) { + var length uint32 startPop: - element = Self.pop() - if element != nil { - return - } - if !iter { - iter = true - runtime.Gosched() - goto startPop - } - if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) { - iter = false + ptrs := atomic.LoadUint64(&Self.lengthWait) + length, _ = Self.chain.head.unpack(ptrs) + if length == 0 { + if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) { + goto startPop // another goroutine is pushing + } t := Self.timeout.Sub(time.Now()) if t <= 0 { t = time.Minute } timer := time.NewTimer(t) defer timer.Stop() + //logs.Warn("queue into wait") select { case <-Self.readOp: + //logs.Warn("queue wait finish") goto startPop case <-Self.stopOp: err = io.EOF @@ -199,23 +199,34 @@ startPop: return } } - goto startPop + // length is not zero, so try to pop + for { + ptr, ok := Self.chain.popTail() + if ok { + //logs.Warn("window pop before", Self.Len()) + element = (*common.ListElement)(ptr) + atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<