From 867421b7d32baf49365306a692a0d6710550003c Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 17 Mar 2017 18:42:28 -0700 Subject: [PATCH] Adds a stats fetcher to make sure we don't block the autopilot loop. --- consul/agent/server.go | 2 + consul/agent/server_test.go | 4 ++ consul/autopilot.go | 29 +++++---------- consul/server.go | 7 ++++ consul/stats_fetcher.go | 74 +++++++++++++++++++++++++++++++++++++ 5 files changed, 97 insertions(+), 19 deletions(-) create mode 100644 consul/stats_fetcher.go diff --git a/consul/agent/server.go b/consul/agent/server.go index 6510f34eab..9a9ce8810b 100644 --- a/consul/agent/server.go +++ b/consul/agent/server.go @@ -34,6 +34,7 @@ type Server struct { Version int RaftVersion int Addr net.Addr + Status serf.MemberStatus } // Key returns the corresponding Key @@ -104,6 +105,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) { Addr: addr, Version: vsn, RaftVersion: raft_vsn, + Status: m.Status, } return true, parts } diff --git a/consul/agent/server_test.go b/consul/agent/server_test.go index 06321c7e2f..9e697d6d1b 100644 --- a/consul/agent/server_test.go +++ b/consul/agent/server_test.go @@ -62,6 +62,7 @@ func TestIsConsulServer(t *testing.T) { "vsn": "1", "raft_vsn": "3", }, + Status: serf.StatusLeft, } ok, parts := agent.IsConsulServer(m) if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { @@ -82,6 +83,9 @@ func TestIsConsulServer(t *testing.T) { if parts.RaftVersion != 3 { t.Fatalf("bad: %v", parts.RaftVersion) } + if parts.Status != serf.StatusLeft { + t.Fatalf("bad: %v", parts.Status) + } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" ok, parts = agent.IsConsulServer(m) diff --git a/consul/autopilot.go b/consul/autopilot.go index cc0632a1a0..e6746b7962 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -229,7 +229,7 @@ func (s *Server) updateClusterHealth() error { } // Get the the serf members which are Consul servers - serverMap := make(map[string]serf.Member) + serverMap := make(map[string]*agent.Server) for _, member := range s.LANMembers() { if member.Status == serf.StatusLeft { continue @@ -237,7 +237,7 @@ func (s *Server) updateClusterHealth() error { valid, parts := agent.IsConsulServer(member) if valid { - serverMap[parts.ID] = member + serverMap[parts.ID] = parts } } @@ -259,12 +259,12 @@ func (s *Server) updateClusterHealth() error { Voter: server.Suffrage == raft.Voter, } - member, ok := serverMap[string(server.ID)] + parts, ok := serverMap[string(server.ID)] if ok { - health.Name = member.Name - health.SerfStatus = member.Status - if err := s.updateServerHealth(&health, member, autopilotConf); err != nil { - s.logger.Printf("[ERR] consul: error getting server health: %s", err) + health.Name = parts.Name + health.SerfStatus = parts.Status + if err := s.updateServerHealth(&health, parts, autopilotConf); err != nil { + s.logger.Printf("[WARN] consul: error getting server health: %s", err) } } else { health.SerfStatus = serf.StatusNone @@ -306,12 +306,10 @@ func (s *Server) updateClusterHealth() error { // updateServerHealth fetches the raft stats for the given server and uses them // to update its ServerHealth -func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error { - _, server := agent.IsConsulServer(member) - - stats, err := s.getServerStats(server) +func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent.Server, autopilotConf *structs.AutopilotConfig) error { + stats, err := s.statsFetcher.Fetch(server, s.config.ServerHealthInterval/2) if err != nil { - return fmt.Errorf("error getting raft stats: %s", err) + return fmt.Errorf("error getting raft stats for %q: %s", server.Name, err) } health.LastTerm = stats.LastTerm @@ -357,10 +355,3 @@ func (s *Server) getServerHealth(id string) *structs.ServerHealth { } return nil } - -func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { - var args struct{} - var reply structs.ServerStats - err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) - return reply, err -} diff --git a/consul/server.go b/consul/server.go index 668177ced0..82f3597cf5 100644 --- a/consul/server.go +++ b/consul/server.go @@ -161,6 +161,10 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex + // statsFetcher is used by autopilot to check the status of the other + // Consul servers. + statsFetcher *StatsFetcher + // tombstoneGC is used to track the pending GC invocations // for the KV tombstones tombstoneGC *state.TombstoneGC @@ -255,6 +259,9 @@ func NewServer(config *Config) (*Server, error) { } s.autopilotPolicy = &BasicAutopilot{s} + // Initialize the stats fetcher that autopilot will use. + s.statsFetcher = NewStatsFetcher(s.shutdownCh, s.connPool, s.config.Datacenter) + // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) if err != nil { diff --git a/consul/stats_fetcher.go b/consul/stats_fetcher.go new file mode 100644 index 0000000000..ab7216d3aa --- /dev/null +++ b/consul/stats_fetcher.go @@ -0,0 +1,74 @@ +package consul + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/structs" +) + +// StatsFetcher makes sure there's only one in-flight request for stats at any +// given time, and allows us to have a timeout so the autopilot loop doesn't get +// blocked if there's a slow server. +type StatsFetcher struct { + shutdownCh <-chan struct{} + pool *ConnPool + datacenter string + inflight map[string]struct{} + inflightLock sync.Mutex +} + +// NewStatsFetcher returns a stats fetcher. +func NewStatsFetcher(shutdownCh <-chan struct{}, pool *ConnPool, datacenter string) *StatsFetcher { + return &StatsFetcher{ + shutdownCh: shutdownCh, + pool: pool, + datacenter: datacenter, + inflight: make(map[string]struct{}), + } +} + +// Fetch will attempt to get the server health for up to the timeout, and will +// also return an error immediately if there is a request still outstanding. We +// throw away results from any outstanding requests since we don't want to +// ingest stale health data. +func (f *StatsFetcher) Fetch(server *agent.Server, timeout time.Duration) (*structs.ServerStats, error) { + // Don't allow another request if there's another one outstanding. + f.inflightLock.Lock() + if _, ok := f.inflight[server.ID]; ok { + f.inflightLock.Unlock() + return nil, fmt.Errorf("stats request already outstanding") + } + f.inflight[server.ID] = struct{}{} + f.inflightLock.Unlock() + + // Make the request in a goroutine. + errCh := make(chan error, 1) + var reply structs.ServerStats + go func() { + var args struct{} + errCh <- f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + + f.inflightLock.Lock() + delete(f.inflight, server.ID) + f.inflightLock.Unlock() + }() + + // Wait for something to happen. + select { + case <-f.shutdownCh: + return nil, fmt.Errorf("shutdown") + + case err := <-errCh: + if err == nil { + return &reply, nil + } else { + return nil, err + } + + case <-time.After(timeout): + return nil, fmt.Errorf("timeout") + } +}