fine mux, add goroutine pool

pull/265/head
ffdfgdfg 2019-11-09 23:02:29 +08:00
parent 5f58c34c8b
commit f362c96e1e
7 changed files with 352 additions and 228 deletions

View File

@ -2,8 +2,6 @@ package common
import (
"bytes"
"github.com/panjf2000/ants/v2"
"net"
"sync"
)
@ -151,53 +149,34 @@ func (Self *muxPackagerPool) Put(pack *MuxPackager) {
Self.pool.Put(pack)
}
type connGroup struct {
src net.Conn
dst net.Conn
wg *sync.WaitGroup
type ListElement struct {
Buf []byte
L uint16
Part bool
}
func newConnGroup(src net.Conn, dst net.Conn, wg *sync.WaitGroup) connGroup {
return connGroup{
src: src,
dst: dst,
wg: wg,
type listElementPool struct {
pool sync.Pool
}
func (Self *listElementPool) New() {
Self.pool = sync.Pool{
New: func() interface{} {
element := ListElement{}
return &element
},
}
}
func copyConnGroup(group interface{}) {
cg, ok := group.(connGroup)
if !ok {
return
}
_, err := CopyBuffer(cg.src, cg.dst)
if err != nil {
cg.src.Close()
cg.dst.Close()
//logs.Warn("close npc by copy from nps", err, c.connId)
}
cg.wg.Done()
func (Self *listElementPool) Get() *ListElement {
return Self.pool.Get().(*ListElement)
}
type Conns struct {
conn1 net.Conn
conn2 net.Conn
}
func NewConns(c1 net.Conn, c2 net.Conn) Conns {
return Conns{
conn1: c1,
conn2: c2,
}
}
func copyConns(group interface{}) {
conns := group.(Conns)
wg := new(sync.WaitGroup)
wg.Add(2)
_ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg))
_ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg))
wg.Wait()
func (Self *listElementPool) Put(element *ListElement) {
element.L = 0
element.Buf = nil
element.Part = false
Self.pool.Put(element)
}
var once = sync.Once{}
@ -205,14 +184,14 @@ var BuffPool = bufferPool{}
var CopyBuff = copyBufferPool{}
var MuxPack = muxPackagerPool{}
var WindowBuff = windowBufferPool{}
var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false))
var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false))
var ListElementPool = listElementPool{}
func newPool() {
BuffPool.New()
CopyBuff.New()
MuxPack.New()
WindowBuff.New()
ListElementPool.New()
}
func init() {

View File

@ -6,13 +6,14 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"github.com/astaxie/beego/logs"
"github.com/cnlh/nps/lib/goroutine"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/cnlh/nps/lib/common"
@ -350,25 +351,29 @@ func SetUdpSession(sess *kcp.UDPSession) {
//conn1 mux conn
func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) {
var in, out int64
var wg sync.WaitGroup
//var in, out int64
//var wg sync.WaitGroup
connHandle := GetConn(conn1, crypt, snappy, rate, isServer)
if rb != nil {
connHandle.Write(rb)
}
go func(in *int64) {
wg.Add(1)
*in, _ = common.CopyBuffer(connHandle, conn2)
connHandle.Close()
conn2.Close()
wg.Done()
}(&in)
out, _ = common.CopyBuffer(conn2, connHandle)
connHandle.Close()
conn2.Close()
wg.Wait()
if flow != nil {
flow.Add(in, out)
//go func(in *int64) {
// wg.Add(1)
// *in, _ = common.CopyBuffer(connHandle, conn2)
// connHandle.Close()
// conn2.Close()
// wg.Done()
//}(&in)
//out, _ = common.CopyBuffer(conn2, connHandle)
//connHandle.Close()
//conn2.Close()
//wg.Wait()
//if flow != nil {
// flow.Add(in, out)
//}
err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow))
if err != nil {
logs.Error(err)
}
}

73
lib/goroutine/pool.go Normal file
View File

