diff --git a/agent/agent.go b/agent/agent.go index 92f4a06e9d..d30ef3281b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -37,6 +37,7 @@ import ( "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/consul/servercert" "github.com/hashicorp/consul/agent/dns" external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/local" @@ -353,6 +354,9 @@ type Agent struct { // based on the current consul configuration. tlsConfigurator *tlsutil.Configurator + // certManager manages the lifecycle of the internally-managed server certificate. + certManager *servercert.CertManager + // httpConnLimiter is used to limit connections to the HTTP server by client // IP. httpConnLimiter connlimit.Limiter @@ -583,6 +587,24 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("Failed to start Consul server: %v", err) } a.delegate = server + + if a.config.PeeringEnabled && a.config.ConnectEnabled { + d := servercert.Deps{ + Logger: a.logger.Named("server.cert-manager"), + Config: servercert.Config{ + Datacenter: a.config.Datacenter, + ACLsEnabled: a.config.ACLsEnabled, + }, + Cache: a.cache, + GetStore: func() servercert.Store { return server.FSM().State() }, + TLSConfigurator: a.tlsConfigurator, + } + a.certManager = servercert.NewCertManager(d) + if err := a.certManager.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil { + return fmt.Errorf("failed to start server cert manager: %w", err) + } + } + } else { client, err := consul.NewClient(consulCfg, a.baseDeps.Deps) if err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index ea15358213..65593710eb 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -5984,6 +5984,71 @@ func TestAgent_startListeners(t *testing.T) { } +func TestAgent_ServerCertificate(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + const expectURI = "spiffe://11111111-2222-3333-4444-555555555555.consul/agent/server/dc/dc1" + + // Leader should acquire a sever cert after bootstrapping. + a1 := NewTestAgent(t, ` +node_name = "a1" +acl { + enabled = true + tokens { + initial_management = "root" + default = "root" + } +} +connect { + enabled = true +} +peering { + enabled = true +}`) + defer a1.Shutdown() + testrpc.WaitForTestAgent(t, a1.RPC, "dc1") + + retry.Run(t, func(r *retry.R) { + cert := a1.tlsConfigurator.AutoEncryptCert() + require.NotNil(r, cert) + require.Len(r, cert.URIs, 1) + require.Equal(r, expectURI, cert.URIs[0].String()) + }) + + // Join a follower, and it should be able to acquire a server cert as well. + a2 := NewTestAgent(t, ` +node_name = "a2" +bootstrap = false +acl { + enabled = true + tokens { + initial_management = "root" + default = "root" + } +} +connect { + enabled = true +} +peering { + enabled = true +}`) + defer a2.Shutdown() + + _, err := a2.JoinLAN([]string{fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortLAN)}, nil) + require.NoError(t, err) + + testrpc.WaitForTestAgent(t, a2.RPC, "dc1") + + retry.Run(t, func(r *retry.R) { + cert := a2.tlsConfigurator.AutoEncryptCert() + require.NotNil(r, cert) + require.Len(r, cert.URIs, 1) + require.Equal(r, expectURI, cert.URIs[0].String()) + }) +} + func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool { pool := x509.NewCertPool() data, err := ioutil.ReadFile("../test/ca/root.cer") diff --git a/agent/cache-types/connect_ca_leaf.go b/agent/cache-types/connect_ca_leaf.go index ce98955c6e..b93882c236 100644 --- a/agent/cache-types/connect_ca_leaf.go +++ b/agent/cache-types/connect_ca_leaf.go @@ -688,19 +688,21 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, // since this is only used for cache-related requests and not forwarded // directly to any Consul servers. type ConnectCALeafRequest struct { - Token string - Datacenter string - Service string // Service name, not ID - Agent string // Agent name, not ID - Kind structs.ServiceKind // only mesh-gateway for now - Server bool - DNSSAN []string - IPSAN []net.IP - MinQueryIndex uint64 - MaxQueryTime time.Duration + Token string + Datacenter string + DNSSAN []string + IPSAN []net.IP + MinQueryIndex uint64 + MaxQueryTime time.Duration + acl.EnterpriseMeta MustRevalidate bool - acl.EnterpriseMeta + // The following flags indicate the entity we are requesting a cert for. + // Only one of these must be specified. + Service string // Given a Service name, not ID, the request is for a SpiffeIDService. + Agent string // Given an Agent name, not ID, the request is for a SpiffeIDAgent. + Kind structs.ServiceKind // Given "mesh-gateway", the request is for a SpiffeIDMeshGateway. No other kinds supported. + Server bool // If true, the request is for a SpiffeIDServer. } func (r *ConnectCALeafRequest) Key() string { diff --git a/agent/consul/acl_server.go b/agent/consul/acl_server.go index f9f88e4087..8e14d502a4 100644 --- a/agent/consul/acl_server.go +++ b/agent/consul/acl_server.go @@ -110,7 +110,7 @@ type serverACLResolverBackend struct { } func (s *serverACLResolverBackend) IsServerManagementToken(token string) bool { - mgmt, err := s.getSystemMetadata(structs.ServerManagementToken) + mgmt, err := s.getSystemMetadata(structs.ServerManagementTokenAccessorID) if err != nil { s.logger.Debug("failed to fetch server management token: %w", err) return false diff --git a/agent/consul/acl_test.go b/agent/consul/acl_test.go index f08f966e90..7c5288d1ec 100644 --- a/agent/consul/acl_test.go +++ b/agent/consul/acl_test.go @@ -2209,7 +2209,7 @@ func TestACLResolver_ServerManagementToken(t *testing.T) { authz, err := r.ResolveToken(testToken) require.NoError(t, err) require.NotNil(t, authz.ACLIdentity) - require.Equal(t, structs.ServerManagementToken, authz.ACLIdentity.ID()) + require.Equal(t, structs.ServerManagementTokenAccessorID, authz.ACLIdentity.ID()) require.NotNil(t, authz.Authorizer) require.Equal(t, acl.ManageAll(), authz.Authorizer) } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 35626d1bc9..94aeeb3bb4 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -538,7 +538,7 @@ func (s *Server) initializeACLs(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err) } - if err := s.setSystemMetadataKey(structs.ServerManagementToken, secretID); err != nil { + if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil { return fmt.Errorf("failed to persist server management token: %w", err) } diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index a1d19dd36c..3ba328672e 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -1297,7 +1297,7 @@ func TestLeader_ACL_Initialization(t *testing.T) { require.NoError(t, err) require.NotNil(t, policy) - serverToken, err := s1.getSystemMetadata(structs.ServerManagementToken) + serverToken, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID) require.NoError(t, err) require.NotEmpty(t, serverToken) diff --git a/agent/consul/servercert/manager.go b/agent/consul/servercert/manager.go new file mode 100644 index 0000000000..d600fa6e6f --- /dev/null +++ b/agent/consul/servercert/manager.go @@ -0,0 +1,267 @@ +package servercert + +import ( + "context" + "fmt" + "time" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" +) + +// Correlation ID for leaf cert watches. +const leafWatchID = "leaf" + +// Cache is an interface to represent the necessary methods of the agent/cache.Cache. +// It is used to request and renew the server leaf certificate. +type Cache interface { + Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error +} + +// TLSConfigurator is an interface to represent the necessary methods of the tlsutil.Configurator. +// It is used to apply the server leaf certificate and server name. +type TLSConfigurator interface { + UpdateAutoTLSCert(pub, priv string) error + UpdateAutoTLSPeeringServerName(name string) +} + +// Store is an interface to represent the necessary methods of the state.Store. +// It is used to fetch the CA Config to getStore the trust domain in the TLSConfigurator. +type Store interface { + CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) + SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) + AbandonCh() <-chan struct{} +} + +type Config struct { + // Datacenter is the datacenter name the server is configured with. + Datacenter string + + // ACLsEnabled indicates whether the ACL system is enabled on this server. + ACLsEnabled bool +} + +type Deps struct { + Config Config + Logger hclog.Logger + Cache Cache + GetStore func() Store + TLSConfigurator TLSConfigurator + waiter retry.Waiter +} + +// CertManager is responsible for requesting and renewing the leaf cert for server agents. +// The server certificate is managed internally and used for peering control-plane traffic +// to the TLS-enabled external gRPC port. +type CertManager struct { + logger hclog.Logger + + // config contains agent configuration necessary for the cert manager to operate. + config Config + + // cache provides an API to issue internal RPC requests and receive notifications + // when there are changes. + cache Cache + + // cacheUpdateCh receives notifications of cache update events for resources watched. + cacheUpdateCh chan cache.UpdateEvent + + // getStore returns the server state getStore for read-only access. + getStore func() Store + + // tlsConfigurator receives the leaf cert and peering server name updates from the cert manager. + tlsConfigurator TLSConfigurator + + // waiter contains the waiter for exponential backoff between retries. + waiter retry.Waiter +} + +func NewCertManager(deps Deps) *CertManager { + return &CertManager{ + config: deps.Config, + logger: deps.Logger, + cache: deps.Cache, + cacheUpdateCh: make(chan cache.UpdateEvent, 1), + getStore: deps.GetStore, + tlsConfigurator: deps.TLSConfigurator, + waiter: retry.Waiter{ + MinFailures: 1, + MinWait: 1 * time.Second, + MaxWait: 5 * time.Minute, + Jitter: retry.NewJitter(20), + }, + } +} + +func (m *CertManager) Start(ctx context.Context) error { + if err := m.initializeWatches(ctx); err != nil { + return fmt.Errorf("failed to set up certificate watches: %w", err) + } + go m.handleUpdates(ctx) + + m.logger.Info("initialized server certificate management") + return nil +} + +func (m *CertManager) initializeWatches(ctx context.Context) error { + if m.config.ACLsEnabled { + // If ACLs are enabled we need to watch for server token updates and set/reset + // leaf cert updates as token updates arrive. + go m.watchServerToken(ctx) + } else { + // If ACLs are disabled we set up a single cache notification for leaf certs. + if err := m.watchLeafCert(ctx); err != nil { + return fmt.Errorf("failed to watch leaf: %w", err) + } + } + go m.watchCAConfig(ctx) + + return nil +} + +func (m *CertManager) watchServerToken(ctx context.Context) { + // We keep the last iteration's cancel function to reset watches. + var ( + notifyCtx context.Context + cancel context.CancelFunc = func() {} + ) + retryLoopBackoff(ctx, m.waiter, func() error { + ws := memdb.NewWatchSet() + ws.Add(m.getStore().AbandonCh()) + + _, token, err := m.getStore().SystemMetadataGet(ws, structs.ServerManagementTokenAccessorID) + if err != nil { + return err + } + if token == nil { + m.logger.Debug("ACLs have not finished initializing") + return nil + } + if token.Value == "" { + // This should never happen. If the leader stored a token with this key it will not be empty. + return fmt.Errorf("empty token") + } + m.logger.Debug("server management token watch fired - resetting leaf cert watch") + + // Cancel existing the leaf cert watch and spin up new one any time the server token changes. + // The watch needs the current token as set by the leader since certificate signing requests go to the leader. + fmt.Println("canceling and resetting") + cancel() + notifyCtx, cancel = context.WithCancel(ctx) + + req := cachetype.ConnectCALeafRequest{ + Datacenter: m.config.Datacenter, + Token: token.Value, + Server: true, + } + if err := m.cache.Notify(notifyCtx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil { + return fmt.Errorf("failed to setup leaf cert notifications: %w", err) + } + + ws.WatchCtx(ctx) + return nil + + }, func(err error) { + m.logger.Error("failed to watch server management token", "error", err) + }) +} + +func (m *CertManager) watchLeafCert(ctx context.Context) error { + req := cachetype.ConnectCALeafRequest{ + Datacenter: m.config.Datacenter, + Server: true, + } + if err := m.cache.Notify(ctx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil { + return fmt.Errorf("failed to setup leaf cert notifications: %w", err) + } + + return nil +} + +func (m *CertManager) watchCAConfig(ctx context.Context) { + retryLoopBackoff(ctx, m.waiter, func() error { + ws := memdb.NewWatchSet() + ws.Add(m.getStore().AbandonCh()) + + _, conf, err := m.getStore().CAConfig(ws) + if err != nil { + return fmt.Errorf("failed to fetch CA configuration from the state getStore: %w", err) + } + if conf == nil || conf.ClusterID == "" { + m.logger.Debug("CA has not finished initializing") + return nil + } + + id := connect.SpiffeIDSigningForCluster(conf.ClusterID) + name := connect.PeeringServerSAN(m.config.Datacenter, id.Host()) + + m.logger.Debug("CA config watch fired - updating auto TLS server name", "name", name) + m.tlsConfigurator.UpdateAutoTLSPeeringServerName(name) + + ws.WatchCtx(ctx) + return nil + + }, func(err error) { + m.logger.Error("failed to watch CA config", "error", err) + }) +} + +func retryLoopBackoff(ctx context.Context, waiter retry.Waiter, loopFn func() error, errorFn func(error)) { + for { + if err := waiter.Wait(ctx); err != nil { + // The error will only be non-nil if the context is canceled. + return + } + + if err := loopFn(); err != nil { + errorFn(err) + continue + } + + // Reset the failure count seen by the waiter if there was no error. + waiter.Reset() + } +} + +func (m *CertManager) handleUpdates(ctx context.Context) { + for { + select { + case <-ctx.Done(): + m.logger.Debug("context canceled") + return + + case event := <-m.cacheUpdateCh: + m.logger.Debug("got cache update event", "correlationID", event.CorrelationID, "error", event.Err) + + if err := m.handleLeafUpdate(event); err != nil { + m.logger.Error("failed to handle cache update event", "error", err) + } + } + } +} + +func (m *CertManager) handleLeafUpdate(event cache.UpdateEvent) error { + if event.Err != nil { + return fmt.Errorf("leaf cert watch returned an error: %w", event.Err) + } + if event.CorrelationID != leafWatchID { + return fmt.Errorf("got unexpected update correlation ID %q while expecting %q", event.CorrelationID, leafWatchID) + } + + leaf, ok := event.Result.(*structs.IssuedCert) + if !ok { + return fmt.Errorf("got invalid type in leaf cert watch response: %T", event.Result) + } + + m.logger.Debug("leaf certificate watch fired - updating auto TLS certificate", "uri", leaf.ServerURI) + + if err := m.tlsConfigurator.UpdateAutoTLSCert(leaf.CertPEM, leaf.PrivateKeyPEM); err != nil { + return fmt.Errorf("failed to getStore the server leaf cert: %w", err) + } + return nil +} diff --git a/agent/consul/servercert/manager_test.go b/agent/consul/servercert/manager_test.go new file mode 100644 index 0000000000..6beec683fb --- /dev/null +++ b/agent/consul/servercert/manager_test.go @@ -0,0 +1,296 @@ +package servercert + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +type fakeStore struct { + // conf is the current CA configuration stored in the fakeStore. + conf chan *structs.CAConfiguration + + // tokenEntry is the current server token entry stored in the fakeStore. + tokenEntry chan *structs.SystemMetadataEntry + + // tokenCanceler will unblock the WatchSet for the token entry. + tokenCanceler <-chan struct{} +} + +func (s *fakeStore) CAConfig(_ memdb.WatchSet) (uint64, *structs.CAConfiguration, error) { + select { + case conf := <-s.conf: + return 0, conf, nil + default: + return 0, nil, nil + } +} + +func (s *fakeStore) setCAConfig() { + s.conf <- &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + } +} + +func (s *fakeStore) SystemMetadataGet(ws memdb.WatchSet, _ string) (uint64, *structs.SystemMetadataEntry, error) { + select { + case entry := <-s.tokenEntry: + ws.Add(s.tokenCanceler) + return 0, entry, nil + default: + return 0, nil, nil + } +} + +func (s *fakeStore) setServerToken(token string, canceler <-chan struct{}) { + s.tokenCanceler = canceler + s.tokenEntry <- &structs.SystemMetadataEntry{ + Key: structs.ServerManagementTokenAccessorID, + Value: token, + } +} + +func (s *fakeStore) AbandonCh() <-chan struct{} { + return make(<-chan struct{}) +} + +type testCert struct { + pub string + priv string +} + +type fakeTLSConfigurator struct { + cert testCert + peeringServerName string + + // syncCh is used to signal that an update was handled. + // It synchronizes readers and writers in different goroutines. + syncCh chan struct{} +} + +func (u *fakeTLSConfigurator) UpdateAutoTLSCert(pub, priv string) error { + u.cert = testCert{ + pub: pub, + priv: priv, + } + u.syncCh <- struct{}{} + return nil +} + +func (u *fakeTLSConfigurator) UpdateAutoTLSPeeringServerName(name string) { + u.peeringServerName = name + u.syncCh <- struct{}{} +} + +func (u *fakeTLSConfigurator) timeoutIfNotUpdated(t *testing.T) error { + t.Helper() + + select { + case <-u.syncCh: + case <-time.After(100 * time.Millisecond): + t.Fatalf("timed out") + } + return nil +} + +type watchInfo struct { + ctx context.Context + token string +} + +type fakeCache struct { + updateCh chan<- cache.UpdateEvent + + // watched is a map of watched correlation IDs to the ACL token of the request. + watched map[string]watchInfo + + // syncCh is used to signal that Notify was called. + // It synchronizes readers and writers in different goroutines. + syncCh chan struct{} +} + +func (c *fakeCache) triggerLeafUpdate() { + c.updateCh <- cache.UpdateEvent{ + CorrelationID: leafWatchID, + Result: &structs.IssuedCert{ + CertPEM: "cert-pem", + PrivateKeyPEM: "key-pem", + ServerURI: "test-uri", + }, + } +} + +func (c *fakeCache) Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error { + c.watched[correlationID] = watchInfo{ctx: ctx, token: r.CacheInfo().Token} + c.updateCh = ch + c.syncCh <- struct{}{} + return nil +} + +func (c *fakeCache) timeoutIfNotUpdated(t *testing.T) error { + t.Helper() + + select { + case <-c.syncCh: + case <-time.After(100 * time.Millisecond): + t.Fatalf("timed out") + } + return nil +} + +func testWaiter() retry.Waiter { + return retry.Waiter{ + MinFailures: 1, + MinWait: 20 * time.Millisecond, + MaxWait: 20 * time.Millisecond, + } +} + +func TestCertManager_ACLsDisabled(t *testing.T) { + tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)} + cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)} + store := fakeStore{ + conf: make(chan *structs.CAConfiguration, 1), + tokenEntry: make(chan *structs.SystemMetadataEntry, 1), + } + + mgr := NewCertManager(Deps{ + Logger: testutil.Logger(t), + Config: Config{ + Datacenter: "my-dc", + ACLsEnabled: false, + }, + TLSConfigurator: &tlsConfigurator, + Cache: &cache, + GetStore: func() Store { return &store }, + }) + + // Override the default waiter to reduce time between retries. + mgr.waiter = testWaiter() + + require.NoError(t, mgr.Start(context.Background())) + + testutil.RunStep(t, "initial empty state", func(t *testing.T) { + require.Empty(t, tlsConfigurator.cert) + require.Empty(t, tlsConfigurator.peeringServerName) + + require.Contains(t, cache.watched, leafWatchID) + }) + + testutil.RunStep(t, "leaf cert update", func(t *testing.T) { + cache.triggerLeafUpdate() + + // Wait for the update to arrive. + require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) + + expect := testCert{ + pub: "cert-pem", + priv: "key-pem", + } + require.Equal(t, expect, tlsConfigurator.cert) + }) + + testutil.RunStep(t, "ca config update", func(t *testing.T) { + store.setCAConfig() + + // Wait for the update to arrive. + require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) + + expect := connect.PeeringServerSAN(mgr.config.Datacenter, connect.TestTrustDomain) + require.Equal(t, expect, tlsConfigurator.peeringServerName) + }) +} + +func TestCertManager_ACLsEnabled(t *testing.T) { + tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)} + cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)} + store := fakeStore{ + conf: make(chan *structs.CAConfiguration, 1), + tokenEntry: make(chan *structs.SystemMetadataEntry, 1), + } + + mgr := NewCertManager(Deps{ + Logger: testutil.Logger(t), + Config: Config{ + Datacenter: "my-dc", + ACLsEnabled: true, + }, + TLSConfigurator: &tlsConfigurator, + Cache: &cache, + GetStore: func() Store { return &store }, + }) + + // Override the default waiter to reduce time between retries. + mgr.waiter = testWaiter() + + require.NoError(t, mgr.Start(context.Background())) + + testutil.RunStep(t, "initial empty state", func(t *testing.T) { + require.Empty(t, tlsConfigurator.cert) + require.Empty(t, tlsConfigurator.peeringServerName) + + require.Empty(t, cache.watched) + }) + + var leafCtx context.Context + tokenCanceler := make(chan struct{}) + + testutil.RunStep(t, "server token update", func(t *testing.T) { + store.setServerToken("first-secret", tokenCanceler) + + require.NoError(t, cache.timeoutIfNotUpdated(t)) + + require.Contains(t, cache.watched, leafWatchID) + require.Equal(t, "first-secret", cache.watched[leafWatchID].token) + + leafCtx = cache.watched[leafWatchID].ctx + }) + + testutil.RunStep(t, "leaf cert update", func(t *testing.T) { + cache.triggerLeafUpdate() + + // Wait for the update to arrive. + require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) + + expect := testCert{ + pub: "cert-pem", + priv: "key-pem", + } + require.Equal(t, expect, tlsConfigurator.cert) + }) + + testutil.RunStep(t, "another server token update", func(t *testing.T) { + store.setServerToken("second-secret", nil) + + // Fire the existing WatchSet to simulate a state store update. + tokenCanceler <- struct{}{} + + // The leaf watch in the cache should have been reset. + require.NoError(t, cache.timeoutIfNotUpdated(t)) + + // The original leaf watch context should have been canceled. + require.Error(t, leafCtx.Err()) + + // A new leaf watch is expected with the new token. + require.Contains(t, cache.watched, leafWatchID) + require.Equal(t, "second-secret", cache.watched[leafWatchID].token) + }) + + testutil.RunStep(t, "ca config update", func(t *testing.T) { + store.setCAConfig() + + // Wait for the update to arrive. + require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) + + expect := connect.PeeringServerSAN(mgr.config.Datacenter, connect.TestTrustDomain) + require.Equal(t, expect, tlsConfigurator.peeringServerName) + }) +} diff --git a/agent/structs/acl.go b/agent/structs/acl.go index ce6b871eaf..a5243c4758 100644 --- a/agent/structs/acl.go +++ b/agent/structs/acl.go @@ -1840,7 +1840,7 @@ func (id *AgentRecoveryTokenIdentity) EnterpriseMetadata() *acl.EnterpriseMeta { return nil } -const ServerManagementToken = "server-management-token" +const ServerManagementTokenAccessorID = "server-management-token" type ACLServerIdentity struct { secretID string @@ -1853,7 +1853,7 @@ func NewACLServerIdentity(secretID string) *ACLServerIdentity { } func (i *ACLServerIdentity) ID() string { - return ServerManagementToken + return ServerManagementTokenAccessorID } func (i *ACLServerIdentity) SecretToken() string { diff --git a/lib/retry/retry.go b/lib/retry/retry.go index 88ccd04df7..59a979fbcc 100644 --- a/lib/retry/retry.go +++ b/lib/retry/retry.go @@ -79,6 +79,7 @@ func (w *Waiter) delay() time.Duration { } // Reset the failure count to 0. +// Reset must be called if the operation done after Wait did not fail. func (w *Waiter) Reset() { w.failures = 0 } @@ -88,9 +89,13 @@ func (w *Waiter) Failures() int { return int(w.failures) } -// Wait increase the number of failures by one, and then blocks until the context +// Wait increases the number of failures by one, and then blocks until the context // is cancelled, or until the wait time is reached. +// // The wait time increases exponentially as the number of failures increases. +// Every call to Wait increments the failures count, so Reset must be called +// after Wait when there wasn't a failure. +// // Wait will return ctx.Err() if the context is cancelled. func (w *Waiter) Wait(ctx context.Context) error { w.failures++