Merge pull request #2897 from hashicorp/autopilot-config-fix

Wait to initialize autopilot until all servers are >= 0.8.0
pull/2911/head
Kyle Havlovitz 8 years ago committed by GitHub
commit 19540bb627

@ -10,6 +10,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -33,6 +34,8 @@ func (s *Server) stopAutopilot() {
s.autopilotWaitGroup.Wait() s.autopilotWaitGroup.Wait()
} }
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. // autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
func (s *Server) autopilotLoop() { func (s *Server) autopilotLoop() {
defer s.autopilotWaitGroup.Done() defer s.autopilotWaitGroup.Done()
@ -46,22 +49,25 @@ func (s *Server) autopilotLoop() {
case <-s.autopilotShutdownCh: case <-s.autopilotShutdownCh:
return return
case <-ticker.C: case <-ticker.C:
state := s.fsm.State() autopilotConfig, ok := s.getOrCreateAutopilotConfig()
_, autopilotConf, err := state.AutopilotConfig() if !ok {
if err != nil { continue
s.logger.Printf("[ERR] autopilot: error retrieving config from state store: %s", err)
break
} }
if err := s.autopilotPolicy.PromoteNonVoters(autopilotConf); err != nil { if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil {
s.logger.Printf("[ERR] autopilot: error checking for non-voters to promote: %s", err) s.logger.Printf("[ERR] autopilot: error checking for non-voters to promote: %s", err)
} }
if err := s.pruneDeadServers(); err != nil { if err := s.pruneDeadServers(autopilotConfig); err != nil {
s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err) s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err)
} }
case <-s.autopilotRemoveDeadCh: case <-s.autopilotRemoveDeadCh:
if err := s.pruneDeadServers(); err != nil { autopilotConfig, ok := s.getOrCreateAutopilotConfig()
if !ok {
continue
}
if err := s.pruneDeadServers(autopilotConfig); err != nil {
s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err) s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err)
} }
} }
@ -69,19 +75,13 @@ func (s *Server) autopilotLoop() {
} }
// pruneDeadServers removes up to numPeers/2 failed servers // pruneDeadServers removes up to numPeers/2 failed servers
func (s *Server) pruneDeadServers() error { func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error {
state := s.fsm.State()
_, autopilotConf, err := state.AutopilotConfig()
if err != nil {
return err
}
// Find any failed servers // Find any failed servers
var failed []string var failed []string
staleRaftServers := make(map[string]raft.Server) staleRaftServers := make(map[string]raft.Server)
if autopilotConf.CleanupDeadServers { if autopilotConfig.CleanupDeadServers {
future := s.raft.GetConfiguration() future := s.raft.GetConfiguration()
if future.Error() != nil { if err := future.Error(); err != nil {
return err return err
} }
@ -155,7 +155,7 @@ type BasicAutopilot struct {
} }
// PromoteNonVoters promotes eligible non-voting servers to voters. // PromoteNonVoters promotes eligible non-voting servers to voters.
func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig) error { func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error {
minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers()) minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
if err != nil { if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err) return fmt.Errorf("error getting server raft protocol versions: %s", err)
@ -178,7 +178,7 @@ func (b *BasicAutopilot) PromoteNonVoters(autopilotConf *structs.AutopilotConfig
// If this server has been stable and passing for long enough, promote it to a voter // If this server has been stable and passing for long enough, promote it to a voter
if !isVoter(server.Suffrage) { if !isVoter(server.Suffrage) {
health := b.server.getServerHealth(string(server.ID)) health := b.server.getServerHealth(string(server.ID))
if health.IsStable(time.Now(), autopilotConf) { if health.IsStable(time.Now(), autopilotConfig) {
promotions = append(promotions, server) promotions = append(promotions, server)
} }
} else { } else {

@ -191,6 +191,8 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
} }
} }
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Add s4 to peers directly // Add s4 to peers directly
s4addr := fmt.Sprintf("127.0.0.1:%d", s4addr := fmt.Sprintf("127.0.0.1:%d",
s4.config.SerfLANConfig.MemberlistConfig.BindPort) s4.config.SerfLANConfig.MemberlistConfig.BindPort)

@ -153,11 +153,8 @@ func (s *Server) establishLeadership() error {
return err return err
} }
// Setup autopilot config if we are the leader and need to // Setup autopilot config if we need to
if err := s.initializeAutopilot(); err != nil { s.getOrCreateAutopilotConfig()
s.logger.Printf("[ERR] consul: Autopilot initialization failed: %v", err)
return err
}
s.startAutopilot() s.startAutopilot()
@ -249,27 +246,31 @@ func (s *Server) initializeACL() error {
return nil return nil
} }
// initializeAutopilot is used to setup the autopilot config if we are // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
// the leader and need to do this func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) {
func (s *Server) initializeAutopilot() error {
// Bail if the config has already been initialized
state := s.fsm.State() state := s.fsm.State()
_, config, err := state.AutopilotConfig() _, config, err := state.AutopilotConfig()
if err != nil { if err != nil {
return fmt.Errorf("failed to get autopilot config: %v", err) s.logger.Printf("[ERR] autopilot: failed to get config: %v", err)
return nil, false
} }
if config != nil { if config != nil {
return nil return config, true
} }
req := structs.AutopilotSetConfigRequest{ if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) {
Config: *s.config.AutopilotConfig, s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String())
return nil, false
} }
config = s.config.AutopilotConfig
req := structs.AutopilotSetConfigRequest{Config: *config}
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil { if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
return fmt.Errorf("failed to initialize autopilot config") s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err)
return nil, false
} }
return nil return config, true
} }
// reconcile is used to reconcile the differences between Serf // reconcile is used to reconcile the differences between Serf

@ -26,6 +26,9 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r
if err != nil { if err != nil {
return err return err
} }
if config == nil {
return fmt.Errorf("autopilot config not initialized yet")
}
*reply = *config *reply = *config

@ -7,6 +7,8 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -295,3 +297,17 @@ func runtimeStats() map[string]string {
"cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10), "cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10),
} }
} }
// ServersMeetMinimumVersion returns whether the given alive servers are at least on the
// given Consul version
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool {
for _, member := range members {
if valid, parts := agent.IsConsulServer(member); valid && parts.Status == serf.StatusAlive {
if parts.Build.LessThan(minVersion) {
return false
}
}
}
return true
}

@ -7,6 +7,7 @@ import (
"regexp" "regexp"
"testing" "testing"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -325,3 +326,72 @@ func TestGetPublicIPv6(t *testing.T) {
} }
} }
} }
func TestServersMeetMinimumVersion(t *testing.T) {
makeMember := func(version string) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"build": version,
"wan_join_port": "1234",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
},
Status: serf.StatusAlive,
}
}
cases := []struct {
members []serf.Member
ver *version.Version
expected bool
}{
// One server, meets reqs
{
members: []serf.Member{
makeMember("0.7.5"),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
},
// One server, doesn't meet reqs
{
members: []serf.Member{
makeMember("0.7.5"),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
},
// Multiple servers, meets req version
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.8.0"),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
},
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.8.0"),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
},
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
}
}

Loading…
Cancel
Save