Adds a stats fetcher to make sure we don't block the autopilot loop.

pull/2805/head
James Phillips 2017-03-17 18:42:28 -07:00
parent b333f3ea04
commit 867421b7d3
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
5 changed files with 97 additions and 19 deletions

View File

@ -34,6 +34,7 @@ type Server struct {
Version int
RaftVersion int
Addr net.Addr
Status serf.MemberStatus
}
// Key returns the corresponding Key
@ -104,6 +105,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
Addr: addr,
Version: vsn,
RaftVersion: raft_vsn,
Status: m.Status,
}
return true, parts
}

View File

@ -62,6 +62,7 @@ func TestIsConsulServer(t *testing.T) {
"vsn": "1",
"raft_vsn": "3",
},
Status: serf.StatusLeft,
}
ok, parts := agent.IsConsulServer(m)
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
@ -82,6 +83,9 @@ func TestIsConsulServer(t *testing.T) {
if parts.RaftVersion != 3 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
if parts.Status != serf.StatusLeft {
t.Fatalf("bad: %v", parts.Status)
}
m.Tags["bootstrap"] = "1"
m.Tags["disabled"] = "1"
ok, parts = agent.IsConsulServer(m)

View File

@ -229,7 +229,7 @@ func (s *Server) updateClusterHealth() error {
}
// Get the the serf members which are Consul servers
serverMap := make(map[string]serf.Member)
serverMap := make(map[string]*agent.Server)
for _, member := range s.LANMembers() {
if member.Status == serf.StatusLeft {
continue
@ -237,7 +237,7 @@ func (s *Server) updateClusterHealth() error {
valid, parts := agent.IsConsulServer(member)
if valid {
serverMap[parts.ID] = member
serverMap[parts.ID] = parts
}
}
@ -259,12 +259,12 @@ func (s *Server) updateClusterHealth() error {
Voter: server.Suffrage == raft.Voter,
}
member, ok := serverMap[string(server.ID)]
parts, ok := serverMap[string(server.ID)]
if ok {
health.Name = member.Name
health.SerfStatus = member.Status
if err := s.updateServerHealth(&health, member, autopilotConf); err != nil {
s.logger.Printf("[ERR] consul: error getting server health: %s", err)
health.Name = parts.Name
health.SerfStatus = parts.Status
if err := s.updateServerHealth(&health, parts, autopilotConf); err != nil {
s.logger.Printf("[WARN] consul: error getting server health: %s", err)
}
} else {
health.SerfStatus = serf.StatusNone
@ -306,12 +306,10 @@ func (s *Server) updateClusterHealth() error {
// updateServerHealth fetches the raft stats for the given server and uses them
// to update its ServerHealth
func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error {
_, server := agent.IsConsulServer(member)
stats, err := s.getServerStats(server)
func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent.Server, autopilotConf *structs.AutopilotConfig) error {
stats, err := s.statsFetcher.Fetch(server, s.config.ServerHealthInterval/2)
if err != nil {
return fmt.Errorf("error getting raft stats: %s", err)
return fmt.Errorf("error getting raft stats for %q: %s", server.Name, err)
}
health.LastTerm = stats.LastTerm
@ -357,10 +355,3 @@ func (s *Server) getServerHealth(id string) *structs.ServerHealth {
}
return nil
}
func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) {
var args struct{}
var reply structs.ServerStats
err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
return reply, err
}

View File

@ -161,6 +161,10 @@ type Server struct {
sessionTimers map[string]*time.Timer
sessionTimersLock sync.Mutex
// statsFetcher is used by autopilot to check the status of the other
// Consul servers.
statsFetcher *StatsFetcher
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
tombstoneGC *state.TombstoneGC
@ -255,6 +259,9 @@ func NewServer(config *Config) (*Server, error) {
}
s.autopilotPolicy = &BasicAutopilot{s}
// Initialize the stats fetcher that autopilot will use.
s.statsFetcher = NewStatsFetcher(s.shutdownCh, s.connPool, s.config.Datacenter)
// Initialize the authoritative ACL cache.
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)
if err != nil {

74
consul/stats_fetcher.go Normal file
View File

@ -0,0 +1,74 @@
package consul
import (
"fmt"
"sync"
"time"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs"
)
// StatsFetcher makes sure there's only one in-flight request for stats at any
// given time, and allows us to have a timeout so the autopilot loop doesn't get
// blocked if there's a slow server.
type StatsFetcher struct {
shutdownCh <-chan struct{}
pool *ConnPool
datacenter string
inflight map[string]struct{}
inflightLock sync.Mutex
}
// NewStatsFetcher returns a stats fetcher.
func NewStatsFetcher(shutdownCh <-chan struct{}, pool *ConnPool, datacenter string) *StatsFetcher {
return &StatsFetcher{
shutdownCh: shutdownCh,
pool: pool,
datacenter: datacenter,
inflight: make(map[string]struct{}),
}
}
// Fetch will attempt to get the server health for up to the timeout, and will
// also return an error immediately if there is a request still outstanding. We
// throw away results from any outstanding requests since we don't want to
// ingest stale health data.
func (f *StatsFetcher) Fetch(server *agent.Server, timeout time.Duration) (*structs.ServerStats, error) {
// Don't allow another request if there's another one outstanding.
f.inflightLock.Lock()
if _, ok := f.inflight[server.ID]; ok {
f.inflightLock.Unlock()
return nil, fmt.Errorf("stats request already outstanding")
}
f.inflight[server.ID] = struct{}{}
f.inflightLock.Unlock()
// Make the request in a goroutine.
errCh := make(chan error, 1)
var reply structs.ServerStats
go func() {
var args struct{}
errCh <- f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
f.inflightLock.Lock()
delete(f.inflight, server.ID)
f.inflightLock.Unlock()
}()
// Wait for something to happen.
select {
case <-f.shutdownCh:
return nil, fmt.Errorf("shutdown")
case err := <-errCh:
if err == nil {
return &reply, nil
} else {
return nil, err
}
case <-time.After(timeout):
return nil, fmt.Errorf("timeout")
}
}