You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/cache/watch.go

266 lines
8.6 KiB

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// SPDX-License-Identifier: BUSL-1.1
package cache
import (
"context"
"fmt"
"reflect"
"time"
"google.golang.org/protobuf/proto"
xds controller: setup watches for and compute leaf cert references in ProxyStateTemplate, and wire up leaf cert manager dependency (#18756) * Refactors the leafcert package to not have a dependency on agent/consul and agent/cache to avoid import cycles. This way the xds controller can just import the leafcert package to use the leafcert manager. The leaf cert logic in the controller: * Sets up watches for leaf certs that are referenced in the ProxyStateTemplate (which generates the leaf certs too). * Gets the leaf cert from the leaf cert cache * Stores the leaf cert in the ProxyState that's pushed to xds * For the cert watches, this PR also uses a bimapper + a thin wrapper to map leaf cert events to related ProxyStateTemplates Since bimapper uses a resource.Reference or resource.ID to map between two resource types, I've created an internal type for a leaf certificate to use for the resource.Reference, since it's not a v2 resource. The wrapper allows mapping events to resources (as opposed to mapping resources to resources) The controller tests: Unit: Ensure that we resolve leaf cert references Lifecycle: Ensure that when the CA is updated, the leaf cert is as well Also adds a new spiffe id type, and adds workload identity and workload identity URI to leaf certs. This is so certs are generated with the new workload identity based SPIFFE id. * Pulls out some leaf cert test helpers into a helpers file so it can be used in the xds controller tests. * Wires up leaf cert manager dependency * Support getting token from proxytracker * Add workload identity spiffe id type to the authorize and sign functions --------- Co-authored-by: John Murret <john.murret@hashicorp.com>
1 year ago
"github.com/hashicorp/consul/agent/cacheshim"
"github.com/hashicorp/consul/lib"
)
// UpdateEvent is a struct summarizing an update to a cache entry
xds controller: setup watches for and compute leaf cert references in ProxyStateTemplate, and wire up leaf cert manager dependency (#18756) * Refactors the leafcert package to not have a dependency on agent/consul and agent/cache to avoid import cycles. This way the xds controller can just import the leafcert package to use the leafcert manager. The leaf cert logic in the controller: * Sets up watches for leaf certs that are referenced in the ProxyStateTemplate (which generates the leaf certs too). * Gets the leaf cert from the leaf cert cache * Stores the leaf cert in the ProxyState that's pushed to xds * For the cert watches, this PR also uses a bimapper + a thin wrapper to map leaf cert events to related ProxyStateTemplates Since bimapper uses a resource.Reference or resource.ID to map between two resource types, I've created an internal type for a leaf certificate to use for the resource.Reference, since it's not a v2 resource. The wrapper allows mapping events to resources (as opposed to mapping resources to resources) The controller tests: Unit: Ensure that we resolve leaf cert references Lifecycle: Ensure that when the CA is updated, the leaf cert is as well Also adds a new spiffe id type, and adds workload identity and workload identity URI to leaf certs. This is so certs are generated with the new workload identity based SPIFFE id. * Pulls out some leaf cert test helpers into a helpers file so it can be used in the xds controller tests. * Wires up leaf cert manager dependency * Support getting token from proxytracker * Add workload identity spiffe id type to the authorize and sign functions --------- Co-authored-by: John Murret <john.murret@hashicorp.com>
1 year ago
type UpdateEvent = cacheshim.UpdateEvent
// Callback is the function type accepted by NotifyCallback.
xds controller: setup watches for and compute leaf cert references in ProxyStateTemplate, and wire up leaf cert manager dependency (#18756) * Refactors the leafcert package to not have a dependency on agent/consul and agent/cache to avoid import cycles. This way the xds controller can just import the leafcert package to use the leafcert manager. The leaf cert logic in the controller: * Sets up watches for leaf certs that are referenced in the ProxyStateTemplate (which generates the leaf certs too). * Gets the leaf cert from the leaf cert cache * Stores the leaf cert in the ProxyState that's pushed to xds * For the cert watches, this PR also uses a bimapper + a thin wrapper to map leaf cert events to related ProxyStateTemplates Since bimapper uses a resource.Reference or resource.ID to map between two resource types, I've created an internal type for a leaf certificate to use for the resource.Reference, since it's not a v2 resource. The wrapper allows mapping events to resources (as opposed to mapping resources to resources) The controller tests: Unit: Ensure that we resolve leaf cert references Lifecycle: Ensure that when the CA is updated, the leaf cert is as well Also adds a new spiffe id type, and adds workload identity and workload identity URI to leaf certs. This is so certs are generated with the new workload identity based SPIFFE id. * Pulls out some leaf cert test helpers into a helpers file so it can be used in the xds controller tests. * Wires up leaf cert manager dependency * Support getting token from proxytracker * Add workload identity spiffe id type to the authorize and sign functions --------- Co-authored-by: John Murret <john.murret@hashicorp.com>
1 year ago
type Callback = cacheshim.Callback
// Notify registers a desire to be updated about changes to a cache result.
//
// It is a helper that abstracts code from performing their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the
// context is canceled.
//
// The passed context must be canceled or timeout in order to free resources
// and stop maintaining the value in cache. Typically request-scoped resources
// do this but if a long-lived context like context.Background is used, then the
// caller must arrange for it to be canceled when the watch is no longer
// needed.
//
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
// fast enough it will block the notification loop. When the chan is later
// drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behavior transparently.
//
// The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (c *Cache) Notify(
ctx context.Context,
t string,
r Request,
correlationID string,
ch chan<- UpdateEvent,
) error {
return c.NotifyCallback(ctx, t, r, correlationID, func(ctx context.Context, event UpdateEvent) {
select {
case ch <- event:
case <-ctx.Done():
}
})
}
// NotifyCallback allows you to receive notifications about changes to a cache
// result in the same way as Notify, but accepts a callback function instead of
// a channel.
func (c *Cache) NotifyCallback(
ctx context.Context,
t string,
r Request,
correlationID string,
cb Callback,
) error {
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return fmt.Errorf("unknown type in cache: %s", t)
}
if tEntry.Opts.SupportsBlocking {
go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, cb)
return nil
}
info := r.CacheInfo()
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
}
go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, cb)
return nil
}
func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, cb Callback) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
failures := uint(0)
for {
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Blocking request
r.Info.MinIndex = index
res, meta, err := c.getWithIndex(ctx, r)
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Check the index of the value returned in the cache entry to be sure it
// changed
if index == 0 || index < meta.Index {
xds controller: setup watches for and compute leaf cert references in ProxyStateTemplate, and wire up leaf cert manager dependency (#18756) * Refactors the leafcert package to not have a dependency on agent/consul and agent/cache to avoid import cycles. This way the xds controller can just import the leafcert package to use the leafcert manager. The leaf cert logic in the controller: * Sets up watches for leaf certs that are referenced in the ProxyStateTemplate (which generates the leaf certs too). * Gets the leaf cert from the leaf cert cache * Stores the leaf cert in the ProxyState that's pushed to xds * For the cert watches, this PR also uses a bimapper + a thin wrapper to map leaf cert events to related ProxyStateTemplates Since bimapper uses a resource.Reference or resource.ID to map between two resource types, I've created an internal type for a leaf certificate to use for the resource.Reference, since it's not a v2 resource. The wrapper allows mapping events to resources (as opposed to mapping resources to resources) The controller tests: Unit: Ensure that we resolve leaf cert references Lifecycle: Ensure that when the CA is updated, the leaf cert is as well Also adds a new spiffe id type, and adds workload identity and workload identity URI to leaf certs. This is so certs are generated with the new workload identity based SPIFFE id. * Pulls out some leaf cert test helpers into a helpers file so it can be used in the xds controller tests. * Wires up leaf cert manager dependency * Support getting token from proxytracker * Add workload identity spiffe id type to the authorize and sign functions --------- Co-authored-by: John Murret <john.murret@hashicorp.com>
1 year ago
cb(ctx, UpdateEvent{CorrelationID: correlationID, Result: res, Meta: meta, Err: err})
// Update index for next request
index = meta.Index
}
var wait time.Duration
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
wait = backOffWait(failures)
c.options.Logger.
With("error", err).
With("cache-type", r.TypeEntry.Name).
With("index", index).
Warn("handling error in Cache.Notify")
}
if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if err == nil && index < 1 {
index = 1
}
}
}
func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, cb Callback) {
index := uint64(0)
failures := uint(0)
var lastValue interface{} = nil
for {
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Make the request
r.Info.MinIndex = index
res, meta, err := c.getWithIndex(ctx, r)
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Check for a change in the value or an index change
if index < meta.Index || !isEqual(lastValue, res) {
xds controller: setup watches for and compute leaf cert references in ProxyStateTemplate, and wire up leaf cert manager dependency (#18756) * Refactors the leafcert package to not have a dependency on agent/consul and agent/cache to avoid import cycles. This way the xds controller can just import the leafcert package to use the leafcert manager. The leaf cert logic in the controller: * Sets up watches for leaf certs that are referenced in the ProxyStateTemplate (which generates the leaf certs too). * Gets the leaf cert from the leaf cert cache * Stores the leaf cert in the ProxyState that's pushed to xds * For the cert watches, this PR also uses a bimapper + a thin wrapper to map leaf cert events to related ProxyStateTemplates Since bimapper uses a resource.Reference or resource.ID to map between two resource types, I've created an internal type for a leaf certificate to use for the resource.Reference, since it's not a v2 resource. The wrapper allows mapping events to resources (as opposed to mapping resources to resources) The controller tests: Unit: Ensure that we resolve leaf cert references Lifecycle: Ensure that when the CA is updated, the leaf cert is as well Also adds a new spiffe id type, and adds workload identity and workload identity URI to leaf certs. This is so certs are generated with the new workload identity based SPIFFE id. * Pulls out some leaf cert test helpers into a helpers file so it can be used in the xds controller tests. * Wires up leaf cert manager dependency * Support getting token from proxytracker * Add workload identity spiffe id type to the authorize and sign functions --------- Co-authored-by: John Murret <john.murret@hashicorp.com>
1 year ago
cb(ctx, UpdateEvent{CorrelationID: correlationID, Result: res, Meta: meta, Err: err})
// Update index and lastValue
lastValue = res
index = meta.Index
}
// Reset or increment failure counter
if err == nil {
failures = 0
} else {
failures++
c.options.Logger.
With("error", err).
With("cache-type", r.TypeEntry.Name).
With("index", index).
Warn("handling error in Cache.Notify")
}
var wait time.Duration
// Determining how long to wait before the next poll is complicated.
// First off the happy path and the error path waits are handled distinctly
//
// Once fetching the data through the cache returns an error (and until a
// non-error value is returned) the wait time between each round of the loop
// gets controlled by the backOffWait function. Because we would have waited
// at least until the age of the cached data was too old the error path should
// immediately retry the fetch and backoff on the time as needed for persistent
// failures which potentially will wait much longer than the MaxAge of the request
//
// When on the happy path we just need to fetch from the cache often enough to ensure
// that the data is not older than the MaxAge. Therefore after fetching the data from
// the cache we can sleep until the age of that data would exceed the MaxAge. Sometimes
// this will be for the MaxAge duration (like when only a single notify was executed so
// only 1 go routine is keeping the cache updated). Other times this will be some smaller
// duration than MaxAge (when multiple notify calls were executed and this go routine just
// got data back from the cache that was a cache hit after the other go routine fetched it
// without a hit). We cannot just set MustRevalidate on the request and always sleep for MaxAge
// as this would eliminate the single-flighting of these requests in the cache and
// the efficiencies gained by it.
if failures > 0 {
wait = backOffWait(failures)
} else {
// Calculate when the cached data's Age will get too stale and
// need to be re-queried. When the data's Age already exceeds the
// maxAge the pollWait value is left at 0 to immediately re-poll
if meta.Age <= r.Info.MaxAge {
wait = r.Info.MaxAge - meta.Age
}
// Add a small amount of random jitter to the polling time. One
// purpose of the jitter is to ensure that the next time
// we fetch from the cache the data will be stale (unless another
// notify go routine has updated it while this one is sleeping).
// Without this it would be possible to wake up, fetch the data
// again where the age of the data is strictly equal to the MaxAge
// and then immediately have to re-fetch again. That wouldn't
// be terrible but it would expend a bunch more cpu cycles when
// we can definitely avoid it.
wait += lib.RandomStagger(r.Info.MaxAge / 16)
}
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
}
// isEqual compares two values for deep equality. Protobuf objects
// require us to use a special comparison because cloned structs
// may have non-exported fields that differ. For non-protobuf objects,
// we use reflect.DeepEqual().
func isEqual(a, b interface{}) bool {
// TODO move this logic into an interface so that each type can determine
// its own logic for equality, rather than a centralized type-cast like this.
if a != nil && b != nil {
a, aok := a.(proto.Message)
b, bok := b.(proto.Message)
if aok && bok {
return proto.Equal(a, b)
}
}
return reflect.DeepEqual(a, b)
}