mirror of https://github.com/hashicorp/consul
Merge pull request #10161 from hashicorp/dnephin/update-deps
Update a couple dependenciespull/10178/head
commit
981fb5322a
|
@ -0,0 +1,9 @@
|
||||||
|
```release-note:bug
|
||||||
|
memberlist: fixes a couple bugs which allowed malformed input to cause a crash in a Consul
|
||||||
|
client or server.
|
||||||
|
```
|
||||||
|
|
||||||
|
```release-note:bug
|
||||||
|
telemetry: fixes a bug with Prometheus metrics where Gauges and Summaries were incorrectly
|
||||||
|
being expired.
|
||||||
|
```
|
|
@ -21,7 +21,6 @@ import (
|
||||||
connlimit "github.com/hashicorp/go-connlimit"
|
connlimit "github.com/hashicorp/go-connlimit"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/memberlist"
|
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
autopilot "github.com/hashicorp/raft-autopilot"
|
autopilot "github.com/hashicorp/raft-autopilot"
|
||||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||||
|
@ -35,6 +34,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/wanfed"
|
||||||
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
|
@ -251,7 +251,7 @@ type Server struct {
|
||||||
// serfWAN is the Serf cluster maintained between DC's
|
// serfWAN is the Serf cluster maintained between DC's
|
||||||
// which SHOULD only consist of Consul servers
|
// which SHOULD only consist of Consul servers
|
||||||
serfWAN *serf.Serf
|
serfWAN *serf.Serf
|
||||||
memberlistTransportWAN memberlist.IngestionAwareTransport
|
memberlistTransportWAN wanfed.IngestionAwareTransport
|
||||||
gatewayLocator *GatewayLocator
|
gatewayLocator *GatewayLocator
|
||||||
|
|
||||||
// serverLookup tracks server consuls in the local datacenter.
|
// serverLookup tracks server consuls in the local datacenter.
|
||||||
|
@ -500,7 +500,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||||
|
|
||||||
// This is always a *memberlist.NetTransport or something which wraps
|
// This is always a *memberlist.NetTransport or something which wraps
|
||||||
// it which satisfies this interface.
|
// it which satisfies this interface.
|
||||||
s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(memberlist.IngestionAwareTransport)
|
s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(wanfed.IngestionAwareTransport)
|
||||||
|
|
||||||
// See big comment above why we are doing this.
|
// See big comment above why we are doing this.
|
||||||
if serfBindPortWAN == 0 {
|
if serfBindPortWAN == 0 {
|
||||||
|
|
|
@ -8,10 +8,11 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/memberlist"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/memberlist"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -29,9 +30,15 @@ const (
|
||||||
|
|
||||||
type MeshGatewayResolver func(datacenter string) string
|
type MeshGatewayResolver func(datacenter string) string
|
||||||
|
|
||||||
|
type IngestionAwareTransport interface {
|
||||||
|
memberlist.NodeAwareTransport
|
||||||
|
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
|
||||||
|
IngestStream(conn net.Conn) error
|
||||||
|
}
|
||||||
|
|
||||||
func NewTransport(
|
func NewTransport(
|
||||||
tlsConfigurator *tlsutil.Configurator,
|
tlsConfigurator *tlsutil.Configurator,
|
||||||
transport memberlist.NodeAwareTransport,
|
transport IngestionAwareTransport,
|
||||||
datacenter string,
|
datacenter string,
|
||||||
gwResolver MeshGatewayResolver,
|
gwResolver MeshGatewayResolver,
|
||||||
) (*Transport, error) {
|
) (*Transport, error) {
|
||||||
|
@ -48,17 +55,17 @@ func NewTransport(
|
||||||
}
|
}
|
||||||
|
|
||||||
t := &Transport{
|
t := &Transport{
|
||||||
NodeAwareTransport: transport,
|
IngestionAwareTransport: transport,
|
||||||
tlsConfigurator: tlsConfigurator,
|
tlsConfigurator: tlsConfigurator,
|
||||||
datacenter: datacenter,
|
datacenter: datacenter,
|
||||||
gwResolver: gwResolver,
|
gwResolver: gwResolver,
|
||||||
pool: cp,
|
pool: cp,
|
||||||
}
|
}
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Transport struct {
|
type Transport struct {
|
||||||
memberlist.NodeAwareTransport
|
IngestionAwareTransport
|
||||||
|
|
||||||
tlsConfigurator *tlsutil.Configurator
|
tlsConfigurator *tlsutil.Configurator
|
||||||
datacenter string
|
datacenter string
|
||||||
|
@ -71,7 +78,7 @@ var _ memberlist.NodeAwareTransport = (*Transport)(nil)
|
||||||
// Shutdown implements memberlist.Transport.
|
// Shutdown implements memberlist.Transport.
|
||||||
func (t *Transport) Shutdown() error {
|
func (t *Transport) Shutdown() error {
|
||||||
err1 := t.pool.Close()
|
err1 := t.pool.Close()
|
||||||
err2 := t.NodeAwareTransport.Shutdown()
|
err2 := t.IngestionAwareTransport.Shutdown()
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
// the more important error is err2
|
// the more important error is err2
|
||||||
return err2
|
return err2
|
||||||
|
@ -118,7 +125,7 @@ func (t *Transport) WriteToAddress(b []byte, addr memberlist.Address) (time.Time
|
||||||
return time.Now(), nil
|
return time.Now(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.NodeAwareTransport.WriteToAddress(b, addr)
|
return t.IngestionAwareTransport.WriteToAddress(b, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DialAddressTimeout implements memberlist.NodeAwareTransport.
|
// DialAddressTimeout implements memberlist.NodeAwareTransport.
|
||||||
|
@ -137,7 +144,7 @@ func (t *Transport) DialAddressTimeout(addr memberlist.Address, timeout time.Dur
|
||||||
return t.dial(dc, node, pool.ALPN_WANGossipStream, gwAddr)
|
return t.dial(dc, node, pool.ALPN_WANGossipStream, gwAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.NodeAwareTransport.DialAddressTimeout(addr, timeout)
|
return t.IngestionAwareTransport.DialAddressTimeout(addr, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: There is a close mirror of this method in agent/pool/pool.go:DialTimeoutWithRPCType
|
// NOTE: There is a close mirror of this method in agent/pool/pool.go:DialTimeoutWithRPCType
|
||||||
|
|
6
go.mod
6
go.mod
|
@ -12,7 +12,7 @@ require (
|
||||||
github.com/Microsoft/go-winio v0.4.3 // indirect
|
github.com/Microsoft/go-winio v0.4.3 // indirect
|
||||||
github.com/NYTimes/gziphandler v1.0.1
|
github.com/NYTimes/gziphandler v1.0.1
|
||||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
|
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
|
||||||
github.com/armon/go-metrics v0.3.7
|
github.com/armon/go-metrics v0.3.8
|
||||||
github.com/armon/go-radix v1.0.0
|
github.com/armon/go-radix v1.0.0
|
||||||
github.com/aws/aws-sdk-go v1.25.41
|
github.com/aws/aws-sdk-go v1.25.41
|
||||||
github.com/coredns/coredns v1.1.2
|
github.com/coredns/coredns v1.1.2
|
||||||
|
@ -50,9 +50,9 @@ require (
|
||||||
github.com/hashicorp/hcl v1.0.0
|
github.com/hashicorp/hcl v1.0.0
|
||||||
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038
|
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038
|
||||||
github.com/hashicorp/mdns v1.0.4 // indirect
|
github.com/hashicorp/mdns v1.0.4 // indirect
|
||||||
github.com/hashicorp/memberlist v0.2.3
|
github.com/hashicorp/memberlist v0.2.4
|
||||||
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
|
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
|
||||||
github.com/hashicorp/raft v1.3.0
|
github.com/hashicorp/raft v1.3.1
|
||||||
github.com/hashicorp/raft-autopilot v0.1.2
|
github.com/hashicorp/raft-autopilot v0.1.2
|
||||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||||
github.com/hashicorp/serf v0.9.5
|
github.com/hashicorp/serf v0.9.5
|
||||||
|
|
12
go.sum
12
go.sum
|
@ -58,8 +58,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
|
||||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
||||||
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
|
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
|
||||||
github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs=
|
github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs=
|
||||||
github.com/armon/go-metrics v0.3.7 h1:c/oCtWzYpboy6+6f6LjXRlyW7NwA2SWf+a9KMlHq/bM=
|
github.com/armon/go-metrics v0.3.8 h1:oOxq3KPj0WhCuy50EhzwiyMyG2ovRQZpZLXQuOh2a/M=
|
||||||
github.com/armon/go-metrics v0.3.7/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
|
github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
|
||||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||||
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
|
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
|
||||||
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||||
|
@ -274,14 +274,14 @@ github.com/hashicorp/mdns v1.0.1/go.mod h1:4gW7WsVCke5TE7EPeYliwHlRUyBtfCwuFwuMg
|
||||||
github.com/hashicorp/mdns v1.0.4 h1:sY0CMhFmjIPDMlTB+HfymFHCaYLhgifZ0QhjaYKD/UQ=
|
github.com/hashicorp/mdns v1.0.4 h1:sY0CMhFmjIPDMlTB+HfymFHCaYLhgifZ0QhjaYKD/UQ=
|
||||||
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
|
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
|
||||||
github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
|
github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
|
||||||
github.com/hashicorp/memberlist v0.2.3 h1:BwZa5IjREr75J0am7nblP+X5i95Rmp8EEbMI5vkUWdA=
|
github.com/hashicorp/memberlist v0.2.4 h1:OOhYzSvFnkFQXm1ysE8RjXTHsqSRDyP4emusC9K7DYg=
|
||||||
github.com/hashicorp/memberlist v0.2.3/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
|
github.com/hashicorp/memberlist v0.2.4/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
|
||||||
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
|
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
|
||||||
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
|
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
|
||||||
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||||
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||||
github.com/hashicorp/raft v1.3.0 h1:Wox4J4R7J2FOJLtTa6hdk0VJfiNUSP32pYoYR738bkE=
|
github.com/hashicorp/raft v1.3.1 h1:zDT8ke8y2aP4wf9zPTB2uSIeavJ3Hx/ceY4jxI2JxuY=
|
||||||
github.com/hashicorp/raft v1.3.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
|
github.com/hashicorp/raft v1.3.1/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
|
||||||
github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs=
|
github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs=
|
||||||
github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
|
github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
|
||||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
|
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
|
||||||
|
|
|
@ -245,6 +245,8 @@ func (i *InmemSink) Data() []*IntervalMetrics {
|
||||||
copyCurrent := intervals[n-1]
|
copyCurrent := intervals[n-1]
|
||||||
current.RLock()
|
current.RLock()
|
||||||
*copyCurrent = *current
|
*copyCurrent = *current
|
||||||
|
// RWMutex is not safe to copy, so create a new instance on the copy
|
||||||
|
copyCurrent.RWMutex = sync.RWMutex{}
|
||||||
|
|
||||||
copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
|
copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
|
||||||
for k, v := range current.Gauges {
|
for k, v := range current.Gauges {
|
||||||
|
|
|
@ -355,6 +355,10 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
|
func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
|
||||||
|
if len(buf) < 1 {
|
||||||
|
m.logger.Printf("[ERR] memberlist: missing message type byte %s", LogAddress(from))
|
||||||
|
return
|
||||||
|
}
|
||||||
// Decode the message type
|
// Decode the message type
|
||||||
msgType := messageType(buf[0])
|
msgType := messageType(buf[0])
|
||||||
buf = buf[1:]
|
buf = buf[1:]
|
||||||
|
|
|
@ -82,16 +82,18 @@ func (a *Address) String() string {
|
||||||
return a.Addr
|
return a.Addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IngestionAwareTransport is not used.
|
||||||
|
//
|
||||||
|
// Deprecated: IngestionAwareTransport is not used and may be removed in a future
|
||||||
|
// version. Define the interface locally instead of referencing this exported
|
||||||
|
// interface.
|
||||||
type IngestionAwareTransport interface {
|
type IngestionAwareTransport interface {
|
||||||
Transport
|
|
||||||
// IngestPacket pulls a single packet off the conn, and only closes it if shouldClose is true.
|
|
||||||
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
|
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
|
||||||
// IngestStream hands off the conn to the transport and doesn't close it.
|
|
||||||
IngestStream(conn net.Conn) error
|
IngestStream(conn net.Conn) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodeAwareTransport interface {
|
type NodeAwareTransport interface {
|
||||||
IngestionAwareTransport
|
Transport
|
||||||
WriteToAddress(b []byte, addr Address) (time.Time, error)
|
WriteToAddress(b []byte, addr Address) (time.Time, error)
|
||||||
DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error)
|
DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error)
|
||||||
}
|
}
|
||||||
|
@ -102,22 +104,6 @@ type shimNodeAwareTransport struct {
|
||||||
|
|
||||||
var _ NodeAwareTransport = (*shimNodeAwareTransport)(nil)
|
var _ NodeAwareTransport = (*shimNodeAwareTransport)(nil)
|
||||||
|
|
||||||
func (t *shimNodeAwareTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error {
|
|
||||||
iat, ok := t.Transport.(IngestionAwareTransport)
|
|
||||||
if !ok {
|
|
||||||
panic("shimNodeAwareTransport does not support IngestPacket")
|
|
||||||
}
|
|
||||||
return iat.IngestPacket(conn, addr, now, shouldClose)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *shimNodeAwareTransport) IngestStream(conn net.Conn) error {
|
|
||||||
iat, ok := t.Transport.(IngestionAwareTransport)
|
|
||||||
if !ok {
|
|
||||||
panic("shimNodeAwareTransport does not support IngestStream")
|
|
||||||
}
|
|
||||||
return iat.IngestStream(conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Time, error) {
|
func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Time, error) {
|
||||||
return t.WriteTo(b, addr.Addr)
|
return t.WriteTo(b, addr.Addr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,18 +185,18 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
|
||||||
err = fmt.Errorf("missing compound length byte")
|
err = fmt.Errorf("missing compound length byte")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
numParts := uint8(buf[0])
|
numParts := int(buf[0])
|
||||||
buf = buf[1:]
|
buf = buf[1:]
|
||||||
|
|
||||||
// Check we have enough bytes
|
// Check we have enough bytes
|
||||||
if len(buf) < int(numParts*2) {
|
if len(buf) < numParts*2 {
|
||||||
err = fmt.Errorf("truncated len slice")
|
err = fmt.Errorf("truncated len slice")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode the lengths
|
// Decode the lengths
|
||||||
lengths := make([]uint16, numParts)
|
lengths := make([]uint16, numParts)
|
||||||
for i := 0; i < int(numParts); i++ {
|
for i := 0; i < numParts; i++ {
|
||||||
lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
|
lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
|
||||||
}
|
}
|
||||||
buf = buf[numParts*2:]
|
buf = buf[numParts*2:]
|
||||||
|
@ -204,7 +204,7 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
|
||||||
// Split each message
|
// Split each message
|
||||||
for idx, msgLen := range lengths {
|
for idx, msgLen := range lengths {
|
||||||
if len(buf) < int(msgLen) {
|
if len(buf) < int(msgLen) {
|
||||||
trunc = int(numParts) - idx
|
trunc = numParts - idx
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -564,7 +564,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
|
||||||
r.logger.Error("failed to get log", "index", index, "error", err)
|
r.logger.Error("failed to get log", "index", index, "error", err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
r.processConfigurationLogEntry(&entry)
|
if err := r.processConfigurationLogEntry(&entry); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
r.logger.Info("initial configuration",
|
r.logger.Info("initial configuration",
|
||||||
"index", r.configurations.latestIndex,
|
"index", r.configurations.latestIndex,
|
||||||
|
@ -627,7 +629,10 @@ func (r *Raft) restoreSnapshot() error {
|
||||||
conf = snapshot.Configuration
|
conf = snapshot.Configuration
|
||||||
index = snapshot.ConfigurationIndex
|
index = snapshot.ConfigurationIndex
|
||||||
} else {
|
} else {
|
||||||
conf = decodePeers(snapshot.Peers, r.trans)
|
var err error
|
||||||
|
if conf, err = decodePeers(snapshot.Peers, r.trans); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
index = snapshot.Index
|
index = snapshot.Index
|
||||||
}
|
}
|
||||||
r.setCommittedConfiguration(conf, index)
|
r.setCommittedConfiguration(conf, index)
|
||||||
|
|
|
@ -319,11 +319,11 @@ func encodePeers(configuration Configuration, trans Transport) []byte {
|
||||||
// decodePeers is used to deserialize an old list of peers into a Configuration.
|
// decodePeers is used to deserialize an old list of peers into a Configuration.
|
||||||
// This is here for backwards compatibility with old log entries and snapshots;
|
// This is here for backwards compatibility with old log entries and snapshots;
|
||||||
// it should be removed eventually.
|
// it should be removed eventually.
|
||||||
func decodePeers(buf []byte, trans Transport) Configuration {
|
func decodePeers(buf []byte, trans Transport) (Configuration, error) {
|
||||||
// Decode the buffer first.
|
// Decode the buffer first.
|
||||||
var encPeers [][]byte
|
var encPeers [][]byte
|
||||||
if err := decodeMsgPack(buf, &encPeers); err != nil {
|
if err := decodeMsgPack(buf, &encPeers); err != nil {
|
||||||
panic(fmt.Errorf("failed to decode peers: %v", err))
|
return Configuration{}, fmt.Errorf("failed to decode peers: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deserialize each peer.
|
// Deserialize each peer.
|
||||||
|
@ -333,13 +333,11 @@ func decodePeers(buf []byte, trans Transport) Configuration {
|
||||||
servers = append(servers, Server{
|
servers = append(servers, Server{
|
||||||
Suffrage: Voter,
|
Suffrage: Voter,
|
||||||
ID: ServerID(p),
|
ID: ServerID(p),
|
||||||
Address: ServerAddress(p),
|
Address: p,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return Configuration{
|
return Configuration{Servers: servers}, nil
|
||||||
Servers: servers,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeConfiguration serializes a Configuration using MsgPack, or panics on
|
// EncodeConfiguration serializes a Configuration using MsgPack, or panics on
|
||||||
|
|
|
@ -244,8 +244,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error {
|
||||||
}
|
}
|
||||||
r.setCurrentTerm(1)
|
r.setCurrentTerm(1)
|
||||||
r.setLastLog(entry.Index, entry.Term)
|
r.setLastLog(entry.Index, entry.Term)
|
||||||
r.processConfigurationLogEntry(&entry)
|
return r.processConfigurationLogEntry(&entry)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// runCandidate runs the FSM for a candidate.
|
// runCandidate runs the FSM for a candidate.
|
||||||
|
@ -1383,7 +1382,13 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
|
||||||
|
|
||||||
// Handle any new configuration changes
|
// Handle any new configuration changes
|
||||||
for _, newEntry := range newEntries {
|
for _, newEntry := range newEntries {
|
||||||
r.processConfigurationLogEntry(newEntry)
|
if err := r.processConfigurationLogEntry(newEntry); err != nil {
|
||||||
|
r.logger.Warn("failed to append entry",
|
||||||
|
"index", newEntry.Index,
|
||||||
|
"error", err)
|
||||||
|
rpcErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the lastLog
|
// Update the lastLog
|
||||||
|
@ -1415,14 +1420,21 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
|
||||||
// processConfigurationLogEntry takes a log entry and updates the latest
|
// processConfigurationLogEntry takes a log entry and updates the latest
|
||||||
// configuration if the entry results in a new configuration. This must only be
|
// configuration if the entry results in a new configuration. This must only be
|
||||||
// called from the main thread, or from NewRaft() before any threads have begun.
|
// called from the main thread, or from NewRaft() before any threads have begun.
|
||||||
func (r *Raft) processConfigurationLogEntry(entry *Log) {
|
func (r *Raft) processConfigurationLogEntry(entry *Log) error {
|
||||||
if entry.Type == LogConfiguration {
|
switch entry.Type {
|
||||||
|
case LogConfiguration:
|
||||||
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
|
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
|
||||||
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
|
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
|
||||||
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
|
|
||||||
|
case LogAddPeerDeprecated, LogRemovePeerDeprecated:
|
||||||
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
|
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
|
||||||
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
|
conf, err := decodePeers(entry.Data, r.trans)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.setLatestConfiguration(conf, entry.Index)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// requestVote is invoked when we get an request vote RPC call.
|
// requestVote is invoked when we get an request vote RPC call.
|
||||||
|
@ -1574,7 +1586,11 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
|
||||||
reqConfiguration = DecodeConfiguration(req.Configuration)
|
reqConfiguration = DecodeConfiguration(req.Configuration)
|
||||||
reqConfigurationIndex = req.ConfigurationIndex
|
reqConfigurationIndex = req.ConfigurationIndex
|
||||||
} else {
|
} else {
|
||||||
reqConfiguration = decodePeers(req.Peers, r.trans)
|
reqConfiguration, rpcErr = decodePeers(req.Peers, r.trans)
|
||||||
|
if rpcErr != nil {
|
||||||
|
r.logger.Error("failed to install snapshot", "error", rpcErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
reqConfigurationIndex = req.LastLogIndex
|
reqConfigurationIndex = req.LastLogIndex
|
||||||
}
|
}
|
||||||
version := getSnapshotVersion(r.protocolVersion)
|
version := getSnapshotVersion(r.protocolVersion)
|
||||||
|
|
|
@ -32,7 +32,7 @@ github.com/NYTimes/gziphandler
|
||||||
github.com/StackExchange/wmi
|
github.com/StackExchange/wmi
|
||||||
# github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
|
# github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
|
||||||
github.com/armon/circbuf
|
github.com/armon/circbuf
|
||||||
# github.com/armon/go-metrics v0.3.7
|
# github.com/armon/go-metrics v0.3.8
|
||||||
github.com/armon/go-metrics
|
github.com/armon/go-metrics
|
||||||
github.com/armon/go-metrics/circonus
|
github.com/armon/go-metrics/circonus
|
||||||
github.com/armon/go-metrics/datadog
|
github.com/armon/go-metrics/datadog
|
||||||
|
@ -477,11 +477,11 @@ github.com/hashicorp/hil/parser
|
||||||
github.com/hashicorp/hil/scanner
|
github.com/hashicorp/hil/scanner
|
||||||
# github.com/hashicorp/mdns v1.0.4
|
# github.com/hashicorp/mdns v1.0.4
|
||||||
github.com/hashicorp/mdns
|
github.com/hashicorp/mdns
|
||||||
# github.com/hashicorp/memberlist v0.2.3
|
# github.com/hashicorp/memberlist v0.2.4
|
||||||
github.com/hashicorp/memberlist
|
github.com/hashicorp/memberlist
|
||||||
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
|
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
|
||||||
github.com/hashicorp/net-rpc-msgpackrpc
|
github.com/hashicorp/net-rpc-msgpackrpc
|
||||||
# github.com/hashicorp/raft v1.3.0
|
# github.com/hashicorp/raft v1.3.1
|
||||||
github.com/hashicorp/raft
|
github.com/hashicorp/raft
|
||||||
# github.com/hashicorp/raft-autopilot v0.1.2
|
# github.com/hashicorp/raft-autopilot v0.1.2
|
||||||
github.com/hashicorp/raft-autopilot
|
github.com/hashicorp/raft-autopilot
|
||||||
|
|
Loading…
Reference in New Issue