Browse Source

Move some things around to allow for license updating via config reload

The bulk of this commit is moving the LeaderRoutineManager from the agent/consul package into its own package: lib/gort. It also got a renaming and its Start method now requires a context. Requiring that context required updating a whole bunch of other places in the code.
pull/10270/head
Matt Keeler 4 years ago committed by Matt Keeler
parent
commit
da31e0449e
  1. 2
      .changelog/10267.txt
  2. 6
      agent/agent.go
  3. 6
      agent/config/config.go
  4. 4
      agent/consul/acl_token_exp.go
  5. 3
      agent/consul/enterprise_server_oss.go
  6. 50
      agent/consul/leader.go
  7. 8
      agent/consul/leader_connect.go
  8. 19
      agent/consul/leader_connect_ca.go
  9. 6
      agent/consul/leader_federation_state_ae.go
  10. 6
      agent/consul/leader_intentions.go
  11. 7
      agent/consul/replication_test.go
  12. 9
      agent/consul/server.go
  13. 2
      agent/setup.go
  14. 2
      agent/setup_oss.go
  15. 4
      agent/testagent.go
  16. 78
      lib/routine/routine.go
  17. 39
      lib/routine/routine_test.go

2
.changelog/10267.txt

@ -0,0 +1,2 @@
```release-note:improvement
licensing: **(Enterprise Only)** Consul Enterprise has gained the ability update its license via a configuration reload. The same environment variables and configurations will be used to determine the new license.```

6
agent/agent.go

