From 0638e09b6ee33dec18ed3d38b69d719e4cb2dfe7 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Thu, 10 Jan 2019 12:46:11 +0000 Subject: [PATCH] connect: agent leaf cert caching improvements (#5091) * Add State storage and LastResult argument into Cache so that cache.Types can safely store additional data that is eventually expired. * New Leaf cache type working and basic tests passing. TODO: more extensive testing for the Root change jitter across blocking requests, test concurrent fetches for different leaves interact nicely with rootsWatcher. * Add multi-client and delayed rotation tests. * Typos and cleanup error handling in roots watch * Add comment about how the FetchResult can be used and change ca leaf state to use a non-pointer state. * Plumb test override of root CA jitter through TestAgent so that tests are deterministic again! * Fix failing config test --- agent/agent.go | 6 +- agent/cache-types/connect_ca_leaf.go | 554 +++++++++++++------ agent/cache-types/connect_ca_leaf_test.go | 643 ++++++++++++++++------ agent/cache/cache.go | 8 + agent/cache/cache_test.go | 35 +- agent/cache/entry.go | 7 + agent/cache/type.go | 52 +- agent/config/runtime.go | 9 +- agent/config/runtime_test.go | 1 + agent/connect/testing_ca.go | 14 +- agent/structs/connect_ca.go | 5 +- agent/testagent.go | 4 + 12 files changed, 990 insertions(+), 348 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 6daa168107..0ed27cb610 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3419,8 +3419,10 @@ func (a *Agent) registerCache() { }) a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{ - RPC: a, - Cache: a.cache, + RPC: a, + Cache: a.cache, + Datacenter: a.config.Datacenter, + TestOverrideCAChangeInitialDelay: a.config.ConnectTestCALeafRootChangeSpread, }, &cache.RegisterOptions{ // Maintain a blocking query, retry dropped connections quickly Refresh: true, diff --git a/agent/cache-types/connect_ca_leaf.go b/agent/cache-types/connect_ca_leaf.go index b85beb4c26..258d84f0da 100644 --- a/agent/cache-types/connect_ca_leaf.go +++ b/agent/cache-types/connect_ca_leaf.go @@ -1,13 +1,15 @@ package cachetype import ( - "crypto/sha256" + "context" "errors" "fmt" "sync" "sync/atomic" "time" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" @@ -16,29 +18,242 @@ import ( // Recommended name for registration. const ConnectCALeafName = "connect-ca-leaf" +// caChangeInitialSpreadDefault is the jitter we apply after noticing the CA +// changed before requesting a new cert. Since we don't know how many services +// are in the cluster we can't be too smart about setting this so it's a +// tradeoff between not making root rotations take unnecessarily long on small +// clusters and not hammering the servers to hard on large ones. Note that +// server's will soon have CSR rate limiting that will limit the impact on big +// clusters, but a small spread in the initial requests still seems like a good +// idea and limits how many clients will hit the rate limit. +const caChangeInitialSpreadDefault = 20 * time.Second + // ConnectCALeaf supports fetching and generating Connect leaf // certificates. type ConnectCALeaf struct { caIndex uint64 // Current index for CA roots - issuedCertsLock sync.RWMutex - issuedCerts map[string]*structs.IssuedCert + // rootWatchMu protects access to the rootWatchSubscribers map and + // rootWatchCancel + rootWatchMu sync.Mutex + // rootWatchSubscribers is a set of chans, one for each currently in-flight + // Fetch. These chans have root updates delivered from the root watcher. + rootWatchSubscribers map[chan struct{}]struct{} + // rootWatchCancel is a func to call to stop the background root watch if any. + // You must hold inflightMu to read (e.g. call) or write the value. + rootWatchCancel func() + + // testRootWatchStart/StopCount are testing helpers that allow tests to + // observe the reference counting behavior that governs the shared root watch. + // It's not exactly pretty to expose internals like this, but seems cleaner + // than constructing elaborate and brittle test cases that we can infer + // correct behavior from, and simpler than trying to probe runtime goroutine + // traces to infer correct behavior that way. They must be accessed + // atomically. + testRootWatchStartCount uint32 + testRootWatchStopCount uint32 + + RPC RPC // RPC client for remote requests + Cache *cache.Cache // Cache that has CA root certs via ConnectCARoot + Datacenter string // This agent's datacenter + + // TestOverrideCAChangeInitialDelay allows overriding the random jitter after a + // root change with a fixed delay. So far ths is only done in tests. If it's + // zero the caChangeInitialSpreadDefault maximum jitter will be used but if + // set, it overrides and provides a fixed delay. To essentially disable the + // delay in tests they can set it to 1 nanosecond. We may separately allow + // configuring the jitter limit by users later but this is different and for + // tests only since we need to set a deterministic time delay in order to test + // the behaviour here fully and determinstically. + TestOverrideCAChangeInitialDelay time.Duration +} + +// fetchState is some additional metadata we store with each cert in the cache +// to track things like expiry and coordinate paces root rotations. It's +// important this doesn't contain any pointer types since we rely on the struct +// being copied to avoid modifying the actual state in the cache entry during +// Fetch. Pointers themselves are OK, but if we point to another struct that we +// call a method or modify in some way that would directly mutate the cache and +// cause problems. We'd need to deep-clone in that case in Fetch below. +type fetchState struct { + // authorityKeyID is the key ID of the CA root that signed the current cert. + // This is just to save parsing the whole cert everytime we have to check if + // the root changed. + authorityKeyID string + + // forceExpireAfter is used to coordinate renewing certs after a CA rotation + // in a staggered way so that we don't overwhelm the servers. + forceExpireAfter time.Time +} + +// fetchStart is called on each fetch that is about to block and wait for +// changes to the leaf. It subscribes a chan to receive updates from the shared +// root watcher and triggers root watcher if it's not already running. +func (c *ConnectCALeaf) fetchStart(rootUpdateCh chan struct{}) { + c.rootWatchMu.Lock() + defer c.rootWatchMu.Unlock() + // Lazy allocation + if c.rootWatchSubscribers == nil { + c.rootWatchSubscribers = make(map[chan struct{}]struct{}) + } + // Make sure a root watcher is running. We don't only do this on first request + // to be more tolerant of errors that could cause the root watcher to fail and + // exit. + if c.rootWatchCancel == nil { + ctx, cancel := context.WithCancel(context.Background()) + c.rootWatchCancel = cancel + go c.rootWatcher(ctx) + } + c.rootWatchSubscribers[rootUpdateCh] = struct{}{} +} + +// fetchDone is called when a blocking call exits to unsubscribe from root +// updates and possibly stop the shared root watcher if it's no longer needed. +// Note that typically root CA is still being watched by clients directly and +// probably by the ProxyConfigManager so it will stay hot in cache for a while, +// we are just not monitoring it for updates any more. +func (c *ConnectCALeaf) fetchDone(rootUpdateCh chan struct{}) { + c.rootWatchMu.Lock() + defer c.rootWatchMu.Unlock() + delete(c.rootWatchSubscribers, rootUpdateCh) + if len(c.rootWatchSubscribers) == 0 && c.rootWatchCancel != nil { + // This was the last request. Stop the root watcher. + c.rootWatchCancel() + } +} + +// rootWatcher is the shared rootWatcher that runs in a background goroutine +// while needed by one or more inflight Fetch calls. +func (c *ConnectCALeaf) rootWatcher(ctx context.Context) { + atomic.AddUint32(&c.testRootWatchStartCount, 1) + defer atomic.AddUint32(&c.testRootWatchStopCount, 1) + + ch := make(chan cache.UpdateEvent, 1) + err := c.Cache.Notify(ctx, ConnectCARootName, &structs.DCSpecificRequest{ + Datacenter: c.Datacenter, + }, "roots", ch) + + notifyChange := func() { + c.rootWatchMu.Lock() + defer c.rootWatchMu.Unlock() + + for ch := range c.rootWatchSubscribers { + select { + case ch <- struct{}{}: + default: + // Don't block - chans are 1-buffered so act as an edge trigger and + // reload CA state directly from cache so they never "miss" updates. + } + } + } + + if err != nil { + // Trigger all inflight watchers. We don't pass the error, but they will + // reload from cache and observe the same error and return it to the caller, + // or if it's transient, will continue and the next Fetch will get us back + // into the right state. Seems better than busy loop-retrying here given + // that almost any error we would see here would also be returned from the + // cache get this will trigger. + notifyChange() + return + } + + var oldRoots *structs.IndexedCARoots + // Wait for updates to roots or all requests to stop + for { + select { + case <-ctx.Done(): + return + case e := <-ch: + // Root response changed in some way. Note this might be the initial + // fetch. + if e.Err != nil { + // See above rationale about the error propagation + notifyChange() + continue + } + + roots, ok := e.Result.(*structs.IndexedCARoots) + if !ok { + // See above rationale about the error propagation + notifyChange() + continue + } + + // Check that the active root is actually different from the last CA + // config there are many reasons the config might have changed without + // actually updating the CA root that is signing certs in the cluster. + // The Fetch calls will also validate this since the first call here we + // don't know if it changed or not, but there is no point waking up all + // Fetch calls to check this if we know none of them will need to act on + // this update. + if oldRoots != nil && oldRoots.ActiveRootID == roots.ActiveRootID { + continue + } - RPC RPC // RPC client for remote requests - Cache *cache.Cache // Cache that has CA root certs via ConnectCARoot + // Distribute the update to all inflight requests - they will decide + // whether or not they need to act on it. + notifyChange() + oldRoots = roots + } + } } -// issuedKey returns the issuedCerts cache key for a given service and token. We -// use a hash rather than concatenating strings to provide resilience against -// user input containing our separator - both service name and token ID can be -// freely manipulated by user so may contain any delimiter we choose. It also -// has the benefit of not leaking the ACL token to a new place in memory it -// might get accidentally dumped etc. -func issuedKey(service, token string) string { - hash := sha256.New() - hash.Write([]byte(service)) - hash.Write([]byte(token)) - return fmt.Sprintf("%x", hash.Sum(nil)) +// calculateSoftExpiry encapsulates our logic for when to renew a cert based on +// it's age. It returns a pair of times min, max which makes it easier to test +// the logic without non-determinisic jitter to account for. The caller should +// choose a time randomly in between these. +// +// We want to balance a few factors here: +// - renew too early and it increases the aggregate CSR rate in the cluster +// - renew too late and it risks disruption to the service if a transient +// error prevents the renewal +// - we want a broad amount of jitter so if there is an outage, we don't end +// up with all services in sync and causing a thundering herd every +// renewal period. Broader is better for smoothing requests but pushes +// both earlier and later tradeoffs above. +// +// Somewhat arbitrarily the current strategy looks like this: +// +// 0 60% 90% +// Issued [------------------------------|===============|!!!!!] Expires +// 72h TTL: 0 ~43h ~65h +// 1h TTL: 0 36m 54m +// +// Where |===| is the soft renewal period where we jitter for the first attempt +// and |!!!| is the danger zone where we just try immediately. +// +// In the happy path (no outages) the average renewal occurs half way through +// the soft renewal region or at 75% of the cert lifetime which is ~54 hours for +// a 72 hour cert, or 45 mins for a 1 hour cert. +// +// If we are already in the softRenewal period, we randomly pick a time between +// now and the start of the danger zone. +// +// We pass in now to make testing easier. +func calculateSoftExpiry(now time.Time, cert *structs.IssuedCert) (min time.Time, max time.Time) { + + certLifetime := cert.ValidBefore.Sub(cert.ValidAfter) + if certLifetime < 10*time.Minute { + // Shouldn't happen as we limit to 1 hour shortest elsewhere but just be + // defensive against strange times or bugs. + return now, now + } + + // Find the 60% mark in diagram above + softRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.6)) + hardRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.9)) + + if now.After(hardRenewTime) { + // In the hard renew period, or already expired. Renew now! + return now, now + } + + if now.After(softRenewTime) { + // Already in the soft renew period, make now the lower bound for jitter + softRenewTime = now + } + return softRenewTime, hardRenewTime } func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { @@ -51,92 +266,165 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache "Internal cache failure: request wrong type: %T", req) } - // This channel watches our overall timeout. The other goroutines - // launched in this function should end all around the same time so - // they clean themselves up. - timeoutCh := time.After(opts.Timeout) - - // Kick off the goroutine that waits for new CA roots. The channel buffer - // is so that the goroutine doesn't block forever if we return for other - // reasons. - newRootCACh := make(chan error, 1) - go c.waitNewRootCA(reqReal.Datacenter, newRootCACh, opts.Timeout) - - // Generate a cache key to lookup/store the cert. We MUST generate a new cert - // per token used to ensure revocation by ACL token is robust. - issuedKey := issuedKey(reqReal.Service, reqReal.Token) - - // Get our prior cert (if we had one) and use that to determine our - // expiration time. If no cert exists, we expire immediately since we - // need to generate. - c.issuedCertsLock.RLock() - lastCert := c.issuedCerts[issuedKey] - c.issuedCertsLock.RUnlock() - - var leafExpiryCh <-chan time.Time - if lastCert != nil { - // Determine how long we wait until triggering. If we've already - // expired, we trigger immediately. - if expiryDur := lastCert.ValidBefore.Sub(time.Now()); expiryDur > 0 { - leafExpiryCh = time.After(expiryDur - 1*time.Hour) - // TODO(mitchellh): 1 hour buffer is hardcoded above - - // We should not depend on the cache package de-duplicating requests for - // the same service/token (which is all we care about keying our local - // issued cert cache on) since it might later make sense to partition - // clients for other reasons too. So if the request has a 0 MinIndex, and - // the cached cert is still valid, then the client is expecting an - // immediate response and hasn't already seen the cached cert, return it - // now. - if opts.MinIndex == 0 { - result.Value = lastCert - result.Index = lastCert.ModifyIndex - return result, nil - } + // Do we already have a cert in the cache? + var existing *structs.IssuedCert + // Really important this is not a pointer type since otherwise we would set it + // to point to the actual fetchState in the cache entry below and then would + // be directly modifying that in the cache entry even when we might later + // return an error and not update index etc. By being a value, we force a copy + var state fetchState + if opts.LastResult != nil { + existing, ok = opts.LastResult.Value.(*structs.IssuedCert) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: last value wrong type: %T", req) + } + state, ok = opts.LastResult.State.(fetchState) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: last state wrong type: %T", req) } + } else { + state = fetchState{} + } + + // Handle brand new request first as it's simplest. + if existing == nil { + return c.generateNewLeaf(reqReal, &state) + } + + // We have a certificate in cache already. Check it's still valid. + now := time.Now() + minExpire, maxExpire := calculateSoftExpiry(now, existing) + expiresAt := minExpire.Add(lib.RandomStagger(maxExpire.Sub(minExpire))) + + // Check if we have been force-expired by a root update that jittered beyond + // the timeout of the query it was running. + if !state.forceExpireAfter.IsZero() && state.forceExpireAfter.Before(expiresAt) { + expiresAt = state.forceExpireAfter } - if leafExpiryCh == nil { - // If the channel is still nil then it means we need to generate - // a cert no matter what: we either don't have an existing one or - // it is expired. - leafExpiryCh = time.After(0) + if expiresAt == now || expiresAt.Before(now) { + // Already expired, just make a new one right away + return c.generateNewLeaf(reqReal, &state) } - // Block on the events that wake us up. - select { - case <-timeoutCh: - // On a timeout, we just return the empty result and no error. - // It isn't an error to timeout, its just the limit of time the - // caching system wants us to block for. By returning an empty result - // the caching system will ignore. - return result, nil - - case err := <-newRootCACh: - // A new root CA triggers us to refresh the leaf certificate. - // If there was an error while getting the root CA then we return. - // Otherwise, we leave the select statement and move to generation. - if err != nil { - return result, err + // We are about to block and wait for a change or timeout. + + // Make a chan we can be notified of changes to CA roots on. It must be + // buffered so we don't miss broadcasts from rootsWatch. It is an edge trigger + // so a single buffer element is sufficient regardless of whether we consume + // the updates fast enough since as soon as we see an element in it, we will + // reload latest CA from cache. + rootUpdateCh := make(chan struct{}, 1) + + // Subscribe our chan to get root update notification. + c.fetchStart(rootUpdateCh) + defer c.fetchDone(rootUpdateCh) + + // Setup result to mirror the current value for if we timeout. This allows us + // to update the state even if we don't generate a new cert. + result.Value = existing + result.Index = existing.ModifyIndex + result.State = state + + // Setup the timeout chan outside the loop so we don't keep bumping the timout + // later if we loop around. + timeoutCh := time.After(opts.Timeout) + + // Setup initial expiry chan. We may change this if root update occurs in the + // loop below. + expiresCh := time.After(expiresAt.Sub(now)) + + // Current cert is valid so just wait until it expires or we time out. + for { + select { + case <-timeoutCh: + // We timed out the request with same cert. + return result, nil + + case <-expiresCh: + // Cert expired or was force-expired by a root change. + return c.generateNewLeaf(reqReal, &state) + + case <-rootUpdateCh: + // A root cache change occurred, reload roots from cache. + roots, err := c.rootsFromCache() + if err != nil { + return result, err + } + + // Handle _possibly_ changed roots. We still need to verify the new active + // root is not the same as the one our current cert was signed by since we + // can be notified spuriously if we are the first request since the + // rootsWatcher didn't know about the CA we were signed by. + if activeRootHasKey(roots, state.authorityKeyID) { + // Current active CA is the same one that signed our current cert so + // keep waiting for a change. + continue + } + // CA root changed. We add some jitter here to avoid a thundering herd. + // See docs on caChangeInitialJitter const. + delay := lib.RandomStagger(caChangeInitialSpreadDefault) + if c.TestOverrideCAChangeInitialDelay > 0 { + delay = c.TestOverrideCAChangeInitialDelay + } + // Force the cert to be expired after the jitter - the delay above might + // be longer than we have left on our timeout. We set forceExpireAfter in + // the cache state so the next request will notice we still need to renew + // and do it at the right time. This is cleared once a new cert is + // returned by generateNewLeaf. + state.forceExpireAfter = time.Now().Add(delay) + // If the delay time is within the current timeout, we want to renew the + // as soon as it's up. We change the expire time and chan so that when we + // loop back around, we'll wait at most delay until generating a new cert. + if state.forceExpireAfter.Before(expiresAt) { + expiresAt = state.forceExpireAfter + expiresCh = time.After(delay) + } + continue } + } +} - case <-leafExpiryCh: - // The existing leaf certificate is expiring soon, so we generate a - // new cert with a healthy overlapping validity period (determined - // by the above channel). +func activeRootHasKey(roots *structs.IndexedCARoots, currentSigningKeyID string) bool { + for _, ca := range roots.Roots { + if ca.Active { + if ca.SigningKeyID == currentSigningKeyID { + return true + } + // Found the active CA but it has changed + return false + } } + // Shouldn't be possible since at least one root should be active. + return false +} - // Need to lookup RootCAs response to discover trust domain. First just lookup - // with no blocking info - this should be a cache hit most of the time. +func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) { rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{ - Datacenter: reqReal.Datacenter, + Datacenter: c.Datacenter, }) if err != nil { - return result, err + return nil, err } roots, ok := rawRoots.(*structs.IndexedCARoots) if !ok { - return result, errors.New("invalid RootCA response type") + return nil, errors.New("invalid RootCA response type") + } + return roots, nil +} + +// generateNewLeaf does the actual work of creating a new private key, +// generating a CSR and getting it signed by the servers. +func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchState) (cache.FetchResult, error) { + var result cache.FetchResult + + // Need to lookup RootCAs response to discover trust domain. This should be a + // cache hit. + roots, err := c.rootsFromCache() + if err != nil { + return result, err } if roots.TrustDomain == "" { return result, errors.New("cluster has no CA bootstrapped yet") @@ -145,9 +433,9 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache // Build the service ID serviceID := &connect.SpiffeIDService{ Host: roots.TrustDomain, - Datacenter: reqReal.Datacenter, + Datacenter: req.Datacenter, Namespace: "default", - Service: reqReal.Service, + Service: req.Service, } // Create a new private key @@ -165,8 +453,8 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache // Request signing var reply structs.IssuedCert args := structs.CASignRequest{ - WriteRequest: structs.WriteRequest{Token: reqReal.Token}, - Datacenter: reqReal.Datacenter, + WriteRequest: structs.WriteRequest{Token: req.Token}, + Datacenter: req.Datacenter, CSR: csr, } if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil { @@ -174,80 +462,22 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache } reply.PrivateKeyPEM = pkPEM - // Lock the issued certs map so we can insert it. We only insert if - // we didn't happen to get a newer one. This should never happen since - // the Cache should ensure only one Fetch per service, but we sanity - // check just in case. - c.issuedCertsLock.Lock() - defer c.issuedCertsLock.Unlock() - lastCert = c.issuedCerts[issuedKey] - if lastCert == nil || lastCert.ModifyIndex < reply.ModifyIndex { - if c.issuedCerts == nil { - c.issuedCerts = make(map[string]*structs.IssuedCert) - } + // Reset the forcedExpiry in the state + state.forceExpireAfter = time.Time{} - c.issuedCerts[issuedKey] = &reply - lastCert = &reply - } - - result.Value = lastCert - result.Index = lastCert.ModifyIndex - return result, nil -} - -// waitNewRootCA blocks until a new root CA is available or the timeout is -// reached (on timeout ErrTimeout is returned on the channel). -func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error, - timeout time.Duration) { - // We always want to block on at least an initial value. If this isn't - minIndex := atomic.LoadUint64(&c.caIndex) - if minIndex == 0 { - minIndex = 1 - } - - // Fetch some new roots. This will block until our MinQueryIndex is - // matched or the timeout is reached. - rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{ - Datacenter: datacenter, - QueryOptions: structs.QueryOptions{ - MinQueryIndex: minIndex, - MaxQueryTime: timeout, - }, - }) + cert, err := connect.ParseCert(reply.CertPEM) if err != nil { - ch <- err - return - } - - roots, ok := rawRoots.(*structs.IndexedCARoots) - if !ok { - // This should never happen but we don't want to even risk a panic - ch <- fmt.Errorf( - "internal error: CA root cache returned bad type: %T", rawRoots) - return - } - - // We do a loop here because there can be multiple waitNewRootCA calls - // happening simultaneously. Each Fetch kicks off one call. These are - // multiplexed through Cache.Get which should ensure we only ever - // actually make a single RPC call. However, there is a race to set - // the caIndex field so do a basic CAS loop here. - for { - // We only set our index if its newer than what is previously set. - old := atomic.LoadUint64(&c.caIndex) - if old == roots.Index || old > roots.Index { - break - } - - // Set the new index atomically. If the caIndex value changed - // in the meantime, retry. - if atomic.CompareAndSwapUint64(&c.caIndex, old, roots.Index) { - break - } + return result, err } - - // Trigger the channel since we updated. - ch <- nil + // Set the CA key ID so we can easily tell when a active root has changed. + state.authorityKeyID = connect.HexString(cert.AuthorityKeyId) + + result.Value = &reply + // Store value not pointer so we don't accidentally mutate the cache entry + // state in Fetch. + result.State = *state + result.Index = reply.ModifyIndex + return result, nil } func (c *ConnectCALeaf) SupportsBlocking() bool { diff --git a/agent/cache-types/connect_ca_leaf_test.go b/agent/cache-types/connect_ca_leaf_test.go index c52c8ce020..130f74fc9d 100644 --- a/agent/cache-types/connect_ca_leaf_test.go +++ b/agent/cache-types/connect_ca_leaf_test.go @@ -6,12 +6,139 @@ import ( "testing" "time" + "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +func TestCalculateSoftExpire(t *testing.T) { + tests := []struct { + name string + now string + issued string + lifetime time.Duration + wantMin string + wantMax string + }{ + { + name: "72h just issued", + now: "2018-01-01 00:00:01", + issued: "2018-01-01 00:00:00", + lifetime: 72 * time.Hour, + // Should jitter between 60% and 90% of the lifetime which is 43.2/64.8 + // hours after issued + wantMin: "2018-01-02 19:12:00", + wantMax: "2018-01-03 16:48:00", + }, + { + name: "72h in renew range", + // This time should be inside the renewal range. + now: "2018-01-02 20:00:20", + issued: "2018-01-01 00:00:00", + lifetime: 72 * time.Hour, + // Min should be the "now" time + wantMin: "2018-01-02 20:00:20", + wantMax: "2018-01-03 16:48:00", + }, + { + name: "72h in hard renew", + // This time should be inside the renewal range. + now: "2018-01-03 18:00:00", + issued: "2018-01-01 00:00:00", + lifetime: 72 * time.Hour, + // Min and max should both be the "now" time + wantMin: "2018-01-03 18:00:00", + wantMax: "2018-01-03 18:00:00", + }, + { + name: "72h expired", + // This time is after expiry + now: "2018-01-05 00:00:00", + issued: "2018-01-01 00:00:00", + lifetime: 72 * time.Hour, + // Min and max should both be the "now" time + wantMin: "2018-01-05 00:00:00", + wantMax: "2018-01-05 00:00:00", + }, + { + name: "1h just issued", + now: "2018-01-01 00:00:01", + issued: "2018-01-01 00:00:00", + lifetime: 1 * time.Hour, + // Should jitter between 60% and 90% of the lifetime which is 36/54 mins + // hours after issued + wantMin: "2018-01-01 00:36:00", + wantMax: "2018-01-01 00:54:00", + }, + { + name: "1h in renew range", + // This time should be inside the renewal range. + now: "2018-01-01 00:40:00", + issued: "2018-01-01 00:00:00", + lifetime: 1 * time.Hour, + // Min should be the "now" time + wantMin: "2018-01-01 00:40:00", + wantMax: "2018-01-01 00:54:00", + }, + { + name: "1h in hard renew", + // This time should be inside the renewal range. + now: "2018-01-01 00:55:00", + issued: "2018-01-01 00:00:00", + lifetime: 1 * time.Hour, + // Min and max should both be the "now" time + wantMin: "2018-01-01 00:55:00", + wantMax: "2018-01-01 00:55:00", + }, + { + name: "1h expired", + // This time is after expiry + now: "2018-01-01 01:01:01", + issued: "2018-01-01 00:00:00", + lifetime: 1 * time.Hour, + // Min and max should both be the "now" time + wantMin: "2018-01-01 01:01:01", + wantMax: "2018-01-01 01:01:01", + }, + { + name: "too short lifetime", + // This time is after expiry + now: "2018-01-01 01:01:01", + issued: "2018-01-01 00:00:00", + lifetime: 1 * time.Minute, + // Min and max should both be the "now" time + wantMin: "2018-01-01 01:01:01", + wantMax: "2018-01-01 01:01:01", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require := require.New(t) + now, err := time.Parse("2006-01-02 15:04:05", tc.now) + require.NoError(err) + issued, err := time.Parse("2006-01-02 15:04:05", tc.issued) + require.NoError(err) + wantMin, err := time.Parse("2006-01-02 15:04:05", tc.wantMin) + require.NoError(err) + wantMax, err := time.Parse("2006-01-02 15:04:05", tc.wantMax) + require.NoError(err) + + min, max := calculateSoftExpiry(now, &structs.IssuedCert{ + ValidAfter: issued, + ValidBefore: issued.Add(tc.lifetime), + }) + + require.Equal(wantMin, min) + require.Equal(wantMax, max) + }) + } +} + // Test that after an initial signing, new CA roots (new ID) will // trigger a blocking query to execute. func TestConnectCALeaf_changingRoots(t *testing.T) { @@ -23,10 +150,16 @@ func TestConnectCALeaf_changingRoots(t *testing.T) { typ, rootsCh := testCALeafType(t, rpc) defer close(rootsCh) + + caRoot := connect.TestCA(t, nil) + caRoot.Active = true rootsCh <- structs.IndexedCARoots{ - ActiveRootID: "1", + ActiveRootID: caRoot.ID, TrustDomain: "fake-trust-domain.consul", - QueryMeta: structs.QueryMeta{Index: 1}, + Roots: []*structs.CARoot{ + caRoot, + }, + QueryMeta: structs.QueryMeta{Index: 1}, } // Instrument ConnectCA.Sign to return signed cert @@ -35,7 +168,10 @@ func TestConnectCALeaf_changingRoots(t *testing.T) { rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { reply := args.Get(2).(*structs.IssuedCert) - reply.ValidBefore = time.Now().Add(12 * time.Hour) + leaf, _ := connect.TestLeaf(t, "web", caRoot) + reply.CertPEM = leaf + reply.ValidAfter = time.Now().Add(-1 * time.Hour) + reply.ValidBefore = time.Now().Add(11 * time.Hour) reply.CreateIndex = atomic.AddUint64(&idx, 1) reply.ModifyIndex = reply.CreateIndex resp = reply @@ -51,10 +187,11 @@ func TestConnectCALeaf_changingRoots(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Fatal("shouldn't block waiting for fetch") case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 1, - }, result) + v := mustFetchResult(t, result) + require.Equal(resp, v.Value) + require.Equal(uint64(1), v.Index) + // Set the LastResult for subsequent fetches + opts.LastResult = &v } // Second fetch should block with set index @@ -66,20 +203,28 @@ func TestConnectCALeaf_changingRoots(t *testing.T) { case <-time.After(100 * time.Millisecond): } - // Let's send in new roots, which should trigger the sign req + // Let's send in new roots, which should trigger the sign req. We need to take + // care to set the new root as active + caRoot2 := connect.TestCA(t, nil) + caRoot2.Active = true + caRoot.Active = false rootsCh <- structs.IndexedCARoots{ - ActiveRootID: "2", + ActiveRootID: caRoot2.ID, TrustDomain: "fake-trust-domain.consul", - QueryMeta: structs.QueryMeta{Index: 2}, + Roots: []*structs.CARoot{ + caRoot2, + caRoot, + }, + QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)}, } select { case <-time.After(100 * time.Millisecond): t.Fatal("shouldn't block waiting for fetch") case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 2, - }, result) + v := mustFetchResult(t, result) + require.Equal(resp, v.Value) + // 3 since the second CA "update" used up 2 + require.Equal(uint64(3), v.Index) } // Third fetch should block @@ -91,9 +236,10 @@ func TestConnectCALeaf_changingRoots(t *testing.T) { } } -// Test that after an initial signing, an expiringLeaf will trigger a -// blocking query to resign. -func TestConnectCALeaf_expiringLeaf(t *testing.T) { +// Tests that if the root change jitter is longer than the time left on the +// timeout, we return normally but then still renew the cert on a subsequent +// call. +func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) { t.Parallel() require := require.New(t) @@ -102,32 +248,47 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) { typ, rootsCh := testCALeafType(t, rpc) defer close(rootsCh) + + // Override the root-change delay so we will timeout first. We can't set it to + // a crazy high value otherwise we'll have to wait that long in the test to + // see if it actually happens on subsequent calls. We instead reduce the + // timeout in FetchOptions to be much shorter than this. + typ.TestOverrideCAChangeInitialDelay = 100 * time.Millisecond + + caRoot := connect.TestCA(t, nil) + caRoot.Active = true rootsCh <- structs.IndexedCARoots{ - ActiveRootID: "1", + ActiveRootID: caRoot.ID, TrustDomain: "fake-trust-domain.consul", - QueryMeta: structs.QueryMeta{Index: 1}, + Roots: []*structs.CARoot{ + caRoot, + }, + QueryMeta: structs.QueryMeta{Index: 1}, } - // Instrument ConnectCA.Sign to + // Instrument ConnectCA.Sign to return signed cert var resp *structs.IssuedCert var idx uint64 rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { reply := args.Get(2).(*structs.IssuedCert) + leaf, _ := connect.TestLeaf(t, "web", caRoot) + reply.CertPEM = leaf + reply.ValidAfter = time.Now().Add(-1 * time.Hour) + reply.ValidBefore = time.Now().Add(11 * time.Hour) reply.CreateIndex = atomic.AddUint64(&idx, 1) reply.ModifyIndex = reply.CreateIndex - - // This sets the validity to 0 on the first call, and - // 12 hours+ on subsequent calls. This means that our first - // cert expires immediately. - reply.ValidBefore = time.Now().Add((12 * time.Hour) * - time.Duration(reply.CreateIndex-1)) - resp = reply }) - // We'll reuse the fetch options and request - opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Second} + // We'll reuse the fetch options and request. Timeout must be much shorter + // than the initial root delay. 20ms means that if we deliver the root change + // during the first blocking call, we should need to block fully for 5 more + // calls before the cert is renewed. We pick a timeout that is not an exact + // multiple of the 100ms delay above to reduce the chance that timing works + // out in a way that makes it hard to tell a timeout from an early return due + // to a cert renewal. + opts := cache.FetchOptions{MinIndex: 0, Timeout: 35 * time.Millisecond} req := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web"} // First fetch should return immediately @@ -136,48 +297,92 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Fatal("shouldn't block waiting for fetch") case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 1, - }, result) + v := mustFetchResult(t, result) + require.Equal(resp, v.Value) + require.Equal(uint64(1), v.Index) + // Set the LastResult for subsequent fetches + opts.LastResult = &v } - // Second fetch should return immediately despite there being - // no updated CA roots, because we issued an expired cert. - fetchCh = TestFetchCh(t, typ, opts, req) - select { - case <-time.After(100 * time.Millisecond): - t.Fatal("shouldn't block waiting for fetch") - case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 2, - }, result) + // Let's send in new roots, which should eventually trigger the sign req. We + // need to take care to set the new root as active + caRoot2 := connect.TestCA(t, nil) + caRoot2.Active = true + caRoot.Active = false + rootsCh <- structs.IndexedCARoots{ + ActiveRootID: caRoot2.ID, + TrustDomain: "fake-trust-domain.consul", + Roots: []*structs.CARoot{ + caRoot2, + caRoot, + }, + QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)}, } - - // Third fetch should block since the cert is not expiring and - // we also didn't update CA certs. - opts.MinIndex = 2 - fetchCh = TestFetchCh(t, typ, opts, req) - select { - case result := <-fetchCh: - t.Fatalf("should not return: %#v", result) - case <-time.After(100 * time.Millisecond): + earliestRootDelivery := time.Now() + + // Some number of fetches (2,3,4 likely) should timeout after 20ms and after + // 100ms has elapsed total we should see the new cert. Since this is all very + // timing dependent, we don't hard code exact numbers here and instead loop + // for plenty of time and do as many calls as it takes and just assert on the + // time taken and that the call either blocks and returns the cached cert, or + // returns the new one. + opts.MinIndex = 1 + var shouldExpireAfter time.Time + i := 1 + rootsDelivered := false + for rootsDelivered { + start := time.Now() + fetchCh = TestFetchCh(t, typ, opts, req) + select { + case result := <-fetchCh: + v := mustFetchResult(t, result) + timeTaken := time.Since(start) + + // There are two options, either it blocked waiting for the delay after + // the rotation or it returned the new CA cert before the timeout was + // done. TO be more robust against timing, we take the value as the + // decider for which case it is, and assert timing matches our expected + // bounds rather than vice versa. + + if v.Index > uint64(1) { + // Got a new cert + require.Equal(resp, v.Value) + require.Equal(uint64(3), v.Index) + // Should not have been delivered before the delay + require.True(time.Since(earliestRootDelivery) > typ.TestOverrideCAChangeInitialDelay) + // All good. We are done! + rootsDelivered = true + } else { + // Should be the cached cert + require.Equal(resp, v.Value) + require.Equal(uint64(1), v.Index) + // Sanity check we blocked for the whole timeout + require.Truef(timeTaken > opts.Timeout, + "should block for at least %s, returned after %s", + opts.Timeout, timeTaken) + // Sanity check that the forceExpireAfter state was set correctly + shouldExpireAfter = v.State.(*fetchState).forceExpireAfter + require.True(shouldExpireAfter.After(time.Now())) + require.True(shouldExpireAfter.Before(time.Now().Add(typ.TestOverrideCAChangeInitialDelay))) + } + // Set the LastResult for subsequent fetches + opts.LastResult = &v + case <-time.After(50 * time.Millisecond): + t.Fatalf("request %d blocked too long", i) + } + i++ + + // Sanity check that we've not gone way beyond the deadline without a + // new cert. We give some leeway to make it less brittle. + require.Falsef( + time.Now().After(shouldExpireAfter.Add(100*time.Millisecond)), + "waited extra 100ms and delayed CA rotate renew didn't happen") } } -// Test that once one client (e.g. the proxycfg.Manager) has fetched a cert, -// that subsequent clients get it returned immediately and don't block until it -// expires or their request times out. Note that typically FEtches at this level -// are de-duped by the cache higher up, but if the two clients are using -// different ACL tokens for example (common) that may not be the case, and we -// should wtill deliver correct blocking semantics to both. -// -// Additionally, we want to make sure that clients with different tokens -// generate distinct certs since we might later want to revoke all certs fetched -// with a given token but can't if a client using that token was served a cert -// generated under a different token (say the agent token). -func TestConnectCALeaf_multipleClientsDifferentTokens(t *testing.T) { +// This test runs multiple concurrent callers watching different leaf certs and +// tries to ensure that the background root watch activity behaves correctly. +func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) { t.Parallel() require := require.New(t) @@ -186,95 +391,179 @@ func TestConnectCALeaf_multipleClientsDifferentTokens(t *testing.T) { typ, rootsCh := testCALeafType(t, rpc) defer close(rootsCh) + + caRoot := connect.TestCA(t, nil) + caRoot.Active = true rootsCh <- structs.IndexedCARoots{ - ActiveRootID: "1", + ActiveRootID: caRoot.ID, TrustDomain: "fake-trust-domain.consul", - QueryMeta: structs.QueryMeta{Index: 1}, + Roots: []*structs.CARoot{ + caRoot, + }, + QueryMeta: structs.QueryMeta{Index: 1}, } - // Instrument ConnectCA.Sign to - var resp *structs.IssuedCert + // Instrument ConnectCA.Sign to return signed cert var idx uint64 rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { reply := args.Get(2).(*structs.IssuedCert) + // Note we will sign certs for same service name each time because + // otherwise we have to re-invent whole CSR endpoint here to be able to + // control things - parse PEM sign with right key etc. It doesn't matter - + // we use the CreateIndex to differentiate the "right" results. + leaf, _ := connect.TestLeaf(t, "web", caRoot) + reply.CertPEM = leaf + reply.ValidAfter = time.Now().Add(-1 * time.Hour) + reply.ValidBefore = time.Now().Add(11 * time.Hour) reply.CreateIndex = atomic.AddUint64(&idx, 1) reply.ModifyIndex = reply.CreateIndex - reply.ValidBefore = time.Now().Add(12 * time.Hour) - resp = reply }) - // We'll reuse the fetch options and request - opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute} - reqA := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "A-token"} - reqB := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "B-token"} + // n is the number of clients we'll run + n := 3 + + // setup/testDoneCh are used for coordinating clients such that each has + // initial cert delivered and is blocking before the root changes. It's not a + // wait group since we want to be able to timeout the main test goroutine if + // one of the clients gets stuck. Instead it's a buffered chan. + setupDoneCh := make(chan struct{}, n) + testDoneCh := make(chan struct{}, n) + // rootsUpdate is used to coordinate clients so they know when they should + // expect to see leaf renewed after root change. + rootsUpdatedCh := make(chan struct{}) + + // Create a function that models a single client. It should go through the + // steps of getting an initial cert and then watching for changes until root + // updates. + client := func(i int) { + // We'll reuse the fetch options and request + opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Second} + req := &ConnectCALeafRequest{Datacenter: "dc1", Service: fmt.Sprintf("web-%d", i)} + + // First fetch should return immediately + fetchCh := TestFetchCh(t, typ, opts, req) + select { + case <-time.After(100 * time.Millisecond): + t.Fatal("shouldn't block waiting for fetch") + case result := <-fetchCh: + v := mustFetchResult(t, result) + opts.LastResult = &v + } + + // Second fetch should block with set index + opts.MinIndex = 1 + fetchCh = TestFetchCh(t, typ, opts, req) + select { + case result := <-fetchCh: + t.Fatalf("should not return: %#v", result) + case <-time.After(100 * time.Millisecond): + } + + // We're done with setup and the blocking call is still blocking in + // background. + setupDoneCh <- struct{}{} + + // Wait until all others are also done and roots change incase there are + // stragglers delaying the root update. + select { + case <-rootsUpdatedCh: + case <-time.After(200 * time.Millisecond): + t.Fatalf("waited too long for root update") + } + + // Now we should see root update within a short period + select { + case <-time.After(100 * time.Millisecond): + t.Fatal("shouldn't block waiting for fetch") + case result := <-fetchCh: + v := mustFetchResult(t, result) + // Index must be different + require.NotEqual(opts.MinIndex, v.Value.(*structs.IssuedCert).CreateIndex) + } + + testDoneCh <- struct{}{} + } - // First fetch (Client A, MinIndex = 0) should return immediately - fetchCh := TestFetchCh(t, typ, opts, reqA) - var certA *structs.IssuedCert - select { - case <-time.After(100 * time.Millisecond): - t.Fatal("shouldn't block waiting for fetch") - case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 1, - }, result) - certA = result.(cache.FetchResult).Value.(*structs.IssuedCert) + // Sanity check the roots watcher is not running yet + assertRootsWatchCounts(t, typ, 0, 0) + + for i := 0; i < n; i++ { + go client(i) } - // Second fetch (Client B, MinIndex = 0) should return immediately - fetchCh = TestFetchCh(t, typ, opts, reqB) - select { - case <-time.After(100 * time.Millisecond): - t.Fatal("shouldn't block waiting for fetch") - case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 2, - }, result) - // Different tokens should result in different certs. Note that we don't - // actually generate and sign real certs in this test with our mock RPC but - // this is enough to be sure we actually generated a different Private Key - // for each one and aren't just differnt due to index values. - require.NotEqual(certA.PrivateKeyPEM, - result.(cache.FetchResult).Value.(*structs.IssuedCert).PrivateKeyPEM) + timeoutCh := time.After(200 * time.Millisecond) + + for i := 0; i < n; i++ { + select { + case <-timeoutCh: + t.Fatal("timed out waiting for clients") + case <-setupDoneCh: + } } - // Third fetch (Client A, MinIndex = > 0) should block - opts.MinIndex = 2 - fetchCh = TestFetchCh(t, typ, opts, reqA) - select { - case result := <-fetchCh: - t.Fatalf("should not return: %#v", result) - case <-time.After(100 * time.Millisecond): + // Should be 3 clients running now, so the roots watcher should have started + // once and not stopped. + assertRootsWatchCounts(t, typ, 1, 0) + + // Now we deliver the root update + caRoot2 := connect.TestCA(t, nil) + caRoot2.Active = true + caRoot.Active = false + rootsCh <- structs.IndexedCARoots{ + ActiveRootID: caRoot2.ID, + TrustDomain: "fake-trust-domain.consul", + Roots: []*structs.CARoot{ + caRoot2, + caRoot, + }, + QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)}, + } + // And notify clients + close(rootsUpdatedCh) + + timeoutCh = time.After(200 * time.Millisecond) + for i := 0; i < n; i++ { + select { + case <-timeoutCh: + t.Fatalf("timed out waiting for %d of %d clients to renew after root change", n-i, n) + case <-testDoneCh: + } } - // Fourth fetch (Client B, MinIndex = > 0) should block - fetchCh = TestFetchCh(t, typ, opts, reqB) - select { - case result := <-fetchCh: - t.Fatalf("should not return: %#v", result) - case <-time.After(100 * time.Millisecond): + // All active requests have returned the new cert so the rootsWatcher should + // have stopped. This is timing dependent though so retry a few times + retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) { + assertRootsWatchCounts(r, typ, 1, 1) + }) +} + +func assertRootsWatchCounts(t require.TestingT, typ *ConnectCALeaf, wantStarts, wantStops int) { + if tt, ok := t.(*testing.T); ok { + tt.Helper() } + starts := atomic.LoadUint32(&typ.testRootWatchStartCount) + stops := atomic.LoadUint32(&typ.testRootWatchStopCount) + require.Equal(t, wantStarts, int(starts)) + require.Equal(t, wantStops, int(stops)) } -// Test that once one client (e.g. the proxycfg.Manager) has fetched a cert, -// that subsequent clients get it returned immediately and don't block until it -// expires or their request times out. Note that typically Fetches at this level -// are de-duped by the cache higher up, the test above explicitly tests the case -// where two clients with different tokens request the same cert. However two -// clients sharing a token _may_ share the certificate, but the cachetype should -// not implicitly depend on the cache mechanism de-duping these clients. -// -// Genrally we _shouldn't_ rely on implementation details in the cache package -// about partitioning to behave correctly as that is likely to lead to subtle -// errors later when the implementation there changes, so this test ensures that -// even if the cache for some reason decides to not share an existing cache -// entry with a second client despite using the same token, that we don't block -// it's initial request assuming that it's already recieved the in-memory and -// still valid cert. -func TestConnectCALeaf_multipleClientsSameToken(t *testing.T) { +func mustFetchResult(t *testing.T, result interface{}) cache.FetchResult { + t.Helper() + switch v := result.(type) { + case error: + require.NoError(t, v) + case cache.FetchResult: + return v + default: + t.Fatalf("unexpected type from fetch %T", v) + } + return cache.FetchResult{} +} + +// Test that after an initial signing, an expiringLeaf will trigger a +// blocking query to resign. +func TestConnectCALeaf_expiringLeaf(t *testing.T) { t.Parallel() require := require.New(t) @@ -283,10 +572,16 @@ func TestConnectCALeaf_multipleClientsSameToken(t *testing.T) { typ, rootsCh := testCALeafType(t, rpc) defer close(rootsCh) + + caRoot := connect.TestCA(t, nil) + caRoot.Active = true rootsCh <- structs.IndexedCARoots{ - ActiveRootID: "1", + ActiveRootID: caRoot.ID, TrustDomain: "fake-trust-domain.consul", - QueryMeta: structs.QueryMeta{Index: 1}, + Roots: []*structs.CARoot{ + caRoot, + }, + QueryMeta: structs.QueryMeta{Index: 1}, } // Instrument ConnectCA.Sign to @@ -297,50 +592,65 @@ func TestConnectCALeaf_multipleClientsSameToken(t *testing.T) { reply := args.Get(2).(*structs.IssuedCert) reply.CreateIndex = atomic.AddUint64(&idx, 1) reply.ModifyIndex = reply.CreateIndex - reply.ValidBefore = time.Now().Add(12 * time.Hour) + + leaf, _ := connect.TestLeaf(t, "web", caRoot) + reply.CertPEM = leaf + + if reply.CreateIndex == 1 { + // First call returns expired cert to prime cache with an expired one. + reply.ValidAfter = time.Now().Add(-13 * time.Hour) + reply.ValidBefore = time.Now().Add(-1 * time.Hour) + } else { + reply.ValidAfter = time.Now().Add(-1 * time.Hour) + reply.ValidBefore = time.Now().Add(11 * time.Hour) + } + resp = reply }) // We'll reuse the fetch options and request - opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute} - reqA := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "shared-token"} - reqB := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "shared-token"} + opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Second} + req := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web"} - // First fetch (Client A, MinIndex = 0) should return immediately - fetchCh := TestFetchCh(t, typ, opts, reqA) + // First fetch should return immediately + fetchCh := TestFetchCh(t, typ, opts, req) select { case <-time.After(100 * time.Millisecond): t.Fatal("shouldn't block waiting for fetch") case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 1, - }, result) + switch v := result.(type) { + case error: + require.NoError(v) + case cache.FetchResult: + require.Equal(resp, v.Value) + require.Equal(uint64(1), v.Index) + // Set the LastResult for subsequent fetches + opts.LastResult = &v + } } - // Second fetch (Client B, MinIndex = 0) should return immediately - fetchCh = TestFetchCh(t, typ, opts, reqB) + // Second fetch should return immediately despite there being + // no updated CA roots, because we issued an expired cert. + fetchCh = TestFetchCh(t, typ, opts, req) select { case <-time.After(100 * time.Millisecond): t.Fatal("shouldn't block waiting for fetch") case result := <-fetchCh: - require.Equal(cache.FetchResult{ - Value: resp, - Index: 1, // Same result as last fetch - }, result) - } - - // Third fetch (Client A, MinIndex = > 0) should block - opts.MinIndex = 1 - fetchCh = TestFetchCh(t, typ, opts, reqA) - select { - case result := <-fetchCh: - t.Fatalf("should not return: %#v", result) - case <-time.After(100 * time.Millisecond): + switch v := result.(type) { + case error: + require.NoError(v) + case cache.FetchResult: + require.Equal(resp, v.Value) + require.Equal(uint64(2), v.Index) + // Set the LastResult for subsequent fetches + opts.LastResult = &v + } } - // Fourth fetch (Client B, MinIndex = > 0) should block - fetchCh = TestFetchCh(t, typ, opts, reqB) + // Third fetch should block since the cert is not expiring and + // we also didn't update CA certs. + opts.MinIndex = 2 + fetchCh = TestFetchCh(t, typ, opts, req) select { case result := <-fetchCh: t.Fatalf("should not return: %#v", result) @@ -368,7 +678,16 @@ func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.Indexed }) // Create the leaf type - return &ConnectCALeaf{RPC: rpc, Cache: c}, rootsCh + return &ConnectCALeaf{ + RPC: rpc, + Cache: c, + Datacenter: "dc1", + // Override the root-change spread so we don't have to wait up to 20 seconds + // to see root changes work. Can be changed back for specific tests that + // need to test this, Note it's not 0 since that used default but is + // effectively the same. + TestOverrideCAChangeInitialDelay: 1 * time.Microsecond, + }, rootsCh } // testGatedRootsRPC will send each subsequent value on the channel as the diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 228c909ffc..1637ecd88c 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -454,6 +454,13 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- fOpts.MinIndex = entry.Index fOpts.Timeout = tEntry.Opts.RefreshTimeout } + if entry.Valid { + fOpts.LastResult = &FetchResult{ + Value: entry.Value, + State: entry.State, + Index: entry.Index, + } + } // Start building the new entry by blocking on the fetch. result, err := tEntry.Type.Fetch(fOpts, r) @@ -476,6 +483,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- if result.Value != nil { // A new value was given, so we create a brand new entry. newEntry.Value = result.Value + newEntry.State = result.State newEntry.Index = result.Index newEntry.FetchedAt = time.Now() if newEntry.Index < 1 { diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index e8cb1ddf3e..d6ef3e1ac6 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -379,9 +379,17 @@ func TestCacheGet_emptyFetchResult(t *testing.T) { c := TestCache(t) c.RegisterType("t", typ, nil) + stateCh := make(chan int, 1) + // Configure the type - typ.Static(FetchResult{Value: 42, Index: 1}, nil).Times(1) - typ.Static(FetchResult{Value: nil}, nil) + typ.Static(FetchResult{Value: 42, State: 31, Index: 1}, nil).Times(1) + // Return different State, it should be ignored + typ.Static(FetchResult{Value: nil, State: 32}, nil).Run(func(args mock.Arguments) { + // We should get back the original state + opts := args.Get(0).(FetchOptions) + require.NotNil(opts.LastResult) + stateCh <- opts.LastResult.State.(int) + }) // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) @@ -398,6 +406,29 @@ func TestCacheGet_emptyFetchResult(t *testing.T) { require.Equal(42, result) require.False(meta.Hit) + // State delivered to second call should be the result from first call. + select { + case state := <-stateCh: + require.Equal(31, state) + case <-time.After(20 * time.Millisecond): + t.Fatal("timed out") + } + + // Next request should get the first returned state too since last Fetch + // returned nil result. + req = TestRequest(t, RequestInfo{ + Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond}) + result, meta, err = c.Get("t", req) + require.NoError(err) + require.Equal(42, result) + require.False(meta.Hit) + select { + case state := <-stateCh: + require.Equal(31, state) + case <-time.After(20 * time.Millisecond): + t.Fatal("timed out") + } + // Sleep a tiny bit just to let maybe some background calls happen // then verify that we still only got the one call time.Sleep(20 * time.Millisecond) diff --git a/agent/cache/entry.go b/agent/cache/entry.go index f8660afbe9..f38a15c7f6 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -12,6 +12,13 @@ import ( type cacheEntry struct { // Fields pertaining to the actual value Value interface{} + // State can be used to store info needed by the cache type but that should + // not be part of the result the client gets. For example the Connect Leaf + // type needs to store additional data about when it last attempted a renewal + // that is not part of the actual IssuedCert struct it returns. It's opaque to + // the Cache but allows types to store additional data that is coupled to the + // cache entry's lifetime and will be aged out by TTL etc. + State interface{} Error error Index uint64 diff --git a/agent/cache/type.go b/agent/cache/type.go index 73d8195aad..31c525eeec 100644 --- a/agent/cache/type.go +++ b/agent/cache/type.go @@ -8,20 +8,25 @@ import ( type Type interface { // Fetch fetches a single unique item. // - // The FetchOptions contain the index and timeouts for blocking queries. - // The MinIndex value on the Request itself should NOT be used - // as the blocking index since a request may be reused multiple times - // as part of Refresh behavior. + // The FetchOptions contain the index and timeouts for blocking queries. The + // MinIndex value on the Request itself should NOT be used as the blocking + // index since a request may be reused multiple times as part of Refresh + // behavior. // - // The return value is a FetchResult which contains information about - // the fetch. If an error is given, the FetchResult is ignored. The - // cache does not support backends that return partial values. + // The return value is a FetchResult which contains information about the + // fetch. If an error is given, the FetchResult is ignored. The cache does not + // support backends that return partial values. Optional State can be added to + // the FetchResult which will be stored with the cache entry and provided to + // the next Fetch call but will not be returned to clients. This allows types + // to add additional bookkeeping data per cache entry that will still be aged + // out along with the entry's TTL. // - // On timeout, FetchResult can behave one of two ways. First, it can - // return the last known value. This is the default behavior of blocking - // RPC calls in Consul so this allows cache types to be implemented with - // no extra logic. Second, FetchResult can return an unset value and index. - // In this case, the cache will reuse the last value automatically. + // On timeout, FetchResult can behave one of two ways. First, it can return + // the last known value. This is the default behavior of blocking RPC calls in + // Consul so this allows cache types to be implemented with no extra logic. + // Second, FetchResult can return an unset value and index. In this case, the + // cache will reuse the last value automatically. If an unset Value is + // returned, the State field will also be ignored currently. Fetch(FetchOptions, Request) (FetchResult, error) // SupportsBlocking should return true if the type supports blocking queries. @@ -41,6 +46,23 @@ type FetchOptions struct { // Timeout is the maximum time for the query. This must be implemented // in the Fetch itself. Timeout time.Duration + + // LastResult is the result from the last successful Fetch and represents the + // value currently stored in the cache at the time Fetch is invoked. It will + // be nil on first call where there is no current cache value. There may have + // been other Fetch attempts that resulted in an error in the mean time. These + // are not explicitly represented currently. We could add that if needed this + // was just simpler for now. + // + // The FetchResult read-only! It is constructed per Fetch call so modifying + // the struct directly (e.g. changing it's Index of Value field) will have no + // effect, however the Value and State fields may be pointers to the actual + // values stored in the cache entry. It is thread-unsafe to modify the Value + // or State via pointers since readers may be concurrently inspecting those + // values under the entry lock (although we guarantee only one Fetch call per + // entry) and modifying them even if the index doesn't change or the Fetch + // eventually errors will likely break logical invariants in the cache too! + LastResult *FetchResult } // FetchResult is the result of a Type Fetch operation and contains the @@ -49,6 +71,12 @@ type FetchResult struct { // Value is the result of the fetch. Value interface{} + // State is opaque data stored in the cache but not returned to clients. It + // can be used by Types to maintain any bookkeeping they need between fetches + // (using FetchOptions.LastResult) in a way that gets automatically cleaned up + // by TTL expiry etc. + State interface{} + // Index is the corresponding index value for this data. Index uint64 } diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 4d912cb1d2..b9f732e945 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -531,7 +531,7 @@ type RuntimeConfig struct { // ConnectReplicationToken is the ACL token used for replicating intentions. ConnectReplicationToken string - // ConnectTestDisableManagedProxies is not exposed to public config but us + // ConnectTestDisableManagedProxies is not exposed to public config but is // used by TestAgent to prevent self-executing the test binary in the // background if a managed proxy is created for a test. The only place we // actually want to test processes really being spun up and managed is in @@ -541,6 +541,13 @@ type RuntimeConfig struct { // processes up. ConnectTestDisableManagedProxies bool + // ConnectTestCALeafRootChangeSpread is used to control how long the CA leaf + // cache with spread CSRs over when a root change occurs. For now we don't + // expose this in public config intentionally but could later with a rename. + // We only set this from during tests to effectively make CA rotation tests + // deterministic again. + ConnectTestCALeafRootChangeSpread time.Duration + // DNSAddrs contains the list of TCP and UDP addresses the DNS server will // bind to. If the DNS endpoint is disabled (ports.dns <= 0) the list is // empty. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index b1f2c1631c..d28ffd6b69 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -4981,6 +4981,7 @@ func TestSanitize(t *testing.T) { "ConnectProxyDefaultScriptCommand": [], "ConnectSidecarMaxPort": 0, "ConnectSidecarMinPort": 0, + "ConnectTestCALeafRootChangeSpread": "0s", "ConnectReplicationToken": "hidden", "ConnectTestDisableManagedProxies": false, "ConsulCoordinateUpdateBatchSize": 0, diff --git a/agent/connect/testing_ca.go b/agent/connect/testing_ca.go index 27a0dd20a0..02f360594b 100644 --- a/agent/connect/testing_ca.go +++ b/agent/connect/testing_ca.go @@ -41,6 +41,7 @@ func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot { // Create the private key we'll use for this CA cert. signer, keyPEM := testPrivateKey(t) result.SigningKey = keyPEM + result.SigningKeyID = HexString(testKeyID(t, signer.Public())) // The serial number for the cert sn, err := testSerialNumber() @@ -83,6 +84,9 @@ func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot { if err != nil { t.Fatalf("error generating CA ID fingerprint: %s", err) } + result.SerialNumber = uint64(sn.Int64()) + result.NotBefore = template.NotBefore.UTC() + result.NotAfter = template.NotAfter.UTC() // If there is a prior CA to cross-sign with, then we need to create that // and set it as the signing cert. @@ -269,12 +273,12 @@ func testPrivateKey(t testing.T) (crypto.Signer, string) { return pk, buf.String() } -// testSerialNumber generates a serial number suitable for a certificate. -// For testing, this just sets it to a random number. -// -// This function is taken directly from the Vault implementation. +// testSerialNumber generates a serial number suitable for a certificate. For +// testing, this just sets it to a random number, but one that can fit in a +// uint64 since we use that in our datastructures and assume cert serials will +// fit in that for now. func testSerialNumber() (*big.Int, error) { - return rand.Int(rand.Reader, (&big.Int{}).Exp(big.NewInt(2), big.NewInt(159), nil)) + return rand.Int(rand.Reader, (&big.Int{}).Exp(big.NewInt(2), big.NewInt(63), nil)) } // testUUID generates a UUID for testing. diff --git a/agent/structs/connect_ca.go b/agent/structs/connect_ca.go index a51f9b4bd3..1511b47e2e 100644 --- a/agent/structs/connect_ca.go +++ b/agent/structs/connect_ca.go @@ -59,8 +59,9 @@ type CARoot struct { // SerialNumber is the x509 serial number of the certificate. SerialNumber uint64 - // SigningKeyID is the ID of the public key that corresponds to the - // private key used to sign the certificate. + // SigningKeyID is the ID of the public key that corresponds to the private + // key used to sign the certificate. Is is the HexString format of the raw + // AuthorityKeyID bytes. SigningKeyID string // ExternalTrustDomain is the trust domain this root was generated under. It diff --git a/agent/testagent.go b/agent/testagent.go index 301908ed96..c9f74d6116 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -379,6 +379,10 @@ func TestConfig(sources ...config.Source) *config.RuntimeConfig { // Disable connect proxy execution since it causes all kinds of problems with // self-executing tests etc. cfg.ConnectTestDisableManagedProxies = true + // Effectively disables the delay after root rotation before requesting CSRs + // to make test deterministic. 0 results in default jitter being applied but a + // tiny delay is effectively thre same. + cfg.ConnectTestCALeafRootChangeSpread = 1 * time.Nanosecond return &cfg }