From 51b11cd3449c29e7c5938bcd23f30b4e90018b51 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 15 Mar 2017 16:09:55 -0700 Subject: [PATCH] Fix an issue with changing server IDs and add a few UX enhancements around autopilot features --- api/operator.go | 3 + command/agent/operator_endpoint.go | 1 + consul/autopilot.go | 106 +++++++++++++----- consul/autopilot_test.go | 33 ++++-- consul/leader.go | 31 +++-- consul/operator_endpoint.go | 26 +---- consul/operator_endpoint_test.go | 21 ++-- consul/serf.go | 1 + consul/server.go | 8 +- consul/structs/operator.go | 3 + .../docs/agent/http/operator.html.markdown | 2 + .../source/docs/agent/telemetry.html.markdown | 12 ++ 12 files changed, 161 insertions(+), 86 deletions(-) diff --git a/api/operator.go b/api/operator.go index 2c9961c9d1..38a5a5c48b 100644 --- a/api/operator.go +++ b/api/operator.go @@ -128,6 +128,9 @@ type ServerHealth struct { // Autopilot config. Healthy bool + // Voter is whether this is a voting server. + Voter bool + // StableSince is the last time this server's Healthy value changed. StableSince time.Time } diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index a04e801fce..fb3030e3aa 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -303,6 +303,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re LastTerm: server.LastTerm, LastIndex: server.LastIndex, Healthy: server.Healthy, + Voter: server.Voter, StableSince: server.StableSince.Round(time.Second).UTC(), }) } diff --git a/consul/autopilot.go b/consul/autopilot.go index 9156fd49e0..dafad50c32 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" @@ -102,7 +103,7 @@ func (s *Server) pruneDeadServers() error { go s.serfLAN.RemoveFailedNode(server) } } else { - s.logger.Printf("[ERR] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers) + s.logger.Printf("[DEBUG] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers) } return nil @@ -198,8 +199,6 @@ func (s *Server) serverHealthLoop() { case <-s.shutdownCh: return case <-ticker.C: - serverHealths := make(map[string]*structs.ServerHealth) - // Don't do anything if the min Raft version is too low minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) if err != nil { @@ -221,7 +220,8 @@ func (s *Server) serverHealthLoop() { break } - // Build an updated map of server healths + // Get the the serf members which are Consul servers + serverMap := make(map[string]serf.Member) for _, member := range s.LANMembers() { if member.Status == serf.StatusLeft { continue @@ -229,31 +229,78 @@ func (s *Server) serverHealthLoop() { valid, parts := agent.IsConsulServer(member) if valid { - health, err := s.queryServerHealth(member, parts, autopilotConf) - if err != nil { - s.logger.Printf("[ERR] consul: error fetching server health: %s", err) - serverHealths[parts.ID] = &structs.ServerHealth{ - ID: parts.ID, - Name: parts.Name, - Healthy: false, - } - } else { - serverHealths[parts.ID] = health - } + serverMap[parts.ID] = member } } - s.serverHealthLock.Lock() - s.serverHealths = serverHealths - s.serverHealthLock.Unlock() + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: error getting Raft configuration %s", err) + break + } + + // Build a current list of server healths + var clusterHealth structs.OperatorHealthReply + servers := future.Configuration().Servers + healthyCount := 0 + voterCount := 0 + for _, server := range servers { + member, ok := serverMap[string(server.ID)] + if !ok { + s.logger.Printf("[DEBUG] consul: couldn't find serf member for server with ID %q", server.ID) + continue + } + + health, err := s.queryServerHealth(member, autopilotConf) + if err != nil { + s.logger.Printf("[ERR] consul: error fetching server health: %s", err) + clusterHealth.Servers = append(clusterHealth.Servers, structs.ServerHealth{ + ID: string(server.ID), + Name: member.Name, + SerfStatus: serf.StatusFailed, + }) + continue + } + + if health.Healthy { + healthyCount++ + } + + if server.Suffrage != raft.Nonvoter { + health.Voter = true + voterCount++ + } + clusterHealth.Servers = append(clusterHealth.Servers, *health) + } + clusterHealth.Healthy = healthyCount == len(servers) + + // If we have extra healthy voters, update FailureTolerance + if voterCount > len(servers)/2+1 { + clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1) + } + + // Heartbeat a metric for monitoring if we're the leader + if s.IsLeader() { + metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) + if clusterHealth.Healthy { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) + } else { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) + } + } + + s.clusterHealthLock.Lock() + s.clusterHealth = clusterHealth + s.clusterHealthLock.Unlock() } } } // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth -func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, - autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { +func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { + _, server := agent.IsConsulServer(member) + stats, err := s.getServerStats(server) if err != nil { return nil, fmt.Errorf("error getting raft stats: %s", err) @@ -297,14 +344,21 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, return health, nil } +func (s *Server) getClusterHealth() structs.OperatorHealthReply { + s.clusterHealthLock.RLock() + defer s.clusterHealthLock.RUnlock() + return s.clusterHealth +} + func (s *Server) getServerHealth(id string) *structs.ServerHealth { - s.serverHealthLock.RLock() - defer s.serverHealthLock.RUnlock() - h, ok := s.serverHealths[id] - if !ok { - return nil + s.clusterHealthLock.RLock() + defer s.clusterHealthLock.RUnlock() + for _, health := range s.clusterHealth.Servers { + if health.ID == id { + return &health + } } - return h + return nil } func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { diff --git a/consul/autopilot_test.go b/consul/autopilot_test.go index 6d38d6a4de..ccdced9e90 100644 --- a/consul/autopilot_test.go +++ b/consul/autopilot_test.go @@ -12,15 +12,27 @@ import ( ) func TestAutopilot_CleanupDeadServer(t *testing.T) { - dir1, s1 := testServerDCBootstrap(t, "dc1", true) + for i := 1; i <= 3; i++ { + testCleanupDeadServer(t, i) + } +} + +func testCleanupDeadServer(t *testing.T, raftVersion int) { + conf := func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.BootstrapExpect = 3 + c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion) + } + dir1, s1 := testServerWithConfig(t, conf) defer os.RemoveAll(dir1) defer s1.Shutdown() - dir2, s2 := testServerDCBootstrap(t, "dc1", false) + dir2, s2 := testServerWithConfig(t, conf) defer os.RemoveAll(dir2) defer s2.Shutdown() - dir3, s3 := testServerDCBootstrap(t, "dc1", false) + dir3, s3 := testServerWithConfig(t, conf) defer os.RemoveAll(dir3) defer s3.Shutdown() @@ -45,8 +57,13 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) { }) } + // Bring up a new server + dir4, s4 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + // Kill a non-leader server - s2.Shutdown() + s3.Shutdown() testutil.WaitForResult(func() (bool, error) { alive := 0 @@ -60,15 +77,11 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) { t.Fatalf("should have 2 alive members") }) - // Bring up and join a new server - dir4, s4 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir4) - defer s4.Shutdown() - + // Join the new server if _, err := s4.JoinLAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } - servers[1] = s4 + servers[2] = s4 // Make sure the dead server is removed and we're back to 3 total peers for _, s := range servers { diff --git a/consul/leader.go b/consul/leader.go index ecdb996d50..5dd4909268 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -583,26 +583,42 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { // TODO (slackpad) - This will need to be changed once we support node IDs. addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String() + minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + if err != nil { + return err + } + // See if it's already in the configuration. It's harmless to re-add it // but we want to avoid doing that if possible to prevent useless Raft - // log entries. + // log entries. If the address is the same but the ID changed, remove the + // old server before adding the new one. configFuture := s.raft.GetConfiguration() if err := configFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) return err } for _, server := range configFuture.Configuration().Servers { - if server.Address == raft.ServerAddress(addr) { + // No-op if the raft version is too low + if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) { return nil } + + // If the address or ID matches an existing server, see if we need to remove the old one first + if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) { + // Exit with no-op if this is being called on an existing server + if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) { + return nil + } else { + future := s.raft.RemoveServer(server.ID, 0, 0) + if err := future.Error(); err != nil { + return fmt.Errorf("error removing server with duplicate ID: %s", err) + } + s.logger.Printf("[INFO] consul: removed server with duplicate ID: %s", server.ID) + } + } } // Attempt to add as a peer - minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) - if err != nil { - return err - } - switch { case minRaftProtocol >= 3: addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) @@ -635,7 +651,6 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { // removeConsulServer is used to try to remove a consul server that has left func (s *Server) removeConsulServer(m serf.Member, port int) error { - // TODO (slackpad) - This will need to be changed once we support node IDs. addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String() // See if it's already in the configuration. It's harmless to re-remove it diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index 16d8b75dc0..acbfb9ea3a 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -212,31 +212,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") } - var status structs.OperatorHealthReply - future := op.srv.raft.GetConfiguration() - if err := future.Error(); err != nil { - return err - } - - healthyCount := 0 - servers := future.Configuration().Servers - for _, s := range servers { - health := op.srv.getServerHealth(string(s.ID)) - if health != nil { - if health.Healthy { - healthyCount++ - } - status.Servers = append(status.Servers, *health) - } - } - status.Healthy = healthyCount == len(servers) - - // If we have extra healthy servers, set FailureTolerance - if healthyCount > len(servers)/2+1 { - status.FailureTolerance = healthyCount - (len(servers)/2 + 1) - } - - *reply = status + *reply = op.srv.getClusterHealth() return nil } diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index d430bd28fb..8fde7de4a8 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -429,22 +429,21 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { } func TestOperator_ServerHealth(t *testing.T) { - dir1, s1 := testServerWithConfig(t, func(c *Config) { + conf := func(c *Config) { c.Datacenter = "dc1" - c.Bootstrap = true + c.Bootstrap = false + c.BootstrapExpect = 3 c.RaftConfig.ProtocolVersion = 3 c.ServerHealthInterval = 100 * time.Millisecond - }) + c.AutopilotInterval = 100 * time.Millisecond + } + dir1, s1 := testServerWithConfig(t, conf) defer os.RemoveAll(dir1) defer s1.Shutdown() codec := rpcClient(t, s1) defer codec.Close() - dir2, s2 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = false - c.RaftConfig.ProtocolVersion = 3 - }) + dir2, s2 := testServerWithConfig(t, conf) defer os.RemoveAll(dir2) defer s2.Shutdown() addr := fmt.Sprintf("127.0.0.1:%d", @@ -453,11 +452,7 @@ func TestOperator_ServerHealth(t *testing.T) { t.Fatalf("err: %v", err) } - dir3, s3 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = false - c.RaftConfig.ProtocolVersion = 3 - }) + dir3, s3 := testServerWithConfig(t, conf) defer os.RemoveAll(dir3) defer s3.Shutdown() if _, err := s3.JoinLAN([]string{addr}); err != nil { diff --git a/consul/serf.go b/consul/serf.go index c9e5044720..c449d73a24 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -58,6 +58,7 @@ func (s *Server) lanEventHandler() { case serf.EventUser: s.localEvent(e.(serf.UserEvent)) case serf.EventMemberUpdate: // Ignore + s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventQuery: // Ignore default: s.logger.Printf("[WARN] consul: Unhandled LAN Serf Event: %#v", e) diff --git a/consul/server.go b/consul/server.go index 112b9aff43..668177ced0 100644 --- a/consul/server.go +++ b/consul/server.go @@ -88,6 +88,10 @@ type Server struct { // autopilotWaitGroup is used to block until Autopilot shuts down. autopilotWaitGroup sync.WaitGroup + // clusterHealth stores the current view of the cluster's health. + clusterHealth structs.OperatorHealthReply + clusterHealthLock sync.RWMutex + // Consul configuration config *Config @@ -157,10 +161,6 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex - // serverHealths stores the current view of server healths. - serverHealths map[string]*structs.ServerHealth - serverHealthLock sync.RWMutex - // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC diff --git a/consul/structs/operator.go b/consul/structs/operator.go index f17cd8f418..5fc6ef36df 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -127,6 +127,9 @@ type ServerHealth struct { // Autopilot config. Healthy bool + // Voter is whether this is a voting server. + Voter bool + // StableSince is the last time this server's Healthy value changed. StableSince time.Time } diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index dabc9980ac..6918dd8e21 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -365,6 +365,7 @@ A JSON body is returned that looks like this: "LastTerm": 2, "LastIndex": 46, "Healthy": true, + "Voter": true, "StableSince": "2017-03-06T22:07:51Z" }, { @@ -375,6 +376,7 @@ A JSON body is returned that looks like this: "LastTerm": 2, "LastIndex": 46, "Healthy": true, + "Voter": false, "StableSince": "2017-03-06T22:18:26Z" } ] diff --git a/website/source/docs/agent/telemetry.html.markdown b/website/source/docs/agent/telemetry.html.markdown index f23033a086..b63f686868 100644 --- a/website/source/docs/agent/telemetry.html.markdown +++ b/website/source/docs/agent/telemetry.html.markdown @@ -189,4 +189,16 @@ These metrics give insight into the health of the cluster as a whole. ms timer + + `consul.autopilot.failure_tolerance` + This tracks the number of voting servers that the cluster can lose while continuing to function. + servers + gauge + + + `consul.autopilot.healthy` + This tracks the overall health of the local server cluster. If all servers are considered healthy by Autopilot, this will be set to 1. If any are unhealthy, this will be 0. + boolean + gauge +