Merge pull request #5485 from hashicorp/change-node-id

Allow nodes to change IDs when replacing a dead node
pull/5849/head
Kyle Havlovitz 2019-05-15 12:18:13 -07:00 committed by GitHub
commit b15cb60851
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 276 additions and 63 deletions

View File

@ -537,6 +537,10 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
// Allow dead nodes to be replaced after 30 seconds.
conf.SerfLANConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second
conf.SerfWANConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second
// Raft protocol version 3 only works with other Consul servers running
// 0.8.0 or later.
conf.RaftConfig.ProtocolVersion = 3

View File

@ -1380,11 +1380,12 @@ AFTER_CHECK:
Status: api.HealthPassing,
Output: structs.SerfCheckAliveOutput,
},
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate: true,
}
if node != nil {
req.TaggedAddresses = node.TaggedAddresses
req.NodeMeta = node.Meta
}
_, err = s.raftApply(structs.RegisterRequestType, &req)
return err
}

View File

@ -953,6 +953,69 @@ func TestLeader_ChangeServerID(t *testing.T) {
})
}
func TestLeader_ChangeNodeID(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
testrpc.WaitForTestAgent(t, s.RPC, "dc1")
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Shut down a server, freeing up its address/port
s3.Shutdown()
retry.Run(t, func(r *retry.R) {
failed := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusFailed {
failed++
}
}
require.Equal(r, 1, failed)
})
// Bring up a new server with s3's name that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.NodeName = s3.config.NodeName
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s4, s1)
servers[2] = s4
// Make sure the dead server is gone from both Raft and Serf and we're back to 3 total peers
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
retry.Run(t, func(r *retry.R) {
for _, m := range s1.LANMembers() {
require.Equal(r, serf.StatusAlive, m.Status)
}
})
}
func TestLeader_ACL_Initialization(t *testing.T) {
t.Parallel()

View File

@ -69,6 +69,7 @@ func testServerConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.DeadNodeReclaimTime = 100 * time.Millisecond
config.SerfWANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfWANConfig.MemberlistConfig.BindPort = ports[2]
@ -77,6 +78,7 @@ func testServerConfig(t *testing.T) (string, *Config) {
config.SerfWANConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond
config.SerfWANConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
config.SerfWANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.SerfWANConfig.MemberlistConfig.DeadNodeReclaimTime = 100 * time.Millisecond
config.RaftConfig.LeaderLeaseTimeout = 100 * time.Millisecond
config.RaftConfig.HeartbeatTimeout = 200 * time.Millisecond

View File

@ -369,7 +369,24 @@ func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *memdb.Txn, node *structs.Node
for nodeIt := enodes.Next(); nodeIt != nil; nodeIt = enodes.Next() {
enode := nodeIt.(*structs.Node)
if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID {
if !(enode.ID == "" && allowClashWithoutID) {
// Look up the existing node's Serf health check to see if it's failed.
// If it is, the node can be renamed.
enodeCheck, err := tx.First("checks", "id", enode.Node, string(structs.SerfCheckID))
if err != nil {
return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err)
}
// Get the node health. If there's no Serf health check, we consider it safe to rename
// the node as it's likely an external node registration not managed by Consul.
var nodeHealthy bool
if enodeCheck != nil {
enodeSerfCheck, ok := enodeCheck.(*structs.HealthCheck)
if ok {
nodeHealthy = enodeSerfCheck.Status != api.HealthCritical
}
}
if !(enode.ID == "" && allowClashWithoutID) && nodeHealthy {
return fmt.Errorf("Node name %s is reserved by node %s with name %s", node.Node, enode.ID, enode.Node)
}
}

View File

@ -84,6 +84,11 @@ func TestStateStore_ensureNoNodeWithSimilarNameTxn(t *testing.T) {
Address: "1.2.3.4",
TaggedAddresses: map[string]string{"hello": "world"},
NodeMeta: map[string]string{"somekey": "somevalue"},
Check: &structs.HealthCheck{
Node: "node1",
CheckID: structs.SerfCheckID,
Status: api.HealthPassing,
},
}
if err := s.EnsureRegistration(1, req); err != nil {
t.Fatalf("err: %s", err)
@ -92,6 +97,11 @@ func TestStateStore_ensureNoNodeWithSimilarNameTxn(t *testing.T) {
ID: types.NodeID(""),
Node: "node2",
Address: "10.0.0.1",
Check: &structs.HealthCheck{
Node: "node2",
CheckID: structs.SerfCheckID,
Status: api.HealthPassing,
},
}
if err := s.EnsureRegistration(2, req); err != nil {
t.Fatalf("err: %s", err)
@ -119,6 +129,23 @@ func TestStateStore_ensureNoNodeWithSimilarNameTxn(t *testing.T) {
t.Fatalf("Should not clash with another similar node name without ID, err:=%q", err)
}
// Set node1's Serf health to failing and replace it.
newNode := &structs.Node{
ID: makeRandomNodeID(t),
Node: "node1",
Address: "2.3.4.5",
}
if err := s.ensureNoNodeWithSimilarNameTxn(tx, newNode, false); err == nil {
t.Fatalf("Should return an error since the previous node is still healthy")
}
s.ensureCheckTxn(tx, 5, &structs.HealthCheck{
Node: "node1",
CheckID: structs.SerfCheckID,
Status: api.HealthCritical,
})
if err := s.ensureNoNodeWithSimilarNameTxn(tx, newNode, false); err != nil {
t.Fatal(err)
}
}
func TestStateStore_EnsureRegistration(t *testing.T) {
@ -599,6 +626,13 @@ func TestNodeRenamingNodes(t *testing.T) {
if err := s.EnsureNode(1, in1); err != nil {
t.Fatalf("err: %s", err)
}
if err := s.EnsureCheck(2, &structs.HealthCheck{
Node: "node1",
CheckID: structs.SerfCheckID,
Status: api.HealthPassing,
}); err != nil {
t.Fatalf("err: %s", err)
}
// Node2 with ID
in2 := &structs.Node{
@ -607,7 +641,14 @@ func TestNodeRenamingNodes(t *testing.T) {
Address: "1.1.1.2",
}
if err := s.EnsureNode(2, in2); err != nil {
if err := s.EnsureNode(3, in2); err != nil {
t.Fatalf("err: %s", err)
}
if err := s.EnsureCheck(4, &structs.HealthCheck{
Node: "node2",
CheckID: structs.SerfCheckID,
Status: api.HealthPassing,
}); err != nil {
t.Fatalf("err: %s", err)
}
@ -617,7 +658,14 @@ func TestNodeRenamingNodes(t *testing.T) {
Address: "1.1.1.3",
}
if err := s.EnsureNode(3, in3); err != nil {
if err := s.EnsureNode(5, in3); err != nil {
t.Fatalf("err: %s", err)
}
if err := s.EnsureCheck(6, &structs.HealthCheck{
Node: "node3",
CheckID: structs.SerfCheckID,
Status: api.HealthPassing,
}); err != nil {
t.Fatalf("err: %s", err)
}
@ -635,7 +683,7 @@ func TestNodeRenamingNodes(t *testing.T) {
Node: "node1",
Address: "1.1.1.2",
}
if err := s.EnsureNode(4, in2Modify); err == nil {
if err := s.EnsureNode(7, in2Modify); err == nil {
t.Fatalf("Renaming node2 into node1 should fail")
}
@ -645,7 +693,7 @@ func TestNodeRenamingNodes(t *testing.T) {
Node: "NoDe1",
Address: "1.1.1.2",
}
if err := s.EnsureNode(5, in2Modify); err == nil {
if err := s.EnsureNode(8, in2Modify); err == nil {
t.Fatalf("Renaming node2 into node1 should fail")
}
@ -655,7 +703,7 @@ func TestNodeRenamingNodes(t *testing.T) {
Node: "NoDe3",
Address: "1.1.1.2",
}
if err := s.EnsureNode(6, in2Modify); err == nil {
if err := s.EnsureNode(9, in2Modify); err == nil {
t.Fatalf("Renaming node2 into node1 should fail")
}
@ -665,7 +713,7 @@ func TestNodeRenamingNodes(t *testing.T) {
Node: "node2bis",
Address: "1.1.1.2",
}
if err := s.EnsureNode(6, in2Modify); err != nil {
if err := s.EnsureNode(10, in2Modify); err != nil {
t.Fatalf("Renaming node2 into node1 should fail")
}
@ -825,13 +873,22 @@ func TestStateStore_EnsureNode(t *testing.T) {
newNodeID := types.NodeID("d0347693-65cc-4d9f-a6e0-5025b2e6513f")
// Set a Serf check on the new node to inform whether to allow changing ID
if err := s.EnsureCheck(8, &structs.HealthCheck{
Node: "node1-renamed",
CheckID: structs.SerfCheckID,
Status: api.HealthPassing,
}); err != nil {
t.Fatalf("err: %s", err)
}
// Adding another node with same name should fail
in = &structs.Node{
Node: "node1-renamed",
ID: newNodeID,
Address: "1.1.1.7",
}
if err := s.EnsureNode(8, in); err == nil {
if err := s.EnsureNode(9, in); err == nil {
t.Fatalf("There should be an error since node1-renamed already exists")
}
@ -841,7 +898,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
ID: newNodeID,
Address: "1.1.1.7",
}
if err := s.EnsureNode(8, in); err == nil {
if err := s.EnsureNode(9, in); err == nil {
t.Fatalf("err: %s", err)
}
@ -851,7 +908,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
ID: newNodeID,
Address: "1.1.1.7",
}
if err := s.EnsureNode(9, in); err != nil {
if err := s.EnsureNode(10, in); err != nil {
t.Fatalf("err: %s", err)
}
@ -867,7 +924,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
ID: newNodeID,
Address: "1.1.1.7",
}
if err := s.EnsureNode(9, in); err != nil {
if err := s.EnsureNode(10, in); err != nil {
t.Fatalf("err: %s", err)
}
@ -877,10 +934,10 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Node and indexes were updated
if out.ID != newNodeID || out.CreateIndex != 9 || out.ModifyIndex != 9 || out.Address != "1.1.1.7" || out.Node != "Node1bis" {
if out.ID != newNodeID || out.CreateIndex != 10 || out.ModifyIndex != 10 || out.Address != "1.1.1.7" || out.Node != "Node1bis" {
t.Fatalf("bad: %#v", out)
}
if idx != 9 {
if idx != 10 {
t.Fatalf("bad index: %d", idx)
}
@ -891,7 +948,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
ID: newNodeID,
Address: "1.1.1.7",
}
if err := s.EnsureNode(10, in); err == nil {
if err := s.EnsureNode(11, in); err == nil {
t.Fatalf("err: %s", err)
}
@ -901,7 +958,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
ID: newNodeID,
Address: "1.1.1.7",
}
if err := s.EnsureNode(10, in); err == nil {
if err := s.EnsureNode(11, in); err == nil {
t.Fatalf("err: %s", err)
}
@ -911,7 +968,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
ID: newNodeID,
Address: "1.1.1.7",
}
if err := s.EnsureNode(11, in); err != nil {
if err := s.EnsureNode(12, in); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("Node1-Renamed2")
@ -920,10 +977,10 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Node and indexes were updated
if out.ID != newNodeID || out.CreateIndex != 9 || out.ModifyIndex != 11 || out.Address != "1.1.1.7" || out.Node != "Node1-Renamed2" {
if out.ID != newNodeID || out.CreateIndex != 10 || out.ModifyIndex != 12 || out.Address != "1.1.1.7" || out.Node != "Node1-Renamed2" {
t.Fatalf("bad: %#v", out)
}
if idx != 11 {
if idx != 12 {
t.Fatalf("bad index: %d", idx)
}
@ -931,27 +988,27 @@ func TestStateStore_EnsureNode(t *testing.T) {
// See https://github.com/hashicorp/consul/pull/3983 for context
// Deprecated behavior is following
deprecatedEnsureNodeWithoutIDCanRegister(t, s, "new-node-without-id", 12)
deprecatedEnsureNodeWithoutIDCanRegister(t, s, "new-node-without-id", 13)
// Deprecated, but should work as well
deprecatedEnsureNodeWithoutIDCanRegister(t, s, "new-node-without-id", 13)
deprecatedEnsureNodeWithoutIDCanRegister(t, s, "new-node-without-id", 14)
// All of this is deprecated as well, should be removed
in = &structs.Node{
Node: "Node1-Renamed2",
Address: "1.1.1.66",
}
if err := s.EnsureNode(14, in); err != nil {
if err := s.EnsureNode(15, in); err != nil {
t.Fatalf("[DEPRECATED] it should work, err:= %q", err)
}
idx, out, err = s.GetNode("Node1-Renamed2")
if err != nil {
t.Fatalf("[DEPRECATED] err: %s", err)
}
if out.CreateIndex != 9 {
if out.CreateIndex != 10 {
t.Fatalf("[DEPRECATED] We expected to modify node previously added, but add index = %d for node %q", out.CreateIndex, out)
}
if out.Address != "1.1.1.66" || out.ModifyIndex != 14 {
if out.Address != "1.1.1.66" || out.ModifyIndex != 15 {
t.Fatalf("[DEPRECATED] Node with newNodeID should have been updated, but was: %d with content := %q", out.CreateIndex, out)
}
}

2
go.mod
View File

@ -73,7 +73,7 @@ require (
github.com/hashicorp/hil v0.0.0-20160711231837-1e86c6b523c5
github.com/hashicorp/logutils v1.0.0
github.com/hashicorp/mdns v1.0.1 // indirect
github.com/hashicorp/memberlist v0.1.3
github.com/hashicorp/memberlist v0.1.4
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1

4
go.sum
View File

@ -188,6 +188,10 @@ github.com/hashicorp/mdns v1.0.1 h1:XFSOubp8KWB+Jd2PDyaX5xUd5bhSP/+pTDZVDMzZJM8=
github.com/hashicorp/mdns v1.0.1/go.mod h1:4gW7WsVCke5TE7EPeYliwHlRUyBtfCwuFwuMg2DmyNY=
github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG676r31M=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/memberlist v0.1.4-0.20190515174901-e1138a6a4d8a h1:9V2hIf261IZE7KfJtTOAncg/BRrauZblzdyS9lrTl1E=
github.com/hashicorp/memberlist v0.1.4-0.20190515174901-e1138a6a4d8a/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/memberlist v0.1.4 h1:gkyML/r71w3FL8gUi74Vk76avkj/9lYAY9lvg0OcoGs=
github.com/hashicorp/memberlist v0.1.4/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472 h1:9EPzHJ1bJFaFbGOz3UV3DDFmGYANr+SF+eapmiK5zV4=

View File

@ -7,8 +7,8 @@ package memberlist
// a node out and prevent it from being considered a peer
// using application specific logic.
type AliveDelegate interface {
// NotifyMerge is invoked when a merge could take place.
// Provides a list of the nodes known by the peer. If
// the return value is non-nil, the merge is canceled.
// NotifyAlive is invoked when a message about a live
// node is received from the network. Returning a non-nil
// error prevents the node from being considered a peer.
NotifyAlive(peer *Node) error
}

View File

@ -215,6 +215,11 @@ type Config struct {
// This is a legacy name for backward compatibility but should really be
// called PacketBufferSize now that we have generalized the transport.
UDPBufferSize int
// DeadNodeReclaimTime controls the time before a dead node's name can be
// reclaimed by one with a different address or port. By default, this is 0,
// meaning nodes cannot be reclaimed this way.
DeadNodeReclaimTime time.Duration
}
// DefaultLANConfig returns a sane set of configurations for Memberlist.

View File

@ -72,6 +72,15 @@ type Memberlist struct {
logger *log.Logger
}
// BuildVsnArray creates the array of Vsn
func (conf *Config) BuildVsnArray() []uint8 {
return []uint8{
ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion,
conf.DelegateProtocolMin, conf.DelegateProtocolMax,
conf.DelegateProtocolVersion,
}
}
// newMemberlist creates the network listeners.
// Does not schedule execution of background maintenance.
func newMemberlist(conf *Config) (*Memberlist, error) {
@ -402,11 +411,7 @@ func (m *Memberlist) setAlive() error {
Addr: addr,
Port: uint16(port),
Meta: meta,
Vsn: []uint8{
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
m.config.DelegateProtocolVersion,
},
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&a, nil, true)
return nil
@ -447,11 +452,7 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
Addr: state.Addr,
Port: state.Port,
Meta: meta,
Vsn: []uint8{
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
m.config.DelegateProtocolVersion,
},
Vsn: m.config.BuildVsnArray(),
}
notifyCh := make(chan struct{})
m.aliveNode(&a, notifyCh, true)

View File

@ -9,7 +9,7 @@ import (
"sync/atomic"
"time"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
)
type nodeStateType int
@ -850,11 +850,26 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
return
}
if len(a.Vsn) >= 3 {
pMin := a.Vsn[0]
pMax := a.Vsn[1]
pCur := a.Vsn[2]
if pMin == 0 || pMax == 0 || pMin > pMax {
m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax)
return
}
}
// Invoke the Alive delegate if any. This can be used to filter out
// alive messages based on custom logic. For example, using a cluster name.
// Using a merge delegate is not enough, as it is possible for passive
// cluster merging to still occur.
if m.config.Alive != nil {
if len(a.Vsn) < 6 {
m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present",
a.Node, net.IP(a.Addr), a.Port)
return
}
node := &Node{
Name: a.Node,
Addr: a.Addr,
@ -876,6 +891,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
// Check if we've never seen this node before, and if not, then
// store this node in our node map.
var updatesNode bool
if !ok {
state = &nodeState{
Node: Node{
@ -886,6 +902,14 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
},
State: stateDead,
}
if len(a.Vsn) > 5 {
state.PMin = a.Vsn[0]
state.PMax = a.Vsn[1]
state.PCur = a.Vsn[2]
state.DMin = a.Vsn[3]
state.DMax = a.Vsn[4]
state.DCur = a.Vsn[5]
}
// Add to map
m.nodeMap[a.Node] = state
@ -903,29 +927,40 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
// Update numNodes after we've added a new node
atomic.AddUint32(&m.numNodes, 1)
}
} else {
// Check if this address is different than the existing node unless the old node is dead.
if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
// If DeadNodeReclaimTime is configured, check if enough time has elapsed since the node died.
canReclaim := (m.config.DeadNodeReclaimTime > 0 &&
time.Since(state.StateChange) > m.config.DeadNodeReclaimTime)
// Check if this address is different than the existing node
if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d",
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
// Allow the address to be updated if a dead node is being replaced.
if state.State == stateDead && canReclaim {
m.logger.Printf("[INFO] memberlist: Updating address for failed node %s from %v:%d to %v:%d",
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
updatesNode = true
} else {
m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d Old state: %v",
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port, state.State)
// Inform the conflict delegate if provided
if m.config.Conflict != nil {
other := Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
// Inform the conflict delegate if provided
if m.config.Conflict != nil {
other := Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
}
m.config.Conflict.NotifyConflict(&state.Node, &other)
}
return
}
m.config.Conflict.NotifyConflict(&state.Node, &other)
}
return
}
// Bail if the incarnation number is older, and this is not about us
isLocalNode := state.Name == m.config.Name
if a.Incarnation <= state.Incarnation && !isLocalNode {
if a.Incarnation <= state.Incarnation && !isLocalNode && !updatesNode {
return
}
@ -965,9 +1000,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
bytes.Equal(a.Vsn, versions) {
return
}
m.refute(state, a.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting an alive message")
m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions)
} else {
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
@ -984,6 +1018,8 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
// Update the state and incarnation number
state.Incarnation = a.Incarnation
state.Meta = a.Meta
state.Addr = a.Addr
state.Port = a.Port
if state.State != stateAlive {
state.State = stateAlive
state.StateChange = time.Now()

View File

@ -3,6 +3,7 @@ package yamux
import (
"fmt"
"io"
"log"
"os"
"time"
)
@ -30,8 +31,13 @@ type Config struct {
// window size that we allow for a stream.
MaxStreamWindowSize uint32
// LogOutput is used to control the log destination
// LogOutput is used to control the log destination. Either Logger or
// LogOutput can be set, not both.
LogOutput io.Writer
// Logger is used to pass in the logger to be used. Either Logger or
// LogOutput can be set, not both.
Logger *log.Logger
}
// DefaultConfig is used to return a default configuration
@ -57,6 +63,11 @@ func VerifyConfig(config *Config) error {
if config.MaxStreamWindowSize < initialStreamWindow {
return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
}
if config.LogOutput != nil && config.Logger != nil {
return fmt.Errorf("both Logger and LogOutput may not be set, select one")
} else if config.LogOutput == nil && config.Logger == nil {
return fmt.Errorf("one of Logger or LogOutput must be set, select one")
}
return nil
}

View File

@ -86,9 +86,14 @@ type sendReady struct {
// newSession is used to construct a new session
func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
logger := config.Logger
if logger == nil {
logger = log.New(config.LogOutput, "", log.LstdFlags)
}
s := &Session{
config: config,
logger: log.New(config.LogOutput, "", log.LstdFlags),
logger: logger,
conn: conn,
bufRead: bufio.NewReader(conn),
pings: make(map[uint32]chan struct{}),

11
vendor/modules.txt vendored
View File

@ -188,6 +188,13 @@ github.com/gregjones/httpcache
github.com/gregjones/httpcache/diskcache
# github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
github.com/hailocab/go-hostpool
# github.com/hashicorp/consul/api v1.1.0 => ./api
github.com/hashicorp/consul/api
github.com/hashicorp/consul/api/watch
# github.com/hashicorp/consul/sdk v0.1.1 => ./sdk
github.com/hashicorp/consul/sdk/freeport
github.com/hashicorp/consul/sdk/testutil/retry
github.com/hashicorp/consul/sdk/testutil
# github.com/hashicorp/errwrap v1.0.0
github.com/hashicorp/errwrap
# github.com/hashicorp/go-bexpr v0.1.0
@ -257,7 +264,7 @@ github.com/hashicorp/hil/ast
github.com/hashicorp/logutils
# github.com/hashicorp/mdns v1.0.1
github.com/hashicorp/mdns
# github.com/hashicorp/memberlist v0.1.3
# github.com/hashicorp/memberlist v0.1.4
github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc
@ -320,7 +327,7 @@ github.com/hashicorp/vault/helper/keysutil
github.com/hashicorp/vault/helper/kdf
# github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443
github.com/hashicorp/vic/pkg/vsphere/tags
# github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb
# github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
github.com/hashicorp/yamux
# github.com/imdario/mergo v0.3.6
github.com/imdario/mergo