// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: BUSL-1.1 package leafcert import ( "context" "fmt" "time" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/lib" ) // Notify registers a desire to be updated about changes to a cache result. // // It is a helper that abstracts code from performing their own "blocking" query // logic against a cache key to watch for changes and to maintain the key in // cache actively. It will continue to perform blocking Get requests until the // context is canceled. // // The passed context must be canceled or timeout in order to free resources // and stop maintaining the value in cache. Typically request-scoped resources // do this but if a long-lived context like context.Background is used, then the // caller must arrange for it to be canceled when the watch is no longer // needed. // // The passed chan may be buffered or unbuffered, if the caller doesn't consume // fast enough it will block the notification loop. When the chan is later // drained, watching resumes correctly. If the pause is longer than the // cachetype's TTL, the result might be removed from the local cache. Even in // this case though when the chan is drained again, the new Get will re-fetch // the entry from servers and resume notification behavior transparently. // // The chan is passed in to allow multiple cached results to be watched by a // single consumer without juggling extra goroutines per watch. The // correlationID is opaque and will be returned in all UpdateEvents generated by // result of watching the specified request so the caller can set this to any // value that allows them to disambiguate between events in the returned chan // when sharing a chan between multiple cache entries. If the chan is closed, // the notify loop will terminate. func (m *Manager) Notify( ctx context.Context, req *ConnectCALeafRequest, correlationID string, ch chan<- cache.UpdateEvent, ) error { return m.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) { select { case ch <- event: case <-ctx.Done(): } }) } // NotifyCallback allows you to receive notifications about changes to a cache // result in the same way as Notify, but accepts a callback function instead of // a channel. func (m *Manager) NotifyCallback( ctx context.Context, req *ConnectCALeafRequest, correlationID string, cb cache.Callback, ) error { if req.Key() == "" { return fmt.Errorf("a key is required") } // Lightweight copy this object so that manipulating req doesn't race. dup := *req req = &dup if req.MaxQueryTime <= 0 { req.MaxQueryTime = DefaultQueryTimeout } go m.notifyBlockingQuery(ctx, req, correlationID, cb) return nil } func (m *Manager) notifyBlockingQuery( ctx context.Context, req *ConnectCALeafRequest, correlationID string, cb cache.Callback, ) { // Always start at 0 index to deliver the initial (possibly currently cached // value). index := uint64(0) failures := uint(0) for { // Check context hasn't been canceled if ctx.Err() != nil { return } // Blocking request req.MinQueryIndex = index newValue, meta, err := m.internalGet(ctx, req) // Check context hasn't been canceled if ctx.Err() != nil { return } // Check the index of the value returned in the cache entry to be sure it // changed if index == 0 || index < meta.Index { cb(ctx, cache.UpdateEvent{ CorrelationID: correlationID, Result: newValue, Meta: meta, Err: err, }) // Update index for next request index = meta.Index } var wait time.Duration // Handle errors with backoff. Badly behaved blocking calls that returned // a zero index are considered as failures since we need to not get stuck // in a busy loop. if err == nil && meta.Index > 0 { failures = 0 } else { failures++ wait = backOffWait(m.config, failures) m.logger. With("error", err). With("index", index). Warn("handling error in Manager.Notify") } if wait > 0 { select { case <-time.After(wait): case <-ctx.Done(): return } } // Sanity check we always request blocking on second pass if err == nil && index < 1 { index = 1 } } } func backOffWait(cfg Config, failures uint) time.Duration { if failures > cfg.LeafCertRefreshBackoffMin { shift := failures - cfg.LeafCertRefreshBackoffMin waitTime := cfg.LeafCertRefreshMaxWait if shift < 31 { waitTime = (1 << shift) * time.Second } if waitTime > cfg.LeafCertRefreshMaxWait { waitTime = cfg.LeafCertRefreshMaxWait } return waitTime + lib.RandomStagger(waitTime) } return 0 }