mirror of https://github.com/hashicorp/consul
Removes support for muxado and protocol version 1.
parent
70691d6aa4
commit
a984a6703c
|
@ -14,27 +14,14 @@ import (
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/yamux"
|
"github.com/hashicorp/yamux"
|
||||||
"github.com/inconshreveable/muxado"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// muxSession is used to provide an interface for either muxado or yamux
|
// muxSession is used to provide an interface for a stream multiplexer.
|
||||||
type muxSession interface {
|
type muxSession interface {
|
||||||
Open() (net.Conn, error)
|
Open() (net.Conn, error)
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type muxadoWrapper struct {
|
|
||||||
m muxado.Session
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *muxadoWrapper) Open() (net.Conn, error) {
|
|
||||||
return w.m.Open()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *muxadoWrapper) Close() error {
|
|
||||||
return w.m.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// streamClient is used to wrap a stream with an RPC client
|
// streamClient is used to wrap a stream with an RPC client
|
||||||
type StreamClient struct {
|
type StreamClient struct {
|
||||||
stream net.Conn
|
stream net.Conn
|
||||||
|
@ -295,15 +282,7 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, err
|
||||||
// Switch the multiplexing based on version
|
// Switch the multiplexing based on version
|
||||||
var session muxSession
|
var session muxSession
|
||||||
if version < 2 {
|
if version < 2 {
|
||||||
// Write the Consul multiplex byte to set the mode
|
return nil, fmt.Errorf("cannot make client connection, unsupported protocol version %d", version)
|
||||||
if _, err := conn.Write([]byte{byte(rpcMultiplex)}); err != nil {
|
|
||||||
conn.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a multiplexed session
|
|
||||||
session = &muxadoWrapper{muxado.Client(conn)}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// Write the Consul multiplex byte to set the mode
|
// Write the Consul multiplex byte to set the mode
|
||||||
if _, err := conn.Write([]byte{byte(rpcMultiplexV2)}); err != nil {
|
if _, err := conn.Write([]byte{byte(rpcMultiplexV2)}); err != nil {
|
||||||
|
|
|
@ -30,7 +30,7 @@ import (
|
||||||
// Consul-level protocol versions, that are used to configure the Serf
|
// Consul-level protocol versions, that are used to configure the Serf
|
||||||
// protocol versions.
|
// protocol versions.
|
||||||
const (
|
const (
|
||||||
ProtocolVersionMin uint8 = 1
|
ProtocolVersionMin uint8 = 2
|
||||||
|
|
||||||
// Version 3 added support for network coordinates but we kept the
|
// Version 3 added support for network coordinates but we kept the
|
||||||
// default protocol version at 2 to ease the transition to this new
|
// default protocol version at 2 to ease the transition to this new
|
||||||
|
|
|
@ -1,13 +0,0 @@
|
||||||
Copyright 2013 Alan Shreve
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
|
@ -1,122 +0,0 @@
|
||||||
# muxado - Stream multiplexing for Go
|
|
||||||
|
|
||||||
## What is stream multiplexing?
|
|
||||||
Imagine you have a single stream (a bi-directional stream of bytes) like a TCP connection. Stream multiplexing
|
|
||||||
is a method for enabling the transmission of multiple simultaneous streams over the one underlying transport stream.
|
|
||||||
|
|
||||||
## What is muxado?
|
|
||||||
muxado is an implementation of a stream multiplexing library in Go that can be layered on top of a net.Conn to multiplex that stream.
|
|
||||||
muxado's protocol is not currently documented explicitly, but it is very nearly an implementation of the HTTP2
|
|
||||||
framing layer with all of the HTTP-specific bits removed. It is heavily inspired by HTTP2, SPDY, and WebMUX.
|
|
||||||
|
|
||||||
## How does it work?
|
|
||||||
Simplifying, muxado chunks data sent over each multiplexed stream and transmits each piece
|
|
||||||
as a "frame" over the transport stream. It then sends these frames,
|
|
||||||
often interleaving data for multiple streams, to the remote side.
|
|
||||||
The remote endpoint then reassembles the frames into distinct streams
|
|
||||||
of data which are presented to the application layer.
|
|
||||||
|
|
||||||
## What good is it anyways?
|
|
||||||
A stream multiplexing library is a powerful tool for an application developer's toolbox which solves a number of problems:
|
|
||||||
|
|
||||||
- It allows developers to implement asynchronous/pipelined protocols with ease. Instead of matching requests with responses in your protocols, just open a new stream for each request and communicate over that.
|
|
||||||
- muxado can do application-level keep-alives and dead-session detection so that you don't have to write heartbeat code ever again.
|
|
||||||
- You never need to build connection pools for services running your protocol. You can open as many independent, concurrent streams as you need without incurring any round-trip latency costs.
|
|
||||||
- muxado allows the server to initiate new streams to clients which is normally very difficult without NAT-busting trickery.
|
|
||||||
|
|
||||||
## Show me the code!
|
|
||||||
As much as possible, the muxado library strives to look and feel just like the standard library's net package. Here's how you initiate a new client session:
|
|
||||||
|
|
||||||
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
|
|
||||||
|
|
||||||
And a server:
|
|
||||||
|
|
||||||
l, err := muxado.ListenTLS("tcp", ":1234", tlsConfig))
|
|
||||||
for {
|
|
||||||
sess, err := l.Accept()
|
|
||||||
go handleSession(sess)
|
|
||||||
}
|
|
||||||
|
|
||||||
Once you have a session, you can open new streams on it:
|
|
||||||
|
|
||||||
stream, err := sess.Open()
|
|
||||||
|
|
||||||
And accept streams opened by the remote side:
|
|
||||||
|
|
||||||
stream, err := sess.Accept()
|
|
||||||
|
|
||||||
Streams satisfy the net.Conn interface, so they're very familiar to work with:
|
|
||||||
|
|
||||||
n, err := stream.Write(buf)
|
|
||||||
n, err = stream.Read(buf)
|
|
||||||
|
|
||||||
muxado sessions and streams implement the net.Listener and net.Conn interfaces (with a small shim), so you can use them with existing golang libraries!
|
|
||||||
|
|
||||||
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
|
|
||||||
http.Serve(sess.NetListener(), handler)
|
|
||||||
|
|
||||||
## A more extensive muxado client
|
|
||||||
|
|
||||||
// open a new session to a remote endpoint
|
|
||||||
sess, err := muxado.Dial("tcp", "example.com:1234")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle streams initiated by the server
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
stream, err := sess.Accept()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go handleStream(stream)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// open new streams for application requests
|
|
||||||
for req := range requests {
|
|
||||||
str, err := sess.Open()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(stream muxado.Stream) {
|
|
||||||
defer stream.Close()
|
|
||||||
|
|
||||||
// send request
|
|
||||||
if _, err = stream.Write(req.serialize()); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// read response
|
|
||||||
if buf, err := ioutil.ReadAll(stream); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
handleResponse(buf)
|
|
||||||
}(str)
|
|
||||||
}
|
|
||||||
|
|
||||||
## How did you build it?
|
|
||||||
muxado is a modified implementation of the HTTP2 framing protocol with all of the HTTP-specific bits removed. It aims
|
|
||||||
for simplicity in the protocol by removing everything that is not core to multiplexing streams. The muxado code
|
|
||||||
is also built with the intention that its performance should be moderately good within the bounds of working in Go. As a result,
|
|
||||||
muxado does contain some unidiomatic code.
|
|
||||||
|
|
||||||
## API documentation
|
|
||||||
API documentation is available on godoc.org:
|
|
||||||
|
|
||||||
[muxado API documentation](https://godoc.org/github.com/inconshreveable/muxado)
|
|
||||||
|
|
||||||
## What are its biggest drawbacks?
|
|
||||||
Any stream-multiplexing library over TCP will suffer from head-of-line blocking if the next packet to service gets dropped.
|
|
||||||
muxado is also a poor choice when sending large payloads and speed is a priority.
|
|
||||||
It shines best when the application workload needs to quickly open a large number of small-payload streams.
|
|
||||||
|
|
||||||
## Status
|
|
||||||
Most of muxado's features are implemented (and tested!), but there are many that are still rough or could be improved. See the TODO file for suggestions on what needs to improve.
|
|
||||||
|
|
||||||
## License
|
|
||||||
Apache
|
|
|
@ -1,35 +0,0 @@
|
||||||
improve the formatting of the docs to look nice for godoc
|
|
||||||
use a better example in the docs first before showing the clever integration with the net.Listener/net.Conn APIs
|
|
||||||
Make all errors support Temporary() API so applications can better decide what to do
|
|
||||||
Handle case of running out of stream ids + test
|
|
||||||
writeFrame errors should kill the session, but only if it's not a timeout + test
|
|
||||||
Short read should cause an error + test
|
|
||||||
Decrement() in outBuffer needs to have deadline support
|
|
||||||
|
|
||||||
Extensions:
|
|
||||||
Heartbeat extension needs tests
|
|
||||||
Make extensions a public API instead of a private API
|
|
||||||
Document how extensions work
|
|
||||||
Don't include any extensions by default
|
|
||||||
|
|
||||||
heartbeat test
|
|
||||||
Finish writing buffer tests
|
|
||||||
Write stress test
|
|
||||||
Write multi-frame write test
|
|
||||||
More session tests
|
|
||||||
More stream tests
|
|
||||||
Write frame/transport tests - verify read correct type, verify unknown type causes error, verify ioerror is propogated
|
|
||||||
Write frame/syn tests
|
|
||||||
Write frame/goaway tests
|
|
||||||
|
|
||||||
### Low priority:
|
|
||||||
- Add the ability to differentiate stream errors which allow you to safely retry
|
|
||||||
- Decide what to do if the application isn't handling its accepted streams fast enough. Refuse stream? Wait and block reading more frames?
|
|
||||||
- Figure out whether to die with/without lock - in GoAway/OpenStream
|
|
||||||
- Add priority APIs to stream
|
|
||||||
- Add priority extension
|
|
||||||
- Add Reset() stream API
|
|
||||||
- Eliminate unlikely race on s.remoteDebug between handleFrame() and die()
|
|
||||||
- Should writeFrame calls for rst/wndinc set the write deadline?
|
|
||||||
- don't send reset if the stream is fully closed
|
|
||||||
- include muxado pun somewhere in the docs
|
|
|
@ -1,54 +0,0 @@
|
||||||
package muxado
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/inconshreveable/muxado/proto"
|
|
||||||
"github.com/inconshreveable/muxado/proto/frame"
|
|
||||||
)
|
|
||||||
|
|
||||||
// streamAdaptor recasts the types of some function calls by the proto/Stream implementation
|
|
||||||
// so that it satisfies the public interface
|
|
||||||
type streamAdaptor struct {
|
|
||||||
proto.IStream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *streamAdaptor) Id() StreamId {
|
|
||||||
return StreamId(a.IStream.Id())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *streamAdaptor) StreamType() StreamType {
|
|
||||||
return StreamType(a.IStream.StreamType())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *streamAdaptor) Session() Session {
|
|
||||||
return &sessionAdaptor{a.IStream.Session()}
|
|
||||||
}
|
|
||||||
|
|
||||||
// sessionAdaptor recasts the types of some function calls by the proto/Session implementation
|
|
||||||
// so that it satisfies the public interface
|
|
||||||
type sessionAdaptor struct {
|
|
||||||
proto.ISession
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *sessionAdaptor) Accept() (Stream, error) {
|
|
||||||
str, err := a.ISession.Accept()
|
|
||||||
return &streamAdaptor{str}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *sessionAdaptor) Open() (Stream, error) {
|
|
||||||
str, err := a.ISession.Open()
|
|
||||||
return &streamAdaptor{str}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *sessionAdaptor) OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error) {
|
|
||||||
str, err := a.ISession.OpenStream(frame.StreamPriority(priority), frame.StreamType(streamType), fin)
|
|
||||||
return &streamAdaptor{str}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *sessionAdaptor) GoAway(code ErrorCode, debug []byte) error {
|
|
||||||
return a.ISession.GoAway(frame.ErrorCode(code), debug)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *sessionAdaptor) Wait() (ErrorCode, error, []byte) {
|
|
||||||
code, err, debug := a.ISession.Wait()
|
|
||||||
return ErrorCode(code), err, debug
|
|
||||||
}
|
|
|
@ -1,32 +0,0 @@
|
||||||
package muxado
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/tls"
|
|
||||||
"github.com/inconshreveable/muxado/proto"
|
|
||||||
"github.com/inconshreveable/muxado/proto/ext"
|
|
||||||
"net"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Client returns a new muxado client-side connection using conn as the transport.
|
|
||||||
func Client(conn net.Conn) Session {
|
|
||||||
return &sessionAdaptor{proto.NewSession(conn, proto.NewStream, true, []proto.Extension{ext.NewDefaultHeartbeat()})}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dial opens a new connection to the given network/address and then beings a muxado client session on it.
|
|
||||||
func Dial(network, addr string) (sess Session, err error) {
|
|
||||||
conn, err := net.Dial(network, addr)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return Client(conn), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DialTLS opens a new TLS encrytped connection with the givent configuration
|
|
||||||
// to the network/address and then beings a muxado client session on it.
|
|
||||||
func DialTLS(network, addr string, tlsConfig *tls.Config) (sess Session, err error) {
|
|
||||||
conn, err := tls.Dial(network, addr, tlsConfig)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return Client(conn), nil
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
// muxado is an implementation of a general-purpose stream-multiplexing protocol.
|
|
||||||
//
|
|
||||||
// muxado allows clients applications to multiplex a single stream-oriented connection,
|
|
||||||
// like a TCP connection, and communicate over many streams on top of it. muxado accomplishes
|
|
||||||
// this by chunking data sent over each stream into frames and then reassembling the
|
|
||||||
// frames and buffering the data before being passed up to the application
|
|
||||||
// layer on the other side.
|
|
||||||
//
|
|
||||||
// muxado is very nearly an exact implementation of the HTTP2 framing layer while leaving out all
|
|
||||||
// the HTTP-specific parts. It is heavily inspired by HTTP2/SPDY/WebMUX.
|
|
||||||
//
|
|
||||||
// muxado's documentation uses the following terms consistently for easier communication:
|
|
||||||
// - "a transport" is an underlying stream (typically TCP) over which frames are sent between
|
|
||||||
// endpoints
|
|
||||||
// - "a stream" is any of the full-duplex byte-streams multiplexed over the transport
|
|
||||||
// - "a session" refers to an instance of the muxado protocol running over a transport between
|
|
||||||
// two endpoints
|
|
||||||
//
|
|
||||||
// Perhaps the best part of muxado is the interface exposed to client libraries. Since new
|
|
||||||
// streams may be initiated by both sides at any time, a muxado.Session implements the net.Listener
|
|
||||||
// interface (almost! Go unfortunately doesn't support covariant interface satisfaction so there's
|
|
||||||
// a shim). Each muxado stream implements the net.Conn interface. This allows you to integrate
|
|
||||||
// muxado into existing code which works with these interfaces (which is most Golang networking code)
|
|
||||||
// with very little difficulty. Consider the following toy example. Here we'll initiate a new secure
|
|
||||||
// connection to a server, and then ask it which application it wants via an HTTP request over a muxado stream
|
|
||||||
// and then serve an entire HTTP application *to the server*.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// sess, err := muxado.DialTLS("tcp", "example.com:1234", new(tls.Config))
|
|
||||||
// client := &http.Client{Transport: &http.Transport{Dial: sess.NetDial}}
|
|
||||||
// resp, err := client.Get("http://example.com/appchoice")
|
|
||||||
// switch getChoice(resp.Body) {
|
|
||||||
// case "foo":
|
|
||||||
// http.Serve(sess.NetListener(), fooHandler)
|
|
||||||
// case "bar":
|
|
||||||
// http.Serve(sess.NetListener(), barHandler)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// In addition to enabling multiple streams over a single connection, muxado enables other
|
|
||||||
// behaviors which can be useful to the application layer:
|
|
||||||
// - Both sides of a muxado session may initiate new streams
|
|
||||||
// - muxado can transparently run application-level heartbeats and timeout dead sessions
|
|
||||||
// - When connections fail, muxado indicates to the application which streams may be safely retried
|
|
||||||
// - muxado supports prioritizing streams to maximize useful throughput when bandwidth-constrained
|
|
||||||
//
|
|
||||||
// A few examples of what these capabilities might make muxado useful for:
|
|
||||||
// - eliminating custom async/pipeling code for your protocols
|
|
||||||
// - eliminating connection pools in your protocols
|
|
||||||
// - eliminating custom NAT traversal logic for enabling server-initiated streams
|
|
||||||
//
|
|
||||||
// muxado has been tuned to be very performant within the limits of what you can expect of pure-Go code.
|
|
||||||
// Some of muxado's code looks unidiomatic in the quest for better performance. (Locks over channels, never allocating
|
|
||||||
// from the heap, etc). muxado will typically outperform TCP connections when rapidly initiating many new
|
|
||||||
// streams with small payloads. When sending a large payload over a single stream, muxado's worst case, it can
|
|
||||||
// be 2-3x slower and does not parallelize well.
|
|
||||||
package muxado
|
|
|
@ -1,115 +0,0 @@
|
||||||
package muxado
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/inconshreveable/muxado/proto/frame"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type StreamId frame.StreamId
|
|
||||||
type StreamPriority frame.StreamPriority
|
|
||||||
type StreamType frame.StreamType
|
|
||||||
type ErrorCode frame.ErrorCode
|
|
||||||
|
|
||||||
// Stream is a full duplex stream-oriented connection that is multiplexed over a Session.
|
|
||||||
// Stream implement the net.Conn inteface.
|
|
||||||
type Stream interface {
|
|
||||||
// Write writes the bytes in the given buffer to the stream
|
|
||||||
Write([]byte) (int, error)
|
|
||||||
|
|
||||||
// Read reads the next bytes on the stream into the given buffer
|
|
||||||
Read([]byte) (int, error)
|
|
||||||
|
|
||||||
// Close closes the stream. It attempts to behave as Close does for a TCP conn in that it
|
|
||||||
// half-closes the stream for sending, and it will send an RST if any more data is received
|
|
||||||
// from the remote side.
|
|
||||||
Close() error
|
|
||||||
|
|
||||||
// SetDeadline sets a time after which future Read and Write operations will fail.
|
|
||||||
SetDeadline(time.Time) error
|
|
||||||
|
|
||||||
// SetReadDeadline sets a time after which future Read operations will fail.
|
|
||||||
SetReadDeadline(time.Time) error
|
|
||||||
|
|
||||||
// SetWriteDeadline sets a time after which future Write operations will fail.
|
|
||||||
SetWriteDeadline(time.Time) error
|
|
||||||
|
|
||||||
// HalfClose sends a data frame with a fin flag set to half-close the stream from the local side.
|
|
||||||
HalfClose([]byte) (int, error)
|
|
||||||
|
|
||||||
// Id returns the stream's id.
|
|
||||||
Id() StreamId
|
|
||||||
|
|
||||||
// StreamType returns the stream's type
|
|
||||||
StreamType() StreamType
|
|
||||||
|
|
||||||
// Session returns the session object this stream is running on.
|
|
||||||
Session() Session
|
|
||||||
|
|
||||||
// RemoteAddr returns the session transport's remote address.
|
|
||||||
RemoteAddr() net.Addr
|
|
||||||
|
|
||||||
// LocalAddr returns the session transport's local address.
|
|
||||||
LocalAddr() net.Addr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Session multiplexes many Streams over a single underlying stream transport.
|
|
||||||
// Both sides of a muxado session can open new Streams. Sessions can also accept
|
|
||||||
// new streams from the remote side.
|
|
||||||
//
|
|
||||||
// A muxado Session implements the net.Listener interface, returning new Streams from the remote side.
|
|
||||||
type Session interface {
|
|
||||||
|
|
||||||
// Open initiates a new stream on the session. It is equivalent to OpenStream(0, 0, false)
|
|
||||||
Open() (Stream, error)
|
|
||||||
|
|
||||||
// OpenStream initiates a new stream on the session. A caller can specify a stream's priority and an opaque stream type.
|
|
||||||
// Setting fin to true will cause the stream to be half-closed from the local side immediately upon creation.
|
|
||||||
OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error)
|
|
||||||
|
|
||||||
// Accept returns the next stream initiated by the remote side
|
|
||||||
Accept() (Stream, error)
|
|
||||||
|
|
||||||
// Kill closes the underlying transport stream immediately.
|
|
||||||
//
|
|
||||||
// You SHOULD always perfer to call Close() instead so that the connection
|
|
||||||
// closes cleanly by sending a GoAway frame.
|
|
||||||
Kill() error
|
|
||||||
|
|
||||||
// Close instructs the session to close cleanly, sending a GoAway frame if one hasn't already been sent.
|
|
||||||
//
|
|
||||||
// This implementation does not "linger". Pending writes on streams may fail.
|
|
||||||
//
|
|
||||||
// You MAY call Close() more than once. Each time after
|
|
||||||
// the first, Close() will return an error.
|
|
||||||
Close() error
|
|
||||||
|
|
||||||
// GoAway instructs the other side of the connection to stop
|
|
||||||
// initiating new streams by sending a GoAway frame. Most clients
|
|
||||||
// will just call Close(), but you may want explicit control of this
|
|
||||||
// in order to facilitate clean shutdowns.
|
|
||||||
//
|
|
||||||
// You MAY call GoAway() more than once. Each time after the first,
|
|
||||||
// GoAway() will return an error.
|
|
||||||
GoAway(ErrorCode, []byte) error
|
|
||||||
|
|
||||||
// LocalAddr returns the local address of the transport stream over which the session is running.
|
|
||||||
LocalAddr() net.Addr
|
|
||||||
|
|
||||||
// RemoteAddr returns the address of the remote side of the transport stream over which the session is running.
|
|
||||||
RemoteAddr() net.Addr
|
|
||||||
|
|
||||||
// Wait blocks until the session has shutdown and returns the error code for session termination. It also
|
|
||||||
// returns the error that caused the session to terminate as well as any debug information sent in the GoAway
|
|
||||||
// frame by the remote side.
|
|
||||||
Wait() (code ErrorCode, err error, debug []byte)
|
|
||||||
|
|
||||||
// NetListener returns an adaptor object which allows this Session to be used as a net.Listener. The returned
|
|
||||||
// net.Listener returns new streams initiated by the remote side as net.Conn's when calling Accept().
|
|
||||||
NetListener() net.Listener
|
|
||||||
|
|
||||||
// NetDial is a function that implements the same API as net.Dial and can be used in place of it. Users should keep
|
|
||||||
// in mind that it is the same as a call to Open(). It ignores both arguments passed to it, always initiate a new stream
|
|
||||||
// to the remote side.
|
|
||||||
NetDial(_, _ string) (net.Conn, error)
|
|
||||||
}
|
|
|
@ -1,87 +0,0 @@
|
||||||
package buffer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
FullError = errors.New("Buffer is full")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Reads as much data
|
|
||||||
func readInto(rd io.Reader, p []byte) (n int, err error) {
|
|
||||||
var nr int
|
|
||||||
for n < len(p) {
|
|
||||||
nr, err = rd.Read(p[n:])
|
|
||||||
n += nr
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// A circular buffer on top of a byte-array
|
|
||||||
// NOTE: It does not implement the Write() method, it implements ReadFrom()
|
|
||||||
// to avoid copies
|
|
||||||
type Circular struct {
|
|
||||||
buf []byte // the bytes
|
|
||||||
size int // == len(buf)
|
|
||||||
head int // index of the next byte to read
|
|
||||||
tail int // index of the last byte available to read
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a new circular buffer of the given size
|
|
||||||
func NewCircular(size int) *Circular {
|
|
||||||
return &Circular{
|
|
||||||
buf: make([]byte, size+1),
|
|
||||||
size: size + 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy data from the given reader into the buffer
|
|
||||||
// Any errors encountered while reading are returned EXCEPT io.EOF.
|
|
||||||
// If the reader fills the buffer, it returns buffer.FullError
|
|
||||||
func (c *Circular) ReadFrom(rd io.Reader) (n int, err error) {
|
|
||||||
// IF:
|
|
||||||
// [---H+++T--]
|
|
||||||
if c.tail >= c.head {
|
|
||||||
n, err = readInto(rd, c.buf[c.tail:])
|
|
||||||
c.tail = (c.tail + n) % c.size
|
|
||||||
if err == io.EOF {
|
|
||||||
return n, nil
|
|
||||||
} else if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOW:
|
|
||||||
// [T---H++++] or [++T--H+++]
|
|
||||||
n2, err := readInto(rd, c.buf[c.tail:c.head])
|
|
||||||
n += n2
|
|
||||||
c.tail += n2
|
|
||||||
if err == nil {
|
|
||||||
err = FullError
|
|
||||||
} else if err == io.EOF {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read data out of the buffer. This never fails but may
|
|
||||||
// return n==0 if there is no data to be read
|
|
||||||
func (c *Circular) Read(p []byte) (n int, err error) {
|
|
||||||
if c.head > c.tail {
|
|
||||||
n = copy(p, c.buf[c.head:])
|
|
||||||
c.head = (c.head + n) % c.size
|
|
||||||
if c.head != 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n2 := copy(p[n:], c.buf[c.head:c.tail])
|
|
||||||
n += n2
|
|
||||||
c.head += n2
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,160 +0,0 @@
|
||||||
package buffer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
AlreadyClosed = errors.New("Buffer already closed")
|
|
||||||
)
|
|
||||||
|
|
||||||
// A specialized concurrent circular buffer intended to buffer a stream's inbound data with the following properties:
|
|
||||||
// - Minimizes copies by skipping the buffer if a write occurs while a reader is waiting
|
|
||||||
// - Provides a mechnaism to time out reads after a deadline
|
|
||||||
// - Provides a mechanism to set an 'error' that will fail reads when the buffer is empty
|
|
||||||
type waitingReader struct {
|
|
||||||
buf []byte
|
|
||||||
n int
|
|
||||||
}
|
|
||||||
|
|
||||||
type Inbound struct {
|
|
||||||
*Circular
|
|
||||||
*sync.Cond
|
|
||||||
err error
|
|
||||||
waitingReader
|
|
||||||
deadline time.Time
|
|
||||||
timer *time.Timer
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewInbound(size int) *Inbound {
|
|
||||||
return &Inbound{
|
|
||||||
Circular: NewCircular(size),
|
|
||||||
Cond: sync.NewCond(new(sync.Mutex)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Inbound) SetDeadline(t time.Time) {
|
|
||||||
b.L.Lock()
|
|
||||||
|
|
||||||
// set the deadline
|
|
||||||
b.deadline = t
|
|
||||||
|
|
||||||
// how long until the deadline
|
|
||||||
delay := t.Sub(time.Now())
|
|
||||||
|
|
||||||
if b.timer != nil {
|
|
||||||
b.timer.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// after the delay, wake up waiters
|
|
||||||
b.timer = time.AfterFunc(delay, func() {
|
|
||||||
b.Broadcast()
|
|
||||||
})
|
|
||||||
|
|
||||||
b.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Inbound) SetError(err error) {
|
|
||||||
b.L.Lock()
|
|
||||||
b.err = err
|
|
||||||
b.Broadcast()
|
|
||||||
b.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Inbound) GetError() (err error) {
|
|
||||||
b.L.Lock()
|
|
||||||
err = b.err
|
|
||||||
b.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Inbound) ReadFrom(rd io.Reader) (n int, err error) {
|
|
||||||
b.L.Lock()
|
|
||||||
|
|
||||||
if b.err != nil {
|
|
||||||
b.L.Unlock()
|
|
||||||
if _, err = ioutil.ReadAll(rd); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return 0, AlreadyClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
// write directly to a reader's buffer, if possible
|
|
||||||
if b.waitingReader.buf != nil {
|
|
||||||
b.waitingReader.n, err = readInto(rd, b.waitingReader.buf)
|
|
||||||
n += b.waitingReader.n
|
|
||||||
b.waitingReader.buf = nil
|
|
||||||
if err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
// EOF is not an error
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
b.Broadcast()
|
|
||||||
b.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// write the rest to buffer
|
|
||||||
var writeN int
|
|
||||||
writeN, err = b.Circular.ReadFrom(rd)
|
|
||||||
n += writeN
|
|
||||||
|
|
||||||
b.Broadcast()
|
|
||||||
b.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Inbound) Read(p []byte) (n int, err error) {
|
|
||||||
b.L.Lock()
|
|
||||||
|
|
||||||
var wait *waitingReader
|
|
||||||
|
|
||||||
for {
|
|
||||||
// we got a direct write to our buffer
|
|
||||||
if wait != nil && wait.n != 0 {
|
|
||||||
n = wait.n
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for timeout
|
|
||||||
if !b.deadline.IsZero() {
|
|
||||||
if time.Now().After(b.deadline) {
|
|
||||||
err = errors.New("Read timeout")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// try to read from the buffer
|
|
||||||
n, _ = b.Circular.Read(p)
|
|
||||||
|
|
||||||
// successfully read some data
|
|
||||||
if n != 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// there's an error
|
|
||||||
if b.err != nil {
|
|
||||||
err = b.err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// register for a direct write
|
|
||||||
if b.waitingReader.buf == nil {
|
|
||||||
wait = &b.waitingReader
|
|
||||||
wait.buf = p
|
|
||||||
wait.n = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// no data, wait
|
|
||||||
b.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
b.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,59 +0,0 @@
|
||||||
package buffer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Outbound struct {
|
|
||||||
val int
|
|
||||||
err error
|
|
||||||
*sync.Cond
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewOutbound(size int) *Outbound {
|
|
||||||
return &Outbound{val: size, Cond: sync.NewCond(new(sync.Mutex))}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Outbound) Increment(inc int) {
|
|
||||||
b.L.Lock()
|
|
||||||
b.val += inc
|
|
||||||
b.Broadcast()
|
|
||||||
b.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Outbound) SetError(err error) {
|
|
||||||
b.L.Lock()
|
|
||||||
b.err = err
|
|
||||||
b.Broadcast()
|
|
||||||
b.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Outbound) Decrement(dec int) (ret int, err error) {
|
|
||||||
if dec == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
b.L.Lock()
|
|
||||||
for {
|
|
||||||
if b.err != nil {
|
|
||||||
err = b.err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if b.val > 0 {
|
|
||||||
if dec > b.val {
|
|
||||||
ret = b.val
|
|
||||||
b.val = 0
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
b.val -= dec
|
|
||||||
ret = dec
|
|
||||||
break
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
b.Wait()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
b.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,10 +0,0 @@
|
||||||
package buffer
|
|
||||||
|
|
||||||
func BothClosed(in *Inbound, out *Outbound) (closed bool) {
|
|
||||||
in.L.Lock()
|
|
||||||
out.L.Lock()
|
|
||||||
closed = (in.err != nil && out.err != nil)
|
|
||||||
out.L.Unlock()
|
|
||||||
in.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,9 +0,0 @@
|
||||||
package ext
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/inconshreveable/muxado/proto"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
heartbeatExtensionType = proto.MinExtensionType + iota
|
|
||||||
)
|
|
|
@ -1,125 +0,0 @@
|
||||||
package ext
|
|
||||||
|
|
||||||
// XXX: There's no logging around heartbeats - how can we do this in a way that is useful
|
|
||||||
// as a library?
|
|
||||||
//
|
|
||||||
// XXX: When we close the session because of a lost heartbeat or because of an error in the
|
|
||||||
// heartbeating, there is no way to tell that; a Session will just appear to stop working
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/binary"
|
|
||||||
proto "github.com/inconshreveable/muxado/proto"
|
|
||||||
"github.com/inconshreveable/muxado/proto/frame"
|
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultHeartbeatInterval = 3 * time.Second
|
|
||||||
defaultHeartbeatTolerance = 10 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
type Heartbeat struct {
|
|
||||||
sess proto.ISession
|
|
||||||
accept proto.ExtAccept
|
|
||||||
|
|
||||||
mark chan int
|
|
||||||
interval time.Duration
|
|
||||||
tolerance time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDefaultHeartbeat() *Heartbeat {
|
|
||||||
return NewHeartbeat(defaultHeartbeatInterval, defaultHeartbeatTolerance)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHeartbeat(interval, tolerance time.Duration) *Heartbeat {
|
|
||||||
return &Heartbeat{
|
|
||||||
mark: make(chan int),
|
|
||||||
interval: interval,
|
|
||||||
tolerance: tolerance,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Heartbeat) Start(sess proto.ISession, accept proto.ExtAccept) frame.StreamType {
|
|
||||||
h.sess = sess
|
|
||||||
h.accept = accept
|
|
||||||
go h.respond()
|
|
||||||
go h.request()
|
|
||||||
go h.check()
|
|
||||||
|
|
||||||
return heartbeatExtensionType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Heartbeat) check() {
|
|
||||||
t := time.NewTimer(h.interval + h.tolerance)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
// time out waiting for a response!
|
|
||||||
h.sess.Close()
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-h.mark:
|
|
||||||
t.Reset(h.interval + h.tolerance)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Heartbeat) respond() {
|
|
||||||
// close the session on any errors
|
|
||||||
defer h.sess.Close()
|
|
||||||
|
|
||||||
stream, err := h.accept()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// read the next heartbeat id and respond
|
|
||||||
buf := make([]byte, 4)
|
|
||||||
for {
|
|
||||||
_, err := io.ReadFull(stream, buf)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = stream.Write(buf)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Heartbeat) request() {
|
|
||||||
// close the session on any errors
|
|
||||||
defer h.sess.Close()
|
|
||||||
|
|
||||||
// request highest possible priority for heartbeats
|
|
||||||
priority := frame.StreamPriority(0x7FFFFFFF)
|
|
||||||
stream, err := h.sess.OpenStream(priority, heartbeatExtensionType, false)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// send heartbeats and then check that we got them back
|
|
||||||
var id uint32
|
|
||||||
for {
|
|
||||||
time.Sleep(h.interval)
|
|
||||||
|
|
||||||
if err := binary.Write(stream, binary.BigEndian, id); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var respId uint32
|
|
||||||
if err := binary.Read(stream, binary.BigEndian, &respId); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if id != respId {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// record the time
|
|
||||||
h.mark <- 1
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,68 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// data frames are actually longer, but they are variable length
|
|
||||||
dataFrameSize = headerSize
|
|
||||||
)
|
|
||||||
|
|
||||||
type RStreamData struct {
|
|
||||||
Header
|
|
||||||
fixed [dataFrameSize]byte
|
|
||||||
|
|
||||||
toRead io.LimitedReader // when reading, the underlying connection's io.Reader is handed up
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamData) Reader() io.Reader {
|
|
||||||
return &f.toRead
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamData) readFrom(d deserializer) (err error) {
|
|
||||||
// not using io.LimitReader to avoid a heap memory allocation in the hot path
|
|
||||||
f.toRead.R = d
|
|
||||||
f.toRead.N = int64(f.Length())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// WStreamData is a StreamData frame that you can write
|
|
||||||
// It delivers opaque data on a stream to the application layer
|
|
||||||
type WStreamData struct {
|
|
||||||
Header
|
|
||||||
fixed [dataFrameSize]byte
|
|
||||||
toWrite []byte // when writing, you just pass a byte slice to write
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamData) writeTo(s serializer) (err error) {
|
|
||||||
if _, err = s.Write(f.fixed[:]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = s.Write(f.toWrite); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamData) Set(streamId StreamId, data []byte, fin bool) (err error) {
|
|
||||||
var flags flagsType
|
|
||||||
if fin {
|
|
||||||
flags.Set(flagFin)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = f.Header.SetAll(TypeStreamData, len(data), streamId, flags); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
f.toWrite = data
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWStreamData() (f *WStreamData) {
|
|
||||||
f = new(WStreamData)
|
|
||||||
f.Header = f.fixed[:headerSize]
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,37 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import "fmt"
|
|
||||||
import "io"
|
|
||||||
|
|
||||||
type DebugTransport struct {
|
|
||||||
prefix string
|
|
||||||
*BasicTransport
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *DebugTransport) Write(buf []byte) (int, error) {
|
|
||||||
fmt.Printf("%v writes %d bytes: %x\n", t.prefix, len(buf), buf)
|
|
||||||
return t.BasicTransport.Write(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *DebugTransport) WriteFrame(frame WFrame) (err error) {
|
|
||||||
// each frame knows how to write iteself to the framer
|
|
||||||
return frame.writeTo(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *DebugTransport) ReadFrame() (f RFrame, err error) {
|
|
||||||
f, err = t.BasicTransport.ReadFrame()
|
|
||||||
|
|
||||||
fmt.Printf("%v reads Header length: %v\n", t.prefix, t.Header.Length())
|
|
||||||
fmt.Printf("%v reads Header type: %v\n", t.prefix, t.Header.Type())
|
|
||||||
fmt.Printf("%v reads Header stream id: %v\n", t.prefix, t.Header.StreamId())
|
|
||||||
fmt.Printf("%v reads Header fin: %v\n", t.prefix, t.Header.Fin())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDebugTransport(rwc io.ReadWriteCloser, prefix string) *DebugTransport {
|
|
||||||
trans := &DebugTransport{
|
|
||||||
prefix: prefix,
|
|
||||||
BasicTransport: &BasicTransport{ReadWriteCloser: rwc, Header: make([]byte, headerSize)},
|
|
||||||
}
|
|
||||||
return trans
|
|
||||||
}
|
|
|
@ -1,25 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
NoError = iota
|
|
||||||
ProtocolError
|
|
||||||
InternalError
|
|
||||||
FlowControlError
|
|
||||||
StreamClosed
|
|
||||||
FrameSizeError
|
|
||||||
RefusedStream
|
|
||||||
Cancel
|
|
||||||
NoSuchError
|
|
||||||
)
|
|
||||||
|
|
||||||
type FramingError struct {
|
|
||||||
error
|
|
||||||
}
|
|
||||||
|
|
||||||
func protoError(fmtstr string, args ...interface{}) FramingError {
|
|
||||||
return FramingError{fmt.Errorf(fmtstr, args...)}
|
|
||||||
}
|
|
|
@ -1,79 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import "io"
|
|
||||||
|
|
||||||
const (
|
|
||||||
goAwayBodySize = 8
|
|
||||||
goAwayFrameSize = headerSize + goAwayBodySize
|
|
||||||
)
|
|
||||||
|
|
||||||
// Instruct the remote side not to initiate new streams
|
|
||||||
type RGoAway struct {
|
|
||||||
Header
|
|
||||||
body [goAwayBodySize]byte
|
|
||||||
debug []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RGoAway) LastStreamId() StreamId {
|
|
||||||
return StreamId(order.Uint32(f.body[0:]) & streamMask)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RGoAway) ErrorCode() ErrorCode {
|
|
||||||
return ErrorCode(order.Uint32(f.body[4:]))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RGoAway) Debug() []byte {
|
|
||||||
return f.debug
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RGoAway) readFrom(d deserializer) (err error) {
|
|
||||||
if _, err = io.ReadFull(d, f.body[:]); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
f.debug = make([]byte, f.Length()-goAwayBodySize)
|
|
||||||
if _, err = io.ReadFull(d, f.debug); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
type WGoAway struct {
|
|
||||||
Header
|
|
||||||
data [goAwayFrameSize]byte
|
|
||||||
debug []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WGoAway) writeTo(s serializer) (err error) {
|
|
||||||
if _, err = s.Write(f.data[:]); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = s.Write(f.debug); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WGoAway) Set(lastStreamId StreamId, errorCode ErrorCode, debug []byte) (err error) {
|
|
||||||
if f.Header.SetAll(TypeGoAway, len(debug)+goAwayFrameSize, 0, 0); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if lastStreamId > streamMask {
|
|
||||||
err = protoError("Related stream id %d is out of range", lastStreamId)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
order.PutUint32(f.data[headerSize:], uint32(lastStreamId))
|
|
||||||
order.PutUint32(f.data[headerSize+4:], uint32(errorCode))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWGoAway() (f *WGoAway) {
|
|
||||||
f = new(WGoAway)
|
|
||||||
f.Header = Header(f.data[:headerSize])
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,85 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import "io"
|
|
||||||
|
|
||||||
const (
|
|
||||||
headerSize = 8
|
|
||||||
)
|
|
||||||
|
|
||||||
type Header []byte
|
|
||||||
|
|
||||||
func newHeader() Header {
|
|
||||||
return Header(make([]byte, headerSize))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) readFrom(d deserializer) (err error) {
|
|
||||||
// read the header
|
|
||||||
if _, err = io.ReadFull(d, []byte(b)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) Length() uint16 {
|
|
||||||
return order.Uint16(b[:2]) & lengthMask
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) SetLength(length int) (err error) {
|
|
||||||
if length > lengthMask || length < 0 {
|
|
||||||
return protoError("Frame length %d out of range", length)
|
|
||||||
}
|
|
||||||
|
|
||||||
order.PutUint16(b[:2], uint16(length))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) Type() FrameType {
|
|
||||||
return FrameType((b[3]) & typeMask)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) SetType(t FrameType) (err error) {
|
|
||||||
b[3] = byte(t & typeMask)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) StreamId() StreamId {
|
|
||||||
return StreamId(order.Uint32(b[4:]) & streamMask)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) SetStreamId(streamId StreamId) (err error) {
|
|
||||||
if streamId > streamMask {
|
|
||||||
return protoError("Stream id %d out of range", streamId)
|
|
||||||
}
|
|
||||||
|
|
||||||
order.PutUint32(b[4:], uint32(streamId))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) Flags() flagsType {
|
|
||||||
return flagsType(b[2])
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) SetFlags(fl flagsType) (err error) {
|
|
||||||
b[2] = byte(fl)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) Fin() bool {
|
|
||||||
return b.Flags().IsSet(flagFin)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Header) SetAll(ftype FrameType, length int, streamId StreamId, flags flagsType) (err error) {
|
|
||||||
if err = b.SetType(ftype); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = b.SetLength(length); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = b.SetStreamId(streamId); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = b.SetFlags(flags); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,25 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Transport interface {
|
|
||||||
WriteFrame(WFrame) error
|
|
||||||
ReadFrame() (RFrame, error)
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// A frame can read and write itself to a serializer/deserializer
|
|
||||||
type RFrame interface {
|
|
||||||
StreamId() StreamId
|
|
||||||
Type() FrameType
|
|
||||||
readFrom(deserializer) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type WFrame interface {
|
|
||||||
writeTo(serializer) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type deserializer io.Reader
|
|
||||||
type serializer io.Writer
|
|
|
@ -1,67 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import "io"
|
|
||||||
|
|
||||||
const (
|
|
||||||
rstBodySize = 4
|
|
||||||
rstFrameSize = headerSize + rstBodySize
|
|
||||||
)
|
|
||||||
|
|
||||||
// RsStreamRst is a STREAM_RST frame that is read from a transport
|
|
||||||
type RStreamRst struct {
|
|
||||||
Header
|
|
||||||
body [rstBodySize]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamRst) readFrom(d deserializer) (err error) {
|
|
||||||
if f.Length() != rstBodySize {
|
|
||||||
return protoError("STREAM_RST length must be %d, got %d", rstBodySize, f.Length())
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = io.ReadFull(d, f.body[:]); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamRst) ErrorCode() ErrorCode {
|
|
||||||
return ErrorCode(order.Uint32(f.body[0:]))
|
|
||||||
}
|
|
||||||
|
|
||||||
// WStreamRst is a STREAM_RST frame that can be written, it terminate a stream ungracefully
|
|
||||||
type WStreamRst struct {
|
|
||||||
Header
|
|
||||||
all [rstFrameSize]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWStreamRst() (f *WStreamRst) {
|
|
||||||
f = new(WStreamRst)
|
|
||||||
f.Header = Header(f.all[:headerSize])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamRst) writeTo(s serializer) (err error) {
|
|
||||||
_, err = s.Write(f.all[:])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamRst) Set(streamId StreamId, errorCode ErrorCode) (err error) {
|
|
||||||
if err = f.Header.SetAll(TypeStreamRst, rstBodySize, streamId, 0); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = validRstErrorCode(errorCode); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
order.PutUint32(f.all[headerSize:], uint32(errorCode))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func validRstErrorCode(errorCode ErrorCode) error {
|
|
||||||
if errorCode >= NoSuchError {
|
|
||||||
return protoError("Invalid error code %d for STREAM_RST", errorCode)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,120 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
maxSynBodySize = 8
|
|
||||||
maxSynFrameSize = headerSize + maxSynBodySize
|
|
||||||
)
|
|
||||||
|
|
||||||
type RStreamSyn struct {
|
|
||||||
Header
|
|
||||||
body [maxSynBodySize]byte
|
|
||||||
streamPriority StreamPriority
|
|
||||||
streamType StreamType
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamType returns the stream's defined type as specified by
|
|
||||||
// the remote endpoint
|
|
||||||
func (f *RStreamSyn) StreamType() StreamType {
|
|
||||||
return f.streamType
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamPriority returns the stream priority set on this frame
|
|
||||||
func (f *RStreamSyn) StreamPriority() StreamPriority {
|
|
||||||
return f.streamPriority
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamSyn) parseFields() error {
|
|
||||||
var length uint16 = 0
|
|
||||||
flags := f.Flags()
|
|
||||||
|
|
||||||
if flags.IsSet(flagStreamPriority) {
|
|
||||||
f.streamPriority = StreamPriority(order.Uint32(f.body[length : length+4]))
|
|
||||||
length += 4
|
|
||||||
} else {
|
|
||||||
f.streamPriority = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
if flags.IsSet(flagStreamType) {
|
|
||||||
f.streamType = StreamType(order.Uint32(f.body[length : length+4]))
|
|
||||||
length += 4
|
|
||||||
} else {
|
|
||||||
f.streamType = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
if length != f.Length() {
|
|
||||||
return fmt.Errorf("Expected length %d for flags %v, but got %v", length, flags, f.Length())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamSyn) readFrom(d deserializer) (err error) {
|
|
||||||
if _, err = io.ReadFull(d, f.body[:f.Length()]); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = f.parseFields(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
type WStreamSyn struct {
|
|
||||||
Header
|
|
||||||
data [maxSynFrameSize]byte
|
|
||||||
length int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamSyn) writeTo(s serializer) (err error) {
|
|
||||||
_, err = s.Write(f.data[:headerSize+f.Length()])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamSyn) Set(streamId StreamId, streamPriority StreamPriority, streamType StreamType, fin bool) (err error) {
|
|
||||||
var (
|
|
||||||
flags flagsType
|
|
||||||
length int = 0
|
|
||||||
)
|
|
||||||
|
|
||||||
// set fin bit
|
|
||||||
if fin {
|
|
||||||
flags.Set(flagFin)
|
|
||||||
}
|
|
||||||
|
|
||||||
if streamPriority != 0 {
|
|
||||||
if streamPriority > priorityMask {
|
|
||||||
err = protoError("Priority %d is out of range", streamPriority)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
flags.Set(flagStreamPriority)
|
|
||||||
start := headerSize + length
|
|
||||||
order.PutUint32(f.data[start:start+4], uint32(streamPriority))
|
|
||||||
length += 4
|
|
||||||
}
|
|
||||||
|
|
||||||
if streamType != 0 {
|
|
||||||
flags.Set(flagStreamType)
|
|
||||||
start := headerSize + length
|
|
||||||
order.PutUint32(f.data[start:start+4], uint32(streamType))
|
|
||||||
length += 4
|
|
||||||
}
|
|
||||||
|
|
||||||
// make the frame
|
|
||||||
if err = f.Header.SetAll(TypeStreamSyn, length, streamId, flags); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWStreamSyn() (f *WStreamSyn) {
|
|
||||||
f = new(WStreamSyn)
|
|
||||||
f.Header = Header(f.data[:headerSize])
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,79 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/binary"
|
|
||||||
"io"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
order = binary.BigEndian
|
|
||||||
)
|
|
||||||
|
|
||||||
// BasicTransport can serialize/deserialize frames on an underlying
|
|
||||||
// net.Conn to implement the muxado protocol.
|
|
||||||
type BasicTransport struct {
|
|
||||||
io.ReadWriteCloser
|
|
||||||
Header
|
|
||||||
RStreamSyn
|
|
||||||
RStreamRst
|
|
||||||
RStreamData
|
|
||||||
RStreamWndInc
|
|
||||||
RGoAway
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteFrame writes the given frame to the underlying transport
|
|
||||||
func (t *BasicTransport) WriteFrame(frame WFrame) (err error) {
|
|
||||||
// each frame knows how to write iteself to the framer
|
|
||||||
err = frame.writeTo(t)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadFrame reads the next frame from the underlying transport
|
|
||||||
func (t *BasicTransport) ReadFrame() (f RFrame, err error) {
|
|
||||||
// read the header
|
|
||||||
if _, err = io.ReadFull(t, []byte(t.Header)); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch t.Header.Type() {
|
|
||||||
case TypeStreamSyn:
|
|
||||||
frame := &t.RStreamSyn
|
|
||||||
frame.Header = t.Header
|
|
||||||
err = frame.readFrom(t)
|
|
||||||
return frame, err
|
|
||||||
|
|
||||||
case TypeStreamRst:
|
|
||||||
frame := &t.RStreamRst
|
|
||||||
frame.Header = t.Header
|
|
||||||
err = frame.readFrom(t)
|
|
||||||
return frame, err
|
|
||||||
|
|
||||||
case TypeStreamData:
|
|
||||||
frame := &t.RStreamData
|
|
||||||
frame.Header = t.Header
|
|
||||||
err = frame.readFrom(t)
|
|
||||||
return frame, err
|
|
||||||
|
|
||||||
case TypeStreamWndInc:
|
|
||||||
frame := &t.RStreamWndInc
|
|
||||||
frame.Header = t.Header
|
|
||||||
err = frame.readFrom(t)
|
|
||||||
return frame, err
|
|
||||||
|
|
||||||
case TypeGoAway:
|
|
||||||
frame := &t.RGoAway
|
|
||||||
frame.Header = t.Header
|
|
||||||
err = frame.readFrom(t)
|
|
||||||
return frame, err
|
|
||||||
|
|
||||||
default:
|
|
||||||
return nil, protoError("Illegal frame type: %d", t.Header.Type())
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewBasicTransport(rwc io.ReadWriteCloser) *BasicTransport {
|
|
||||||
trans := &BasicTransport{ReadWriteCloser: rwc, Header: make([]byte, headerSize)}
|
|
||||||
return trans
|
|
||||||
}
|
|
|
@ -1,61 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
const (
|
|
||||||
// offsets for packing/unpacking frames
|
|
||||||
lengthOffset = 32 + 16
|
|
||||||
flagsOffset = 32 + 8
|
|
||||||
typeOffset = 32 + 3
|
|
||||||
|
|
||||||
// masks for packing/unpacking frames
|
|
||||||
lengthMask = 0x3FFF
|
|
||||||
streamMask = 0x7FFFFFFF
|
|
||||||
flagsMask = 0xFF
|
|
||||||
typeMask = 0x1F
|
|
||||||
wndIncMask = 0x7FFFFFFF
|
|
||||||
priorityMask = 0x7FFFFFFF
|
|
||||||
)
|
|
||||||
|
|
||||||
// a frameType is a 5-bit integer in the frame header that identifies the type of frame
|
|
||||||
type FrameType uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
TypeStreamSyn = 0x1
|
|
||||||
TypeStreamRst = 0x2
|
|
||||||
TypeStreamData = 0x3
|
|
||||||
TypeStreamWndInc = 0x4
|
|
||||||
TypeStreamPri = 0x5
|
|
||||||
TypeGoAway = 0x6
|
|
||||||
)
|
|
||||||
|
|
||||||
// a flagsType is an 8-bit integer containing frame-specific flag bits in the frame header
|
|
||||||
type flagsType uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
flagFin = 0x1
|
|
||||||
flagStreamPriority = 0x2
|
|
||||||
flagStreamType = 0x4
|
|
||||||
)
|
|
||||||
|
|
||||||
func (ft flagsType) IsSet(f flagsType) bool {
|
|
||||||
return (ft & f) != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ft *flagsType) Set(f flagsType) {
|
|
||||||
*ft |= f
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ft *flagsType) Unset(f flagsType) {
|
|
||||||
*ft = *ft &^ f
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamId is 31-bit integer uniquely identifying a stream within a session
|
|
||||||
type StreamId uint32
|
|
||||||
|
|
||||||
// StreamPriority is 31-bit integer specifying a stream's priority
|
|
||||||
type StreamPriority uint32
|
|
||||||
|
|
||||||
// StreamType is 32-bit integer specifying a stream's type
|
|
||||||
type StreamType uint32
|
|
||||||
|
|
||||||
// ErrorCode is a 32-bit integer indicating a error condition included in rst/goaway frames
|
|
||||||
type ErrorCode uint32
|
|
|
@ -1,57 +0,0 @@
|
||||||
package frame
|
|
||||||
|
|
||||||
import "io"
|
|
||||||
|
|
||||||
const (
|
|
||||||
wndIncBodySize = 4
|
|
||||||
wndIncFrameSize = headerSize + wndIncBodySize
|
|
||||||
)
|
|
||||||
|
|
||||||
// Increase a stream's flow control window size
|
|
||||||
type RStreamWndInc struct {
|
|
||||||
Header
|
|
||||||
body [wndIncBodySize]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamWndInc) WindowIncrement() (inc uint32) {
|
|
||||||
return order.Uint32(f.body[:]) & wndIncMask
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *RStreamWndInc) readFrom(d deserializer) (err error) {
|
|
||||||
if f.Length() != wndIncBodySize {
|
|
||||||
return protoError("WND_INC length must be %d, got %d", wndIncBodySize, f.Length())
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = io.ReadFull(d, f.body[:])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
type WStreamWndInc struct {
|
|
||||||
Header
|
|
||||||
data [wndIncFrameSize]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamWndInc) writeTo(s serializer) (err error) {
|
|
||||||
_, err = s.Write(f.data[:])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *WStreamWndInc) Set(streamId StreamId, inc uint32) (err error) {
|
|
||||||
if inc > wndIncMask {
|
|
||||||
return protoError("Window increment %d out of range", inc)
|
|
||||||
}
|
|
||||||
|
|
||||||
order.PutUint32(f.data[headerSize:], inc)
|
|
||||||
|
|
||||||
if err = f.Header.SetAll(TypeStreamWndInc, wndIncBodySize, streamId, 0); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWStreamWndInc() (f *WStreamWndInc) {
|
|
||||||
f = new(WStreamWndInc)
|
|
||||||
f.Header = Header(f.data[:headerSize])
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
package proto
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/inconshreveable/muxado/proto/frame"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type IStream interface {
|
|
||||||
Write([]byte) (int, error)
|
|
||||||
Read([]byte) (int, error)
|
|
||||||
Close() error
|
|
||||||
SetDeadline(time.Time) error
|
|
||||||
SetReadDeadline(time.Time) error
|
|
||||||
SetWriteDeadline(time.Time) error
|
|
||||||
HalfClose([]byte) (int, error)
|
|
||||||
Id() frame.StreamId
|
|
||||||
StreamType() frame.StreamType
|
|
||||||
Session() ISession
|
|
||||||
RemoteAddr() net.Addr
|
|
||||||
LocalAddr() net.Addr
|
|
||||||
}
|
|
||||||
|
|
||||||
type ISession interface {
|
|
||||||
Open() (IStream, error)
|
|
||||||
OpenStream(frame.StreamPriority, frame.StreamType, bool) (IStream, error)
|
|
||||||
Accept() (IStream, error)
|
|
||||||
Kill() error
|
|
||||||
GoAway(frame.ErrorCode, []byte) error
|
|
||||||
LocalAddr() net.Addr
|
|
||||||
RemoteAddr() net.Addr
|
|
||||||
Close() error
|
|
||||||
Wait() (frame.ErrorCode, error, []byte)
|
|
||||||
NetListener() net.Listener
|
|
||||||
NetDial(_, _ string) (net.Conn, error)
|
|
||||||
}
|
|
|
@ -1,474 +0,0 @@
|
||||||
package proto
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"github.com/inconshreveable/muxado/proto/frame"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"reflect"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultWindowSize = 0x10000 // 64KB
|
|
||||||
defaultAcceptQueueDepth = 100
|
|
||||||
MinExtensionType = 0xFFFFFFFF - 0x100 // 512 extensions
|
|
||||||
)
|
|
||||||
|
|
||||||
// private interface for Sessions to call Streams
|
|
||||||
type stream interface {
|
|
||||||
IStream
|
|
||||||
handleStreamData(*frame.RStreamData)
|
|
||||||
handleStreamWndInc(*frame.RStreamWndInc)
|
|
||||||
handleStreamRst(*frame.RStreamRst)
|
|
||||||
closeWith(error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// for extensions
|
|
||||||
type ExtAccept func() (IStream, error)
|
|
||||||
type Extension interface {
|
|
||||||
Start(ISession, ExtAccept) frame.StreamType
|
|
||||||
}
|
|
||||||
|
|
||||||
type deadReason struct {
|
|
||||||
errorCode frame.ErrorCode
|
|
||||||
err error
|
|
||||||
remoteDebug []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// factory function that creates new streams
|
|
||||||
type streamFactory func(id frame.StreamId, priority frame.StreamPriority, streamType frame.StreamType, finLocal bool, finRemote bool, windowSize uint32, sess session) stream
|
|
||||||
|
|
||||||
// checks the parity of a stream id (local vs remote, client vs server)
|
|
||||||
type parityFn func(frame.StreamId) bool
|
|
||||||
|
|
||||||
// state for each half of the session (remote and local)
|
|
||||||
type halfState struct {
|
|
||||||
goneAway int32 // true if that half of the stream has gone away
|
|
||||||
lastId uint32 // last id used/seen from one half of the session
|
|
||||||
}
|
|
||||||
|
|
||||||
// Session implements a simple streaming session manager. It has the following characteristics:
|
|
||||||
//
|
|
||||||
// - When closing the Session, it does not linger, all pending write operations will fail immediately.
|
|
||||||
// - It completely ignores stream priority when processing and writing frames
|
|
||||||
// - It offers no customization of settings like window size/ping time
|
|
||||||
type Session struct {
|
|
||||||
conn net.Conn // connection the transport is running over
|
|
||||||
transport frame.Transport // transport
|
|
||||||
streams StreamMap // all active streams
|
|
||||||
local halfState // client state
|
|
||||||
remote halfState // server state
|
|
||||||
syn *frame.WStreamSyn // STREAM_SYN frame for opens
|
|
||||||
wr sync.Mutex // synchronization when writing frames
|
|
||||||
accept chan stream // new streams opened by the remote
|
|
||||||
diebit int32 // true if we're dying
|
|
||||||
remoteDebug []byte // debugging data sent in the remote's GoAway frame
|
|
||||||
defaultWindowSize uint32 // window size when creating new streams
|
|
||||||
newStream streamFactory // factory function to make new streams
|
|
||||||
dead chan deadReason // dead
|
|
||||||
isLocal parityFn // determines if a stream id is local or remote
|
|
||||||
exts map[frame.StreamType]chan stream // map of extension stream type -> accept channel for the extension
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSession(conn net.Conn, newStream streamFactory, isClient bool, exts []Extension) ISession {
|
|
||||||
sess := &Session{
|
|
||||||
conn: conn,
|
|
||||||
transport: frame.NewBasicTransport(conn),
|
|
||||||
streams: NewConcurrentStreamMap(),
|
|
||||||
local: halfState{lastId: 0},
|
|
||||||
remote: halfState{lastId: 0},
|
|
||||||
syn: frame.NewWStreamSyn(),
|
|
||||||
diebit: 0,
|
|
||||||
defaultWindowSize: defaultWindowSize,
|
|
||||||
accept: make(chan stream, defaultAcceptQueueDepth),
|
|
||||||
newStream: newStream,
|
|
||||||
dead: make(chan deadReason, 1), // don't block die() if there is no Wait call
|
|
||||||
exts: make(map[frame.StreamType]chan stream),
|
|
||||||
}
|
|
||||||
|
|
||||||
if isClient {
|
|
||||||
sess.isLocal = sess.isClient
|
|
||||||
sess.local.lastId += 1
|
|
||||||
} else {
|
|
||||||
sess.isLocal = sess.isServer
|
|
||||||
sess.remote.lastId += 1
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ext := range exts {
|
|
||||||
sess.startExtension(ext)
|
|
||||||
}
|
|
||||||
|
|
||||||
go sess.reader()
|
|
||||||
|
|
||||||
return sess
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////
|
|
||||||
// public interface
|
|
||||||
////////////////////////////////
|
|
||||||
|
|
||||||
func (s *Session) Open() (IStream, error) {
|
|
||||||
return s.OpenStream(0, 0, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) OpenStream(priority frame.StreamPriority, streamType frame.StreamType, fin bool) (ret IStream, err error) {
|
|
||||||
// check if the remote has gone away
|
|
||||||
if atomic.LoadInt32(&s.remote.goneAway) == 1 {
|
|
||||||
return nil, fmt.Errorf("Failed to create stream, remote has gone away.")
|
|
||||||
}
|
|
||||||
|
|
||||||
// this lock prevents the following race:
|
|
||||||
// goroutine1 goroutine2
|
|
||||||
// - inc stream id
|
|
||||||
// - inc stream id
|
|
||||||
// - send streamsyn
|
|
||||||
// - send streamsyn
|
|
||||||
s.wr.Lock()
|
|
||||||
|
|
||||||
// get the next id we can use
|
|
||||||
nextId := frame.StreamId(atomic.AddUint32(&s.local.lastId, 2))
|
|
||||||
|
|
||||||
// make the stream
|
|
||||||
str := s.newStream(nextId, priority, streamType, fin, false, s.defaultWindowSize, s)
|
|
||||||
|
|
||||||
// add to to the stream map
|
|
||||||
s.streams.Set(nextId, str)
|
|
||||||
|
|
||||||
// write the frame
|
|
||||||
if err = s.syn.Set(nextId, priority, streamType, fin); err != nil {
|
|
||||||
s.wr.Unlock()
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = s.transport.WriteFrame(s.syn); err != nil {
|
|
||||||
s.wr.Unlock()
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wr.Unlock()
|
|
||||||
return str, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) Accept() (str IStream, err error) {
|
|
||||||
var ok bool
|
|
||||||
if str, ok = <-s.accept; !ok {
|
|
||||||
return nil, fmt.Errorf("Session closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) Kill() error {
|
|
||||||
return s.transport.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) Close() error {
|
|
||||||
return s.die(frame.NoError, fmt.Errorf("Session Close()"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) GoAway(errorCode frame.ErrorCode, debug []byte) (err error) {
|
|
||||||
if !atomic.CompareAndSwapInt32(&s.local.goneAway, 0, 1) {
|
|
||||||
return fmt.Errorf("Already sent GoAway!")
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wr.Lock()
|
|
||||||
f := frame.NewWGoAway()
|
|
||||||
remoteId := frame.StreamId(atomic.LoadUint32(&s.remote.lastId))
|
|
||||||
if err = f.Set(remoteId, errorCode, debug); err != nil {
|
|
||||||
s.wr.Unlock()
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = s.transport.WriteFrame(f); err != nil {
|
|
||||||
s.wr.Unlock()
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wr.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) LocalAddr() net.Addr {
|
|
||||||
return s.conn.LocalAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) RemoteAddr() net.Addr {
|
|
||||||
return s.conn.RemoteAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) Wait() (frame.ErrorCode, error, []byte) {
|
|
||||||
reason := <-s.dead
|
|
||||||
return reason.errorCode, reason.err, reason.remoteDebug
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////
|
|
||||||
// private interface for streams
|
|
||||||
////////////////////////////////
|
|
||||||
|
|
||||||
// removeStream removes a stream from this session's stream registry
|
|
||||||
//
|
|
||||||
// It does not error if the stream is not present
|
|
||||||
func (s *Session) removeStream(id frame.StreamId) {
|
|
||||||
s.streams.Delete(id)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeFrame writes the given frame to the transport and returns the error from the write operation
|
|
||||||
func (s *Session) writeFrame(f frame.WFrame, dl time.Time) (err error) {
|
|
||||||
s.wr.Lock()
|
|
||||||
s.conn.SetWriteDeadline(dl)
|
|
||||||
err = s.transport.WriteFrame(f)
|
|
||||||
s.wr.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// die closes the session cleanly with the given error and protocol error code
|
|
||||||
func (s *Session) die(errorCode frame.ErrorCode, err error) error {
|
|
||||||
// only one shutdown ever happens
|
|
||||||
if !atomic.CompareAndSwapInt32(&s.diebit, 0, 1) {
|
|
||||||
return fmt.Errorf("Shutdown already in progress")
|
|
||||||
}
|
|
||||||
|
|
||||||
// send a go away frame
|
|
||||||
s.GoAway(errorCode, []byte(err.Error()))
|
|
||||||
|
|
||||||
// now we're safe to stop accepting incoming connections
|
|
||||||
close(s.accept)
|
|
||||||
|
|
||||||
// we cleaned up as best as possible, close the transport
|
|
||||||
s.transport.Close()
|
|
||||||
|
|
||||||
// notify all of the streams that we're closing
|
|
||||||
s.streams.Each(func(id frame.StreamId, str stream) {
|
|
||||||
str.closeWith(fmt.Errorf("Session closed"))
|
|
||||||
})
|
|
||||||
|
|
||||||
s.dead <- deadReason{errorCode, err, s.remoteDebug}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////
|
|
||||||
// internal methods
|
|
||||||
////////////////////////////////
|
|
||||||
|
|
||||||
// reader() reads frames from the underlying transport and handles passes them to handleFrame
|
|
||||||
func (s *Session) reader() {
|
|
||||||
defer s.recoverPanic("reader()")
|
|
||||||
|
|
||||||
// close all of the extension accept channels when we're done
|
|
||||||
// we do this here instead of in die() since otherwise it wouldn't
|
|
||||||
// be safe to access s.exts
|
|
||||||
defer func() {
|
|
||||||
for _, extAccept := range s.exts {
|
|
||||||
close(extAccept)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
f, err := s.transport.ReadFrame()
|
|
||||||
if err != nil {
|
|
||||||
// if we fail to read a frame, terminate the session
|
|
||||||
_, ok := err.(*frame.FramingError)
|
|
||||||
if ok {
|
|
||||||
s.die(frame.ProtocolError, err)
|
|
||||||
} else {
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.handleFrame(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) handleFrame(rf frame.RFrame) {
|
|
||||||
switch f := rf.(type) {
|
|
||||||
case *frame.RStreamSyn:
|
|
||||||
// if we're going away, refuse new streams
|
|
||||||
if atomic.LoadInt32(&s.local.goneAway) == 1 {
|
|
||||||
rstF := frame.NewWStreamRst()
|
|
||||||
rstF.Set(f.StreamId(), frame.RefusedStream)
|
|
||||||
go s.writeFrame(rstF, time.Time{})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.StreamId() <= frame.StreamId(atomic.LoadUint32(&s.remote.lastId)) {
|
|
||||||
s.die(frame.ProtocolError, fmt.Errorf("Stream id %d is less than last remote id.", f.StreamId()))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.isLocal(f.StreamId()) {
|
|
||||||
s.die(frame.ProtocolError, fmt.Errorf("Stream id has wrong parity for remote endpoint: %d", f.StreamId()))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// update last remote id
|
|
||||||
atomic.StoreUint32(&s.remote.lastId, uint32(f.StreamId()))
|
|
||||||
|
|
||||||
// make the new stream
|
|
||||||
str := s.newStream(f.StreamId(), f.StreamPriority(), f.StreamType(), false, f.Fin(), s.defaultWindowSize, s)
|
|
||||||
|
|
||||||
// add it to the stream map
|
|
||||||
s.streams.Set(f.StreamId(), str)
|
|
||||||
|
|
||||||
// check if this is an extension stream
|
|
||||||
if f.StreamType() >= MinExtensionType {
|
|
||||||
extAccept, ok := s.exts[f.StreamType()]
|
|
||||||
if !ok {
|
|
||||||
// Extension type of stream not registered
|
|
||||||
fRst := frame.NewWStreamRst()
|
|
||||||
if err := fRst.Set(f.StreamId(), frame.StreamClosed); err != nil {
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wr.Lock()
|
|
||||||
defer s.wr.Unlock()
|
|
||||||
s.transport.WriteFrame(fRst)
|
|
||||||
} else {
|
|
||||||
extAccept <- str
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// put the new stream on the accept channel
|
|
||||||
s.accept <- str
|
|
||||||
|
|
||||||
case *frame.RStreamData:
|
|
||||||
if str := s.getStream(f.StreamId()); str != nil {
|
|
||||||
str.handleStreamData(f)
|
|
||||||
} else {
|
|
||||||
// if we get a data frame on a non-existent connection, we still
|
|
||||||
// need to read out the frame body so that the stream stays in a
|
|
||||||
// good state. read the payload into a throwaway buffer
|
|
||||||
discard := make([]byte, f.Length())
|
|
||||||
io.ReadFull(f.Reader(), discard)
|
|
||||||
|
|
||||||
// DATA frames on closed connections are just stream-level errors
|
|
||||||
fRst := frame.NewWStreamRst()
|
|
||||||
if err := fRst.Set(f.StreamId(), frame.StreamClosed); err != nil {
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wr.Lock()
|
|
||||||
defer s.wr.Unlock()
|
|
||||||
s.transport.WriteFrame(fRst)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
case *frame.RStreamRst:
|
|
||||||
// delegate to the stream to handle these frames
|
|
||||||
if str := s.getStream(f.StreamId()); str != nil {
|
|
||||||
str.handleStreamRst(f)
|
|
||||||
}
|
|
||||||
case *frame.RStreamWndInc:
|
|
||||||
// delegate to the stream to handle these frames
|
|
||||||
if str := s.getStream(f.StreamId()); str != nil {
|
|
||||||
str.handleStreamWndInc(f)
|
|
||||||
}
|
|
||||||
|
|
||||||
case *frame.RGoAway:
|
|
||||||
atomic.StoreInt32(&s.remote.goneAway, 1)
|
|
||||||
s.remoteDebug = f.Debug()
|
|
||||||
|
|
||||||
lastId := f.LastStreamId()
|
|
||||||
s.streams.Each(func(id frame.StreamId, str stream) {
|
|
||||||
// close all streams that we opened above the last handled id
|
|
||||||
if s.isLocal(str.Id()) && str.Id() > lastId {
|
|
||||||
str.closeWith(fmt.Errorf("Remote is going away"))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
default:
|
|
||||||
s.die(frame.ProtocolError, fmt.Errorf("Unrecognized frame type: %v", reflect.TypeOf(f)))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) recoverPanic(prefix string) {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
s.die(frame.InternalError, fmt.Errorf("%s panic: %v", prefix, r))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) getStream(id frame.StreamId) (str stream) {
|
|
||||||
// decide if this id is in the "idle" state (i.e. greater than any we've seen for that parity)
|
|
||||||
var lastId *uint32
|
|
||||||
if s.isLocal(id) {
|
|
||||||
lastId = &s.local.lastId
|
|
||||||
} else {
|
|
||||||
lastId = &s.remote.lastId
|
|
||||||
}
|
|
||||||
|
|
||||||
if uint32(id) > atomic.LoadUint32(lastId) {
|
|
||||||
s.die(frame.ProtocolError, fmt.Errorf("%d is an invalid, unassigned stream id", id))
|
|
||||||
}
|
|
||||||
|
|
||||||
// find the stream in the stream map
|
|
||||||
var ok bool
|
|
||||||
if str, ok = s.streams.Get(id); !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if a stream id is for a client stream. client streams are odd
|
|
||||||
func (s *Session) isClient(id frame.StreamId) bool {
|
|
||||||
return uint32(id)&1 == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) isServer(id frame.StreamId) bool {
|
|
||||||
return !s.isClient(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
//////////////////////////////////////////////
|
|
||||||
// session extensions
|
|
||||||
//////////////////////////////////////////////
|
|
||||||
func (s *Session) startExtension(ext Extension) {
|
|
||||||
accept := make(chan stream)
|
|
||||||
extAccept := func() (IStream, error) {
|
|
||||||
s, ok := <-accept
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("Failed to accept connection, shutting down")
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
extType := ext.Start(s, extAccept)
|
|
||||||
s.exts[extType] = accept
|
|
||||||
}
|
|
||||||
|
|
||||||
//////////////////////////////////////////////
|
|
||||||
// net adaptors
|
|
||||||
//////////////////////////////////////////////
|
|
||||||
func (s *Session) NetDial(_, _ string) (net.Conn, error) {
|
|
||||||
str, err := s.Open()
|
|
||||||
return net.Conn(str), err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Session) NetListener() net.Listener {
|
|
||||||
return &netListenerAdaptor{s}
|
|
||||||
}
|
|
||||||
|
|
||||||
type netListenerAdaptor struct {
|
|
||||||
*Session
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *netListenerAdaptor) Addr() net.Addr {
|
|
||||||
return a.LocalAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *netListenerAdaptor) Accept() (net.Conn, error) {
|
|
||||||
str, err := a.Session.Accept()
|
|
||||||
return net.Conn(str), err
|
|
||||||
}
|
|
|
@ -1,319 +0,0 @@
|
||||||
package proto
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"github.com/inconshreveable/muxado/proto/buffer"
|
|
||||||
"github.com/inconshreveable/muxado/proto/frame"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
zeroTime time.Time
|
|
||||||
resetRemoveDelay = 10 * time.Second
|
|
||||||
closeError = fmt.Errorf("Stream closed")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Stream struct {
|
|
||||||
id frame.StreamId // stream id (const)
|
|
||||||
streamType frame.StreamType // related stream id (const)
|
|
||||||
session session // the parent session (const)
|
|
||||||
inBuffer *buffer.Inbound // buffer for data coming in from the remote side
|
|
||||||
outBuffer *buffer.Outbound // manages size of the outbound window
|
|
||||||
sentRst uint32 // == 1 only if we sent a reset to close this connection
|
|
||||||
writer sync.Mutex // only one writer at a time
|
|
||||||
wdata *frame.WStreamData // the frame this stream is currently writing
|
|
||||||
winc *frame.WStreamWndInc // window increment currently being written
|
|
||||||
readDeadline time.Time // deadline for reads (protected by buffer mutex)
|
|
||||||
writeDeadline time.Time // deadline for writes (protected by writer mutex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// private interface for Streams to call Sessions
|
|
||||||
type session interface {
|
|
||||||
ISession
|
|
||||||
writeFrame(frame.WFrame, time.Time) error
|
|
||||||
die(frame.ErrorCode, error) error
|
|
||||||
removeStream(frame.StreamId)
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////
|
|
||||||
// public interface
|
|
||||||
////////////////////////////////
|
|
||||||
func NewStream(id frame.StreamId, priority frame.StreamPriority, streamType frame.StreamType, finLocal bool, finRemote bool, windowSize uint32, sess session) stream {
|
|
||||||
str := &Stream{
|
|
||||||
id: id,
|
|
||||||
inBuffer: buffer.NewInbound(int(windowSize)),
|
|
||||||
outBuffer: buffer.NewOutbound(int(windowSize)),
|
|
||||||
streamType: streamType,
|
|
||||||
session: sess,
|
|
||||||
wdata: frame.NewWStreamData(),
|
|
||||||
winc: frame.NewWStreamWndInc(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if finLocal {
|
|
||||||
str.inBuffer.SetError(io.EOF)
|
|
||||||
}
|
|
||||||
|
|
||||||
if finRemote {
|
|
||||||
str.outBuffer.SetError(fmt.Errorf("Stream closed"))
|
|
||||||
}
|
|
||||||
|
|
||||||
return str
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) Write(buf []byte) (n int, err error) {
|
|
||||||
return s.write(buf, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) Read(buf []byte) (n int, err error) {
|
|
||||||
// read from the buffer
|
|
||||||
n, err = s.inBuffer.Read(buf)
|
|
||||||
|
|
||||||
// if we read more than zero, we send a window update
|
|
||||||
if n > 0 {
|
|
||||||
errWnd := s.sendWindowUpdate(uint32(n))
|
|
||||||
if errWnd != nil {
|
|
||||||
err = errWnd
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the stream in a manner that attempts to emulate a net.Conn's Close():
|
|
||||||
// - It calls HalfClose() with an empty buffer to half-close the stream on the remote side
|
|
||||||
// - It calls closeWith() so that all future Read/Write operations will fail
|
|
||||||
// - If the stream receives another STREAM_DATA frame from the remote side, it will send a STREAM_RST with a CANCELED error code
|
|
||||||
func (s *Stream) Close() error {
|
|
||||||
s.HalfClose([]byte{})
|
|
||||||
s.closeWith(closeError)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) SetDeadline(deadline time.Time) (err error) {
|
|
||||||
if err = s.SetReadDeadline(deadline); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = s.SetWriteDeadline(deadline); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) SetReadDeadline(dl time.Time) error {
|
|
||||||
s.inBuffer.SetDeadline(dl)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) SetWriteDeadline(dl time.Time) error {
|
|
||||||
s.writer.Lock()
|
|
||||||
s.writeDeadline = dl
|
|
||||||
s.writer.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) HalfClose(buf []byte) (n int, err error) {
|
|
||||||
return s.write(buf, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) Id() frame.StreamId {
|
|
||||||
return s.id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) StreamType() frame.StreamType {
|
|
||||||
return s.streamType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) Session() ISession {
|
|
||||||
return s.session
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) LocalAddr() net.Addr {
|
|
||||||
return s.session.LocalAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) RemoteAddr() net.Addr {
|
|
||||||
return s.session.RemoteAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
/////////////////////////////////////
|
|
||||||
// session's stream interface
|
|
||||||
/////////////////////////////////////
|
|
||||||
func (s *Stream) handleStreamData(f *frame.RStreamData) {
|
|
||||||
// skip writing for zero-length frames (typically for sending FIN)
|
|
||||||
if f.Length() > 0 {
|
|
||||||
// write the data into the buffer
|
|
||||||
if _, err := s.inBuffer.ReadFrom(f.Reader()); err != nil {
|
|
||||||
if err == buffer.FullError {
|
|
||||||
s.resetWith(frame.FlowControlError, fmt.Errorf("Flow control buffer overflowed"))
|
|
||||||
} else if err == closeError {
|
|
||||||
// We're trying to emulate net.Conn's Close() behavior where we close our side of the connection,
|
|
||||||
// and if we get any more frames from the other side, we RST it.
|
|
||||||
s.resetWith(frame.Cancel, fmt.Errorf("Stream closed"))
|
|
||||||
} else if err == buffer.AlreadyClosed {
|
|
||||||
// there was already an error set
|
|
||||||
s.resetWith(frame.StreamClosed, err)
|
|
||||||
} else {
|
|
||||||
// the transport returned some sort of IO error
|
|
||||||
s.die(frame.ProtocolError, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.Fin() {
|
|
||||||
s.inBuffer.SetError(io.EOF)
|
|
||||||
s.maybeRemove()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) handleStreamRst(f *frame.RStreamRst) {
|
|
||||||
s.closeWith(fmt.Errorf("Stream reset by peer with error %d", f.ErrorCode()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) handleStreamWndInc(f *frame.RStreamWndInc) {
|
|
||||||
s.outBuffer.Increment(int(f.WindowIncrement()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) closeWith(err error) {
|
|
||||||
s.outBuffer.SetError(err)
|
|
||||||
s.inBuffer.SetError(err)
|
|
||||||
s.session.removeStream(s.id)
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////
|
|
||||||
// internal methods
|
|
||||||
////////////////////////////////
|
|
||||||
|
|
||||||
func (s *Stream) closeWithAndRemoveLater(err error) {
|
|
||||||
s.outBuffer.SetError(err)
|
|
||||||
s.inBuffer.SetError(err)
|
|
||||||
time.AfterFunc(resetRemoveDelay, func() {
|
|
||||||
s.session.removeStream(s.id)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) maybeRemove() {
|
|
||||||
if buffer.BothClosed(s.inBuffer, s.outBuffer) {
|
|
||||||
s.session.removeStream(s.id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) resetWith(errorCode frame.ErrorCode, resetErr error) {
|
|
||||||
// only ever send one reset
|
|
||||||
if !atomic.CompareAndSwapUint32(&s.sentRst, 0, 1) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// close the stream
|
|
||||||
s.closeWithAndRemoveLater(resetErr)
|
|
||||||
|
|
||||||
// make the reset frame
|
|
||||||
rst := frame.NewWStreamRst()
|
|
||||||
if err := rst.Set(s.id, errorCode); err != nil {
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// need write lock to make sure no data frames get sent after we send the reset
|
|
||||||
s.writer.Lock()
|
|
||||||
|
|
||||||
// send it
|
|
||||||
if err := s.session.writeFrame(rst, zeroTime); err != nil {
|
|
||||||
s.writer.Unlock()
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.writer.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) write(buf []byte, fin bool) (n int, err error) {
|
|
||||||
// a write call can pass a buffer larger that we can send in a single frame
|
|
||||||
// only allow one writer at a time to prevent interleaving frames from concurrent writes
|
|
||||||
s.writer.Lock()
|
|
||||||
|
|
||||||
bufSize := len(buf)
|
|
||||||
bytesRemaining := bufSize
|
|
||||||
for bytesRemaining > 0 || fin {
|
|
||||||
// figure out the most we can write in a single frame
|
|
||||||
writeReqSize := min(0x3FFF, bytesRemaining)
|
|
||||||
|
|
||||||
// and then reduce that to however much is available in the window
|
|
||||||
// this blocks until window is available and may not return all that we asked for
|
|
||||||
var writeSize int
|
|
||||||
if writeSize, err = s.outBuffer.Decrement(writeReqSize); err != nil {
|
|
||||||
s.writer.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculate the slice of the buffer we'll write
|
|
||||||
start, end := n, n+writeSize
|
|
||||||
|
|
||||||
// only send fin for the last frame
|
|
||||||
finBit := fin && end == bufSize
|
|
||||||
|
|
||||||
// make the frame
|
|
||||||
if err = s.wdata.Set(s.id, buf[start:end], finBit); err != nil {
|
|
||||||
s.writer.Unlock()
|
|
||||||
s.die(frame.InternalError, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// write the frame
|
|
||||||
if err = s.session.writeFrame(s.wdata, s.writeDeadline); err != nil {
|
|
||||||
s.writer.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// update our counts
|
|
||||||
n += writeSize
|
|
||||||
bytesRemaining -= writeSize
|
|
||||||
|
|
||||||
if finBit {
|
|
||||||
s.outBuffer.SetError(fmt.Errorf("Stream closed"))
|
|
||||||
s.maybeRemove()
|
|
||||||
|
|
||||||
// handles the empty buffer case with fin case
|
|
||||||
fin = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.writer.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendWindowUpdate sends a window increment frame
|
|
||||||
// with the given increment
|
|
||||||
func (s *Stream) sendWindowUpdate(inc uint32) (err error) {
|
|
||||||
// send a window update
|
|
||||||
if err = s.winc.Set(s.id, inc); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX: write this async? We can only write one at
|
|
||||||
// a time if we're not allocating new ones from the heap
|
|
||||||
if err = s.session.writeFrame(s.winc, zeroTime); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// die is called when a protocol error occurs and the entire
|
|
||||||
// session must be destroyed.
|
|
||||||
func (s *Stream) die(errorCode frame.ErrorCode, err error) {
|
|
||||||
s.closeWith(fmt.Errorf("Stream closed on error: %v", err))
|
|
||||||
s.session.die(errorCode, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func min(n1, n2 int) int {
|
|
||||||
if n1 > n2 {
|
|
||||||
return n2
|
|
||||||
} else {
|
|
||||||
return n1
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,59 +0,0 @@
|
||||||
package proto
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/inconshreveable/muxado/proto/frame"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
initMapCapacity = 128 // not too much extra memory wasted to avoid allocations
|
|
||||||
)
|
|
||||||
|
|
||||||
type StreamMap interface {
|
|
||||||
Get(frame.StreamId) (stream, bool)
|
|
||||||
Set(frame.StreamId, stream)
|
|
||||||
Delete(frame.StreamId)
|
|
||||||
Each(func(frame.StreamId, stream))
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConcurrentStreamMap is a map of stream ids -> streams guarded by a read/write lock
|
|
||||||
type ConcurrentStreamMap struct {
|
|
||||||
sync.RWMutex
|
|
||||||
table map[frame.StreamId]stream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *ConcurrentStreamMap) Get(id frame.StreamId) (s stream, ok bool) {
|
|
||||||
m.RLock()
|
|
||||||
s, ok = m.table[id]
|
|
||||||
m.RUnlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *ConcurrentStreamMap) Set(id frame.StreamId, str stream) {
|
|
||||||
m.Lock()
|
|
||||||
m.table[id] = str
|
|
||||||
m.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *ConcurrentStreamMap) Delete(id frame.StreamId) {
|
|
||||||
m.Lock()
|
|
||||||
delete(m.table, id)
|
|
||||||
m.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *ConcurrentStreamMap) Each(fn func(frame.StreamId, stream)) {
|
|
||||||
m.Lock()
|
|
||||||
streams := make(map[frame.StreamId]stream, len(m.table))
|
|
||||||
for k, v := range m.table {
|
|
||||||
streams[k] = v
|
|
||||||
}
|
|
||||||
m.Unlock()
|
|
||||||
|
|
||||||
for id, str := range streams {
|
|
||||||
fn(id, str)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewConcurrentStreamMap() *ConcurrentStreamMap {
|
|
||||||
return &ConcurrentStreamMap{table: make(map[frame.StreamId]stream, initMapCapacity)}
|
|
||||||
}
|
|
|
@ -1,71 +0,0 @@
|
||||||
package muxado
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/tls"
|
|
||||||
"github.com/inconshreveable/muxado/proto"
|
|
||||||
"github.com/inconshreveable/muxado/proto/ext"
|
|
||||||
"net"
|
|
||||||
)
|
|
||||||
|
|
||||||
// A Listener accepts new connections from its net.Listener
|
|
||||||
// and begins muxado server connections on them.
|
|
||||||
//
|
|
||||||
// It's API is very similar to a net.Listener, but it returns
|
|
||||||
// muxado.Sessions instead of net.Conn's.
|
|
||||||
type Listener struct {
|
|
||||||
wrapped net.Listener
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept the next connection from the listener and begin
|
|
||||||
// a muxado session on it.
|
|
||||||
func (l *Listener) Accept() (Session, error) {
|
|
||||||
conn, err := l.wrapped.Accept()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return Server(conn), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Addr returns the bound address of the wrapped net.Listener
|
|
||||||
func (l *Listener) Addr() net.Addr {
|
|
||||||
return l.wrapped.Addr()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the wrapped net.Listener
|
|
||||||
func (l *Listener) Close() error {
|
|
||||||
return l.wrapped.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Server returns a muxado server session using conn as the transport.
|
|
||||||
func Server(conn net.Conn) Session {
|
|
||||||
return &sessionAdaptor{proto.NewSession(conn, proto.NewStream, false, []proto.Extension{ext.NewDefaultHeartbeat()})}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Listen binds to a network address and returns a Listener which accepts
|
|
||||||
// new connections and starts muxado server sessions on them.
|
|
||||||
func Listen(network, addr string) (*Listener, error) {
|
|
||||||
l, err := net.Listen(network, addr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Listener{l}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListenTLS binds to a network address and accepts new TLS-encrypted connections.
|
|
||||||
// It returns a Listener which starts new muxado server sessions on the connections.
|
|
||||||
func ListenTLS(network, addr string, tlsConfig *tls.Config) (*Listener, error) {
|
|
||||||
l, err := tls.Listen(network, addr, tlsConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Listener{l}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewListener creates a new muxado listener which creates new muxado server sessions
|
|
||||||
// by accepting connections from the given net.Listener
|
|
||||||
func NewListener(l net.Listener) *Listener {
|
|
||||||
return &Listener{l}
|
|
||||||
}
|
|
|
@ -399,26 +399,6 @@
|
||||||
"revision": "d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd",
|
"revision": "d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd",
|
||||||
"revisionTime": "2016-07-20T23:31:40Z"
|
"revisionTime": "2016-07-20T23:31:40Z"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"path": "github.com/inconshreveable/muxado",
|
|
||||||
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "github.com/inconshreveable/muxado/proto",
|
|
||||||
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "github.com/inconshreveable/muxado/proto/buffer",
|
|
||||||
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "github.com/inconshreveable/muxado/proto/ext",
|
|
||||||
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "github.com/inconshreveable/muxado/proto/frame",
|
|
||||||
"revision": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"checksumSHA1": "xZuhljnmBysJPta/lMyYmJdujCg=",
|
"checksumSHA1": "xZuhljnmBysJPta/lMyYmJdujCg=",
|
||||||
"path": "github.com/mattn/go-isatty",
|
"path": "github.com/mattn/go-isatty",
|
||||||
|
|
|
@ -16,6 +16,15 @@ standard upgrade flow.
|
||||||
|
|
||||||
## Consul 0.7
|
## Consul 0.7
|
||||||
|
|
||||||
|
#### Dropped Support for Protocol Version 1
|
||||||
|
|
||||||
|
Consul version 0.7 dropped support for protocol version 1, which means it
|
||||||
|
is no longer compatible with versions of Consul prior to 0.3. You will need
|
||||||
|
to upgrade all agents to a newer version of Consul before upgrading to Consul
|
||||||
|
0.7.
|
||||||
|
|
||||||
|
#### Prepared Query Changes
|
||||||
|
|
||||||
Consul version 0.7 adds a feature which allows prepared queries to store a
|
Consul version 0.7 adds a feature which allows prepared queries to store a
|
||||||
["Near" parameter](/docs/agent/http/query.html#near) in the query definition
|
["Near" parameter](/docs/agent/http/query.html#near) in the query definition
|
||||||
itself. This feature enables using the distance sorting features of prepared
|
itself. This feature enables using the distance sorting features of prepared
|
||||||
|
|
Loading…
Reference in New Issue