From da31e0449ea6323956f8ebc942a6d76332c52e27 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Thu, 20 May 2021 10:07:23 -0400 Subject: [PATCH] Move some things around to allow for license updating via config reload The bulk of this commit is moving the LeaderRoutineManager from the agent/consul package into its own package: lib/gort. It also got a renaming and its Start method now requires a context. Requiring that context required updating a whole bunch of other places in the code. --- .changelog/10267.txt | 2 + agent/agent.go | 6 ++ agent/config/config.go | 6 ++ agent/consul/acl_token_exp.go | 4 +- agent/consul/enterprise_server_oss.go | 3 +- agent/consul/leader.go | 50 ++++++------ agent/consul/leader_connect.go | 8 +- agent/consul/leader_connect_ca.go | 19 ++--- agent/consul/leader_federation_state_ae.go | 6 +- agent/consul/leader_intentions.go | 6 +- agent/consul/replication_test.go | 7 +- agent/consul/server.go | 9 ++- agent/setup.go | 2 +- agent/setup_oss.go | 2 +- agent/testagent.go | 4 +- .../routine/routine.go | 78 +++++++++---------- .../routine/routine_test.go | 39 ++++++++-- 17 files changed, 148 insertions(+), 103 deletions(-) create mode 100644 .changelog/10267.txt rename agent/consul/leader_routine_manager.go => lib/routine/routine.go (51%) rename agent/consul/leader_routine_manager_test.go => lib/routine/routine_test.go (67%) diff --git a/.changelog/10267.txt b/.changelog/10267.txt new file mode 100644 index 0000000000..4979519f24 --- /dev/null +++ b/.changelog/10267.txt @@ -0,0 +1,2 @@ +```release-note:improvement +licensing: **(Enterprise Only)** Consul Enterprise has gained the ability update its license via a configuration reload. The same environment variables and configurations will be used to determine the new license.``` \ No newline at end of file diff --git a/agent/agent.go b/agent/agent.go index 6f85f502b1..14b474ef08 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -49,6 +49,7 @@ import ( "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/file" "github.com/hashicorp/consul/lib/mutex" + "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" @@ -327,6 +328,10 @@ type Agent struct { // into Agent, which will allow us to remove this field. rpcClientHealth *health.Client + // routineManager is responsible for managing longer running go routines + // run by the Agent + routineManager *routine.Manager + // enterpriseAgent embeds fields that we only access in consul-enterprise builds enterpriseAgent } @@ -371,6 +376,7 @@ func New(bd BaseDeps) (*Agent, error) { tlsConfigurator: bd.TLSConfigurator, config: bd.RuntimeConfig, cache: bd.Cache, + routineManager: routine.NewManager(bd.Logger), } // TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent diff --git a/agent/config/config.go b/agent/config/config.go index b654c12b31..51e8f94455 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -290,6 +290,12 @@ type Config struct { SegmentName *string `mapstructure:"segment"` // Enterprise Only Segments []Segment `mapstructure:"segments"` + + // Enterprise Only - not user configurable + LicensePollBaseTime *string `mapstructure:"license_poll_base_time"` + LicensePollMaxTime *string `mapstructure:"license_poll_max_time"` + LicenseUpdateBaseTime *string `mapstructure:"license_update_base_time"` + LicenseUpdateMaxTime *string `mapstructure:"license_update_max_time"` } type GossipLANConfig struct { diff --git a/agent/consul/acl_token_exp.go b/agent/consul/acl_token_exp.go index ab2b35f085..5ea72b8fe3 100644 --- a/agent/consul/acl_token_exp.go +++ b/agent/consul/acl_token_exp.go @@ -30,7 +30,7 @@ func (s *Server) reapExpiredTokens(ctx context.Context) error { } } -func (s *Server) startACLTokenReaping() { +func (s *Server) startACLTokenReaping(ctx context.Context) { // Do a quick check for config settings that would imply the goroutine // below will just spin forever. // @@ -41,7 +41,7 @@ func (s *Server) startACLTokenReaping() { return } - s.leaderRoutineManager.Start(aclTokenReapingRoutineName, s.reapExpiredTokens) + s.leaderRoutineManager.Start(ctx, aclTokenReapingRoutineName, s.reapExpiredTokens) } func (s *Server) stopACLTokenReaping() { diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 097b94b4b3..90bf131303 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -3,6 +3,7 @@ package consul import ( + "context" "errors" "net" "strings" @@ -46,7 +47,7 @@ func (s *Server) handleEnterpriseLeave() { return } -func (s *Server) establishEnterpriseLeadership() error { +func (s *Server) establishEnterpriseLeadership(_ context.Context) error { return nil } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index dc88f280db..391b73ca76 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -115,7 +115,7 @@ func (s *Server) monitorLeadership() { if canUpgrade := s.canUpgradeToNewACLs(weAreLeaderCh != nil); canUpgrade { if weAreLeaderCh != nil { - if err := s.initializeACLs(true); err != nil { + if err := s.initializeACLs(&lib.StopChannelContext{StopCh: weAreLeaderCh}, true); err != nil { s.logger.Error("error transitioning to using new ACLs", "error", err) continue } @@ -308,12 +308,12 @@ func (s *Server) establishLeadership(ctx context.Context) error { // check for the upgrade here - this helps us transition to new ACLs much // quicker if this is a new cluster or this is a test agent if canUpgrade := s.canUpgradeToNewACLs(true); canUpgrade { - if err := s.initializeACLs(true); err != nil { + if err := s.initializeACLs(ctx, true); err != nil { return err } atomic.StoreInt32(&s.useNewACLs, 1) s.updateACLAdvertisement() - } else if err := s.initializeACLs(false); err != nil { + } else if err := s.initializeACLs(ctx, false); err != nil { return err } @@ -337,20 +337,20 @@ func (s *Server) establishLeadership(ctx context.Context) error { return err } - if err := s.establishEnterpriseLeadership(); err != nil { + if err := s.establishEnterpriseLeadership(ctx); err != nil { return err } s.getOrCreateAutopilotConfig() s.autopilot.Start(ctx) - s.startConfigReplication() + s.startConfigReplication(ctx) - s.startFederationStateReplication() + s.startFederationStateReplication(ctx) - s.startFederationStateAntiEntropy() + s.startFederationStateAntiEntropy(ctx) - if err := s.startConnectLeader(); err != nil { + if err := s.startConnectLeader(ctx); err != nil { return err } @@ -499,7 +499,7 @@ func (s *Server) initializeLegacyACL() error { // initializeACLs is used to setup the ACLs if we are the leader // and need to do this. -func (s *Server) initializeACLs(upgrade bool) error { +func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error { if !s.config.ACLsEnabled { return nil } @@ -673,11 +673,11 @@ func (s *Server) initializeACLs(upgrade bool) error { } } // launch the upgrade go routine to generate accessors for everything - s.startACLUpgrade() + s.startACLUpgrade(ctx) } else { if s.UseLegacyACLs() && !upgrade { if s.IsACLReplicationEnabled() { - s.startLegacyACLReplication() + s.startLegacyACLReplication(ctx) } // return early as we don't want to start new ACL replication // or ACL token reaping as these are new ACL features. @@ -689,10 +689,10 @@ func (s *Server) initializeACLs(upgrade bool) error { } // ACL replication is now mandatory - s.startACLReplication() + s.startACLReplication(ctx) } - s.startACLTokenReaping() + s.startACLTokenReaping(ctx) return nil } @@ -771,13 +771,13 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error { } } -func (s *Server) startACLUpgrade() { +func (s *Server) startACLUpgrade(ctx context.Context) { if s.config.PrimaryDatacenter != s.config.Datacenter { // token upgrades should only run in the primary return } - s.leaderRoutineManager.Start(aclUpgradeRoutineName, s.legacyACLTokenUpgrade) + s.leaderRoutineManager.Start(ctx, aclUpgradeRoutineName, s.legacyACLTokenUpgrade) } func (s *Server) stopACLUpgrade() { @@ -826,7 +826,7 @@ func (s *Server) runLegacyACLReplication(ctx context.Context) error { } } -func (s *Server) startLegacyACLReplication() { +func (s *Server) startLegacyACLReplication(ctx context.Context) { if s.InACLDatacenter() { return } @@ -840,12 +840,12 @@ func (s *Server) startLegacyACLReplication() { s.initReplicationStatus() - s.leaderRoutineManager.Start(legacyACLReplicationRoutineName, s.runLegacyACLReplication) + s.leaderRoutineManager.Start(ctx, legacyACLReplicationRoutineName, s.runLegacyACLReplication) s.logger.Info("started legacy ACL replication") s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy) } -func (s *Server) startACLReplication() { +func (s *Server) startACLReplication(ctx context.Context) { if s.InACLDatacenter() { return } @@ -858,11 +858,11 @@ func (s *Server) startACLReplication() { } s.initReplicationStatus() - s.leaderRoutineManager.Start(aclPolicyReplicationRoutineName, s.runACLPolicyReplicator) - s.leaderRoutineManager.Start(aclRoleReplicationRoutineName, s.runACLRoleReplicator) + s.leaderRoutineManager.Start(ctx, aclPolicyReplicationRoutineName, s.runACLPolicyReplicator) + s.leaderRoutineManager.Start(ctx, aclRoleReplicationRoutineName, s.runACLRoleReplicator) if s.config.ACLTokenReplication { - s.leaderRoutineManager.Start(aclTokenReplicationRoutineName, s.runACLTokenReplicator) + s.leaderRoutineManager.Start(ctx, aclTokenReplicationRoutineName, s.runACLTokenReplicator) s.updateACLReplicationStatusRunning(structs.ACLReplicateTokens) } else { s.updateACLReplicationStatusRunning(structs.ACLReplicatePolicies) @@ -973,13 +973,13 @@ func (s *Server) stopACLReplication() { s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName) } -func (s *Server) startConfigReplication() { +func (s *Server) startConfigReplication(ctx context.Context) { if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter { // replication shouldn't run in the primary DC return } - s.leaderRoutineManager.Start(configReplicationRoutineName, s.configReplicator.Run) + s.leaderRoutineManager.Start(ctx, configReplicationRoutineName, s.configReplicator.Run) } func (s *Server) stopConfigReplication() { @@ -987,7 +987,7 @@ func (s *Server) stopConfigReplication() { s.leaderRoutineManager.Stop(configReplicationRoutineName) } -func (s *Server) startFederationStateReplication() { +func (s *Server) startFederationStateReplication(ctx context.Context) { if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter { // replication shouldn't run in the primary DC return @@ -998,7 +998,7 @@ func (s *Server) startFederationStateReplication() { s.gatewayLocator.SetLastFederationStateReplicationError(nil, false) } - s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run) + s.leaderRoutineManager.Start(ctx, federationStateReplicationRoutineName, s.federationStateReplicator.Run) } func (s *Server) stopFederationStateReplication() { diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index 18ca3d114f..1b724d2302 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -29,15 +29,15 @@ var ( ) // startConnectLeader starts multi-dc connect leader routines. -func (s *Server) startConnectLeader() error { +func (s *Server) startConnectLeader(ctx context.Context) error { if !s.config.ConnectEnabled { return nil } - s.caManager.Start() - s.leaderRoutineManager.Start(caRootPruningRoutineName, s.runCARootPruning) + s.caManager.Start(ctx) + s.leaderRoutineManager.Start(ctx, caRootPruningRoutineName, s.runCARootPruning) - return s.startIntentionConfigEntryMigration() + return s.startIntentionConfigEntryMigration(ctx) } // stopConnectLeader stops connect specific leader functions. diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index eb4189094e..2f44ce1a82 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/go-hclog" uuid "github.com/hashicorp/go-uuid" ) @@ -65,7 +66,7 @@ type CAManager struct { primaryRoots structs.IndexedCARoots // The most recently seen state of the root CAs from the primary datacenter. actingSecondaryCA bool // True if this datacenter has been initialized as a secondary CA. - leaderRoutineManager *LeaderRoutineManager + leaderRoutineManager *routine.Manager } type caDelegateWithState struct { @@ -76,7 +77,7 @@ func (c *caDelegateWithState) State() *state.Store { return c.fsm.State() } -func NewCAManager(delegate caServerDelegate, leaderRoutineManager *LeaderRoutineManager, logger hclog.Logger, config *Config) *CAManager { +func NewCAManager(delegate caServerDelegate, leaderRoutineManager *routine.Manager, logger hclog.Logger, config *Config) *CAManager { return &CAManager{ delegate: delegate, logger: logger, @@ -247,7 +248,7 @@ func (c *CAManager) setCAProvider(newProvider ca.Provider, root *structs.CARoot) c.providerLock.Unlock() } -func (c *CAManager) Start() { +func (c *CAManager) Start(ctx context.Context) { // Attempt to initialize the Connect CA now. This will // happen during leader establishment and it would be great // if the CA was ready to go once that process was finished. @@ -257,11 +258,11 @@ func (c *CAManager) Start() { // we failed to fully initialize the CA so we need to spawn a // go routine to retry this process until it succeeds or we lose // leadership and the go routine gets stopped. - c.leaderRoutineManager.Start(backgroundCAInitializationRoutineName, c.backgroundCAInitialization) + c.leaderRoutineManager.Start(ctx, backgroundCAInitializationRoutineName, c.backgroundCAInitialization) } else { // We only start these if CA initialization was successful. If not the completion of the // background CA initialization will start these routines. - c.startPostInitializeRoutines() + c.startPostInitializeRoutines(ctx) } } @@ -271,13 +272,13 @@ func (c *CAManager) Stop() { c.leaderRoutineManager.Stop(backgroundCAInitializationRoutineName) } -func (c *CAManager) startPostInitializeRoutines() { +func (c *CAManager) startPostInitializeRoutines(ctx context.Context) { // Start the Connect secondary DC actions if enabled. if c.serverConf.Datacenter != c.serverConf.PrimaryDatacenter { - c.leaderRoutineManager.Start(secondaryCARootWatchRoutineName, c.secondaryCARootWatch) + c.leaderRoutineManager.Start(ctx, secondaryCARootWatchRoutineName, c.secondaryCARootWatch) } - c.leaderRoutineManager.Start(intermediateCertRenewWatchRoutineName, c.intermediateCertRenewalWatch) + c.leaderRoutineManager.Start(ctx, intermediateCertRenewWatchRoutineName, c.intermediateCertRenewalWatch) } func (c *CAManager) backgroundCAInitialization(ctx context.Context) error { @@ -294,7 +295,7 @@ func (c *CAManager) backgroundCAInitialization(ctx context.Context) error { c.logger.Info("Successfully initialized the Connect CA") - c.startPostInitializeRoutines() + c.startPostInitializeRoutines(ctx) return nil } diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index 060962a700..e283744219 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -17,7 +17,7 @@ const ( federationStatePruneInterval = time.Hour ) -func (s *Server) startFederationStateAntiEntropy() { +func (s *Server) startFederationStateAntiEntropy(ctx context.Context) { // Check to see if we can skip waiting for serf feature detection below. if !s.DatacenterSupportsFederationStates() { _, fedStates, err := s.fsm.State().FederationStateList(nil) @@ -31,12 +31,12 @@ func (s *Server) startFederationStateAntiEntropy() { if s.config.DisableFederationStateAntiEntropy { return } - s.leaderRoutineManager.Start(federationStateAntiEntropyRoutineName, s.federationStateAntiEntropySync) + s.leaderRoutineManager.Start(ctx, federationStateAntiEntropyRoutineName, s.federationStateAntiEntropySync) // If this is the primary, then also prune any stale datacenters from the // list of federation states. if s.config.PrimaryDatacenter == s.config.Datacenter { - s.leaderRoutineManager.Start(federationStatePruningRoutineName, s.federationStatePruning) + s.leaderRoutineManager.Start(ctx, federationStatePruningRoutineName, s.federationStatePruning) } } diff --git a/agent/consul/leader_intentions.go b/agent/consul/leader_intentions.go index 4fedd29601..24afc5e093 100644 --- a/agent/consul/leader_intentions.go +++ b/agent/consul/leader_intentions.go @@ -15,7 +15,7 @@ const ( maxIntentionTxnSize = raftWarnSize / 4 ) -func (s *Server) startIntentionConfigEntryMigration() error { +func (s *Server) startIntentionConfigEntryMigration(ctx context.Context) error { if !s.config.ConnectEnabled { return nil } @@ -56,14 +56,14 @@ func (s *Server) startIntentionConfigEntryMigration() error { } // When running in the primary we do all of the real work. - s.leaderRoutineManager.Start(intentionMigrationRoutineName, s.legacyIntentionMigration) + s.leaderRoutineManager.Start(ctx, intentionMigrationRoutineName, s.legacyIntentionMigration) } else { // When running in the secondary we mostly just wait until the // primary finishes, and then wait until we're pretty sure the main // config entry replication thread has seen all of the // migration-related config entry edits before zeroing OUR copy of // the old intentions table. - s.leaderRoutineManager.Start(intentionMigrationRoutineName, s.legacyIntentionMigrationInSecondaryDC) + s.leaderRoutineManager.Start(ctx, intentionMigrationRoutineName, s.legacyIntentionMigrationInSecondaryDC) } return nil diff --git a/agent/consul/replication_test.go b/agent/consul/replication_test.go index 468f0d617c..64d0388bf9 100644 --- a/agent/consul/replication_test.go +++ b/agent/consul/replication_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/mock" @@ -12,7 +13,7 @@ import ( ) func TestReplicationRestart(t *testing.T) { - mgr := NewLeaderRoutineManager(testutil.Logger(t)) + mgr := routine.NewManager(testutil.Logger(t)) config := ReplicatorConfig{ Name: "mock", @@ -30,9 +31,9 @@ func TestReplicationRestart(t *testing.T) { repl, err := NewReplicator(&config) require.NoError(t, err) - mgr.Start("mock", repl.Run) + mgr.Start(context.Background(), "mock", repl.Run) mgr.Stop("mock") - mgr.Start("mock", repl.Run) + mgr.Start(context.Background(), "mock", repl.Run) // Previously this would have segfaulted mgr.Stop("mock") } diff --git a/agent/consul/server.go b/agent/consul/server.go index e73a4208dc..a53916aa48 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -43,6 +43,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/tlsutil" @@ -298,7 +299,7 @@ type Server struct { dcSupportsIntentionsAsConfigEntries int32 // Manager to handle starting/stopping go routines when establishing/revoking raft leadership - leaderRoutineManager *LeaderRoutineManager + leaderRoutineManager *routine.Manager // embedded struct to hold all the enterprise specific data EnterpriseServer @@ -375,7 +376,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { tombstoneGC: gc, serverLookup: NewServerLookup(), shutdownCh: shutdownCh, - leaderRoutineManager: NewLeaderRoutineManager(logger), + leaderRoutineManager: routine.NewManager(logger.Named(logging.Leader)), aclAuthMethodValidators: authmethod.NewCache(), fsm: newFSMFromConfig(flat.Logger, gc, config), } @@ -1319,6 +1320,10 @@ func (s *Server) RegisterEndpoint(name string, handler interface{}) error { return s.rpcServer.RegisterName(name, handler) } +func (s *Server) FSM() *fsm.FSM { + return s.fsm +} + // Stats is used to return statistics for debugging and insight // for various sub-systems func (s *Server) Stats() map[string]map[string]string { diff --git a/agent/setup.go b/agent/setup.go index 04ff509758..93fd9e6cc5 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -125,7 +125,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) TLSConfigurator: d.TLSConfigurator, Cache: d.Cache, Tokens: d.Tokens, - EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps), + EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps, cfg), } d.AutoConfig, err = autoconf.New(acConf) diff --git a/agent/setup_oss.go b/agent/setup_oss.go index 7bda56a173..a0a23c19c4 100644 --- a/agent/setup_oss.go +++ b/agent/setup_oss.go @@ -15,6 +15,6 @@ func initEnterpriseBaseDeps(d BaseDeps, _ *config.RuntimeConfig) (BaseDeps, erro } // initEnterpriseAutoConfig is responsible for setting up auto-config for enterprise -func initEnterpriseAutoConfig(_ consul.EnterpriseDeps) autoconf.EnterpriseConfig { +func initEnterpriseAutoConfig(_ consul.EnterpriseDeps, _ *config.RuntimeConfig) autoconf.EnterpriseConfig { return autoconf.EnterpriseConfig{} } diff --git a/agent/testagent.go b/agent/testagent.go index 999ca038b6..11410f2080 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -86,7 +86,9 @@ type TestAgent struct { // The caller is responsible for calling Shutdown() to stop the agent and remove // temporary directories. func NewTestAgent(t *testing.T, hcl string) *TestAgent { - return StartTestAgent(t, TestAgent{HCL: hcl}) + a := StartTestAgent(t, TestAgent{HCL: hcl}) + t.Cleanup(func() { a.Shutdown() }) + return a } // StartTestAgent and wait for it to become available. If the agent fails to diff --git a/agent/consul/leader_routine_manager.go b/lib/routine/routine.go similarity index 51% rename from agent/consul/leader_routine_manager.go rename to lib/routine/routine.go index 8b268ca832..e53bbb59a5 100644 --- a/agent/consul/leader_routine_manager.go +++ b/lib/routine/routine.go @@ -1,22 +1,21 @@ -package consul +package routine import ( "context" "os" "sync" - "github.com/hashicorp/consul/logging" "github.com/hashicorp/go-hclog" ) -type LeaderRoutine func(ctx context.Context) error +type Routine func(ctx context.Context) error -type leaderRoutine struct { +type routineTracker struct { cancel context.CancelFunc stoppedCh chan struct{} // closed when no longer running } -func (r *leaderRoutine) running() bool { +func (r *routineTracker) running() bool { select { case <-r.stoppedCh: return false @@ -25,27 +24,27 @@ func (r *leaderRoutine) running() bool { } } -type LeaderRoutineManager struct { +type Manager struct { lock sync.RWMutex logger hclog.Logger - routines map[string]*leaderRoutine + routines map[string]*routineTracker } -func NewLeaderRoutineManager(logger hclog.Logger) *LeaderRoutineManager { +func NewManager(logger hclog.Logger) *Manager { if logger == nil { logger = hclog.New(&hclog.LoggerOptions{ Output: os.Stderr, }) } - return &LeaderRoutineManager{ - logger: logger.Named(logging.Leader), - routines: make(map[string]*leaderRoutine), + return &Manager{ + logger: logger, + routines: make(map[string]*routineTracker), } } -func (m *LeaderRoutineManager) IsRunning(name string) bool { +func (m *Manager) IsRunning(name string) bool { m.lock.Lock() defer m.lock.Unlock() @@ -56,11 +55,7 @@ func (m *LeaderRoutineManager) IsRunning(name string) bool { return false } -func (m *LeaderRoutineManager) Start(name string, routine LeaderRoutine) error { - return m.StartWithContext(context.TODO(), name, routine) -} - -func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name string, routine LeaderRoutine) error { +func (m *Manager) Start(ctx context.Context, name string, routine Routine) error { m.lock.Lock() defer m.lock.Unlock() @@ -68,38 +63,41 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name return nil } - if parentCtx == nil { - parentCtx = context.Background() + if ctx == nil { + ctx = context.Background() } - ctx, cancel := context.WithCancel(parentCtx) - instance := &leaderRoutine{ + rtCtx, cancel := context.WithCancel(ctx) + instance := &routineTracker{ cancel: cancel, stoppedCh: make(chan struct{}), } - go func() { - defer func() { - close(instance.stoppedCh) - }() - - err := routine(ctx) - if err != nil && err != context.DeadlineExceeded && err != context.Canceled { - m.logger.Error("routine exited with error", - "routine", name, - "error", err, - ) - } else { - m.logger.Debug("stopped routine", "routine", name) - } - }() + go m.execute(rtCtx, name, routine, instance.stoppedCh) m.routines[name] = instance m.logger.Info("started routine", "routine", name) return nil } -func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} { +// execute will run the given routine in the foreground and close the given channel when its done executing +func (m *Manager) execute(ctx context.Context, name string, routine Routine, done chan struct{}) { + defer func() { + close(done) + }() + + err := routine(ctx) + if err != nil && err != context.DeadlineExceeded && err != context.Canceled { + m.logger.Error("routine exited with error", + "routine", name, + "error", err, + ) + } else { + m.logger.Debug("stopped routine", "routine", name) + } +} + +func (m *Manager) Stop(name string) <-chan struct{} { instance := m.stopInstance(name) if instance == nil { // Fabricate a closed channel so it won't block forever. @@ -111,7 +109,7 @@ func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} { return instance.stoppedCh } -func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine { +func (m *Manager) stopInstance(name string) *routineTracker { m.lock.Lock() defer m.lock.Unlock() @@ -133,7 +131,7 @@ func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine { return instance } -func (m *LeaderRoutineManager) StopAll() { +func (m *Manager) StopAll() { m.lock.Lock() defer m.lock.Unlock() @@ -146,5 +144,5 @@ func (m *LeaderRoutineManager) StopAll() { } // just wipe out the entire map - m.routines = make(map[string]*leaderRoutine) + m.routines = make(map[string]*routineTracker) } diff --git a/agent/consul/leader_routine_manager_test.go b/lib/routine/routine_test.go similarity index 67% rename from agent/consul/leader_routine_manager_test.go rename to lib/routine/routine_test.go index 96b0175ec4..1bfdfa2191 100644 --- a/agent/consul/leader_routine_manager_test.go +++ b/lib/routine/routine_test.go @@ -1,4 +1,4 @@ -package consul +package routine import ( "context" @@ -10,13 +10,11 @@ import ( "github.com/stretchr/testify/require" ) -func TestLeaderRoutineManager(t *testing.T) { +func TestManager(t *testing.T) { t.Parallel() var runs uint32 var running uint32 - // tlog := testutil.NewCancellableTestLogger(t) - // defer tlog.Cancel() - mgr := NewLeaderRoutineManager(testutil.Logger(t)) + mgr := NewManager(testutil.Logger(t)) run := func(ctx context.Context) error { atomic.StoreUint32(&running, 1) @@ -30,7 +28,7 @@ func TestLeaderRoutineManager(t *testing.T) { require.False(t, mgr.IsRunning("not-found")) // start - require.NoError(t, mgr.Start("run", run)) + require.NoError(t, mgr.Start(context.Background(), "run", run)) require.True(t, mgr.IsRunning("run")) retry.Run(t, func(r *retry.R) { require.Equal(r, uint32(1), atomic.LoadUint32(&runs)) @@ -47,7 +45,7 @@ func TestLeaderRoutineManager(t *testing.T) { }) // restart and stop - require.NoError(t, mgr.Start("run", run)) + require.NoError(t, mgr.Start(context.Background(), "run", run)) retry.Run(t, func(r *retry.R) { require.Equal(r, uint32(2), atomic.LoadUint32(&runs)) require.Equal(r, uint32(1), atomic.LoadUint32(&running)) @@ -63,7 +61,7 @@ func TestLeaderRoutineManager(t *testing.T) { // start with a context ctx, cancel := context.WithCancel(context.Background()) - require.NoError(t, mgr.StartWithContext(ctx, "run", run)) + require.NoError(t, mgr.Start(ctx, "run", run)) cancel() // The function should exit of its own accord due to the parent @@ -76,3 +74,28 @@ func TestLeaderRoutineManager(t *testing.T) { require.False(r, mgr.IsRunning("run")) }) } + +func TestManager_StopAll(t *testing.T) { + t.Parallel() + var runs uint32 + var running uint32 + mgr := NewManager(testutil.Logger(t)) + + run := func(ctx context.Context) error { + atomic.StoreUint32(&running, 1) + defer atomic.StoreUint32(&running, 0) + atomic.AddUint32(&runs, 1) + <-ctx.Done() + return nil + } + + require.NoError(t, mgr.Start(context.Background(), "run1", run)) + require.NoError(t, mgr.Start(context.Background(), "run2", run)) + + mgr.StopAll() + + retry.Run(t, func(r *retry.R) { + require.False(r, mgr.IsRunning("run1")) + require.False(r, mgr.IsRunning("run2")) + }) +}