You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/leafcert/watch.go

161 lines
4.5 KiB

agent: remove agent cache dependency from service mesh leaf certificate management (#17075) * agent: remove agent cache dependency from service mesh leaf certificate management This extracts the leaf cert management from within the agent cache. This code was produced by the following process: 1. All tests in agent/cache, agent/cache-types, agent/auto-config, agent/consul/servercert were run at each stage. - The tests in agent matching .*Leaf were run at each stage. - The tests in agent/leafcert were run at each stage after they existed. 2. The former leaf cert Fetch implementation was extracted into a new package behind a "fake RPC" endpoint to make it look almost like all other cache type internals. 3. The old cache type was shimmed to use the fake RPC endpoint and generally cleaned up. 4. I selectively duplicated all of Get/Notify/NotifyCallback/Prepopulate from the agent/cache.Cache implementation over into the new package. This was renamed as leafcert.Manager. - Code that was irrelevant to the leaf cert type was deleted (inlining blocking=true, refresh=false) 5. Everything that used the leaf cert cache type (including proxycfg stuff) was shifted to use the leafcert.Manager instead. 6. agent/cache-types tests were moved and gently replumbed to execute as-is against a leafcert.Manager. 7. Inspired by some of the locking changes from derek's branch I split the fat lock into N+1 locks. 8. The waiter chan struct{} was eventually replaced with a singleflight.Group around cache updates, which was likely the biggest net structural change. 9. The awkward two layers or logic produced as a byproduct of marrying the agent cache management code with the leaf cert type code was slowly coalesced and flattened to remove confusion. 10. The .*Leaf tests from the agent package were copied and made to work directly against a leafcert.Manager to increase direct coverage. I have done a best effort attempt to port the previous leaf-cert cache type's tests over in spirit, as well as to take the e2e-ish tests in the agent package with Leaf in the test name and copy those into the agent/leafcert package to get more direct coverage, rather than coverage tangled up in the agent logic. There is no net-new test coverage, just coverage that was pushed around from elsewhere.
1 year ago
package leafcert
import (
"context"
"fmt"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib"
)
// Notify registers a desire to be updated about changes to a cache result.
//
// It is a helper that abstracts code from performing their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the
// context is canceled.
//
// The passed context must be canceled or timeout in order to free resources
// and stop maintaining the value in cache. Typically request-scoped resources
// do this but if a long-lived context like context.Background is used, then the
// caller must arrange for it to be canceled when the watch is no longer
// needed.
//
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
// fast enough it will block the notification loop. When the chan is later
// drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behavior transparently.
//
// The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (m *Manager) Notify(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
return m.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
select {
case ch <- event:
case <-ctx.Done():
}
})
}
// NotifyCallback allows you to receive notifications about changes to a cache
// result in the same way as Notify, but accepts a callback function instead of
// a channel.
func (m *Manager) NotifyCallback(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cache.Callback,
) error {
if req.Key() == "" {
return fmt.Errorf("a key is required")
}
// Lightweight copy this object so that manipulating req doesn't race.
dup := *req
req = &dup
if req.MaxQueryTime <= 0 {
req.MaxQueryTime = DefaultQueryTimeout
}
go m.notifyBlockingQuery(ctx, req, correlationID, cb)
return nil
}
func (m *Manager) notifyBlockingQuery(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cache.Callback,
) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
failures := uint(0)
for {
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Blocking request
req.MinQueryIndex = index
newValue, meta, err := m.internalGet(ctx, req)
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Check the index of the value returned in the cache entry to be sure it
// changed
if index == 0 || index < meta.Index {
cb(ctx, cache.UpdateEvent{
CorrelationID: correlationID,
Result: newValue,
Meta: meta,
Err: err,
})
// Update index for next request
index = meta.Index
}
var wait time.Duration
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
wait = backOffWait(m.config, failures)
m.logger.
With("error", err).
With("index", index).
Warn("handling error in Manager.Notify")
}
if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if err == nil && index < 1 {
index = 1
}
}
}
func backOffWait(cfg Config, failures uint) time.Duration {
if failures > cfg.LeafCertRefreshBackoffMin {
shift := failures - cfg.LeafCertRefreshBackoffMin
waitTime := cfg.LeafCertRefreshMaxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > cfg.LeafCertRefreshMaxWait {
waitTime = cfg.LeafCertRefreshMaxWait
}
return waitTime + lib.RandomStagger(waitTime)
}
return 0
}