You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/leafcert/watch.go

164 lines
4.6 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package leafcert
import (
"context"
"fmt"
"time"
"github.com/hashicorp/consul/agent/cacheshim"
"github.com/hashicorp/consul/lib"
)
// Notify registers a desire to be updated about changes to a cache result.
//
// It is a helper that abstracts code from performing their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the
// context is canceled.
//
// The passed context must be canceled or timeout in order to free resources
// and stop maintaining the value in cache. Typically request-scoped resources
// do this but if a long-lived context like context.Background is used, then the
// caller must arrange for it to be canceled when the watch is no longer
// needed.
//
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
// fast enough it will block the notification loop. When the chan is later
// drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behavior transparently.
//
// The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (m *Manager) Notify(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
ch chan<- cacheshim.UpdateEvent,
) error {
return m.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cacheshim.UpdateEvent) {
select {
case ch <- event:
case <-ctx.Done():
}
})
}
// NotifyCallback allows you to receive notifications about changes to a cache
// result in the same way as Notify, but accepts a callback function instead of
// a channel.
func (m *Manager) NotifyCallback(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cacheshim.Callback,
) error {
if req.Key() == "" {
return fmt.Errorf("a key is required")
}
// Lightweight copy this object so that manipulating req doesn't race.
dup := *req
req = &dup
if req.MaxQueryTime <= 0 {
req.MaxQueryTime = DefaultQueryTimeout
}
go m.notifyBlockingQuery(ctx, req, correlationID, cb)
return nil
}
func (m *Manager) notifyBlockingQuery(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cacheshim.Callback,
) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
failures := uint(0)
for {
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Blocking request
req.MinQueryIndex = index
newValue, meta, err := m.internalGet(ctx, req)
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Check the index of the value returned in the cache entry to be sure it
// changed
if index == 0 || index < meta.Index {
cb(ctx, cacheshim.UpdateEvent{
CorrelationID: correlationID,
Result: newValue,
Meta: meta,
Err: err,
})
// Update index for next request
index = meta.Index
}
var wait time.Duration
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
wait = backOffWait(m.config, failures)
m.logger.
With("error", err).
With("index", index).
Warn("handling error in Manager.Notify")
}
if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if err == nil && index < 1 {
index = 1
}
}
}
func backOffWait(cfg Config, failures uint) time.Duration {
if failures > cfg.LeafCertRefreshBackoffMin {
shift := failures - cfg.LeafCertRefreshBackoffMin
waitTime := cfg.LeafCertRefreshMaxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > cfg.LeafCertRefreshMaxWait {
waitTime = cfg.LeafCertRefreshMaxWait
}
return waitTime + lib.RandomStagger(waitTime)
}
return 0
}