diff --git a/api/operator.go b/api/operator.go index 2c9961c9d1..38a5a5c48b 100644 --- a/api/operator.go +++ b/api/operator.go @@ -128,6 +128,9 @@ type ServerHealth struct { // Autopilot config. Healthy bool + // Voter is whether this is a voting server. + Voter bool + // StableSince is the last time this server's Healthy value changed. StableSince time.Time } diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index a04e801fce..fb3030e3aa 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -303,6 +303,7 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re LastTerm: server.LastTerm, LastIndex: server.LastIndex, Healthy: server.Healthy, + Voter: server.Voter, StableSince: server.StableSince.Round(time.Second).UTC(), }) } diff --git a/consul/autopilot.go b/consul/autopilot.go index 9156fd49e0..dafad50c32 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" @@ -102,7 +103,7 @@ func (s *Server) pruneDeadServers() error { go s.serfLAN.RemoveFailedNode(server) } } else { - s.logger.Printf("[ERR] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers) + s.logger.Printf("[DEBUG] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers) } return nil @@ -198,8 +199,6 @@ func (s *Server) serverHealthLoop() { case <-s.shutdownCh: return case <-ticker.C: - serverHealths := make(map[string]*structs.ServerHealth) - // Don't do anything if the min Raft version is too low minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) if err != nil { @@ -221,7 +220,8 @@ func (s *Server) serverHealthLoop() { break } - // Build an updated map of server healths + // Get the the serf members which are Consul servers + serverMap := make(map[string]serf.Member) for _, member := range s.LANMembers() { if member.Status == serf.StatusLeft { continue @@ -229,31 +229,78 @@ func (s *Server) serverHealthLoop() { valid, parts := agent.IsConsulServer(member) if valid { - health, err := s.queryServerHealth(member, parts, autopilotConf) - if err != nil { - s.logger.Printf("[ERR] consul: error fetching server health: %s", err) - serverHealths[parts.ID] = &structs.ServerHealth{ - ID: parts.ID, - Name: parts.Name, - Healthy: false, - } - } else { - serverHealths[parts.ID] = health - } + serverMap[parts.ID] = member } } - s.serverHealthLock.Lock() - s.serverHealths = serverHealths - s.serverHealthLock.Unlock() + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: error getting Raft configuration %s", err) + break + } + + // Build a current list of server healths + var clusterHealth structs.OperatorHealthReply + servers := future.Configuration().Servers + healthyCount := 0 + voterCount := 0 + for _, server := range servers { + member, ok := serverMap[string(server.ID)] + if !ok { + s.logger.Printf("[DEBUG] consul: couldn't find serf member for server with ID %q", server.ID) + continue + } + + health, err := s.queryServerHealth(member, autopilotConf) + if err != nil { + s.logger.Printf("[ERR] consul: error fetching server health: %s", err) + clusterHealth.Servers = append(clusterHealth.Servers, structs.ServerHealth{ + ID: string(server.ID), + Name: member.Name, + SerfStatus: serf.StatusFailed, + }) + continue + } + + if health.Healthy { + healthyCount++ + } + + if server.Suffrage != raft.Nonvoter { + health.Voter = true + voterCount++ + } + clusterHealth.Servers = append(clusterHealth.Servers, *health) + } + clusterHealth.Healthy = healthyCount == len(servers) + + // If we have extra healthy voters, update FailureTolerance + if voterCount > len(servers)/2+1 { + clusterHealth.FailureTolerance = voterCount - (len(servers)/2 + 1) + } + + // Heartbeat a metric for monitoring if we're the leader + if s.IsLeader() { + metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) + if clusterHealth.Healthy { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) + } else { + metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) + } + } + + s.clusterHealthLock.Lock() + s.clusterHealth = clusterHealth + s.clusterHealthLock.Unlock() } } } // queryServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth -func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, - autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { +func (s *Server) queryServerHealth(member serf.Member, autopilotConf *structs.AutopilotConfig) (*structs.ServerHealth, error) { + _, server := agent.IsConsulServer(member) + stats, err := s.getServerStats(server) if err != nil { return nil, fmt.Errorf("error getting raft stats: %s", err) @@ -297,14 +344,21 @@ func (s *Server) queryServerHealth(member serf.Member, server *agent.Server, return health, nil } +func (s *Server) getClusterHealth() structs.OperatorHealthReply { + s.clusterHealthLock.RLock() + defer s.clusterHealthLock.RUnlock() + return s.clusterHealth +} + func (s *Server) getServerHealth(id string) *structs.ServerHealth { - s.serverHealthLock.RLock() - defer s.serverHealthLock.RUnlock() - h, ok := s.serverHealths[id] - if !ok { - return nil + s.clusterHealthLock.RLock() + defer s.clusterHealthLock.RUnlock() + for _, health := range s.clusterHealth.Servers { + if health.ID == id { + return &health + } } - return h + return nil } func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { diff --git a/consul/autopilot_test.go b/consul/autopilot_test.go index 6d38d6a4de..ccdced9e90 100644 --- a/consul/autopilot_test.go +++ b/consul/autopilot_test.go @@ -12,15 +12,27 @@ import ( ) func TestAutopilot_CleanupDeadServer(t *testing.T) { - dir1, s1 := testServerDCBootstrap(t, "dc1", true) + for i := 1; i <= 3; i++ { + testCleanupDeadServer(t, i) + } +} + +func testCleanupDeadServer(t *testing.T, raftVersion int) { + conf := func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.BootstrapExpect = 3 + c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion) + } + dir1, s1 := testServerWithConfig(t, conf) defer os.RemoveAll(dir1) defer s1.Shutdown() - dir2, s2 := testServerDCBootstrap(t, "dc1", false) + dir2, s2 := testServerWithConfig(t, conf) defer os.RemoveAll(dir2) defer s2.Shutdown() - dir3, s3 := testServerDCBootstrap(t, "dc1", false) + dir3, s3 := testServerWithConfig(t, conf) defer os.RemoveAll(dir3) defer s3.Shutdown() @@ -45,8 +57,13 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) { }) } + // Bring up a new server + dir4, s4 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + // Kill a non-leader server - s2.Shutdown() + s3.Shutdown() testutil.WaitForResult(func() (bool, error) { alive := 0 @@ -60,15 +77,11 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) { t.Fatalf("should have 2 alive members") }) - // Bring up and join a new server - dir4, s4 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir4) - defer s4.Shutdown() - + // Join the new server if _, err := s4.JoinLAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } - servers[1] = s4 + servers[2] = s4 // Make sure the dead server is removed and we're back to 3 total peers for _, s := range servers { diff --git a/consul/leader.go b/consul/leader.go index ecdb996d50..5dd4909268 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -583,26 +583,42 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { // TODO (slackpad) - This will need to be changed once we support node IDs. addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String() + minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) + if err != nil { + return err + } + // See if it's already in the configuration. It's harmless to re-add it // but we want to avoid doing that if possible to prevent useless Raft - // log entries. + // log entries. If the address is the same but the ID changed, remove the + // old server before adding the new one. configFuture := s.raft.GetConfiguration() if err := configFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) return err } for _, server := range configFuture.Configuration().Servers { - if server.Address == raft.ServerAddress(addr) { + // No-op if the raft version is too low + if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) { return nil } + + // If the address or ID matches an existing server, see if we need to remove the old one first + if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) { + // Exit with no-op if this is being called on an existing server + if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) { + return nil + } else { + future := s.raft.RemoveServer(server.ID, 0, 0) + if err := future.Error(); err != nil { + return fmt.Errorf("error removing server with duplicate ID: %s", err) + } + s.logger.Printf("[INFO] consul: removed server with duplicate ID: %s", server.ID) + } + } } // Attempt to add as a peer - minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) - if err != nil { - return err - } - switch { case minRaftProtocol >= 3: addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0) @@ -635,7 +651,6 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { // removeConsulServer is used to try to remove a consul server that has left func (s *Server) removeConsulServer(m serf.Member, port int) error { - // TODO (slackpad) - This will need to be changed once we support node IDs. addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String() // See if it's already in the configuration. It's harmless to re-remove it diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index 16d8b75dc0..acbfb9ea3a 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -212,31 +212,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") } - var status structs.OperatorHealthReply - future := op.srv.raft.GetConfiguration() - if err := future.Error(); err != nil { - return err - } - - healthyCount := 0 - servers := future.Configuration().Servers - for _, s := range servers { - health := op.srv.getServerHealth(string(s.ID)) - if health != nil { - if health.Healthy { - healthyCount++ - } - status.Servers = append(status.Servers, *health) - } - } - status.Healthy = healthyCount == len(servers) - - // If we have extra healthy servers, set FailureTolerance - if healthyCount > len(servers)/2+1 { - status.FailureTolerance = healthyCount - (len(servers)/2 + 1) - } - - *reply = status + *reply = op.srv.getClusterHealth() return nil } diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index d430bd28fb..8fde7de4a8 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -429,22 +429,21 @@ func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { } func TestOperator_ServerHealth(t *testing.T) { - dir1, s1 := testServerWithConfig(t, func(c *Config) { + conf := func(c *Config) { c.Datacenter = "dc1" - c.Bootstrap = true + c.Bootstrap = false + c.BootstrapExpect = 3 c.RaftConfig.ProtocolVersion = 3 c.ServerHealthInterval = 100 * time.Millisecond - }) + c.AutopilotInterval = 100 * time.Millisecond + } + dir1, s1 := testServerWithConfig(t, conf) defer os.RemoveAll(dir1) defer s1.Shutdown() codec := rpcClient(t, s1) defer codec.Close() - dir2, s2 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = false - c.RaftConfig.ProtocolVersion = 3 - }) + dir2, s2 := testServerWithConfig(t, conf) defer os.RemoveAll(dir2) defer s2.Shutdown() addr := fmt.Sprintf("127.0.0.1:%d", @@ -453,11 +452,7 @@ func TestOperator_ServerHealth(t *testing.T) { t.Fatalf("err: %v", err) } - dir3, s3 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = false - c.RaftConfig.ProtocolVersion = 3 - }) + dir3, s3 := testServerWithConfig(t, conf) defer os.RemoveAll(dir3) defer s3.Shutdown() if _, err := s3.JoinLAN([]string{addr}); err != nil { diff --git a/consul/serf.go b/consul/serf.go index c9e5044720..c449d73a24 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -58,6 +58,7 @@ func (s *Server) lanEventHandler() { case serf.EventUser: s.localEvent(e.(serf.UserEvent)) case serf.EventMemberUpdate: // Ignore + s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventQuery: // Ignore default: s.logger.Printf("[WARN] consul: Unhandled LAN Serf Event: %#v", e) diff --git a/consul/server.go b/consul/server.go index 112b9aff43..668177ced0 100644 --- a/consul/server.go +++ b/consul/server.go @@ -88,6 +88,10 @@ type Server struct { // autopilotWaitGroup is used to block until Autopilot shuts down. autopilotWaitGroup sync.WaitGroup + // clusterHealth stores the current view of the cluster's health. + clusterHealth structs.OperatorHealthReply + clusterHealthLock sync.RWMutex + // Consul configuration config *Config @@ -157,10 +161,6 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex - // serverHealths stores the current view of server healths. - serverHealths map[string]*structs.ServerHealth - serverHealthLock sync.RWMutex - // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC diff --git a/consul/structs/operator.go b/consul/structs/operator.go index f17cd8f418..5fc6ef36df 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -127,6 +127,9 @@ type ServerHealth struct { // Autopilot config. Healthy bool + // Voter is whether this is a voting server. + Voter bool + // StableSince is the last time this server's Healthy value changed. StableSince time.Time } diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index dabc9980ac..6918dd8e21 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -365,6 +365,7 @@ A JSON body is returned that looks like this: "LastTerm": 2, "LastIndex": 46, "Healthy": true, + "Voter": true, "StableSince": "2017-03-06T22:07:51Z" }, { @@ -375,6 +376,7 @@ A JSON body is returned that looks like this: "LastTerm": 2, "LastIndex": 46, "Healthy": true, + "Voter": false, "StableSince": "2017-03-06T22:18:26Z" } ] diff --git a/website/source/docs/agent/telemetry.html.markdown b/website/source/docs/agent/telemetry.html.markdown index f23033a086..b63f686868 100644 --- a/website/source/docs/agent/telemetry.html.markdown +++ b/website/source/docs/agent/telemetry.html.markdown @@ -189,4 +189,16 @@ These metrics give insight into the health of the cluster as a whole.