simplify segment writer

pull/467/head
Darien Raymond 2016-12-20 22:53:58 +01:00
parent bccf11c12c
commit cdcccb4590
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
4 changed files with 13 additions and 37 deletions

View File

@ -188,7 +188,7 @@ type Connection struct {
receivingWorker *ReceivingWorker receivingWorker *ReceivingWorker
sendingWorker *SendingWorker sendingWorker *SendingWorker
output *BufferedSegmentWriter output SegmentWriter
dataUpdater *Updater dataUpdater *Updater
pingUpdater *Updater pingUpdater *Updater
@ -208,7 +208,7 @@ func NewConnection(conv uint16, sysConn SystemConnection, recycler internal.Conn
dataInput: make(chan bool, 1), dataInput: make(chan bool, 1),
dataOutput: make(chan bool, 1), dataOutput: make(chan bool, 1),
Config: config, Config: config,
output: NewSegmentWriter(sysConn, config.GetMtu().GetValue()-uint32(sysConn.Overhead())), output: NewSegmentWriter(sysConn),
mss: config.GetMtu().GetValue() - uint32(sysConn.Overhead()) - DataSegmentOverhead, mss: config.GetMtu().GetValue() - uint32(sysConn.Overhead()) - DataSegmentOverhead,
roundTrip: &RoundTripInfo{ roundTrip: &RoundTripInfo{
rto: 100, rto: 100,
@ -542,7 +542,6 @@ func (v *Connection) flush() {
if v.State() == StateTerminating { if v.State() == StateTerminating {
log.Debug("KCP|Connection: #", v.conv, " sending terminating cmd.") log.Debug("KCP|Connection: #", v.conv, " sending terminating cmd.")
v.Ping(current, CommandTerminate) v.Ping(current, CommandTerminate)
v.output.Flush()
if current-atomic.LoadUint32(&v.stateBeginTime) > 8000 { if current-atomic.LoadUint32(&v.stateBeginTime) > 8000 {
v.SetState(StateTerminated) v.SetState(StateTerminated)
@ -564,9 +563,6 @@ func (v *Connection) flush() {
if current-atomic.LoadUint32(&v.lastPingTime) >= 3000 { if current-atomic.LoadUint32(&v.lastPingTime) >= 3000 {
v.Ping(current, CommandPing) v.Ping(current, CommandPing)
} }
// flash remain segments
v.output.Flush()
} }
func (v *Connection) State() State { func (v *Connection) State() State {

View File

@ -8,48 +8,28 @@ import (
) )
type SegmentWriter interface { type SegmentWriter interface {
Write(seg Segment) Write(seg Segment) error
} }
type BufferedSegmentWriter struct { type SimpleSegmentWriter struct {
sync.Mutex sync.Mutex
mtu uint32
buffer *buf.Buffer buffer *buf.Buffer
writer io.Writer writer io.Writer
} }
func NewSegmentWriter(writer io.Writer, mtu uint32) *BufferedSegmentWriter { func NewSegmentWriter(writer io.Writer) SegmentWriter {
return &BufferedSegmentWriter{ return &SimpleSegmentWriter{
mtu: mtu,
writer: writer, writer: writer,
buffer: buf.NewSmall(), buffer: buf.NewSmall(),
} }
} }
func (v *BufferedSegmentWriter) Write(seg Segment) { func (v *SimpleSegmentWriter) Write(seg Segment) error {
v.Lock() v.Lock()
defer v.Unlock() defer v.Unlock()
nBytes := seg.ByteSize()
if uint32(v.buffer.Len()+nBytes) > v.mtu {
v.FlushWithoutLock()
}
v.buffer.AppendSupplier(seg.Bytes()) v.buffer.AppendSupplier(seg.Bytes())
} _, err := v.writer.Write(v.buffer.Bytes())
func (v *BufferedSegmentWriter) FlushWithoutLock() {
v.writer.Write(v.buffer.Bytes())
v.buffer.Clear() v.buffer.Clear()
} return err
func (v *BufferedSegmentWriter) Flush() {
v.Lock()
defer v.Unlock()
if v.buffer.IsEmpty() {
return
}
v.FlushWithoutLock()
} }

View File

@ -235,7 +235,7 @@ func (v *ReceivingWorker) Flush(current uint32) {
v.acklist.Flush(current, v.conn.roundTrip.Timeout()) v.acklist.Flush(current, v.conn.roundTrip.Timeout())
} }
func (v *ReceivingWorker) Write(seg Segment) { func (v *ReceivingWorker) Write(seg Segment) error {
ackSeg := seg.(*AckSegment) ackSeg := seg.(*AckSegment)
ackSeg.Conv = v.conn.conv ackSeg.Conv = v.conn.conv
ackSeg.ReceivingNext = v.nextNumber ackSeg.ReceivingNext = v.nextNumber
@ -243,7 +243,7 @@ func (v *ReceivingWorker) Write(seg Segment) {
if v.conn.state == StateReadyToClose { if v.conn.state == StateReadyToClose {
ackSeg.Option = SegmentOptionClose ackSeg.Option = SegmentOptionClose
} }
v.conn.output.Write(ackSeg) return v.conn.output.Write(ackSeg)
} }
func (v *ReceivingWorker) CloseRead() { func (v *ReceivingWorker) CloseRead() {

View File

@ -305,7 +305,7 @@ func (v *SendingWorker) Push(b []byte) int {
} }
// Private: Visible for testing. // Private: Visible for testing.
func (v *SendingWorker) Write(seg Segment) { func (v *SendingWorker) Write(seg Segment) error {
dataSeg := seg.(*DataSegment) dataSeg := seg.(*DataSegment)
dataSeg.Conv = v.conn.conv dataSeg.Conv = v.conn.conv
@ -315,7 +315,7 @@ func (v *SendingWorker) Write(seg Segment) {
dataSeg.Option = SegmentOptionClose dataSeg.Option = SegmentOptionClose
} }
v.conn.output.Write(dataSeg) return v.conn.output.Write(dataSeg)
} }
func (v *SendingWorker) OnPacketLoss(lossRate uint32) { func (v *SendingWorker) OnPacketLoss(lossRate uint32) {