From d08ab9fd199434e5220276356ecf9617cfec1eb2 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 18 Dec 2017 12:26:35 -0800 Subject: [PATCH] Make some final tweaks to autopilot package --- agent/consul/autopilot/autopilot.go | 15 ++++- agent/consul/autopilot/autopilot_test.go | 86 ++++++++++++++++++++++++ agent/consul/config.go | 2 +- agent/consul/leader.go | 5 +- agent/consul/server.go | 42 +++++------- 5 files changed, 118 insertions(+), 32 deletions(-) create mode 100644 agent/consul/autopilot/autopilot_test.go diff --git a/agent/consul/autopilot/autopilot.go b/agent/consul/autopilot/autopilot.go index af36d5a2c9..dd7e5c0785 100644 --- a/agent/consul/autopilot/autopilot.go +++ b/agent/consul/autopilot/autopilot.go @@ -143,6 +143,14 @@ func NumPeers(raftConfig raft.Configuration) int { return numPeers } +// RemoveDeadServers triggers a pruning of dead servers in a non-blocking way. +func (a *Autopilot) RemoveDeadServers() { + select { + case a.removeDeadCh <- struct{}{}: + default: + } +} + // pruneDeadServers removes up to numPeers/2 failed servers func (a *Autopilot) pruneDeadServers() error { conf := a.delegate.AutopilotConfig() @@ -223,14 +231,17 @@ func (a *Autopilot) pruneDeadServers() error { // MinRaftProtocol returns the lowest supported Raft protocol among alive servers func (a *Autopilot) MinRaftProtocol() (int, error) { + return minRaftProtocol(a.delegate.Serf().Members(), a.delegate.IsServer) +} + +func minRaftProtocol(members []serf.Member, serverFunc func(serf.Member) (*ServerInfo, error)) (int, error) { minVersion := -1 - members := a.delegate.Serf().Members() for _, m := range members { if m.Status != serf.StatusAlive { continue } - server, err := a.delegate.IsServer(m) + server, err := serverFunc(m) if err != nil { return -1, err } diff --git a/agent/consul/autopilot/autopilot_test.go b/agent/consul/autopilot/autopilot_test.go new file mode 100644 index 0000000000..fd85c5d945 --- /dev/null +++ b/agent/consul/autopilot/autopilot_test.go @@ -0,0 +1,86 @@ +package autopilot + +import ( + "errors" + "net" + "testing" + + "github.com/hashicorp/serf/serf" +) + +func TestMinRaftProtocol(t *testing.T) { + t.Parallel() + makeMember := func(version string) serf.Member { + return serf.Member{ + Name: "foo", + Addr: net.IP([]byte{127, 0, 0, 1}), + Tags: map[string]string{ + "role": "consul", + "dc": "dc1", + "port": "10000", + "vsn": "1", + "raft_vsn": version, + }, + Status: serf.StatusAlive, + } + } + + cases := []struct { + members []serf.Member + expected int + err error + }{ + // No servers, error + { + members: []serf.Member{}, + expected: -1, + err: errors.New("No servers found"), + }, + // One server + { + members: []serf.Member{ + makeMember("1"), + }, + expected: 1, + }, + // One server, bad version formatting + { + members: []serf.Member{ + makeMember("asdf"), + }, + expected: -1, + err: errors.New(`strconv.Atoi: parsing "asdf": invalid syntax`), + }, + // Multiple servers, different versions + { + members: []serf.Member{ + makeMember("1"), + makeMember("2"), + }, + expected: 1, + }, + // Multiple servers, same version + { + members: []serf.Member{ + makeMember("2"), + makeMember("2"), + }, + expected: 2, + }, + } + + serverFunc := func(m serf.Member) (*ServerInfo, error) { + return &ServerInfo{}, nil + } + for _, tc := range cases { + result, err := minRaftProtocol(tc.members, serverFunc) + if result != tc.expected { + t.Fatalf("bad: %v, %v, %v", result, tc.expected, tc) + } + if tc.err != nil { + if err == nil || tc.err.Error() != err.Error() { + t.Fatalf("bad: %v, %v, %v", err, tc.err, tc) + } + } + } +} diff --git a/agent/consul/config.go b/agent/consul/config.go index a05ce86868..a8a7f249be 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -416,7 +416,7 @@ func DefaultConfig() *Config { TLSMinVersion: "tls10", - // TODO (slackpad) - Until #3744 is done, we need to keep these + // TODO (slackpad) - Until #3744 is done, we need to keep these // in sync with agent/config/default.go. AutopilotConfig: &autopilot.Config{ CleanupDeadServers: true, diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 1a72d8accf..64b7ea1cdc 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -738,10 +738,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error { } // Trigger a check to remove dead servers - select { - case s.autopilotRemoveDeadCh <- struct{}{}: - default: - } + s.autopilot.RemoveDeadServers() return nil } diff --git a/agent/consul/server.go b/agent/consul/server.go index d29496c4e0..3792ec45b4 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -90,12 +90,6 @@ type Server struct { // autopilot is the Autopilot instance for this server. autopilot *autopilot.Autopilot - // autopilotRemoveDeadCh is used to trigger a check for dead server removals. - autopilotRemoveDeadCh chan struct{} - - // autopilotShutdownCh is used to stop the Autopilot loop. - autopilotShutdownCh chan struct{} - // autopilotWaitGroup is used to block until Autopilot shuts down. autopilotWaitGroup sync.WaitGroup @@ -281,25 +275,23 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* // Create server. s := &Server{ - autopilotRemoveDeadCh: make(chan struct{}), - autopilotShutdownCh: make(chan struct{}), - config: config, - tokens: tokens, - connPool: connPool, - eventChLAN: make(chan serf.Event, 256), - eventChWAN: make(chan serf.Event, 256), - logger: logger, - leaveCh: make(chan struct{}), - reconcileCh: make(chan serf.Member, 32), - router: router.NewRouter(logger, config.Datacenter), - rpcServer: rpc.NewServer(), - rpcTLS: incomingTLS, - reassertLeaderCh: make(chan chan error), - segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), - sessionTimers: NewSessionTimers(), - tombstoneGC: gc, - serverLookup: NewServerLookup(), - shutdownCh: shutdownCh, + config: config, + tokens: tokens, + connPool: connPool, + eventChLAN: make(chan serf.Event, 256), + eventChWAN: make(chan serf.Event, 256), + logger: logger, + leaveCh: make(chan struct{}), + reconcileCh: make(chan serf.Member, 32), + router: router.NewRouter(logger, config.Datacenter), + rpcServer: rpc.NewServer(), + rpcTLS: incomingTLS, + reassertLeaderCh: make(chan chan error), + segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), + sessionTimers: NewSessionTimers(), + tombstoneGC: gc, + serverLookup: NewServerLookup(), + shutdownCh: shutdownCh, } // Set up autopilot