@ -0,0 +1,73 @@
package goroutine
import (
"github.com/cnlh/nps/lib/common"
"github.com/cnlh/nps/lib/file"
"github.com/panjf2000/ants/v2"
"io"
"net"
"sync"
)
type connGroup struct {
src io.ReadWriteCloser
dst io.ReadWriteCloser
wg *sync.WaitGroup
n *int64
}
func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup {
return connGroup{
src: src,
dst: dst,
wg: wg,
n: n,
}
}
func copyConnGroup(group interface{}) {
cg, ok := group.(connGroup)
if !ok {
return
}
var err error
*cg.n, err = common.CopyBuffer(cg.dst, cg.src)
if err != nil {
cg.src.Close()
cg.dst.Close()
//logs.Warn("close npc by copy from nps", err, c.connId)
}
cg.wg.Done()
}
type Conns struct {
conn1 io.ReadWriteCloser // mux connection
conn2 net.Conn // outside connection
flow *file.Flow
}
func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow) Conns {
return Conns{
conn1: c1,
conn2: c2,
flow: flow,
}
}
func copyConns(group interface{}) {
conns := group.(Conns)
wg := new(sync.WaitGroup)
wg.Add(2)
var in, out int64
_ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in))
// outside to mux : incoming
_ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out))
// mux to outside : outgoing
wg.Wait()
if conns.flow != nil {
conns.flow.Add(in, out)
}
}
var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false))
var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false))

View File

