Make some final tweaks to autopilot package

pull/3667/merge
Kyle Havlovitz 2017-12-18 12:26:35 -08:00
parent a86d11ec0a
commit d08ab9fd19
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
5 changed files with 118 additions and 32 deletions

View File

@ -143,6 +143,14 @@ func NumPeers(raftConfig raft.Configuration) int {
return numPeers
}
// RemoveDeadServers triggers a pruning of dead servers in a non-blocking way.
func (a *Autopilot) RemoveDeadServers() {
select {
case a.removeDeadCh <- struct{}{}:
default:
}
}
// pruneDeadServers removes up to numPeers/2 failed servers
func (a *Autopilot) pruneDeadServers() error {
conf := a.delegate.AutopilotConfig()
@ -223,14 +231,17 @@ func (a *Autopilot) pruneDeadServers() error {
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
func (a *Autopilot) MinRaftProtocol() (int, error) {
return minRaftProtocol(a.delegate.Serf().Members(), a.delegate.IsServer)
}
func minRaftProtocol(members []serf.Member, serverFunc func(serf.Member) (*ServerInfo, error)) (int, error) {
minVersion := -1
members := a.delegate.Serf().Members()
for _, m := range members {
if m.Status != serf.StatusAlive {
continue
}
server, err := a.delegate.IsServer(m)
server, err := serverFunc(m)
if err != nil {
return -1, err
}

View File

@ -0,0 +1,86 @@
package autopilot
import (
"errors"
"net"
"testing"
"github.com/hashicorp/serf/serf"
)
func TestMinRaftProtocol(t *testing.T) {
t.Parallel()
makeMember := func(version string) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"dc": "dc1",
"port": "10000",
"vsn": "1",
"raft_vsn": version,
},
Status: serf.StatusAlive,
}
}
cases := []struct {
members []serf.Member
expected int
err error
}{
// No servers, error
{
members: []serf.Member{},
expected: -1,
err: errors.New("No servers found"),
},
// One server
{
members: []serf.Member{
makeMember("1"),
},
expected: 1,
},
// One server, bad version formatting
{
members: []serf.Member{
makeMember("asdf"),
},
expected: -1,
err: errors.New(`strconv.Atoi: parsing "asdf": invalid syntax`),
},
// Multiple servers, different versions
{
members: []serf.Member{
makeMember("1"),
makeMember("2"),
},
expected: 1,
},
// Multiple servers, same version
{
members: []serf.Member{
makeMember("2"),
makeMember("2"),
},
expected: 2,
},
}
serverFunc := func(m serf.Member) (*ServerInfo, error) {
return &ServerInfo{}, nil
}
for _, tc := range cases {
result, err := minRaftProtocol(tc.members, serverFunc)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.expected, tc)
}
if tc.err != nil {
if err == nil || tc.err.Error() != err.Error() {
t.Fatalf("bad: %v, %v, %v", err, tc.err, tc)
}
}
}
}

View File

@ -416,7 +416,7 @@ func DefaultConfig() *Config {
TLSMinVersion: "tls10",
// TODO (slackpad) - Until #3744 is done, we need to keep these
// TODO (slackpad) - Until #3744 is done, we need to keep these
// in sync with agent/config/default.go.
AutopilotConfig: &autopilot.Config{
CleanupDeadServers: true,

View File

@ -738,10 +738,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
}
// Trigger a check to remove dead servers
select {
case s.autopilotRemoveDeadCh <- struct{}{}:
default:
}
s.autopilot.RemoveDeadServers()
return nil
}

View File

@ -90,12 +90,6 @@ type Server struct {
// autopilot is the Autopilot instance for this server.
autopilot *autopilot.Autopilot
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
autopilotRemoveDeadCh chan struct{}
// autopilotShutdownCh is used to stop the Autopilot loop.
autopilotShutdownCh chan struct{}
// autopilotWaitGroup is used to block until Autopilot shuts down.
autopilotWaitGroup sync.WaitGroup
@ -281,25 +275,23 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// Create server.
s := &Server{
autopilotRemoveDeadCh: make(chan struct{}),
autopilotShutdownCh: make(chan struct{}),
config: config,
tokens: tokens,
connPool: connPool,
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, 32),
router: router.NewRouter(logger, config.Datacenter),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
shutdownCh: shutdownCh,
config: config,
tokens: tokens,
connPool: connPool,
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, 32),
router: router.NewRouter(logger, config.Datacenter),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
shutdownCh: shutdownCh,
}
// Set up autopilot