kcp cleanup

pull/861/head
Darien Raymond 2018-01-17 17:36:14 +01:00
parent a6c0ef11ba
commit 630a76d06a
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
6 changed files with 236 additions and 237 deletions

View File

@ -572,7 +572,7 @@ func (c *Connection) Input(segments []Segment) {
c.dataInput.Signal() c.dataInput.Signal()
c.dataOutput.Signal() c.dataOutput.Signal()
} }
c.sendingWorker.ProcessReceivingNext(seg.ReceivinNext) c.sendingWorker.ProcessReceivingNext(seg.ReceivingNext)
c.receivingWorker.ProcessSendingNext(seg.SendingNext) c.receivingWorker.ProcessSendingNext(seg.SendingNext)
c.roundTrip.UpdatePeerRTO(seg.PeerRTO, current) c.roundTrip.UpdatePeerRTO(seg.PeerRTO, current)
seg.Release() seg.Release()
@ -628,7 +628,7 @@ func (c *Connection) Ping(current uint32, cmd Command) {
seg := NewCmdOnlySegment() seg := NewCmdOnlySegment()
seg.Conv = c.meta.Conversation seg.Conv = c.meta.Conversation
seg.Cmd = cmd seg.Cmd = cmd
seg.ReceivinNext = c.receivingWorker.NextNumber() seg.ReceivingNext = c.receivingWorker.NextNumber()
seg.SendingNext = c.sendingWorker.FirstUnacknowledged() seg.SendingNext = c.sendingWorker.FirstUnacknowledged()
seg.PeerRTO = c.roundTrip.Timeout() seg.PeerRTO = c.roundTrip.Timeout()
if c.State() == StateReadyToClose { if c.State() == StateReadyToClose {

View File

@ -74,25 +74,25 @@ func NewListener(ctx context.Context, address net.Address, port net.Port, addCon
return l, nil return l, nil
} }
func (v *Listener) OnReceive(payload *buf.Buffer, src net.Destination, originalDest net.Destination) { func (l *Listener) OnReceive(payload *buf.Buffer, src net.Destination, originalDest net.Destination) {
defer payload.Release() defer payload.Release()
segments := v.reader.Read(payload.Bytes()) segments := l.reader.Read(payload.Bytes())
if len(segments) == 0 { if len(segments) == 0 {
newError("discarding invalid payload from ", src).WriteToLog() newError("discarding invalid payload from ", src).WriteToLog()
return return
} }
v.Lock() l.Lock()
defer v.Unlock() defer l.Unlock()
select { select {
case <-v.ctx.Done(): case <-l.ctx.Done():
return return
default: default:
} }
if v.hub == nil { if l.hub == nil {
return return
} }
@ -104,7 +104,7 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src net.Destination, originalD
Port: src.Port, Port: src.Port,
Conv: conv, Conv: conv,
} }
conn, found := v.sessions[id] conn, found := l.sessions[id]
if !found { if !found {
if cmd == CommandTerminate { if cmd == CommandTerminate {
@ -112,73 +112,73 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src net.Destination, originalD
} }
writer := &Writer{ writer := &Writer{
id: id, id: id,
hub: v.hub, hub: l.hub,
dest: src, dest: src,
listener: v, listener: l,
} }
remoteAddr := &net.UDPAddr{ remoteAddr := &net.UDPAddr{
IP: src.Address.IP(), IP: src.Address.IP(),
Port: int(src.Port), Port: int(src.Port),
} }
localAddr := v.hub.Addr() localAddr := l.hub.Addr()
conn = NewConnection(ConnMetadata{ conn = NewConnection(ConnMetadata{
LocalAddr: localAddr, LocalAddr: localAddr,
RemoteAddr: remoteAddr, RemoteAddr: remoteAddr,
Conversation: conv, Conversation: conv,
}, &KCPPacketWriter{ }, &KCPPacketWriter{
Header: v.header, Header: l.header,
Security: v.security, Security: l.security,
Writer: writer, Writer: writer,
}, writer, v.config) }, writer, l.config)
var netConn internet.Connection = conn var netConn internet.Connection = conn
if v.tlsConfig != nil { if l.tlsConfig != nil {
tlsConn := tls.Server(conn, v.tlsConfig) tlsConn := tls.Server(conn, l.tlsConfig)
netConn = tlsConn netConn = tlsConn
} }
if !v.addConn(context.Background(), netConn) { if !l.addConn(context.Background(), netConn) {
return return
} }
v.sessions[id] = conn l.sessions[id] = conn
} }
conn.Input(segments) conn.Input(segments)
} }
func (v *Listener) Remove(id ConnectionID) { func (l *Listener) Remove(id ConnectionID) {
select { select {
case <-v.ctx.Done(): case <-l.ctx.Done():
return return
default: default:
v.Lock() l.Lock()
delete(v.sessions, id) delete(l.sessions, id)
v.Unlock() l.Unlock()
} }
} }
// Close stops listening on the UDP address. Already Accepted connections are not closed. // Close stops listening on the UDP address. Already Accepted connections are not closed.
func (v *Listener) Close() error { func (l *Listener) Close() error {
v.hub.Close() l.hub.Close()
v.Lock() l.Lock()
defer v.Unlock() defer l.Unlock()
for _, conn := range v.sessions { for _, conn := range l.sessions {
go conn.Terminate() go conn.Terminate()
} }
return nil return nil
} }
func (v *Listener) ActiveConnections() int { func (l *Listener) ActiveConnections() int {
v.Lock() l.Lock()
defer v.Unlock() defer l.Unlock()
return len(v.sessions) return len(l.sessions)
} }
// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it. // Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
func (v *Listener) Addr() net.Addr { func (l *Listener) Addr() net.Addr {
return v.hub.Addr() return l.hub.Addr()
} }
type Writer struct { type Writer struct {
@ -188,12 +188,12 @@ type Writer struct {
listener *Listener listener *Listener
} }
func (v *Writer) Write(payload []byte) (int, error) { func (w *Writer) Write(payload []byte) (int, error) {
return v.hub.WriteTo(payload, v.dest) return w.hub.WriteTo(payload, w.dest)
} }
func (v *Writer) Close() error { func (w *Writer) Close() error {
v.listener.Remove(v.id) w.listener.Remove(w.id)
return nil return nil
} }

View File

@ -20,42 +20,42 @@ func NewReceivingWindow(size uint32) *ReceivingWindow {
} }
} }
func (v *ReceivingWindow) Size() uint32 { func (w *ReceivingWindow) Size() uint32 {
return v.size return w.size
} }
func (v *ReceivingWindow) Position(idx uint32) uint32 { func (w *ReceivingWindow) Position(idx uint32) uint32 {
return (idx + v.start) % v.size return (idx + w.start) % w.size
} }
func (v *ReceivingWindow) Set(idx uint32, value *DataSegment) bool { func (w *ReceivingWindow) Set(idx uint32, value *DataSegment) bool {
pos := v.Position(idx) pos := w.Position(idx)
if v.list[pos] != nil { if w.list[pos] != nil {
return false return false
} }
v.list[pos] = value w.list[pos] = value
return true return true
} }
func (v *ReceivingWindow) Remove(idx uint32) *DataSegment { func (w *ReceivingWindow) Remove(idx uint32) *DataSegment {
pos := v.Position(idx) pos := w.Position(idx)
e := v.list[pos] e := w.list[pos]
v.list[pos] = nil w.list[pos] = nil
return e return e
} }
func (v *ReceivingWindow) RemoveFirst() *DataSegment { func (w *ReceivingWindow) RemoveFirst() *DataSegment {
return v.Remove(0) return w.Remove(0)
} }
func (w *ReceivingWindow) HasFirst() bool { func (w *ReceivingWindow) HasFirst() bool {
return w.list[w.Position(0)] != nil return w.list[w.Position(0)] != nil
} }
func (v *ReceivingWindow) Advance() { func (w *ReceivingWindow) Advance() {
v.start++ w.start++
if v.start == v.size { if w.start == w.size {
v.start = 0 w.start = 0
} }
} }
@ -79,70 +79,70 @@ func NewAckList(writer SegmentWriter) *AckList {
} }
} }
func (v *AckList) Add(number uint32, timestamp uint32) { func (l *AckList) Add(number uint32, timestamp uint32) {
v.timestamps = append(v.timestamps, timestamp) l.timestamps = append(l.timestamps, timestamp)
v.numbers = append(v.numbers, number) l.numbers = append(l.numbers, number)
v.nextFlush = append(v.nextFlush, 0) l.nextFlush = append(l.nextFlush, 0)
v.dirty = true l.dirty = true
} }
func (v *AckList) Clear(una uint32) { func (l *AckList) Clear(una uint32) {
count := 0 count := 0
for i := 0; i < len(v.numbers); i++ { for i := 0; i < len(l.numbers); i++ {
if v.numbers[i] < una { if l.numbers[i] < una {
continue continue
} }
if i != count { if i != count {
v.numbers[count] = v.numbers[i] l.numbers[count] = l.numbers[i]
v.timestamps[count] = v.timestamps[i] l.timestamps[count] = l.timestamps[i]
v.nextFlush[count] = v.nextFlush[i] l.nextFlush[count] = l.nextFlush[i]
} }
count++ count++
} }
if count < len(v.numbers) { if count < len(l.numbers) {
v.numbers = v.numbers[:count] l.numbers = l.numbers[:count]
v.timestamps = v.timestamps[:count] l.timestamps = l.timestamps[:count]
v.nextFlush = v.nextFlush[:count] l.nextFlush = l.nextFlush[:count]
v.dirty = true l.dirty = true
} }
} }
func (v *AckList) Flush(current uint32, rto uint32) { func (l *AckList) Flush(current uint32, rto uint32) {
v.flushCandidates = v.flushCandidates[:0] l.flushCandidates = l.flushCandidates[:0]
seg := NewAckSegment() seg := NewAckSegment()
for i := 0; i < len(v.numbers); i++ { for i := 0; i < len(l.numbers); i++ {
if v.nextFlush[i] > current { if l.nextFlush[i] > current {
if len(v.flushCandidates) < cap(v.flushCandidates) { if len(l.flushCandidates) < cap(l.flushCandidates) {
v.flushCandidates = append(v.flushCandidates, v.numbers[i]) l.flushCandidates = append(l.flushCandidates, l.numbers[i])
} }
continue continue
} }
seg.PutNumber(v.numbers[i]) seg.PutNumber(l.numbers[i])
seg.PutTimestamp(v.timestamps[i]) seg.PutTimestamp(l.timestamps[i])
timeout := rto / 2 timeout := rto / 2
if timeout < 20 { if timeout < 20 {
timeout = 20 timeout = 20
} }
v.nextFlush[i] = current + timeout l.nextFlush[i] = current + timeout
if seg.IsFull() { if seg.IsFull() {
v.writer.Write(seg) l.writer.Write(seg)
seg.Release() seg.Release()
seg = NewAckSegment() seg = NewAckSegment()
v.dirty = false l.dirty = false
} }
} }
if v.dirty || !seg.IsEmpty() { if l.dirty || !seg.IsEmpty() {
for _, number := range v.flushCandidates { for _, number := range l.flushCandidates {
if seg.IsFull() { if seg.IsFull() {
break break
} }
seg.PutNumber(number) seg.PutNumber(number)
} }
v.writer.Write(seg) l.writer.Write(seg)
seg.Release() seg.Release()
v.dirty = false l.dirty = false
} }
} }

