diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index a988d86332..638c9efdf4 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -1,6 +1,7 @@ package consul import ( + "context" "errors" "math/rand" "sort" @@ -261,9 +262,9 @@ func NewGatewayLocator( var errGatewayLocalStateNotInitialized = errors.New("local state not initialized") -func (g *GatewayLocator) Run(stopCh <-chan struct{}) { +func (g *GatewayLocator) Run(ctx context.Context) { var lastFetchIndex uint64 - retryLoopBackoff(stopCh, func() error { + retryLoopBackoff(ctx, func() error { idx, err := g.runOnce(lastFetchIndex) if err != nil { return err diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index c62eb64899..e79ae917a2 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -651,7 +651,7 @@ func (s *Server) secondaryIntermediateCertRenewalWatch(ctx context.Context) erro case <-ctx.Done(): return nil case <-time.After(structs.IntermediateCertRenewInterval): - retryLoopBackoff(ctx.Done(), func() error { + retryLoopBackoff(ctx, func() error { s.caProviderReconfigurationLock.Lock() defer s.caProviderReconfigurationLock.Unlock() @@ -724,7 +724,7 @@ func (s *Server) secondaryCARootWatch(ctx context.Context) error { connectLogger.Debug("starting Connect CA root replication from primary datacenter", "primary", s.config.PrimaryDatacenter) - retryLoopBackoff(ctx.Done(), func() error { + retryLoopBackoff(ctx, func() error { var roots structs.IndexedCARoots if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil { return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err) @@ -780,7 +780,7 @@ func (s *Server) replicateIntentions(ctx context.Context) error { connectLogger.Debug("starting Connect intention replication from primary datacenter", "primary", s.config.PrimaryDatacenter) - retryLoopBackoff(ctx.Done(), func() error { + retryLoopBackoff(ctx, func() error { // Always use the latest replication token value in case it changed while looping. args.QueryOptions.Token = s.tokens.ReplicationToken() @@ -832,14 +832,14 @@ func (s *Server) replicateIntentions(ctx context.Context) error { // retryLoopBackoff loops a given function indefinitely, backing off exponentially // upon errors up to a maximum of maxRetryBackoff seconds. -func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(error)) { +func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) { var failedAttempts uint limiter := rate.NewLimiter(loopRateLimit, retryBucketSize) for { // Rate limit how often we run the loop - limiter.Wait(context.Background()) + limiter.Wait(ctx) select { - case <-stopCh: + case <-ctx.Done(): return default: } @@ -850,8 +850,15 @@ func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(er if err := loopFn(); err != nil { errFn(err) - time.Sleep(retryTime) - continue + + timer := time.NewTimer(retryTime) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + continue + } } // Reset the failed attempts after a successful run. diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index 99cf534cdd..4254e61a35 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -42,7 +42,7 @@ func (s *Server) stopFederationStateAntiEntropy() { func (s *Server) federationStateAntiEntropySync(ctx context.Context) error { var lastFetchIndex uint64 - retryLoopBackoff(ctx.Done(), func() error { + retryLoopBackoff(ctx, func() error { if !s.DatacenterSupportsFederationStates() { return nil } diff --git a/agent/consul/server.go b/agent/consul/server.go index 6bef3d130d..1803a594ca 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -525,7 +525,7 @@ func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, err } if s.gatewayLocator != nil { - go s.gatewayLocator.Run(s.shutdownCh) + go s.gatewayLocator.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) } // Serf and dynamic bind ports