@ -49,6 +49,7 @@ import (
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
@ -327,6 +328,10 @@ type Agent struct {
// into Agent, which will allow us to remove this field.
rpcClientHealth *health.Client
// routineManager is responsible for managing longer running go routines
// run by the Agent
routineManager *routine.Manager
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
}
@ -371,6 +376,7 @@ func New(bd BaseDeps) (*Agent, error) {
tlsConfigurator: bd.TLSConfigurator,
config: bd.RuntimeConfig,
cache: bd.Cache,
routineManager: routine.NewManager(bd.Logger),
}
// TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent

6
agent/config/config.go

@ -290,6 +290,12 @@ type Config struct {
SegmentName *string `mapstructure:"segment"`
// Enterprise Only
Segments []Segment `mapstructure:"segments"`
// Enterprise Only - not user configurable
LicensePollBaseTime *string `mapstructure:"license_poll_base_time"`
LicensePollMaxTime *string `mapstructure:"license_poll_max_time"`
LicenseUpdateBaseTime *string `mapstructure:"license_update_base_time"`
LicenseUpdateMaxTime *string `mapstructure:"license_update_max_time"`
}
type GossipLANConfig struct {

4
agent/consul/acl_token_exp.go

@ -30,7 +30,7 @@ func (s *Server) reapExpiredTokens(ctx context.Context) error {
}
}
func (s *Server) startACLTokenReaping() {
func (s *Server) startACLTokenReaping(ctx context.Context) {
// Do a quick check for config settings that would imply the goroutine
// below will just spin forever.
//
@ -41,7 +41,7 @@ func (s *Server) startACLTokenReaping() {
return
}
s.leaderRoutineManager.Start(aclTokenReapingRoutineName, s.reapExpiredTokens)
s.leaderRoutineManager.Start(ctx, aclTokenReapingRoutineName, s.reapExpiredTokens)
}
func (s *Server) stopACLTokenReaping() {

3
agent/consul/enterprise_server_oss.go

@ -3,6 +3,7 @@
package consul
import (
"context"
"errors"
"net"
"strings"
@ -46,7 +47,7 @@ func (s *Server) handleEnterpriseLeave() {
return
}
func (s *Server) establishEnterpriseLeadership() error {
func (s *Server) establishEnterpriseLeadership(_ context.Context) error {
return nil
}

50
agent/consul/leader.go

@ -115,7 +115,7 @@ func (s *Server) monitorLeadership() {
if canUpgrade := s.canUpgradeToNewACLs(weAreLeaderCh != nil); canUpgrade {
if weAreLeaderCh != nil {
if err := s.initializeACLs(true); err != nil {
if err := s.initializeACLs(&lib.StopChannelContext{StopCh: weAreLeaderCh}, true); err != nil {
s.logger.Error("error transitioning to using new ACLs", "error", err)
continue
}
@ -308,12 +308,12 @@ func (s *Server) establishLeadership(ctx context.Context) error {
// check for the upgrade here - this helps us transition to new ACLs much
// quicker if this is a new cluster or this is a test agent
if canUpgrade := s.canUpgradeToNewACLs(true); canUpgrade {
if err := s.initializeACLs(true); err != nil {
if err := s.initializeACLs(ctx, true); err != nil {
return err
}
atomic.StoreInt32(&s.useNewACLs, 1)
s.updateACLAdvertisement()
} else if err := s.initializeACLs(false); err != nil {
} else if err := s.initializeACLs(ctx, false); err != nil {
return err
}
@ -337,20 +337,20 @@ func (s *Server) establishLeadership(ctx context.Context) error {
return err
}
if err := s.establishEnterpriseLeadership(); err != nil {
if err := s.establishEnterpriseLeadership(ctx); err != nil {
return err
}
s.getOrCreateAutopilotConfig()
s.autopilot.Start(ctx)
s.startConfigReplication()
s.startConfigReplication(ctx)
s.startFederationStateReplication()
s.startFederationStateReplication(ctx)
s.startFederationStateAntiEntropy()
s.startFederationStateAntiEntropy(ctx)
if err := s.startConnectLeader(); err != nil {
if err := s.startConnectLeader(ctx); err != nil {
return err
}
@ -499,7 +499,7 @@ func (s *Server) initializeLegacyACL() error {
// initializeACLs is used to setup the ACLs if we are the leader
// and need to do this.
func (s *Server) initializeACLs(upgrade bool) error {
func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
if !s.config.ACLsEnabled {
return nil
}
@ -673,11 +673,11 @@ func (s *Server) initializeACLs(upgrade bool) error {
}
}
// launch the upgrade go routine to generate accessors for everything
s.startACLUpgrade()
s.startACLUpgrade(ctx)
} else {
if s.UseLegacyACLs() && !upgrade {
if s.IsACLReplicationEnabled() {
s.startLegacyACLReplication()
s.startLegacyACLReplication(ctx)
}
// return early as we don't want to start new ACL replication
// or ACL token reaping as these are new ACL features.
@ -689,10 +689,10 @@ func (s *Server) initializeACLs(upgrade bool) error {
}
// ACL replication is now mandatory
s.startACLReplication()
s.startACLReplication(ctx)
}
s.startACLTokenReaping()
s.startACLTokenReaping(ctx)
return nil
}
@ -771,13 +771,13 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
}
}
func (s *Server) startACLUpgrade() {
func (s *Server) startACLUpgrade(ctx context.Context) {
if s.config.PrimaryDatacenter != s.config.Datacenter {
// token upgrades should only run in the primary
return
}
s.leaderRoutineManager.Start(aclUpgradeRoutineName, s.legacyACLTokenUpgrade)
s.leaderRoutineManager.Start(ctx, aclUpgradeRoutineName, s.legacyACLTokenUpgrade)
}
func (s *Server) stopACLUpgrade() {
@ -826,7 +826,7 @@ func (s *Server) runLegacyACLReplication(ctx context.Context) error {
}
}
func (s *Server) startLegacyACLReplication() {
func (s *Server) startLegacyACLReplication(ctx context.Context) {
if s.InACLDatacenter() {
return
}
@ -840,12 +840,12 @@ func (s *Server) startLegacyACLReplication() {
s.initReplicationStatus()
s.leaderRoutineManager.Start(legacyACLReplicationRoutineName, s.runLegacyACLReplication)
s.leaderRoutineManager.Start(ctx, legacyACLReplicationRoutineName, s.runLegacyACLReplication)
s.logger.Info("started legacy ACL replication")
s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
}
func (s *Server) startACLReplication() {
func (s *Server) startACLReplication(ctx context.Context) {
if s.InACLDatacenter() {
return
}
@ -858,11 +858,11 @@ func (s *Server) startACLReplication() {
}
s.initReplicationStatus()
s.leaderRoutineManager.Start(aclPolicyReplicationRoutineName, s.runACLPolicyReplicator)
s.leaderRoutineManager.Start(aclRoleReplicationRoutineName, s.runACLRoleReplicator)
s.leaderRoutineManager.Start(ctx, aclPolicyReplicationRoutineName, s.runACLPolicyReplicator)
s.leaderRoutineManager.Start(ctx, aclRoleReplicationRoutineName, s.runACLRoleReplicator)
if s.config.ACLTokenReplication {
s.leaderRoutineManager.Start(aclTokenReplicationRoutineName, s.runACLTokenReplicator)
s.leaderRoutineManager.Start(ctx, aclTokenReplicationRoutineName, s.runACLTokenReplicator)
s.updateACLReplicationStatusRunning(structs.ACLReplicateTokens)
} else {
s.updateACLReplicationStatusRunning(structs.ACLReplicatePolicies)
@ -973,13 +973,13 @@ func (s *Server) stopACLReplication() {
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
}
func (s *Server) startConfigReplication() {
func (s *Server) startConfigReplication(ctx context.Context) {
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
// replication shouldn't run in the primary DC
return
}
s.leaderRoutineManager.Start(configReplicationRoutineName, s.configReplicator.Run)
s.leaderRoutineManager.Start(ctx, configReplicationRoutineName, s.configReplicator.Run)
}
func (s *Server) stopConfigReplication() {
@ -987,7 +987,7 @@ func (s *Server) stopConfigReplication() {
s.leaderRoutineManager.Stop(configReplicationRoutineName)
}
func (s *Server) startFederationStateReplication() {
func (s *Server) startFederationStateReplication(ctx context.Context) {
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
// replication shouldn't run in the primary DC
return
@ -998,7 +998,7 @@ func (s *Server) startFederationStateReplication() {
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
}
s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run)
s.leaderRoutineManager.Start(ctx, federationStateReplicationRoutineName, s.federationStateReplicator.Run)
}
func (s *Server) stopFederationStateReplication() {

8
agent/consul/leader_connect.go

@ -29,15 +29,15 @@ var (
)
// startConnectLeader starts multi-dc connect leader routines.
func (s *Server) startConnectLeader() error {
func (s *Server) startConnectLeader(ctx context.Context) error {
if !s.config.ConnectEnabled {
return nil
}
s.caManager.Start()
s.leaderRoutineManager.Start(caRootPruningRoutineName, s.runCARootPruning)
s.caManager.Start(ctx)
s.leaderRoutineManager.Start(ctx, caRootPruningRoutineName, s.runCARootPruning)
return s.startIntentionConfigEntryMigration()
return s.startIntentionConfigEntryMigration(ctx)
}
// stopConnectLeader stops connect specific leader functions.

19
agent/consul/leader_connect_ca.go

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
)
@ -65,7 +66,7 @@ type CAManager struct {
primaryRoots structs.IndexedCARoots // The most recently seen state of the root CAs from the primary datacenter.
actingSecondaryCA bool // True if this datacenter has been initialized as a secondary CA.
leaderRoutineManager *LeaderRoutineManager
leaderRoutineManager *routine.Manager
}
type caDelegateWithState struct {
@ -76,7 +77,7 @@ func (c *caDelegateWithState) State() *state.Store {
return c.fsm.State()
}
func NewCAManager(delegate caServerDelegate, leaderRoutineManager *LeaderRoutineManager, logger hclog.Logger, config *Config) *CAManager {
func NewCAManager(delegate caServerDelegate, leaderRoutineManager *routine.Manager, logger hclog.Logger, config *Config) *CAManager {
return &CAManager{
delegate: delegate,
logger: logger,
@ -247,7 +248,7 @@ func (c *CAManager) setCAProvider(newProvider ca.Provider, root *structs.CARoot)
c.providerLock.Unlock()
}
func (c *CAManager) Start() {
func (c *CAManager) Start(ctx context.Context) {
// Attempt to initialize the Connect CA now. This will
// happen during leader establishment and it would be great
// if the CA was ready to go once that process was finished.
@ -257,11 +258,11 @@ func (c *CAManager) Start() {
// we failed to fully initialize the CA so we need to spawn a
// go routine to retry this process until it succeeds or we lose
// leadership and the go routine gets stopped.
c.leaderRoutineManager.Start(backgroundCAInitializationRoutineName, c.backgroundCAInitialization)
c.leaderRoutineManager.Start(ctx, backgroundCAInitializationRoutineName, c.backgroundCAInitialization)
} else {
// We only start these if CA initialization was successful. If not the completion of the
// background CA initialization will start these routines.
c.startPostInitializeRoutines()
c.startPostInitializeRoutines(ctx)
}
}
@ -271,13 +272,13 @@ func (c *CAManager) Stop() {
c.leaderRoutineManager.Stop(backgroundCAInitializationRoutineName)
}
func (c *CAManager) startPostInitializeRoutines() {
func (c *CAManager) startPostInitializeRoutines(ctx context.Context) {
// Start the Connect secondary DC actions if enabled.
if c.serverConf.Datacenter != c.serverConf.PrimaryDatacenter {
c.leaderRoutineManager.Start(secondaryCARootWatchRoutineName, c.secondaryCARootWatch)
c.leaderRoutineManager.Start(ctx, secondaryCARootWatchRoutineName, c.secondaryCARootWatch)
}
c.leaderRoutineManager.Start(intermediateCertRenewWatchRoutineName, c.intermediateCertRenewalWatch)
c.leaderRoutineManager.Start(ctx, intermediateCertRenewWatchRoutineName, c.intermediateCertRenewalWatch)
}
func (c *CAManager) backgroundCAInitialization(ctx context.Context) error {
@ -294,7 +295,7 @@ func (c *CAManager) backgroundCAInitialization(ctx context.Context) error {
c.logger.Info("Successfully initialized the Connect CA")
c.startPostInitializeRoutines()
c.startPostInitializeRoutines(ctx)
return nil
}

6
agent/consul/leader_federation_state_ae.go

@ -17,7 +17,7 @@ const (
federationStatePruneInterval = time.Hour
)
func (s *Server) startFederationStateAntiEntropy() {
func (s *Server) startFederationStateAntiEntropy(ctx context.Context) {
// Check to see if we can skip waiting for serf feature detection below.
if !s.DatacenterSupportsFederationStates() {
_, fedStates, err := s.fsm.State().FederationStateList(nil)
@ -31,12 +31,12 @@ func (s *Server) startFederationStateAntiEntropy() {
if s.config.DisableFederationStateAntiEntropy {
return
}
s.leaderRoutineManager.Start(federationStateAntiEntropyRoutineName, s.federationStateAntiEntropySync)
s.leaderRoutineManager.Start(ctx, federationStateAntiEntropyRoutineName, s.federationStateAntiEntropySync)
// If this is the primary, then also prune any stale datacenters from the
// list of federation states.
if s.config.PrimaryDatacenter == s.config.Datacenter {
s.leaderRoutineManager.Start(federationStatePruningRoutineName, s.federationStatePruning)
s.leaderRoutineManager.Start(ctx, federationStatePruningRoutineName, s.federationStatePruning)
}
}

6
agent/consul/leader_intentions.go

@ -15,7 +15,7 @@ const (
maxIntentionTxnSize = raftWarnSize / 4
)
func (s *Server) startIntentionConfigEntryMigration() error {
func (s *Server) startIntentionConfigEntryMigration(ctx context.Context) error {
if !s.config.ConnectEnabled {
return nil
}
@ -56,14 +56,14 @@ func (s *Server) startIntentionConfigEntryMigration() error {
}
// When running in the primary we do all of the real work.
s.leaderRoutineManager.Start(intentionMigrationRoutineName, s.legacyIntentionMigration)
s.leaderRoutineManager.Start(ctx, intentionMigrationRoutineName, s.legacyIntentionMigration)
} else {
// When running in the secondary we mostly just wait until the
// primary finishes, and then wait until we're pretty sure the main
// config entry replication thread has seen all of the
// migration-related config entry edits before zeroing OUR copy of
// the old intentions table.
s.leaderRoutineManager.Start(intentionMigrationRoutineName, s.legacyIntentionMigrationInSecondaryDC)
s.leaderRoutineManager.Start(ctx, intentionMigrationRoutineName, s.legacyIntentionMigrationInSecondaryDC)
}
return nil

7
agent/consul/replication_test.go

@ -5,6 +5,7 @@ import (
"fmt"
"testing"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
@ -12,7 +13,7 @@ import (
)
func TestReplicationRestart(t *testing.T) {
mgr := NewLeaderRoutineManager(testutil.Logger(t))
mgr := routine.NewManager(testutil.Logger(t))
config := ReplicatorConfig{
Name: "mock",
@ -30,9 +31,9 @@ func TestReplicationRestart(t *testing.T) {
repl, err := NewReplicator(&config)
require.NoError(t, err)
mgr.Start("mock", repl.Run)
mgr.Start(context.Background(), "mock", repl.Run)
mgr.Stop("mock")
mgr.Start("mock", repl.Run)
mgr.Start(context.Background(), "mock", repl.Run)
// Previously this would have segfaulted
mgr.Stop("mock")
}

9
agent/consul/server.go

@ -43,6 +43,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
@ -298,7 +299,7 @@ type Server struct {
dcSupportsIntentionsAsConfigEntries int32
// Manager to handle starting/stopping go routines when establishing/revoking raft leadership
leaderRoutineManager *LeaderRoutineManager
leaderRoutineManager *routine.Manager
// embedded struct to hold all the enterprise specific data
EnterpriseServer
@ -375,7 +376,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
tombstoneGC: gc,
serverLookup: NewServerLookup(),
shutdownCh: shutdownCh,
leaderRoutineManager: NewLeaderRoutineManager(logger),
leaderRoutineManager: routine.NewManager(logger.Named(logging.Leader)),
aclAuthMethodValidators: authmethod.NewCache(),
fsm: newFSMFromConfig(flat.Logger, gc, config),
}
@ -1319,6 +1320,10 @@ func (s *Server) RegisterEndpoint(name string, handler interface{}) error {
return s.rpcServer.RegisterName(name, handler)
}
func (s *Server) FSM() *fsm.FSM {
return s.fsm
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (s *Server) Stats() map[string]map[string]string {

2
agent/setup.go

@ -125,7 +125,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
TLSConfigurator: d.TLSConfigurator,
Cache: d.Cache,
Tokens: d.Tokens,
EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps),
EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps, cfg),
}
d.AutoConfig, err = autoconf.New(acConf)

2
agent/setup_oss.go

@ -15,6 +15,6 @@ func initEnterpriseBaseDeps(d BaseDeps, _ *config.RuntimeConfig) (BaseDeps, erro
}
// initEnterpriseAutoConfig is responsible for setting up auto-config for enterprise
func initEnterpriseAutoConfig(_ consul.EnterpriseDeps) autoconf.EnterpriseConfig {
func initEnterpriseAutoConfig(_ consul.EnterpriseDeps, _ *config.RuntimeConfig) autoconf.EnterpriseConfig {
return autoconf.EnterpriseConfig{}
}

4
agent/testagent.go

@ -86,7 +86,9 @@ type TestAgent struct {
// The caller is responsible for calling Shutdown() to stop the agent and remove
// temporary directories.
func NewTestAgent(t *testing.T, hcl string) *TestAgent {
return StartTestAgent(t, TestAgent{HCL: hcl})
a := StartTestAgent(t, TestAgent{HCL: hcl})
t.Cleanup(func() { a.Shutdown() })
return a
}
// StartTestAgent and wait for it to become available. If the agent fails to

78
agent/consul/leader_routine_manager.go → lib/routine/routine.go

@ -1,22 +1,21 @@
package consul
package routine
import (
"context"
"os"
"sync"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
)
type LeaderRoutine func(ctx context.Context) error
type Routine func(ctx context.Context) error
type leaderRoutine struct {
type routineTracker struct {
cancel context.CancelFunc
stoppedCh chan struct{} // closed when no longer running
}
func (r *leaderRoutine) running() bool {
func (r *routineTracker) running() bool {
select {
case <-r.stoppedCh:
return false
@ -25,27 +24,27 @@ func (r *leaderRoutine) running() bool {
}
}
type LeaderRoutineManager struct {
type Manager struct {
lock sync.RWMutex
logger hclog.Logger
routines map[string]*leaderRoutine
routines map[string]*routineTracker
}
func NewLeaderRoutineManager(logger hclog.Logger) *LeaderRoutineManager {
func NewManager(logger hclog.Logger) *Manager {
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{
Output: os.Stderr,
})
}
return &LeaderRoutineManager{
logger: logger.Named(logging.Leader),
routines: make(map[string]*leaderRoutine),
return &Manager{
logger: logger,
routines: make(map[string]*routineTracker),
}
}
func (m *LeaderRoutineManager) IsRunning(name string) bool {
func (m *Manager) IsRunning(name string) bool {
m.lock.Lock()
defer m.lock.Unlock()
@ -56,11 +55,7 @@ func (m *LeaderRoutineManager) IsRunning(name string) bool {
return false
}
func (m *LeaderRoutineManager) Start(name string, routine LeaderRoutine) error {
return m.StartWithContext(context.TODO(), name, routine)
}
func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name string, routine LeaderRoutine) error {
func (m *Manager) Start(ctx context.Context, name string, routine Routine) error {
m.lock.Lock()
defer m.lock.Unlock()
@ -68,38 +63,41 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
return nil
}
if parentCtx == nil {
parentCtx = context.Background()
if ctx == nil {
ctx = context.Background()
}
ctx, cancel := context.WithCancel(parentCtx)
instance := &leaderRoutine{
rtCtx, cancel := context.WithCancel(ctx)
instance := &routineTracker{
cancel: cancel,
stoppedCh: make(chan struct{}),
}
go func() {
defer func() {
close(instance.stoppedCh)
}()
err := routine(ctx)
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
m.logger.Error("routine exited with error",
"routine", name,
"error", err,
)
} else {
m.logger.Debug("stopped routine", "routine", name)
}
}()
go m.execute(rtCtx, name, routine, instance.stoppedCh)
m.routines[name] = instance
m.logger.Info("started routine", "routine", name)
return nil
}
func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} {
// execute will run the given routine in the foreground and close the given channel when its done executing
func (m *Manager) execute(ctx context.Context, name string, routine Routine, done chan struct{}) {
defer func() {
close(done)
}()
err := routine(ctx)
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
m.logger.Error("routine exited with error",
"routine", name,
"error", err,
)
} else {
m.logger.Debug("stopped routine", "routine", name)
}
}
func (m *Manager) Stop(name string) <-chan struct{} {
instance := m.stopInstance(name)
if instance == nil {
// Fabricate a closed channel so it won't block forever.
@ -111,7 +109,7 @@ func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} {
return instance.stoppedCh
}
func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine {
func (m *Manager) stopInstance(name string) *routineTracker {
m.lock.Lock()
defer m.lock.Unlock()
@ -133,7 +131,7 @@ func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine {
return instance
}
func (m *LeaderRoutineManager) StopAll() {
func (m *Manager) StopAll() {
m.lock.Lock()
defer m.lock.Unlock()
@ -146,5 +144,5 @@ func (m *LeaderRoutineManager) StopAll() {
}
// just wipe out the entire map
m.routines = make(map[string]*leaderRoutine)
m.routines = make(map[string]*routineTracker)
}

39
agent/consul/leader_routine_manager_test.go → lib/routine/routine_test.go

@ -1,4 +1,4 @@
package consul
package routine
import (
"context"
@ -10,13 +10,11 @@ import (
"github.com/stretchr/testify/require"
)
func TestLeaderRoutineManager(t *testing.T) {
func TestManager(t *testing.T) {
t.Parallel()
var runs uint32
var running uint32
// tlog := testutil.NewCancellableTestLogger(t)
// defer tlog.Cancel()
mgr := NewLeaderRoutineManager(testutil.Logger(t))
mgr := NewManager(testutil.Logger(t))
run := func(ctx context.Context) error {
atomic.StoreUint32(&running, 1)
@ -30,7 +28,7 @@ func TestLeaderRoutineManager(t *testing.T) {
require.False(t, mgr.IsRunning("not-found"))
// start
require.NoError(t, mgr.Start("run", run))
require.NoError(t, mgr.Start(context.Background(), "run", run))
require.True(t, mgr.IsRunning("run"))
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
@ -47,7 +45,7 @@ func TestLeaderRoutineManager(t *testing.T) {
})
// restart and stop
require.NoError(t, mgr.Start("run", run))
require.NoError(t, mgr.Start(context.Background(), "run", run))
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(2), atomic.LoadUint32(&runs))
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
@ -63,7 +61,7 @@ func TestLeaderRoutineManager(t *testing.T) {
// start with a context
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, mgr.StartWithContext(ctx, "run", run))
require.NoError(t, mgr.Start(ctx, "run", run))
cancel()
// The function should exit of its own accord due to the parent
@ -76,3 +74,28 @@ func TestLeaderRoutineManager(t *testing.T) {
require.False(r, mgr.IsRunning("run"))
})
}
func TestManager_StopAll(t *testing.T) {
t.Parallel()
var runs uint32
var running uint32
mgr := NewManager(testutil.Logger(t))
run := func(ctx context.Context) error {
atomic.StoreUint32(&running, 1)
defer atomic.StoreUint32(&running, 0)
atomic.AddUint32(&runs, 1)
<-ctx.Done()
return nil
}
require.NoError(t, mgr.Start(context.Background(), "run1", run))
require.NoError(t, mgr.Start(context.Background(), "run2", run))
mgr.StopAll()
retry.Run(t, func(r *retry.R) {
require.False(r, mgr.IsRunning("run1"))
require.False(r, mgr.IsRunning("run2"))
})
}
Loading…
Cancel
Save