View File

@ -53,47 +53,47 @@ func NewDataSegment() *DataSegment {
return new(DataSegment) return new(DataSegment)
} }
func (v *DataSegment) Conversation() uint16 { func (s *DataSegment) Conversation() uint16 {
return v.Conv return s.Conv
} }
func (v *DataSegment) Command() Command { func (*DataSegment) Command() Command {
return CommandData return CommandData
} }
func (v *DataSegment) Detach() *buf.Buffer { func (s *DataSegment) Detach() *buf.Buffer {
r := v.payload r := s.payload
v.payload = nil s.payload = nil
return r return r
} }
func (v *DataSegment) Data() *buf.Buffer { func (s *DataSegment) Data() *buf.Buffer {
if v.payload == nil { if s.payload == nil {
v.payload = buf.New() s.payload = buf.New()
} }
return v.payload return s.payload
} }
func (v *DataSegment) Bytes() buf.Supplier { func (s *DataSegment) Bytes() buf.Supplier {
return func(b []byte) (int, error) { return func(b []byte) (int, error) {
b = serial.Uint16ToBytes(v.Conv, b[:0]) b = serial.Uint16ToBytes(s.Conv, b[:0])
b = append(b, byte(CommandData), byte(v.Option)) b = append(b, byte(CommandData), byte(s.Option))
b = serial.Uint32ToBytes(v.Timestamp, b) b = serial.Uint32ToBytes(s.Timestamp, b)
b = serial.Uint32ToBytes(v.Number, b) b = serial.Uint32ToBytes(s.Number, b)
b = serial.Uint32ToBytes(v.SendingNext, b) b = serial.Uint32ToBytes(s.SendingNext, b)
b = serial.Uint16ToBytes(uint16(v.payload.Len()), b) b = serial.Uint16ToBytes(uint16(s.payload.Len()), b)
b = append(b, v.payload.Bytes()...) b = append(b, s.payload.Bytes()...)
return len(b), nil return len(b), nil
} }
} }
func (v *DataSegment) ByteSize() int { func (s *DataSegment) ByteSize() int {
return 2 + 1 + 1 + 4 + 4 + 4 + 2 + v.payload.Len() return 2 + 1 + 1 + 4 + 4 + 4 + 2 + s.payload.Len()
} }
func (v *DataSegment) Release() { func (s *DataSegment) Release() {
v.payload.Release() s.payload.Release()
v.payload = nil s.payload = nil
} }
type AckSegment struct { type AckSegment struct {
@ -113,94 +113,93 @@ func NewAckSegment() *AckSegment {
} }
} }
func (v *AckSegment) Conversation() uint16 { func (s *AckSegment) Conversation() uint16 {
return v.Conv return s.Conv
} }
func (v *AckSegment) Command() Command { func (*AckSegment) Command() Command {
return CommandACK return CommandACK
} }
func (v *AckSegment) PutTimestamp(timestamp uint32) { func (s *AckSegment) PutTimestamp(timestamp uint32) {
if timestamp-v.Timestamp < 0x7FFFFFFF { if timestamp-s.Timestamp < 0x7FFFFFFF {
v.Timestamp = timestamp s.Timestamp = timestamp
} }
} }
func (v *AckSegment) PutNumber(number uint32) { func (s *AckSegment) PutNumber(number uint32) {
v.NumberList = append(v.NumberList, number) s.NumberList = append(s.NumberList, number)
} }
func (v *AckSegment) IsFull() bool { func (s *AckSegment) IsFull() bool {
return len(v.NumberList) == ackNumberLimit return len(s.NumberList) == ackNumberLimit
} }
func (v *AckSegment) IsEmpty() bool { func (s *AckSegment) IsEmpty() bool {
return len(v.NumberList) == 0 return len(s.NumberList) == 0
} }
func (v *AckSegment) ByteSize() int { func (s *AckSegment) ByteSize() int {
return 2 + 1 + 1 + 4 + 4 + 4 + 1 + len(v.NumberList)*4 return 2 + 1 + 1 + 4 + 4 + 4 + 1 + len(s.NumberList)*4
} }
func (v *AckSegment) Bytes() buf.Supplier { func (s *AckSegment) Bytes() buf.Supplier {
return func(b []byte) (int, error) { return func(b []byte) (int, error) {
b = serial.Uint16ToBytes(v.Conv, b[:0]) b = serial.Uint16ToBytes(s.Conv, b[:0])
b = append(b, byte(CommandACK), byte(v.Option)) b = append(b, byte(CommandACK), byte(s.Option))
b = serial.Uint32ToBytes(v.ReceivingWindow, b) b = serial.Uint32ToBytes(s.ReceivingWindow, b)
b = serial.Uint32ToBytes(v.ReceivingNext, b) b = serial.Uint32ToBytes(s.ReceivingNext, b)
b = serial.Uint32ToBytes(v.Timestamp, b) b = serial.Uint32ToBytes(s.Timestamp, b)
count := byte(len(v.NumberList)) count := byte(len(s.NumberList))
b = append(b, count) b = append(b, count)
for _, number := range v.NumberList { for _, number := range s.NumberList {
b = serial.Uint32ToBytes(number, b) b = serial.Uint32ToBytes(number, b)
} }
return v.ByteSize(), nil return s.ByteSize(), nil
} }
} }
func (v *AckSegment) Release() { func (s *AckSegment) Release() {
v.NumberList = nil s.NumberList = nil
} }
type CmdOnlySegment struct { type CmdOnlySegment struct {
Conv uint16 Conv uint16
Cmd Command Cmd Command
Option SegmentOption Option SegmentOption
SendingNext uint32 SendingNext uint32
ReceivinNext uint32 ReceivingNext uint32
PeerRTO uint32 PeerRTO uint32
} }
func NewCmdOnlySegment() *CmdOnlySegment { func NewCmdOnlySegment() *CmdOnlySegment {
return new(CmdOnlySegment) return new(CmdOnlySegment)
} }
func (v *CmdOnlySegment) Conversation() uint16 { func (s *CmdOnlySegment) Conversation() uint16 {
return v.Conv return s.Conv
} }
func (v *CmdOnlySegment) Command() Command { func (s *CmdOnlySegment) Command() Command {
return v.Cmd return s.Cmd
} }
func (v *CmdOnlySegment) ByteSize() int { func (*CmdOnlySegment) ByteSize() int {
return 2 + 1 + 1 + 4 + 4 + 4 return 2 + 1 + 1 + 4 + 4 + 4
} }
func (v *CmdOnlySegment) Bytes() buf.Supplier { func (s *CmdOnlySegment) Bytes() buf.Supplier {
return func(b []byte) (int, error) { return func(b []byte) (int, error) {
b = serial.Uint16ToBytes(v.Conv, b[:0]) b = serial.Uint16ToBytes(s.Conv, b[:0])
b = append(b, byte(v.Cmd), byte(v.Option)) b = append(b, byte(s.Cmd), byte(s.Option))
b = serial.Uint32ToBytes(v.SendingNext, b) b = serial.Uint32ToBytes(s.SendingNext, b)
b = serial.Uint32ToBytes(v.ReceivinNext, b) b = serial.Uint32ToBytes(s.ReceivingNext, b)
b = serial.Uint32ToBytes(v.PeerRTO, b) b = serial.Uint32ToBytes(s.PeerRTO, b)
return len(b), nil return len(b), nil
} }
} }
func (v *CmdOnlySegment) Release() { func (*CmdOnlySegment) Release() {}
}
func ReadSegment(buf []byte) (Segment, []byte) { func ReadSegment(buf []byte) (Segment, []byte) {
if len(buf) < 4 { if len(buf) < 4 {
@ -286,7 +285,7 @@ func ReadSegment(buf []byte) (Segment, []byte) {
seg.SendingNext = serial.BytesToUint32(buf) seg.SendingNext = serial.BytesToUint32(buf)
buf = buf[4:] buf = buf[4:]
seg.ReceivinNext = serial.BytesToUint32(buf) seg.ReceivingNext = serial.BytesToUint32(buf)
buf = buf[4:] buf = buf[4:]
seg.PeerRTO = serial.BytesToUint32(buf) seg.PeerRTO = serial.BytesToUint32(buf)

View File

@ -100,12 +100,12 @@ func TestCmdSegment(t *testing.T) {
assert := With(t) assert := With(t)
seg := &CmdOnlySegment{ seg := &CmdOnlySegment{
Conv: 1, Conv: 1,
Cmd: CommandPing, Cmd: CommandPing,
Option: SegmentOptionClose, Option: SegmentOptionClose,
SendingNext: 11, SendingNext: 11,
ReceivinNext: 13, ReceivingNext: 13,
PeerRTO: 15, PeerRTO: 15,
} }
nBytes := seg.ByteSize() nBytes := seg.ByteSize()
@ -120,6 +120,6 @@ func TestCmdSegment(t *testing.T) {
assert(byte(seg2.Command()), Equals, byte(seg.Command())) assert(byte(seg2.Command()), Equals, byte(seg.Command()))
assert(byte(seg2.Option), Equals, byte(seg.Option)) assert(byte(seg2.Option), Equals, byte(seg.Option))
assert(seg2.SendingNext, Equals, seg.SendingNext) assert(seg2.SendingNext, Equals, seg.SendingNext)
assert(seg2.ReceivinNext, Equals, seg.ReceivinNext) assert(seg2.ReceivingNext, Equals, seg.ReceivingNext)
assert(seg2.PeerRTO, Equals, seg.PeerRTO) assert(seg2.PeerRTO, Equals, seg.PeerRTO)
} }

View File

@ -209,59 +209,59 @@ func NewSendingWorker(kcp *Connection) *SendingWorker {
return worker return worker
} }
func (v *SendingWorker) Release() { func (w *SendingWorker) Release() {
v.Lock() w.Lock()
v.window.Release() w.window.Release()
v.Unlock() w.Unlock()
} }
func (v *SendingWorker) ProcessReceivingNext(nextNumber uint32) { func (w *SendingWorker) ProcessReceivingNext(nextNumber uint32) {
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
v.ProcessReceivingNextWithoutLock(nextNumber) w.ProcessReceivingNextWithoutLock(nextNumber)
} }
func (v *SendingWorker) ProcessReceivingNextWithoutLock(nextNumber uint32) { func (w *SendingWorker) ProcessReceivingNextWithoutLock(nextNumber uint32) {
v.window.Clear(nextNumber) w.window.Clear(nextNumber)
v.FindFirstUnacknowledged() w.FindFirstUnacknowledged()
} }
func (v *SendingWorker) FindFirstUnacknowledged() { func (w *SendingWorker) FindFirstUnacknowledged() {
first := v.firstUnacknowledged first := w.firstUnacknowledged
if !v.window.IsEmpty() { if !w.window.IsEmpty() {
v.firstUnacknowledged = v.window.FirstNumber() w.firstUnacknowledged = w.window.FirstNumber()
} else { } else {
v.firstUnacknowledged = v.nextNumber w.firstUnacknowledged = w.nextNumber
} }
if first != v.firstUnacknowledged { if first != w.firstUnacknowledged {
v.firstUnacknowledgedUpdated = true w.firstUnacknowledgedUpdated = true
} }
} }
func (v *SendingWorker) processAck(number uint32) bool { func (w *SendingWorker) processAck(number uint32) bool {
// number < v.firstUnacknowledged || number >= v.nextNumber // number < v.firstUnacknowledged || number >= v.nextNumber
if number-v.firstUnacknowledged > 0x7FFFFFFF || number-v.nextNumber < 0x7FFFFFFF { if number-w.firstUnacknowledged > 0x7FFFFFFF || number-w.nextNumber < 0x7FFFFFFF {
return false return false
} }
removed := v.window.Remove(number - v.firstUnacknowledged) removed := w.window.Remove(number - w.firstUnacknowledged)
if removed { if removed {
v.FindFirstUnacknowledged() w.FindFirstUnacknowledged()
} }
return removed return removed
} }
func (v *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint32) { func (w *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint32) {
defer seg.Release() defer seg.Release()
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
if v.remoteNextNumber < seg.ReceivingWindow { if w.remoteNextNumber < seg.ReceivingWindow {
v.remoteNextNumber = seg.ReceivingWindow w.remoteNextNumber = seg.ReceivingWindow
} }
v.ProcessReceivingNextWithoutLock(seg.ReceivingNext) w.ProcessReceivingNextWithoutLock(seg.ReceivingNext)
if seg.IsEmpty() { if seg.IsEmpty() {
return return
@ -270,7 +270,7 @@ func (v *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint
var maxack uint32 var maxack uint32
var maxackRemoved bool var maxackRemoved bool
for _, number := range seg.NumberList { for _, number := range seg.NumberList {
removed := v.processAck(number) removed := w.processAck(number)
if maxack < number { if maxack < number {
maxack = number maxack = number
maxackRemoved = removed maxackRemoved = removed
@ -278,100 +278,100 @@ func (v *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint
} }
if maxackRemoved { if maxackRemoved {
v.window.HandleFastAck(maxack, rto) w.window.HandleFastAck(maxack, rto)
if current-seg.Timestamp < 10000 { if current-seg.Timestamp < 10000 {
v.conn.roundTrip.Update(current-seg.Timestamp, current) w.conn.roundTrip.Update(current-seg.Timestamp, current)
} }
} }
} }
func (v *SendingWorker) Push(f buf.Supplier) bool { func (w *SendingWorker) Push(f buf.Supplier) bool {
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
if v.window.IsFull() { if w.window.IsFull() {
return false return false
} }
b := v.window.Push(v.nextNumber) b := w.window.Push(w.nextNumber)
v.nextNumber++ w.nextNumber++
common.Must(b.Reset(f)) common.Must(b.Reset(f))
return true return true
} }
func (v *SendingWorker) Write(seg Segment) error { func (w *SendingWorker) Write(seg Segment) error {
dataSeg := seg.(*DataSegment) dataSeg := seg.(*DataSegment)
dataSeg.Conv = v.conn.meta.Conversation dataSeg.Conv = w.conn.meta.Conversation
dataSeg.SendingNext = v.firstUnacknowledged dataSeg.SendingNext = w.firstUnacknowledged
dataSeg.Option = 0 dataSeg.Option = 0
if v.conn.State() == StateReadyToClose { if w.conn.State() == StateReadyToClose {
dataSeg.Option = SegmentOptionClose dataSeg.Option = SegmentOptionClose
} }
return v.conn.output.Write(dataSeg) return w.conn.output.Write(dataSeg)
} }
func (v *SendingWorker) OnPacketLoss(lossRate uint32) { func (w *SendingWorker) OnPacketLoss(lossRate uint32) {
if !v.conn.Config.Congestion || v.conn.roundTrip.Timeout() == 0 { if !w.conn.Config.Congestion || w.conn.roundTrip.Timeout() == 0 {
return return
} }
if lossRate >= 15 { if lossRate >= 15 {
v.controlWindow = 3 * v.controlWindow / 4 w.controlWindow = 3 * w.controlWindow / 4
} else if lossRate <= 5 { } else if lossRate <= 5 {
v.controlWindow += v.controlWindow / 4 w.controlWindow += w.controlWindow / 4
} }
if v.controlWindow < 16 { if w.controlWindow < 16 {
v.controlWindow = 16 w.controlWindow = 16
} }
if v.controlWindow > 2*v.conn.Config.GetSendingInFlightSize() { if w.controlWindow > 2*w.conn.Config.GetSendingInFlightSize() {
v.controlWindow = 2 * v.conn.Config.GetSendingInFlightSize() w.controlWindow = 2 * w.conn.Config.GetSendingInFlightSize()
} }
} }
func (v *SendingWorker) Flush(current uint32) { func (w *SendingWorker) Flush(current uint32) {
v.Lock() w.Lock()
cwnd := v.firstUnacknowledged + v.conn.Config.GetSendingInFlightSize() cwnd := w.firstUnacknowledged + w.conn.Config.GetSendingInFlightSize()
if cwnd > v.remoteNextNumber { if cwnd > w.remoteNextNumber {
cwnd = v.remoteNextNumber cwnd = w.remoteNextNumber
} }
if v.conn.Config.Congestion && cwnd > v.firstUnacknowledged+v.controlWindow { if w.conn.Config.Congestion && cwnd > w.firstUnacknowledged+w.controlWindow {
cwnd = v.firstUnacknowledged + v.controlWindow cwnd = w.firstUnacknowledged + w.controlWindow
} }
if !v.window.IsEmpty() { if !w.window.IsEmpty() {
v.window.Flush(current, v.conn.roundTrip.Timeout(), cwnd) w.window.Flush(current, w.conn.roundTrip.Timeout(), cwnd)
v.firstUnacknowledgedUpdated = false w.firstUnacknowledgedUpdated = false
} }
updated := v.firstUnacknowledgedUpdated updated := w.firstUnacknowledgedUpdated
v.firstUnacknowledgedUpdated = false w.firstUnacknowledgedUpdated = false
v.Unlock() w.Unlock()
if updated { if updated {
v.conn.Ping(current, CommandPing) w.conn.Ping(current, CommandPing)
} }
} }
func (v *SendingWorker) CloseWrite() { func (w *SendingWorker) CloseWrite() {
v.Lock() w.Lock()
defer v.Unlock() defer w.Unlock()
v.window.Clear(0xFFFFFFFF) w.window.Clear(0xFFFFFFFF)
} }
func (v *SendingWorker) IsEmpty() bool { func (w *SendingWorker) IsEmpty() bool {
v.RLock() w.RLock()
defer v.RUnlock() defer w.RUnlock()
return v.window.IsEmpty() return w.window.IsEmpty()
} }
func (v *SendingWorker) UpdateNecessary() bool { func (w *SendingWorker) UpdateNecessary() bool {
return !v.IsEmpty() return !w.IsEmpty()
} }
func (w *SendingWorker) FirstUnacknowledged() uint32 { func (w *SendingWorker) FirstUnacknowledged() uint32 {