Updates hashicorp/yamux.

pull/2258/head
James Phillips 2016-08-09 17:07:31 -07:00
parent a07938dc45
commit f1fbfb9423
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
5 changed files with 45 additions and 37 deletions

View File

@ -1,23 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test

View File

@ -46,8 +46,11 @@ type Session struct {
pingID uint32
pingLock sync.Mutex
// streams maps a stream id to a stream
// streams maps a stream id to a stream, and inflight has an entry
// for any outgoing stream that has not yet been established. Both are
// protected by streamLock.
streams map[uint32]*Stream
inflight map[uint32]struct{}
streamLock sync.Mutex
// synCh acts like a semaphore. It is sized to the AcceptBacklog which
@ -90,6 +93,7 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
bufRead: bufio.NewReader(conn),
pings: make(map[uint32]chan struct{}),
streams: make(map[uint32]*Stream),
inflight: make(map[uint32]struct{}),
synCh: make(chan struct{}, config.AcceptBacklog),
acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan sendReady, 64),
@ -153,7 +157,7 @@ func (s *Session) OpenStream() (*Stream, error) {
}
GET_ID:
// Get and ID, and check for stream exhaustion
// Get an ID, and check for stream exhaustion
id := atomic.LoadUint32(&s.nextStreamID)
if id >= math.MaxUint32-1 {
return nil, ErrStreamsExhausted
@ -166,10 +170,16 @@ GET_ID:
stream := newStream(s, id, streamInit)
s.streamLock.Lock()
s.streams[id] = stream
s.inflight[id] = struct{}{}
s.streamLock.Unlock()
// Send the window update to create
if err := stream.sendWindowUpdate(); err != nil {
select {
case <-s.synCh:
default:
s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
}
return nil, err
}
return stream, nil
@ -580,19 +590,34 @@ func (s *Session) incomingStream(id uint32) error {
}
// closeStream is used to close a stream once both sides have
// issued a close.
// issued a close. If there was an in-flight SYN and the stream
// was not yet established, then this will give the credit back.
func (s *Session) closeStream(id uint32) {
s.streamLock.Lock()
if _, ok := s.inflight[id]; ok {
select {
case <-s.synCh:
default:
s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
}
}
delete(s.streams, id)
s.streamLock.Unlock()
}
// establishStream is used to mark a stream that was in the
// SYN Sent state as established.
func (s *Session) establishStream() {
func (s *Session) establishStream(id uint32) {
s.streamLock.Lock()
if _, ok := s.inflight[id]; ok {
delete(s.inflight, id)
} else {
s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
}
select {
case <-s.synCh:
default:
panic("established stream without inflight syn")
s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
}
s.streamLock.Unlock()
}

View File

@ -22,7 +22,7 @@ Each field is described below:
## Version Field
The version field is used for future backwards compatibily. At the
The version field is used for future backward compatibility. At the
current time, the field is always set to 0, to indicate the initial
version.
@ -96,7 +96,7 @@ Because we are relying on the reliable stream underneath, a connection
can begin sending data once the SYN flag is sent. The corresponding
ACK does not need to be received. This is particularly well suited
for an RPC system where a client wants to open a stream and immediately
fire a request without wiating for the RTT of the ACK.
fire a request without waiting for the RTT of the ACK.
This does introduce the possibility of a connection being rejected
after data has been sent already. This is a slight semantic difference
@ -138,4 +138,3 @@ provide an error code:
* 0x0 Normal termination
* 0x1 Protocol error
* 0x2 Internal error

View File

@ -91,10 +91,13 @@ START:
case streamRemoteClose:
fallthrough
case streamClosed:
s.recvLock.Lock()
if s.recvBuf == nil || s.recvBuf.Len() == 0 {
s.recvLock.Unlock()
s.stateLock.Unlock()
return 0, io.EOF
}
s.recvLock.Unlock()
case streamReset:
s.stateLock.Unlock()
return 0, ErrConnectionReset
@ -118,12 +121,17 @@ START:
WAIT:
var timeout <-chan time.Time
var timer *time.Timer
if !s.readDeadline.IsZero() {
delay := s.readDeadline.Sub(time.Now())
timeout = time.After(delay)
timer = time.NewTimer(delay)
timeout = timer.C
}
select {
case <-s.recvNotifyCh:
if timer != nil {
timer.Stop()
}
goto START
case <-timeout:
return 0, ErrTimeout
@ -327,7 +335,7 @@ func (s *Stream) processFlags(flags uint16) error {
if s.state == streamSYNSent {
s.state = streamEstablished
}
s.session.establishStream()
s.session.establishStream(s.id)
}
if flags&flagFIN == flagFIN {
switch s.state {
@ -348,9 +356,6 @@ func (s *Stream) processFlags(flags uint16) error {
}
}
if flags&flagRST == flagRST {
if s.state == streamSYNSent {
s.session.establishStream()
}
s.state = streamReset
closeStream = true
s.notifyWaiting()

4
vendor/vendor.json vendored
View File

@ -394,8 +394,10 @@
"revisionTime": "2016-08-09T01:42:04Z"
},
{
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",
"path": "github.com/hashicorp/yamux",
"revision": "df949784da9ed028ee76df44652e42d37a09d7e4"
"revision": "d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd",
"revisionTime": "2016-07-20T23:31:40Z"
},
{
"path": "github.com/inconshreveable/muxado",