simplify fast resend

pull/314/head
Darien Raymond 2016-10-11 13:17:57 +02:00
parent f8ad1f4a3e
commit b5910dccae
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
3 changed files with 20 additions and 27 deletions

View File

@ -102,7 +102,7 @@ func (this *RoundTripInfo) Update(rtt uint32, current uint32) {
if rto > 10000 { if rto > 10000 {
rto = 10000 rto = 10000
} }
this.rto = rto * 3 / 2 this.rto = rto * 5 / 4
this.updatedTimestamp = current this.updatedTimestamp = current
} }
@ -184,7 +184,6 @@ type Connection struct {
receivingWorker *ReceivingWorker receivingWorker *ReceivingWorker
sendingWorker *SendingWorker sendingWorker *SendingWorker
fastresend uint32
congestionControl bool congestionControl bool
output *BufferedSegmentWriter output *BufferedSegmentWriter
@ -221,7 +220,6 @@ func NewConnection(conv uint16, writerCloser io.WriteCloser, local *net.UDPAddr,
} }
conn.interval = config.Tti.GetValue() conn.interval = config.Tti.GetValue()
conn.receivingWorker = NewReceivingWorker(conn) conn.receivingWorker = NewReceivingWorker(conn)
conn.fastresend = 2
conn.congestionControl = config.Congestion conn.congestionControl = config.Congestion
conn.sendingWorker = NewSendingWorker(conn) conn.sendingWorker = NewSendingWorker(conn)
@ -507,7 +505,7 @@ func (this *Connection) Input(data []byte) int {
this.dataUpdater.WakeUp() this.dataUpdater.WakeUp()
case *AckSegment: case *AckSegment:
this.HandleOption(seg.Option) this.HandleOption(seg.Option)
this.sendingWorker.ProcessSegment(current, seg) this.sendingWorker.ProcessSegment(current, seg, this.roundTrip.Timeout())
this.dataOutputCond.Signal() this.dataOutputCond.Signal()
this.dataUpdater.WakeUp() this.dataUpdater.WakeUp()
case *CmdOnlySegment: case *CmdOnlySegment:

View File

@ -41,7 +41,6 @@ type DataSegment struct {
Data *alloc.Buffer Data *alloc.Buffer
timeout uint32 timeout uint32
ackSkipped uint32
transmit uint32 transmit uint32
} }

View File

@ -103,7 +103,7 @@ func (this *SendingWindow) Remove(idx uint32) {
} }
} }
func (this *SendingWindow) HandleFastAck(number uint32) { func (this *SendingWindow) HandleFastAck(number uint32, rto uint32) {
if this.len == 0 { if this.len == 0 {
return return
} }
@ -114,7 +114,9 @@ func (this *SendingWindow) HandleFastAck(number uint32) {
break break
} }
if number != seg.Number { if number != seg.Number {
seg.ackSkipped++ if seg.transmit > 0 && seg.timeout > rto/3 {
seg.timeout -= rto / 3
}
} }
if i == this.last { if i == this.last {
break break
@ -122,7 +124,7 @@ func (this *SendingWindow) HandleFastAck(number uint32) {
} }
} }
func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxInFlightSize uint32) { func (this *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32) {
if this.IsEmpty() { if this.IsEmpty() {
return return
} }
@ -133,25 +135,20 @@ func (this *SendingWindow) Flush(current uint32, resend uint32, rto uint32, maxI
for i := this.start; ; i = this.next[i] { for i := this.start; ; i = this.next[i] {
segment := this.data[i] segment := this.data[i]
needsend := false needsend := false
if current-segment.timeout < 0x7FFFFFFF {
if segment.transmit == 0 { if segment.transmit == 0 {
needsend = true // First time
segment.transmit++
segment.timeout = current + rto
this.totalInFlightSize++ this.totalInFlightSize++
} else if current-segment.timeout < 0x7FFFFFFF { } else {
needsend = true
segment.transmit++
segment.timeout = current + rto
lost++ lost++
} else if segment.ackSkipped >= resend { }
needsend = true needsend = true
segment.transmit++
segment.ackSkipped = 0
segment.timeout = current + rto segment.timeout = current + rto
} }
if needsend { if needsend {
segment.Timestamp = current segment.Timestamp = current
segment.transmit++
this.writer.Write(segment) this.writer.Write(segment)
inFlightSize++ inFlightSize++
if inFlightSize >= maxInFlightSize { if inFlightSize >= maxInFlightSize {
@ -228,7 +225,7 @@ func (this *SendingWorker) ProcessAck(number uint32) {
this.FindFirstUnacknowledged() this.FindFirstUnacknowledged()
} }
func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) { func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint32) {
defer seg.Release() defer seg.Release()
this.Lock() this.Lock()
@ -252,7 +249,7 @@ func (this *SendingWorker) ProcessSegment(current uint32, seg *AckSegment) {
} }
} }
this.window.HandleFastAck(maxack) this.window.HandleFastAck(maxack, rto)
} }
func (this *SendingWorker) Push(b []byte) int { func (this *SendingWorker) Push(b []byte) int {
@ -271,7 +268,6 @@ func (this *SendingWorker) Push(b []byte) int {
seg.Data = AllocateBuffer().Clear().Append(b[:size]) seg.Data = AllocateBuffer().Clear().Append(b[:size])
seg.Number = this.nextNumber seg.Number = this.nextNumber
seg.timeout = 0 seg.timeout = 0
seg.ackSkipped = 0
seg.transmit = 0 seg.transmit = 0
this.window.Push(seg) this.window.Push(seg)
this.nextNumber++ this.nextNumber++
@ -326,7 +322,7 @@ func (this *SendingWorker) Flush(current uint32) {
} }
if !this.window.IsEmpty() { if !this.window.IsEmpty() {
this.window.Flush(current, this.conn.fastresend, this.conn.roundTrip.Timeout(), cwnd) this.window.Flush(current, this.conn.roundTrip.Timeout(), cwnd)
} else if this.firstUnacknowledgedUpdated { } else if this.firstUnacknowledgedUpdated {
this.conn.Ping(current, CommandPing) this.conn.Ping(current, CommandPing)
} }