From f5d5f633660d7ffb1325dbcb3a18bd2797431ea2 Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Sun, 13 Oct 2019 22:45:40 +0800 Subject: [PATCH] change slide window bandwidth calculation --- lib/common/netpackager.go | 16 ++-- lib/mux/conn.go | 160 +++++++++++++++++++++----------------- lib/mux/map.go | 4 + lib/mux/mux.go | 67 ++++++++++++++-- 4 files changed, 163 insertions(+), 84 deletions(-) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 567a48f..91eeb98 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -65,8 +65,9 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) { //Unpack 会导致传入的数字类型转化成float64!! //主要原因是json unmarshal并未传入正确的数据类型 -func (Self *BasePackager) UnPack(reader io.Reader) (err error) { +func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) { Self.clean() + n += 2 // uint16 err = binary.Read(reader, binary.LittleEndian, &Self.Length) if err != nil { return @@ -80,6 +81,7 @@ func (Self *BasePackager) UnPack(reader io.Reader) (err error) { // err = io.ErrUnexpectedEOF //} err = binary.Read(reader, binary.LittleEndian, Self.Content) + n += Self.Length return } @@ -137,12 +139,13 @@ func (Self *ConnPackager) Pack(writer io.Writer) (err error) { return } -func (Self *ConnPackager) UnPack(reader io.Reader) (err error) { +func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) if err != nil && err != io.EOF { return } - err = Self.BasePackager.UnPack(reader) + n, err = Self.BasePackager.UnPack(reader) + n += 2 return } @@ -203,7 +206,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { return } -func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { +func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { err = binary.Read(reader, binary.LittleEndian, &Self.Flag) if err != nil { return @@ -216,14 +219,17 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: Self.Content = WindowBuff.Get() // need get a window buf from pool Self.BasePackager.clean() // also clean the content - err = Self.BasePackager.UnPack(reader) + n, err = Self.BasePackager.UnPack(reader) //logs.Warn("unpack", Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: err = binary.Read(reader, binary.LittleEndian, &Self.Window) if err != nil { return } + n += 4 // uint32 err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength) + n += 4 // uint32 } + n += 5 //uint8 int32 return } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index c4b47f3..2d519cb 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -3,7 +3,6 @@ package mux import ( "errors" "io" - "math" "net" "sync" "time" @@ -137,8 +136,8 @@ type ReceiveWindow struct { readWait bool windowFull bool count int8 - bw *bandwidth - once sync.Once + //bw *bandwidth + once sync.Once window } @@ -146,7 +145,7 @@ func (Self *ReceiveWindow) New(mux *Mux) { // initial a window for receive Self.readOp = make(chan struct{}) Self.bufQueue.New() - Self.bw = new(bandwidth) + //Self.bw = new(bandwidth) Self.element = new(ListElement) Self.maxSize = 8192 Self.mux = mux @@ -175,7 +174,7 @@ func (Self *ReceiveWindow) CalcSize() { // calculating maximum receive window size if Self.count == 0 { //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) - n := uint32(2 * Self.mux.latency * Self.bw.Get()) + n := uint32(2 * Self.mux.latency * Self.mux.bw.Get() * 1.5 / float64(Self.mux.connMap.Size())) if n < 8192 { n = 8192 } @@ -183,13 +182,16 @@ func (Self *ReceiveWindow) CalcSize() { n = Self.bufQueue.Len() } // set the minimal size + if n > 2*Self.maxSize { + n = 2 * Self.maxSize + } if n > common.MAXIMUM_WINDOW_SIZE { n = common.MAXIMUM_WINDOW_SIZE } // set the maximum size //logs.Warn("n", n) Self.maxSize = n - Self.count = -5 + Self.count = -10 } Self.count += 1 } @@ -225,7 +227,7 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { l := 0 //logs.Warn("receive window read off, element.l", Self.off, Self.element.l) copyData: - Self.bw.StartRead() + //Self.bw.StartRead() if Self.off == uint32(Self.element.l) { // on the first Read method invoked, Self.off and Self.element.l // both zero value @@ -241,7 +243,7 @@ copyData: //logs.Warn("pop element", Self.element.l, Self.element.part) } l = copy(p[pOff:], Self.element.buf[Self.off:]) - Self.bw.SetCopySize(l) + //Self.bw.SetCopySize(l) pOff += l Self.off += uint32(l) Self.bufQueue.mutex.Lock() @@ -250,7 +252,7 @@ copyData: Self.bufQueue.mutex.Unlock() n += l l = 0 - Self.bw.EndRead() + //Self.bw.EndRead() Self.sendStatus(id) if Self.off == uint32(Self.element.l) { //logs.Warn("put the element end ", string(Self.element.buf[:15])) @@ -469,65 +471,81 @@ func (Self *SendWindow) SetTimeOut(t time.Time) { Self.timeout = t } -type bandwidth struct { - lastReadStart time.Time - readStart time.Time - readEnd time.Time - bufLength int - lastBufLength int - count int8 - readBW float64 - writeBW float64 -} - -func (Self *bandwidth) StartRead() { - Self.lastReadStart, Self.readStart = Self.readStart, time.Now() -} - -func (Self *bandwidth) EndRead() { - if !Self.lastReadStart.IsZero() { - if Self.count == 0 { - Self.calcWriteBandwidth() - } - } - Self.readEnd = time.Now() - if Self.count == 0 { - Self.calcReadBandwidth() - Self.count = -3 - } - Self.count += 1 -} - -func (Self *bandwidth) SetCopySize(n int) { - // must be invoke between StartRead and EndRead - Self.lastBufLength, Self.bufLength = Self.bufLength, n -} - -func (Self *bandwidth) calcReadBandwidth() { - // Bandwidth between nps and npc - readTime := Self.readEnd.Sub(Self.readStart) - Self.readBW = float64(Self.bufLength) / readTime.Seconds() - //logs.Warn("calc read bw", Self.bufLength, readTime.Seconds()) -} - -func (Self *bandwidth) calcWriteBandwidth() { - // Bandwidth between nps and user, npc and application - //logs.Warn("calc write bw") - writeTime := Self.readEnd.Sub(Self.lastReadStart) - Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds() -} - -func (Self *bandwidth) Get() (bw float64) { - // The zero value, 0 for numeric types - if Self.writeBW == 0 && Self.readBW == 0 { - //logs.Warn("bw both 0") - return 100 - } - if Self.writeBW == 0 && Self.readBW != 0 { - return Self.readBW - } - if Self.readBW == 0 && Self.writeBW != 0 { - return Self.writeBW - } - return math.Min(Self.readBW, Self.writeBW) -} +//type bandwidth struct { +// readStart time.Time +// lastReadStart time.Time +// readEnd time.Time +// lastReadEnd time.Time +// bufLength int +// lastBufLength int +// count int8 +// readBW float64 +// writeBW float64 +// readBandwidth float64 +//} +// +//func (Self *bandwidth) StartRead() { +// Self.lastReadStart, Self.readStart = Self.readStart, time.Now() +// if !Self.lastReadStart.IsZero() { +// if Self.count == -5 { +// Self.calcBandWidth() +// } +// } +//} +// +//func (Self *bandwidth) EndRead() { +// Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now() +// if Self.count == -5 { +// Self.calcWriteBandwidth() +// } +// if Self.count == 0 { +// Self.calcReadBandwidth() +// Self.count = -6 +// } +// Self.count += 1 +//} +// +//func (Self *bandwidth) SetCopySize(n int) { +// // must be invoke between StartRead and EndRead +// Self.lastBufLength, Self.bufLength = Self.bufLength, n +//} +//// calculating +//// start end start end +//// read read +//// write +// +//func (Self *bandwidth) calcBandWidth() { +// t := Self.readStart.Sub(Self.lastReadStart) +// if Self.lastBufLength >= 32768 { +// Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds() +// } +//} +// +//func (Self *bandwidth) calcReadBandwidth() { +// // Bandwidth between nps and npc +// readTime := Self.readEnd.Sub(Self.readStart) +// Self.readBW = float64(Self.bufLength) / readTime.Seconds() +// //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds()) +//} +// +//func (Self *bandwidth) calcWriteBandwidth() { +// // Bandwidth between nps and user, npc and application +// writeTime := Self.readStart.Sub(Self.lastReadEnd) +// Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds() +// //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds()) +//} +// +//func (Self *bandwidth) Get() (bw float64) { +// // The zero value, 0 for numeric types +// if Self.writeBW == 0 && Self.readBW == 0 { +// //logs.Warn("bw both 0") +// return 100 +// } +// if Self.writeBW == 0 && Self.readBW != 0 { +// return Self.readBW +// } +// if Self.readBW == 0 && Self.writeBW != 0 { +// return Self.writeBW +// } +// return Self.readBandwidth +//} diff --git a/lib/mux/map.go b/lib/mux/map.go index 0801201..8f07dee 100644 --- a/lib/mux/map.go +++ b/lib/mux/map.go @@ -20,6 +20,10 @@ func NewConnMap() *connMap { return connMap } +func (s *connMap) Size() (n int) { + return len(s.connMap) +} + func (s *connMap) Get(id int32) (*conn, bool) { s.Lock() defer s.Unlock() diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 529b7dc..42aa8c8 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -24,6 +24,7 @@ type Mux struct { IsClose bool pingOk int latency float64 + bw *bandwidth pingCh chan []byte pingTimer *time.Timer connType string @@ -37,8 +38,9 @@ func NewMux(c net.Conn, connType string) *Mux { conn: c, connMap: NewConnMap(), id: 0, - closeChan: make(chan struct{}), + closeChan: make(chan struct{}, 3), newConnCh: make(chan *conn), + bw: new(bandwidth), IsClose: false, connType: connType, bufCh: make(chan *bytes.Buffer), @@ -91,6 +93,9 @@ func (s *Mux) Addr() net.Addr { } func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { + if s.IsClose { + return + } var err error pack := common.MuxPack.Get() err = pack.NewPac(flag, id, data...) @@ -160,6 +165,9 @@ func (s *Mux) ping() { for { if s.IsClose { ticker.Stop() + if !s.pingTimer.Stop() { + <-s.pingTimer.C + } break } select { @@ -189,6 +197,9 @@ func (s *Mux) pingReturn() { var now time.Time var data []byte for { + if s.IsClose { + break + } select { case data = <-s.pingCh: case <-s.closeChan: @@ -199,12 +210,12 @@ func (s *Mux) pingReturn() { break } _ = now.UnmarshalText(data) - s.latency = time.Now().UTC().Sub(now).Seconds() / 2 - logs.Warn("latency", s.latency) - common.WindowBuff.Put(data) - if s.latency <= 0 { - logs.Warn("latency err", s.latency) + latency := time.Now().UTC().Sub(now).Seconds() / 2 + if latency < 0.5 && latency > 0 { + s.latency = latency } + //logs.Warn("latency", s.latency) + common.WindowBuff.Put(data) } }() } @@ -212,14 +223,18 @@ func (s *Mux) pingReturn() { func (s *Mux) readSession() { go func() { pack := common.MuxPack.Get() + var l uint16 + var err error for { if s.IsClose { break } pack = common.MuxPack.Get() - if pack.UnPack(s.conn) != nil { + s.bw.StartRead() + if l, err = pack.UnPack(s.conn); err != nil { break } + s.bw.SetCopySize(l) s.pingOk = 0 switch pack.Flag { case common.MUX_NEW_CONN: //new connection @@ -239,7 +254,7 @@ func (s *Mux) readSession() { if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose { switch pack.Flag { case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection - err := s.newMsg(connection, pack) + err = s.newMsg(connection, pack) if err != nil { connection.Close() } @@ -299,6 +314,7 @@ func (s *Mux) Close() error { s.connMap.Close() s.closeChan <- struct{}{} s.closeChan <- struct{}{} + s.closeChan <- struct{}{} close(s.newConnCh) return s.conn.Close() } @@ -311,3 +327,38 @@ func (s *Mux) getId() (id int32) { } return } + +type bandwidth struct { + readStart time.Time + lastReadStart time.Time + bufLength uint16 + readBandwidth float64 +} + +func (Self *bandwidth) StartRead() { + if Self.readStart.IsZero() { + Self.readStart = time.Now() + } + if Self.bufLength >= 16384 { + Self.lastReadStart, Self.readStart = Self.readStart, time.Now() + Self.calcBandWidth() + } +} + +func (Self *bandwidth) SetCopySize(n uint16) { + Self.bufLength += n +} + +func (Self *bandwidth) calcBandWidth() { + t := Self.readStart.Sub(Self.lastReadStart) + Self.readBandwidth = float64(Self.bufLength) / t.Seconds() + Self.bufLength = 0 +} + +func (Self *bandwidth) Get() (bw float64) { + // The zero value, 0 for numeric types + if Self.readBandwidth <= 0 { + Self.readBandwidth = 100 + } + return Self.readBandwidth +}