@ -4,6 +4,7 @@ import (
"errors"
"io"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
@ -57,7 +58,12 @@ func (s *conn) Read(buf []byte) (n int, err error) {
return 0, nil
}
// waiting for takeout from receive window finish or timeout
//now := time.Now()
n, err = s.receiveWindow.Read(buf, s.connId)
//t := time.Now().Sub(now)
//if t.Seconds() > 0.5 {
//logs.Warn("conn read long", n, t.Seconds())
//}
//var errstr string
//if err == nil {
// errstr = "err:nil"
@ -82,7 +88,12 @@ func (s *conn) Write(buf []byte) (n int, err error) {
return 0, nil
}
//logs.Warn("write buf", len(buf))
//now := time.Now()
n, err = s.sendWindow.WriteFull(buf, s.connId)
//t := time.Now().Sub(now)
//if t.Seconds() > 0.5 {
// logs.Warn("conn write long", n, t.Seconds())
//}
return
}
@ -133,11 +144,25 @@ func (s *conn) SetWriteDeadline(t time.Time) error {
}
type window struct {
off uint32
maxSize uint32
closeOp bool
closeOpCh chan struct{}
mux *Mux
off uint32
maxSize uint32
closeOp bool
closeOpCh chan struct{}
remainingWait uint64
mux *Mux
}
func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
const mask = 1<<dequeueBits - 1
remaining = uint32((ptrs >> dequeueBits) & mask)
wait = uint32(ptrs & mask)
return
}
func (Self *window) pack(remaining, wait uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(remaining) << dequeueBits) |
uint64(wait&mask)
}
func (Self *window) New() {
@ -153,37 +178,30 @@ func (Self *window) CloseWindow() {
}
type ReceiveWindow struct {
bufQueue ReceiveWindowQueue
element *ListElement
readLength uint32
//readOp chan struct{}
readWait bool
windowFull uint32
count int8
//bw *bandwidth
once sync.Once
bufQueue ReceiveWindowQueue
element *common.ListElement
count int8
once sync.Once
window
}
func (Self *ReceiveWindow) New(mux *Mux) {
// initial a window for receive
//Self.readOp = make(chan struct{})
Self.bufQueue.New()
//Self.bw = new(bandwidth)
Self.element = new(ListElement)
Self.maxSize = 8192
Self.element = common.ListElementPool.Get()
Self.maxSize = 4096
Self.mux = mux
Self.window.New()
}
func (Self *ReceiveWindow) remainingSize() (n uint32) {
func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
// receive window remaining
return atomic.LoadUint32(&Self.maxSize) - Self.bufQueue.Len()
}
func (Self *ReceiveWindow) readSize() (n uint32) {
// acknowledge the size already read
return atomic.SwapUint32(&Self.readLength, 0)
l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
l -= int64(delta)
if l > 0 {
n = uint32(l)
}
return
}
func (Self *ReceiveWindow) calcSize() {
@ -194,8 +212,9 @@ func (Self *ReceiveWindow) calcSize() {
if n < 8192 {
n = 8192
}
if n < Self.bufQueue.Len() {
n = Self.bufQueue.Len()
bufLen := Self.bufQueue.Len()
if n < bufLen {
n = bufLen
}
// set the minimal size
if n > 2*Self.maxSize {
@ -210,28 +229,39 @@ func (Self *ReceiveWindow) calcSize() {
Self.count = -10
}
Self.count += 1
return
}
func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
if Self.closeOp {
return errors.New("conn.receiveWindow: write on closed window")
}
element := new(ListElement)
err = element.New(buf, l, part)
element, err := NewListElement(buf, l, part)
//logs.Warn("push the buf", len(buf), l, (&element).l)
if err != nil {
return
}
Self.bufQueue.Push(element) // must push data before allow read
//logs.Warn("read session calc size ", Self.maxSize)
// calculating the receive window size
Self.calcSize()
//logs.Warn("read session calc size finish", Self.maxSize)
if Self.remainingSize() == 0 {
atomic.StoreUint32(&Self.windowFull, 1)
//logs.Warn("window full true", Self.windowFull)
Self.calcSize() // calculate the max window size
var wait uint32
start:
ptrs := atomic.LoadUint64(&Self.remainingWait)
_, wait = Self.unpack(ptrs)
newRemaining := Self.remainingSize(l)
// calculate the remaining window size now, plus the element we will push
if newRemaining == 0 {
//logs.Warn("window full true", remaining)
wait = 1
}
if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
goto start
// another goroutine change the status, make sure shall we need wait
}
Self.bufQueue.Push(element)
// status check finish, now we can push the element into the queue
if wait == 0 {
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
// send the remaining window size, not including zero size
}
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize())
return nil
}
@ -243,10 +273,10 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
l := 0
//logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
copyData:
//Self.bw.StartRead()
if Self.off == uint32(Self.element.l) {
if Self.off == uint32(Self.element.L) {
// on the first Read method invoked, Self.off and Self.element.l
// both zero value
common.ListElementPool.Put(Self.element)
Self.element, err = Self.bufQueue.Pop()
// if the queue is empty, Pop method will wait until one element push
// into the queue successful, or timeout.
@ -258,38 +288,44 @@ copyData:
}
//logs.Warn("pop element", Self.element.l, Self.element.part)
}
l = copy(p[pOff:], Self.element.buf[Self.off:Self.element.l])
//Self.bw.SetCopySize(l)
l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
pOff += l
Self.off += uint32(l)
atomic.AddUint32(&Self.readLength, uint32(l))
//logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
n += l
l = 0
//Self.bw.EndRead()
if Self.off == uint32(Self.element.l) {
if Self.off == uint32(Self.element.L) {
//logs.Warn("put the element end ", string(Self.element.buf[:15]))
common.WindowBuff.Put(Self.element.buf)
Self.sendStatus(id)
common.WindowBuff.Put(Self.element.Buf)
Self.sendStatus(id, Self.element.L)
// check the window full status
}
if pOff < len(p) && Self.element.part {
if pOff < len(p) && Self.element.Part {
// element is a part of the segments, trying to fill up buf p
goto copyData
}
return // buf p is full or all of segments in buf, return
}
func (Self *ReceiveWindow) sendStatus(id int32) {
if Self.bufQueue.Len() == 0 {
// window is full before read or empty now
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize())
// acknowledge other side, have empty some receive window space
//}
func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
var remaining, wait uint32
for {
ptrs := atomic.LoadUint64(&Self.remainingWait)
remaining, wait = Self.unpack(ptrs)
remaining += uint32(l)
if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
break
}
runtime.Gosched()
// another goroutine change remaining or wait status, make sure
// we need acknowledge other side
}
if atomic.LoadUint32(&Self.windowFull) > 0 && Self.remainingSize() > 0 {
atomic.StoreUint32(&Self.windowFull, 0)
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize())
// now we get the current window status success
if wait == 1 {
//logs.Warn("send the wait status", remaining)
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
}
return
}
func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
@ -308,17 +344,16 @@ func (Self *ReceiveWindow) CloseWindow() {
}
type SendWindow struct {
buf []byte
sentLength uint32
setSizeCh chan struct{}
setSizeWait uint32
timeout time.Time
buf []byte
setSizeCh chan struct{}
timeout time.Time
window
}
func (Self *SendWindow) New(mux *Mux) {
Self.setSizeCh = make(chan struct{})
Self.maxSize = 4096
atomic.AddUint64(&Self.remainingWait, uint64(4096)<<dequeueBits)
Self.mux = mux
Self.window.New()
}
@ -329,11 +364,8 @@ func (Self *SendWindow) SetSendBuf(buf []byte) {
Self.off = 0
}
func (Self *SendWindow) RemainingSize() (n uint32) {
return atomic.LoadUint32(&Self.maxSize) - atomic.LoadUint32(&Self.sentLength)
}
func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
// set the window size from receive window
defer func() {
if recover() != nil {
closed = true
@ -343,37 +375,46 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
close(Self.setSizeCh)
return true
}
if readLength == 0 && atomic.LoadUint32(&Self.maxSize) == windowSize {
//logs.Warn("waiting for another window size")
return false // waiting for receive another usable window size
}
//logs.Warn("set send window size to ", windowSize, readLength)
Self.slide(windowSize, readLength)
if Self.RemainingSize() == 0 {
//logs.Warn("waiting for another window size after slide")
// keep the wait status
//atomic.StoreUint32(&Self.setSizeWait, 1)
return false
}
if atomic.CompareAndSwapUint32(&Self.setSizeWait, 1, 0) {
// send window into the wait status, need notice the channel
select {
case Self.setSizeCh <- struct{}{}:
//logs.Warn("send window remaining size is 0 finish")
return false
case <-Self.closeOpCh:
close(Self.setSizeCh)
return true
//logs.Warn("set send window size to ", windowSize, newRemaining)
var remaining, wait, newWait uint32
for {
ptrs := atomic.LoadUint64(&Self.remainingWait)
remaining, wait = Self.unpack(ptrs)
if remaining == newRemaining {
//logs.Warn("waiting for another window size")
return false // waiting for receive another usable window size
}
if newRemaining == 0 && wait == 1 {
newWait = 1 // keep the wait status,
// also if newRemaining is not zero, change wait to 0
}
if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
break
}
// anther goroutine change wait status or window size
}
if wait == 1 {
// send window into the wait status, need notice the channel
//logs.Warn("send window remaining size is 0")
Self.allow()
}
// send window not into the wait status, so just do slide
return false
}
func (Self *SendWindow) slide(windowSize, readLength uint32) {
atomic.AddUint32(&Self.sentLength, ^readLength-1)
atomic.StoreUint32(&Self.maxSize, windowSize)
func (Self *SendWindow) allow() {
select {
case Self.setSizeCh <- struct{}{}:
//logs.Warn("send window remaining size is 0 finish")
return
case <-Self.closeOpCh:
close(Self.setSizeCh)
return
}
}
func (Self *SendWindow) sent(sentSize uint32) {
atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
}
func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
@ -386,24 +427,35 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err
return nil, 0, false, io.EOF
// send window buff is drain, return eof and get another one
}
if Self.RemainingSize() == 0 {
atomic.StoreUint32(&Self.setSizeWait, 1)
var remaining uint32
start:
ptrs := atomic.LoadUint64(&Self.remainingWait)
remaining, _ = Self.unpack(ptrs)
if remaining == 0 {
if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
goto start // another goroutine change the window, try again
}
// into the wait status
//logs.Warn("send window into wait status")
err = Self.waitReceiveWindow()
if err != nil {
return nil, 0, false, err
}
//logs.Warn("rem into wait finish")
goto start
}
// there are still remaining window
//logs.Warn("rem", remaining)
if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
sendSize = common.MAXIMUM_SEGMENT_SIZE
//logs.Warn("cut buf by mss")
} else {
sendSize = uint32(len(Self.buf[Self.off:]))
}
if Self.RemainingSize() < sendSize {
if remaining < sendSize {
// usable window size is small than
// window MAXIMUM_SEGMENT_SIZE or send buf left
sendSize = Self.RemainingSize()
sendSize = remaining
//logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
}
//logs.Warn("send size", sendSize)
@ -412,7 +464,7 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err
}
p = Self.buf[Self.off : sendSize+Self.off]
Self.off += sendSize
atomic.AddUint32(&Self.sentLength, sendSize)
Self.sent(sendSize)
return
}
@ -439,6 +491,7 @@ func (Self *SendWindow) waitReceiveWindow() (err error) {
func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
Self.SetSendBuf(buf) // set the buf to send window
//logs.Warn("set the buf to send window")
var bufSeg []byte
var part bool
var l uint32

View File

@ -215,7 +215,7 @@ func (s *Mux) pingReturn() {
if latency < 0.5 && latency > 0 {
s.latency = latency
}
logs.Warn("latency", s.latency)
//logs.Warn("latency", s.latency)
common.WindowBuff.Put(data)
}
}()

View File

@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"github.com/cnlh/nps/lib/common"
"github.com/cnlh/nps/lib/goroutine"
"io"
"log"
"net"
@ -49,7 +50,7 @@ func TestNewMux(t *testing.T) {
}
//c2.(*net.TCPConn).SetReadBuffer(0)
//c2.(*net.TCPConn).SetReadBuffer(0)
_ = common.CopyConnsPool.Invoke(common.NewConns(c2, c))
_ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil))
//go func(c2 net.Conn, c *conn) {
// wg := new(sync.WaitGroup)
// wg.Add(2)
@ -102,7 +103,7 @@ func TestNewMux(t *testing.T) {
continue
}
//logs.Warn("nps new conn success ", tmpCpnn.connId)
_ = common.CopyConnsPool.Invoke(common.NewConns(tmpCpnn, conns))
_ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil))
//go func(tmpCpnn *conn, conns net.Conn) {
// wg := new(sync.WaitGroup)
// wg.Add(2)
@ -131,9 +132,9 @@ func TestNewMux(t *testing.T) {
//go NewLogServer()
time.Sleep(time.Second * 5)
for i := 0; i < 1000; i++ {
go test_raw(i)
}
//for i := 0; i < 1; i++ {
// go test_raw(i)
//}
//test_request()
for {
@ -166,7 +167,7 @@ func client() {
func test_request() {
conn, _ := net.Dial("tcp", "127.0.0.1:7777")
for {
for i := 0; i < 1000; i++ {
conn.Write([]byte(`GET / HTTP/1.1
Host: 127.0.0.1:7777
Connection: keep-alive
@ -185,19 +186,20 @@ Connection: keep-alive
break
}
fmt.Println(string(b[:20]), err)
time.Sleep(time.Second)
//time.Sleep(time.Second)
}
logs.Warn("finish")
}
func test_raw(k int) {
for i := 0; i < 1; i++ {
for i := 0; i < 1000; i++ {
ti := time.Now()
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
logs.Warn("conn dial err", err)
}
tid := time.Now()
conn.Write([]byte(`GET / HTTP/1.1
conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1
Host: 127.0.0.1:7777
@ -227,6 +229,7 @@ Host: 127.0.0.1:7777
logs.Warn("n loss", n, string(buf))
}
}
logs.Warn("finish")
}
func TestNewConn(t *testing.T) {
@ -313,11 +316,12 @@ func TestFIFO(t *testing.T) {
d.New()
go func() {
time.Sleep(time.Second)
for i := 0; i < 300100; i++ {
for i := 0; i < 1001; i++ {
data, err := d.Pop()
if err == nil {
//fmt.Println(i, string(data.buf), err)
logs.Warn(i, string(data.buf), err)
logs.Warn(i, string(data.Buf), err)
common.ListElementPool.Put(data)
} else {
//fmt.Println("err", err)
logs.Warn("err", err)
@ -328,10 +332,9 @@ func TestFIFO(t *testing.T) {
}()
go func() {
time.Sleep(time.Second * 10)
for i := 0; i < 300000; i++ {
data := new(ListElement)
for i := 0; i < 1000; i++ {
by := []byte("test " + strconv.Itoa(i) + " ") //
_ = data.New(by, uint16(len(by)), true)
data, _ := NewListElement(by, uint16(len(by)), true)
//fmt.Println(string((*data).buf), data)
//logs.Warn(string((*data).buf), data)
d.Push(data)

View File

@ -44,7 +44,6 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
default:
Self.lowestChain.pushHead(unsafe.Pointer(packager))
}
//atomic.AddUint32(&Self.count, 1)
Self.cond.Signal()
return
}
@ -52,7 +51,6 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
const maxStarving uint8 = 8
func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
// PriorityQueue is empty, notice Push method
var iter bool
for {
packager = Self.pop()
@ -64,6 +62,7 @@ func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
}
if iter {
break
// trying to pop twice
}
iter = true
runtime.Gosched()
@ -74,10 +73,12 @@ func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
if Self.stop {
return
}
//logs.Warn("queue into wait")
Self.cond.Wait()
// wait for it with no more iter
packager = Self.pop()
//logs.Warn("queue wait finish", packager)
}
//atomic.AddUint32(&Self.count, ^uint32(0))
return
}
@ -88,6 +89,7 @@ func (Self *PriorityQueue) pop() (packager *common.MuxPackager) {
return
}
if Self.starving < maxStarving {
// not pop too much, lowestChain will wait too long
ptr, ok = Self.middleChain.popTail()
if ok {
packager = (*common.MuxPackager)(ptr)
@ -119,29 +121,27 @@ func (Self *PriorityQueue) Stop() {
Self.cond.Broadcast()
}
type ListElement struct {
buf []byte
l uint16
part bool
}
func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) {
if uint16(len(buf)) != l {
return errors.New("ListElement: buf length not match")
err = errors.New("ListElement: buf length not match")
return
}
Self.buf = buf
Self.l = l
Self.part = part
return nil
//if l == 0 {
// logs.Warn("push zero")
//}
element = common.ListElementPool.Get()
element.Buf = buf
element.L = l
element.Part = part
return
}
type ReceiveWindowQueue struct {
chain *bufChain
length uint32
stopOp chan struct{}
readOp chan struct{}
popWait uint32
timeout time.Time
chain *bufChain
stopOp chan struct{}
readOp chan struct{}
lengthWait uint64
timeout time.Time
}
func (Self *ReceiveWindowQueue) New() {
@ -151,45 +151,45 @@ func (Self *ReceiveWindowQueue) New() {
Self.stopOp = make(chan struct{}, 2)
}
func (Self *ReceiveWindowQueue) Push(element *ListElement) {
func (Self *ReceiveWindowQueue) Push(element *common.ListElement) {
var length, wait uint32
for {
ptrs := atomic.LoadUint64(&Self.lengthWait)
length, wait = Self.chain.head.unpack(ptrs)
length += uint32(element.L)
if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) {
break
}
// another goroutine change the length or into wait, make sure
}
//logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf))
Self.chain.pushHead(unsafe.Pointer(element))
atomic.AddUint32(&Self.length, uint32(element.l))
Self.allowPop()
return
}
func (Self *ReceiveWindowQueue) pop() (element *ListElement) {
ptr, ok := Self.chain.popTail()
if ok {
element = (*ListElement)(ptr)
atomic.AddUint32(&Self.length, ^uint32(element.l-1))
return
//logs.Warn("window push", Self.Len())
if wait == 1 {
Self.allowPop()
}
return
}
func (Self *ReceiveWindowQueue) Pop() (element *ListElement, err error) {
var iter bool
func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) {
var length uint32
startPop:
element = Self.pop()
if element != nil {
return
}
if !iter {
iter = true
runtime.Gosched()
goto startPop
}
if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) {
iter = false
ptrs := atomic.LoadUint64(&Self.lengthWait)
length, _ = Self.chain.head.unpack(ptrs)
if length == 0 {
if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) {
goto startPop // another goroutine is pushing
}
t := Self.timeout.Sub(time.Now())
if t <= 0 {
t = time.Minute
}
timer := time.NewTimer(t)
defer timer.Stop()
//logs.Warn("queue into wait")
select {
case <-Self.readOp:
//logs.Warn("queue wait finish")
goto startPop
case <-Self.stopOp:
err = io.EOF
@ -199,23 +199,34 @@ startPop:
return
}
}
goto startPop
// length is not zero, so try to pop
for {
ptr, ok := Self.chain.popTail()
if ok {
//logs.Warn("window pop before", Self.Len())
element = (*common.ListElement)(ptr)
atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<<dequeueBits - 1))
//logs.Warn("window pop", Self.Len(), uint32(element.l))
return
}
runtime.Gosched() // another goroutine is still pushing
}
}
func (Self *ReceiveWindowQueue) allowPop() (closed bool) {
if atomic.CompareAndSwapUint32(&Self.popWait, 1, 0) {
select {
case Self.readOp <- struct{}{}:
return false
case <-Self.stopOp:
return true
}
//logs.Warn("allow pop", Self.Len())
select {
case Self.readOp <- struct{}{}:
return false
case <-Self.stopOp:
return true
}
return
}
func (Self *ReceiveWindowQueue) Len() (n uint32) {
return atomic.LoadUint32(&Self.length)
ptrs := atomic.LoadUint64(&Self.lengthWait)
n, _ = Self.chain.head.unpack(ptrs)
return
}
func (Self *ReceiveWindowQueue) Stop() {