From ae28d41231c68c226a0a0c82f3341c42cef6c620 Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Mon, 16 Dec 2019 23:10:44 +0800 Subject: [PATCH] change mux data struct, fix #250 --- lib/common/netpackager.go | 132 ++++++++++++++++++-------------------- lib/mux/conn.go | 6 +- lib/mux/mux.go | 10 +-- lib/version/version.go | 2 +- 4 files changed, 71 insertions(+), 79 deletions(-) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 96ebaec..e3e0919 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -22,24 +22,34 @@ type BasePackager struct { Content []byte } -func (Self *BasePackager) NewPac(contents ...interface{}) (err error) { +func (Self *BasePackager) NewPac(content []byte) (err error) { Self.clean() - for _, content := range contents { - switch content.(type) { - case nil: - Self.Content = Self.Content[:0] - case []byte: - err = Self.appendByte(content.([]byte)) - case string: - err = Self.appendByte([]byte(content.(string))) - if err != nil { - return - } - err = Self.appendByte([]byte(CONN_DATA_SEQ)) - default: - err = Self.marshal(content) + if content != nil { + n := len(content) + if n > MAXIMUM_SEGMENT_SIZE { + err = errors.New("mux:packer: newpack content segment too large") } + Self.Content = Self.Content[:n] + copy(Self.Content, content) + } else { + Self.Content = Self.Content[:0] } + //for _, content := range contents { + // switch content.(type) { + // case nil: + // Self.Content = Self.Content[:0] + // case []byte: + // err = Self.appendByte(content.([]byte)) + // case string: + // err = Self.appendByte([]byte(content.(string))) + // if err != nil { + // return + // } + // err = Self.appendByte([]byte(CONN_DATA_SEQ)) + // default: + // err = Self.marshal(content) + // } + //} Self.setLength() return } @@ -78,6 +88,9 @@ func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) { if int(Self.Length) > cap(Self.Content) { err = errors.New("unpack err, content length too large") } + if Self.Length > MAXIMUM_SEGMENT_SIZE { + err = errors.New("mux:packer: unpack content segment too large") + } Self.Content = Self.Content[:int(Self.Length)] //n, err := io.ReadFull(reader, Self.Content) //if n != int(Self.Length) { @@ -128,61 +141,49 @@ type ConnPackager struct { BasePackager } -func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) { - Self.ConnType = connType - err = Self.BasePackager.NewPac(content...) - return -} - -func (Self *ConnPackager) Pack(writer io.Writer) (err error) { - err = binary.Write(writer, binary.LittleEndian, Self.ConnType) - if err != nil { - return - } - err = Self.BasePackager.Pack(writer) - return -} - -func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { - err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) - if err != nil && err != io.EOF { - return - } - n, err = Self.BasePackager.UnPack(reader) - n += 2 - return -} +//func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) { +// Self.ConnType = connType +// err = Self.BasePackager.NewPac(content...) +// return +//} +// +//func (Self *ConnPackager) Pack(writer io.Writer) (err error) { +// err = binary.Write(writer, binary.LittleEndian, Self.ConnType) +// if err != nil { +// return +// } +// err = Self.BasePackager.Pack(writer) +// return +//} +// +//func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { +// err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) +// if err != nil && err != io.EOF { +// return +// } +// n, err = Self.BasePackager.UnPack(reader) +// n += 2 +// return +//} type MuxPackager struct { - Flag uint8 - Id int32 - Window uint32 - ReadLength uint32 + Flag uint8 + Id int32 + RemainLength uint32 BasePackager } -func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { +func (Self *MuxPackager) NewPac(flag uint8, id int32, content interface{}) (err error) { Self.Flag = flag Self.Id = id switch flag { case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART: Self.Content = WindowBuff.Get() - err = Self.BasePackager.NewPac(content...) + err = Self.BasePackager.NewPac(content.([]byte)) //logs.Warn(Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: // MUX_MSG_SEND_OK contains two data - switch content[0].(type) { - case int: - Self.Window = uint32(content[0].(int)) - case uint32: - Self.Window = content[0].(uint32) - } - switch content[1].(type) { - case int: - Self.ReadLength = uint32(content[1].(int)) - case uint32: - Self.ReadLength = content[1].(uint32) - } + Self.RemainLength = content.(uint32) } return } @@ -201,11 +202,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { err = Self.BasePackager.Pack(writer) WindowBuff.Put(Self.Content) case MUX_MSG_SEND_OK: - err = binary.Write(writer, binary.LittleEndian, Self.Window) - if err != nil { - return - } - err = binary.Write(writer, binary.LittleEndian, Self.ReadLength) + err = binary.Write(writer, binary.LittleEndian, Self.RemainLength) } return } @@ -226,12 +223,7 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { n, err = Self.BasePackager.UnPack(reader) //logs.Warn("unpack", Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: - err = binary.Read(reader, binary.LittleEndian, &Self.Window) - if err != nil { - return - } - n += 4 // uint32 - err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength) + err = binary.Read(reader, binary.LittleEndian, &Self.RemainLength) n += 4 // uint32 } n += 5 //uint8 int32 @@ -273,10 +265,10 @@ func (addr *Addr) Decode(b []byte) error { pos := 1 switch addr.Type { case ipV4: - addr.Host = net.IP(b[pos:pos+net.IPv4len]).String() + addr.Host = net.IP(b[pos : pos+net.IPv4len]).String() pos += net.IPv4len case ipV6: - addr.Host = net.IP(b[pos:pos+net.IPv6len]).String() + addr.Host = net.IP(b[pos : pos+net.IPv6len]).String() pos += net.IPv6len case domainName: addrlen := int(b[pos]) diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 28ec1de..96bb1a9 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -265,7 +265,7 @@ start: Self.bufQueue.Push(element) // status check finish, now we can push the element into the queue if wait == 0 { - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, newRemaining) // send the remaining window size, not including zero size } return nil @@ -333,7 +333,7 @@ func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { // now we get the current window status success if wait == 1 { //logs.Warn("send the wait status", remaining) - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, remaining) } return } @@ -394,7 +394,7 @@ func (Self *SendWindow) SetSendBuf(buf []byte) { Self.off = 0 } -func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) { +func (Self *SendWindow) SetSize(newRemaining uint32) (closed bool) { // set the window size from receive window defer func() { if recover() != nil { diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 02b017a..eb75182 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -92,13 +92,13 @@ func (s *Mux) Addr() net.Addr { return s.conn.LocalAddr() } -func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { +func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { if s.IsClose { return } var err error pack := common.MuxPack.Get() - err = pack.NewPac(flag, id, data...) + err = pack.NewPac(flag, id, data) if err != nil { common.MuxPack.Put(pack) logs.Error("mux: new pack err", err) @@ -173,7 +173,7 @@ func (s *Mux) ping() { s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) // send the ping flag and get the latency first ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() + defer ticker.Stop() for { if s.IsClose { break @@ -198,7 +198,7 @@ func (s *Mux) ping() { } atomic.AddUint32(&s.pingOk, 1) } - return + return }() } @@ -297,7 +297,7 @@ func (s *Mux) readSession() { if connection.isClose { continue } - connection.sendWindow.SetSize(pack.Window, pack.ReadLength) + connection.sendWindow.SetSize(pack.RemainLength) continue case common.MUX_CONN_CLOSE: //close the connection connection.closeFlag = true diff --git a/lib/version/version.go b/lib/version/version.go index 5c80ede..5206a72 100644 --- a/lib/version/version.go +++ b/lib/version/version.go @@ -4,5 +4,5 @@ const VERSION = "0.25.2" // Compulsory minimum version, Minimum downward compatibility to this version func GetVersion() string { - return "0.25.0" + return "0.25.2" }