cleanup kcp connection

pull/1712/head
Darien Raymond 7 years ago
parent bc9267846c
commit 0032760fdc
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169

@ -38,7 +38,7 @@ const (
StatePeerClosed State = 2 // Connection is closed on remote StatePeerClosed State = 2 // Connection is closed on remote
StateTerminating State = 3 // Connection is ready to be destroyed locally StateTerminating State = 3 // Connection is ready to be destroyed locally
StatePeerTerminating State = 4 // Connection is ready to be destroyed on remote StatePeerTerminating State = 4 // Connection is ready to be destroyed on remote
StateTerminated State = 5 // Connection is detroyed. StateTerminated State = 5 // Connection is destroyed.
) )
func nowMillisec() int64 { func nowMillisec() int64 {
@ -137,21 +137,21 @@ func NewUpdater(interval uint32, shouldContinue predicate.Predicate, shouldTermi
return u return u
} }
func (v *Updater) WakeUp() { func (u *Updater) WakeUp() {
select { select {
case v.notifier <- true: case u.notifier <- true:
default: default:
} }
} }
func (v *Updater) Run() { func (u *Updater) Run() {
for <-v.notifier { for <-u.notifier {
if v.shouldTerminate() { if u.shouldTerminate() {
return return
} }
interval := v.Interval() interval := u.Interval()
for v.shouldContinue() { for u.shouldContinue() {
v.updateFunc() u.updateFunc()
time.Sleep(interval) time.Sleep(interval)
} }
} }
@ -280,11 +280,22 @@ func (v *Connection) ReadMultiBuffer() (buf.MultiBuffer, error) {
return nil, io.EOF return nil, io.EOF
} }
if err := v.waitForDataInput(); err != nil {
return nil, err
}
}
}
func (v *Connection) waitForDataInput() error {
if v.State() == StatePeerTerminating {
return io.EOF
}
duration := time.Minute duration := time.Minute
if !v.rd.IsZero() { if !v.rd.IsZero() {
duration = time.Until(v.rd) duration = time.Until(v.rd)
if duration < 0 { if duration < 0 {
return nil, ErrIOTimeout return ErrIOTimeout
} }
} }
@ -292,10 +303,11 @@ func (v *Connection) ReadMultiBuffer() (buf.MultiBuffer, error) {
case <-v.dataInput: case <-v.dataInput:
case <-time.After(duration): case <-time.After(duration):
if !v.rd.IsZero() && v.rd.Before(time.Now()) { if !v.rd.IsZero() && v.rd.Before(time.Now()) {
return nil, ErrIOTimeout return ErrIOTimeout
}
} }
} }
return nil
} }
// Read implements the Conn Read method. // Read implements the Conn Read method.
@ -313,26 +325,30 @@ func (v *Connection) Read(b []byte) (int, error) {
return nBytes, nil return nBytes, nil
} }
if v.State() == StatePeerTerminating { if err := v.waitForDataInput(); err != nil {
return 0, io.EOF return 0, err
}
}
} }
func (v *Connection) waitForDataOutput() error {
duration := time.Minute duration := time.Minute
if !v.rd.IsZero() { if !v.wd.IsZero() {
duration = time.Until(v.rd) duration = time.Until(v.wd)
if duration < 0 { if duration < 0 {
return 0, ErrIOTimeout return ErrIOTimeout
} }
} }
select { select {
case <-v.dataInput: case <-v.dataOutput:
case <-time.After(duration): case <-time.After(duration):
if !v.rd.IsZero() && v.rd.Before(time.Now()) { if !v.wd.IsZero() && v.wd.Before(time.Now()) {
return 0, ErrIOTimeout return ErrIOTimeout
}
} }
} }
return nil
} }
// Write implements io.Writer. // Write implements io.Writer.
@ -359,20 +375,8 @@ func (v *Connection) Write(b []byte) (int, error) {
} }
} }
duration := time.Minute if err := v.waitForDataOutput(); err != nil {
if !v.wd.IsZero() { return totalWritten, err
duration = time.Until(v.wd)
if duration < 0 {
return totalWritten, ErrIOTimeout
}
}
select {
case <-v.dataOutput:
case <-time.After(duration):
if !v.wd.IsZero() && v.wd.Before(time.Now()) {
return totalWritten, ErrIOTimeout
}
} }
} }
} }
@ -400,20 +404,8 @@ func (v *Connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
} }
} }
duration := time.Minute if err := v.waitForDataOutput(); err != nil {
if !v.wd.IsZero() { return err
duration = time.Until(v.wd)
if duration < 0 {
return ErrIOTimeout
}
}
select {
case <-v.dataOutput:
case <-time.After(duration):
if !v.wd.IsZero() && v.wd.Before(time.Now()) {
return ErrIOTimeout
}
} }
} }
} }

Loading…
Cancel
Save