Ensure that retryLoopBackoff can be cancelled

We needed to pass a cancellable context into the limiter.Wait instead of context.Background. So I made the func take a context instead of a chan as most places were just passing through a Done chan from a context anyways.

Fix go routine leak in the gateway locator
pull/8184/head
Matt Keeler 2020-06-24 12:36:14 -04:00
parent 1093212176
commit 15e7b3940c
No known key found for this signature in database
GPG Key ID: 04DBAE1857E0081B
4 changed files with 20 additions and 12 deletions

View File

@ -1,6 +1,7 @@
package consul
import (
"context"
"errors"
"math/rand"
"sort"
@ -261,9 +262,9 @@ func NewGatewayLocator(
var errGatewayLocalStateNotInitialized = errors.New("local state not initialized")
func (g *GatewayLocator) Run(stopCh <-chan struct{}) {
func (g *GatewayLocator) Run(ctx context.Context) {
var lastFetchIndex uint64
retryLoopBackoff(stopCh, func() error {
retryLoopBackoff(ctx, func() error {
idx, err := g.runOnce(lastFetchIndex)
if err != nil {
return err

View File

@ -651,7 +651,7 @@ func (s *Server) secondaryIntermediateCertRenewalWatch(ctx context.Context) erro
case <-ctx.Done():
return nil
case <-time.After(structs.IntermediateCertRenewInterval):
retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
s.caProviderReconfigurationLock.Lock()
defer s.caProviderReconfigurationLock.Unlock()
@ -724,7 +724,7 @@ func (s *Server) secondaryCARootWatch(ctx context.Context) error {
connectLogger.Debug("starting Connect CA root replication from primary datacenter", "primary", s.config.PrimaryDatacenter)
retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
var roots structs.IndexedCARoots
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err)
@ -780,7 +780,7 @@ func (s *Server) replicateIntentions(ctx context.Context) error {
connectLogger.Debug("starting Connect intention replication from primary datacenter", "primary", s.config.PrimaryDatacenter)
retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
// Always use the latest replication token value in case it changed while looping.
args.QueryOptions.Token = s.tokens.ReplicationToken()
@ -832,14 +832,14 @@ func (s *Server) replicateIntentions(ctx context.Context) error {
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
// upon errors up to a maximum of maxRetryBackoff seconds.
func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(error)) {
func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) {
var failedAttempts uint
limiter := rate.NewLimiter(loopRateLimit, retryBucketSize)
for {
// Rate limit how often we run the loop
limiter.Wait(context.Background())
limiter.Wait(ctx)
select {
case <-stopCh:
case <-ctx.Done():
return
default:
}
@ -850,9 +850,16 @@ func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(er
if err := loopFn(); err != nil {
errFn(err)
time.Sleep(retryTime)
timer := time.NewTimer(retryTime)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
continue
}
}
// Reset the failed attempts after a successful run.
failedAttempts = 0

View File

@ -42,7 +42,7 @@ func (s *Server) stopFederationStateAntiEntropy() {
func (s *Server) federationStateAntiEntropySync(ctx context.Context) error {
var lastFetchIndex uint64
retryLoopBackoff(ctx.Done(), func() error {
retryLoopBackoff(ctx, func() error {
if !s.DatacenterSupportsFederationStates() {
return nil
}

View File

@ -525,7 +525,7 @@ func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, err
}
if s.gatewayLocator != nil {
go s.gatewayLocator.Run(s.shutdownCh)
go s.gatewayLocator.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
}
// Serf and dynamic bind ports