mirror of https://github.com/hashicorp/consul
Browse Source
This certificate manager will request a leaf certificate for server agents and then keep them up to date.pull/14556/head
freddygv
2 years ago
11 changed files with 675 additions and 18 deletions
@ -0,0 +1,267 @@
|
||||
package servercert |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/hashicorp/consul/agent/cache" |
||||
cachetype "github.com/hashicorp/consul/agent/cache-types" |
||||
"github.com/hashicorp/consul/agent/connect" |
||||
"github.com/hashicorp/consul/agent/structs" |
||||
"github.com/hashicorp/consul/lib/retry" |
||||
"github.com/hashicorp/go-hclog" |
||||
"github.com/hashicorp/go-memdb" |
||||
) |
||||
|
||||
// Correlation ID for leaf cert watches.
|
||||
const leafWatchID = "leaf" |
||||
|
||||
// Cache is an interface to represent the necessary methods of the agent/cache.Cache.
|
||||
// It is used to request and renew the server leaf certificate.
|
||||
type Cache interface { |
||||
Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error |
||||
} |
||||
|
||||
// TLSConfigurator is an interface to represent the necessary methods of the tlsutil.Configurator.
|
||||
// It is used to apply the server leaf certificate and server name.
|
||||
type TLSConfigurator interface { |
||||
UpdateAutoTLSCert(pub, priv string) error |
||||
UpdateAutoTLSPeeringServerName(name string) |
||||
} |
||||
|
||||
// Store is an interface to represent the necessary methods of the state.Store.
|
||||
// It is used to fetch the CA Config to getStore the trust domain in the TLSConfigurator.
|
||||
type Store interface { |
||||
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) |
||||
SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) |
||||
AbandonCh() <-chan struct{} |
||||
} |
||||
|
||||
type Config struct { |
||||
// Datacenter is the datacenter name the server is configured with.
|
||||
Datacenter string |
||||
|
||||
// ACLsEnabled indicates whether the ACL system is enabled on this server.
|
||||
ACLsEnabled bool |
||||
} |
||||
|
||||
type Deps struct { |
||||
Config Config |
||||
Logger hclog.Logger |
||||
Cache Cache |
||||
GetStore func() Store |
||||
TLSConfigurator TLSConfigurator |
||||
waiter retry.Waiter |
||||
} |
||||
|
||||
// CertManager is responsible for requesting and renewing the leaf cert for server agents.
|
||||
// The server certificate is managed internally and used for peering control-plane traffic
|
||||
// to the TLS-enabled external gRPC port.
|
||||
type CertManager struct { |
||||
logger hclog.Logger |
||||
|
||||
// config contains agent configuration necessary for the cert manager to operate.
|
||||
config Config |
||||
|
||||
// cache provides an API to issue internal RPC requests and receive notifications
|
||||
// when there are changes.
|
||||
cache Cache |
||||
|
||||
// cacheUpdateCh receives notifications of cache update events for resources watched.
|
||||
cacheUpdateCh chan cache.UpdateEvent |
||||
|
||||
// getStore returns the server state getStore for read-only access.
|
||||
getStore func() Store |
||||
|
||||
// tlsConfigurator receives the leaf cert and peering server name updates from the cert manager.
|
||||
tlsConfigurator TLSConfigurator |
||||
|
||||
// waiter contains the waiter for exponential backoff between retries.
|
||||
waiter retry.Waiter |
||||
} |
||||
|
||||
func NewCertManager(deps Deps) *CertManager { |
||||
return &CertManager{ |
||||
config: deps.Config, |
||||
logger: deps.Logger, |
||||
cache: deps.Cache, |
||||
cacheUpdateCh: make(chan cache.UpdateEvent, 1), |
||||
getStore: deps.GetStore, |
||||
tlsConfigurator: deps.TLSConfigurator, |
||||
waiter: retry.Waiter{ |
||||
MinFailures: 1, |
||||
MinWait: 1 * time.Second, |
||||
MaxWait: 5 * time.Minute, |
||||
Jitter: retry.NewJitter(20), |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (m *CertManager) Start(ctx context.Context) error { |
||||
if err := m.initializeWatches(ctx); err != nil { |
||||
return fmt.Errorf("failed to set up certificate watches: %w", err) |
||||
} |
||||
go m.handleUpdates(ctx) |
||||
|
||||
m.logger.Info("initialized server certificate management") |
||||
return nil |
||||
} |
||||
|
||||
func (m *CertManager) initializeWatches(ctx context.Context) error { |
||||
if m.config.ACLsEnabled { |
||||
// If ACLs are enabled we need to watch for server token updates and set/reset
|
||||
// leaf cert updates as token updates arrive.
|
||||
go m.watchServerToken(ctx) |
||||
} else { |
||||
// If ACLs are disabled we set up a single cache notification for leaf certs.
|
||||
if err := m.watchLeafCert(ctx); err != nil { |
||||
return fmt.Errorf("failed to watch leaf: %w", err) |
||||
} |
||||
} |
||||
go m.watchCAConfig(ctx) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (m *CertManager) watchServerToken(ctx context.Context) { |
||||
// We keep the last iteration's cancel function to reset watches.
|
||||
var ( |
||||
notifyCtx context.Context |
||||
cancel context.CancelFunc = func() {} |
||||
) |
||||
retryLoopBackoff(ctx, m.waiter, func() error { |
||||
ws := memdb.NewWatchSet() |
||||
ws.Add(m.getStore().AbandonCh()) |
||||
|
||||
_, token, err := m.getStore().SystemMetadataGet(ws, structs.ServerManagementTokenAccessorID) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if token == nil { |
||||
m.logger.Debug("ACLs have not finished initializing") |
||||
return nil |
||||
} |
||||
if token.Value == "" { |
||||
// This should never happen. If the leader stored a token with this key it will not be empty.
|
||||
return fmt.Errorf("empty token") |
||||
} |
||||
m.logger.Debug("server management token watch fired - resetting leaf cert watch") |
||||
|
||||
// Cancel existing the leaf cert watch and spin up new one any time the server token changes.
|
||||
// The watch needs the current token as set by the leader since certificate signing requests go to the leader.
|
||||
fmt.Println("canceling and resetting") |
||||
cancel() |
||||
notifyCtx, cancel = context.WithCancel(ctx) |
||||
|
||||
req := cachetype.ConnectCALeafRequest{ |
||||
Datacenter: m.config.Datacenter, |
||||
Token: token.Value, |
||||
Server: true, |
||||
} |
||||
if err := m.cache.Notify(notifyCtx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil { |
||||
return fmt.Errorf("failed to setup leaf cert notifications: %w", err) |
||||
} |
||||
|
||||
ws.WatchCtx(ctx) |
||||
return nil |
||||
|
||||
}, func(err error) { |
||||
m.logger.Error("failed to watch server management token", "error", err) |
||||
}) |
||||
} |
||||
|
||||
func (m *CertManager) watchLeafCert(ctx context.Context) error { |
||||
req := cachetype.ConnectCALeafRequest{ |
||||
Datacenter: m.config.Datacenter, |
||||
Server: true, |
||||
} |
||||
if err := m.cache.Notify(ctx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil { |
||||
return fmt.Errorf("failed to setup leaf cert notifications: %w", err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (m *CertManager) watchCAConfig(ctx context.Context) { |
||||
retryLoopBackoff(ctx, m.waiter, func() error { |
||||
ws := memdb.NewWatchSet() |
||||
ws.Add(m.getStore().AbandonCh()) |
||||
|
||||
_, conf, err := m.getStore().CAConfig(ws) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to fetch CA configuration from the state getStore: %w", err) |
||||
} |
||||
if conf == nil || conf.ClusterID == "" { |
||||
m.logger.Debug("CA has not finished initializing") |
||||
return nil |
||||
} |
||||
|
||||
id := connect.SpiffeIDSigningForCluster(conf.ClusterID) |
||||
name := connect.PeeringServerSAN(m.config.Datacenter, id.Host()) |
||||
|
||||
m.logger.Debug("CA config watch fired - updating auto TLS server name", "name", name) |
||||
m.tlsConfigurator.UpdateAutoTLSPeeringServerName(name) |
||||
|
||||
ws.WatchCtx(ctx) |
||||
return nil |
||||
|
||||
}, func(err error) { |
||||
m.logger.Error("failed to watch CA config", "error", err) |
||||
}) |
||||
} |
||||
|
||||
func retryLoopBackoff(ctx context.Context, waiter retry.Waiter, loopFn func() error, errorFn func(error)) { |
||||
for { |
||||
if err := waiter.Wait(ctx); err != nil { |
||||
// The error will only be non-nil if the context is canceled.
|
||||
return |
||||
} |
||||
|
||||
if err := loopFn(); err != nil { |
||||
errorFn(err) |
||||
continue |
||||
} |
||||
|
||||
// Reset the failure count seen by the waiter if there was no error.
|
||||
waiter.Reset() |
||||
} |
||||
} |
||||
|
||||
func (m *CertManager) handleUpdates(ctx context.Context) { |
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
m.logger.Debug("context canceled") |
||||
return |
||||
|
||||
case event := <-m.cacheUpdateCh: |
||||
m.logger.Debug("got cache update event", "correlationID", event.CorrelationID, "error", event.Err) |
||||
|
||||
if err := m.handleLeafUpdate(event); err != nil { |
||||
m.logger.Error("failed to handle cache update event", "error", err) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (m *CertManager) handleLeafUpdate(event cache.UpdateEvent) error { |
||||
if event.Err != nil { |
||||
return fmt.Errorf("leaf cert watch returned an error: %w", event.Err) |
||||
} |
||||
if event.CorrelationID != leafWatchID { |
||||
return fmt.Errorf("got unexpected update correlation ID %q while expecting %q", event.CorrelationID, leafWatchID) |
||||
} |
||||
|
||||
leaf, ok := event.Result.(*structs.IssuedCert) |
||||
if !ok { |
||||
return fmt.Errorf("got invalid type in leaf cert watch response: %T", event.Result) |
||||
} |
||||
|
||||
m.logger.Debug("leaf certificate watch fired - updating auto TLS certificate", "uri", leaf.ServerURI) |
||||
|
||||
if err := m.tlsConfigurator.UpdateAutoTLSCert(leaf.CertPEM, leaf.PrivateKeyPEM); err != nil { |
||||
return fmt.Errorf("failed to getStore the server leaf cert: %w", err) |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,296 @@
|
||||
package servercert |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/hashicorp/consul/agent/cache" |
||||
"github.com/hashicorp/consul/agent/connect" |
||||
"github.com/hashicorp/consul/agent/structs" |
||||
"github.com/hashicorp/consul/lib/retry" |
||||
"github.com/hashicorp/consul/sdk/testutil" |
||||
"github.com/hashicorp/go-memdb" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
type fakeStore struct { |
||||
// conf is the current CA configuration stored in the fakeStore.
|
||||
conf chan *structs.CAConfiguration |
||||
|
||||
// tokenEntry is the current server token entry stored in the fakeStore.
|
||||
tokenEntry chan *structs.SystemMetadataEntry |
||||
|
||||
// tokenCanceler will unblock the WatchSet for the token entry.
|
||||
tokenCanceler <-chan struct{} |
||||
} |
||||
|
||||
func (s *fakeStore) CAConfig(_ memdb.WatchSet) (uint64, *structs.CAConfiguration, error) { |
||||
select { |
||||
case conf := <-s.conf: |
||||
return 0, conf, nil |
||||
default: |
||||
return 0, nil, nil |
||||
} |
||||
} |
||||
|
||||
func (s *fakeStore) setCAConfig() { |
||||
s.conf <- &structs.CAConfiguration{ |
||||
ClusterID: connect.TestClusterID, |
||||
} |
||||
} |
||||
|
||||
func (s *fakeStore) SystemMetadataGet(ws memdb.WatchSet, _ string) (uint64, *structs.SystemMetadataEntry, error) { |
||||
select { |
||||
case entry := <-s.tokenEntry: |
||||
ws.Add(s.tokenCanceler) |
||||
return 0, entry, nil |
||||
default: |
||||
return 0, nil, nil |
||||
} |
||||
} |
||||
|
||||
func (s *fakeStore) setServerToken(token string, canceler <-chan struct{}) { |
||||
s.tokenCanceler = canceler |
||||
s.tokenEntry <- &structs.SystemMetadataEntry{ |
||||
Key: structs.ServerManagementTokenAccessorID, |
||||
Value: token, |
||||
} |
||||
} |
||||
|
||||
func (s *fakeStore) AbandonCh() <-chan struct{} { |
||||
return make(<-chan struct{}) |
||||
} |
||||
|
||||
type testCert struct { |
||||
pub string |
||||
priv string |
||||
} |
||||
|
||||
type fakeTLSConfigurator struct { |
||||
cert testCert |
||||
peeringServerName string |
||||
|
||||
// syncCh is used to signal that an update was handled.
|
||||
// It synchronizes readers and writers in different goroutines.
|
||||
syncCh chan struct{} |
||||
} |
||||
|
||||
func (u *fakeTLSConfigurator) UpdateAutoTLSCert(pub, priv string) error { |
||||
u.cert = testCert{ |
||||
pub: pub, |
||||
priv: priv, |
||||
} |
||||
u.syncCh <- struct{}{} |
||||
return nil |
||||
} |
||||
|
||||
func (u *fakeTLSConfigurator) UpdateAutoTLSPeeringServerName(name string) { |
||||
u.peeringServerName = name |
||||
u.syncCh <- struct{}{} |
||||
} |
||||
|
||||
func (u *fakeTLSConfigurator) timeoutIfNotUpdated(t *testing.T) error { |
||||
t.Helper() |
||||
|
||||
select { |
||||
case <-u.syncCh: |
||||
case <-time.After(100 * time.Millisecond): |
||||
t.Fatalf("timed out") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type watchInfo struct { |
||||
ctx context.Context |
||||
token string |
||||
} |
||||
|
||||
type fakeCache struct { |
||||
updateCh chan<- cache.UpdateEvent |
||||
|
||||
// watched is a map of watched correlation IDs to the ACL token of the request.
|
||||
watched map[string]watchInfo |
||||
|
||||
// syncCh is used to signal that Notify was called.
|
||||
// It synchronizes readers and writers in different goroutines.
|
||||
syncCh chan struct{} |
||||
} |
||||
|
||||
func (c *fakeCache) triggerLeafUpdate() { |
||||
c.updateCh <- cache.UpdateEvent{ |
||||
CorrelationID: leafWatchID, |
||||
Result: &structs.IssuedCert{ |
||||
CertPEM: "cert-pem", |
||||
PrivateKeyPEM: "key-pem", |
||||
ServerURI: "test-uri", |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (c *fakeCache) Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error { |
||||
c.watched[correlationID] = watchInfo{ctx: ctx, token: r.CacheInfo().Token} |
||||
c.updateCh = ch |
||||
c.syncCh <- struct{}{} |
||||
return nil |
||||
} |
||||
|
||||
func (c *fakeCache) timeoutIfNotUpdated(t *testing.T) error { |
||||
t.Helper() |
||||
|
||||
select { |
||||
case <-c.syncCh: |
||||
case <-time.After(100 * time.Millisecond): |
||||
t.Fatalf("timed out") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func testWaiter() retry.Waiter { |
||||
return retry.Waiter{ |
||||
MinFailures: 1, |
||||
MinWait: 20 * time.Millisecond, |
||||
MaxWait: 20 * time.Millisecond, |
||||
} |
||||
} |
||||
|
||||
func TestCertManager_ACLsDisabled(t *testing.T) { |
||||
tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)} |
||||
cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)} |
||||
store := fakeStore{ |
||||
conf: make(chan *structs.CAConfiguration, 1), |
||||
tokenEntry: make(chan *structs.SystemMetadataEntry, 1), |
||||
} |
||||
|
||||
mgr := NewCertManager(Deps{ |
||||
Logger: testutil.Logger(t), |
||||
Config: Config{ |
||||
Datacenter: "my-dc", |
||||
ACLsEnabled: false, |
||||
}, |
||||
TLSConfigurator: &tlsConfigurator, |
||||
Cache: &cache, |
||||
GetStore: func() Store { return &store }, |
||||
}) |
||||
|
||||
// Override the default waiter to reduce time between retries.
|
||||
mgr.waiter = testWaiter() |
||||
|
||||
require.NoError(t, mgr.Start(context.Background())) |
||||
|
||||
testutil.RunStep(t, "initial empty state", func(t *testing.T) { |
||||
require.Empty(t, tlsConfigurator.cert) |
||||
require.Empty(t, tlsConfigurator.peeringServerName) |
||||
|
||||
require.Contains(t, cache.watched, leafWatchID) |
||||
}) |
||||
|
||||
testutil.RunStep(t, "leaf cert update", func(t *testing.T) { |
||||
cache.triggerLeafUpdate() |
||||
|
||||
// Wait for the update to arrive.
|
||||
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) |
||||
|
||||
expect := testCert{ |
||||
pub: "cert-pem", |
||||
priv: "key-pem", |
||||
} |
||||
require.Equal(t, expect, tlsConfigurator.cert) |
||||
}) |
||||
|
||||
testutil.RunStep(t, "ca config update", func(t *testing.T) { |
||||
store.setCAConfig() |
||||
|
||||
// Wait for the update to arrive.
|
||||
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) |
||||
|
||||
expect := connect.PeeringServerSAN(mgr.config.Datacenter, connect.TestTrustDomain) |
||||
require.Equal(t, expect, tlsConfigurator.peeringServerName) |
||||
}) |
||||
} |
||||
|
||||
func TestCertManager_ACLsEnabled(t *testing.T) { |
||||
tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)} |
||||
cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)} |
||||
store := fakeStore{ |
||||
conf: make(chan *structs.CAConfiguration, 1), |
||||
tokenEntry: make(chan *structs.SystemMetadataEntry, 1), |
||||
} |
||||
|
||||
mgr := NewCertManager(Deps{ |
||||
Logger: testutil.Logger(t), |
||||
Config: Config{ |
||||
Datacenter: "my-dc", |
||||
ACLsEnabled: true, |
||||
}, |
||||
TLSConfigurator: &tlsConfigurator, |
||||
Cache: &cache, |
||||
GetStore: func() Store { return &store }, |
||||
}) |
||||
|
||||
// Override the default waiter to reduce time between retries.
|
||||
mgr.waiter = testWaiter() |
||||
|
||||
require.NoError(t, mgr.Start(context.Background())) |
||||
|
||||
testutil.RunStep(t, "initial empty state", func(t *testing.T) { |
||||
require.Empty(t, tlsConfigurator.cert) |
||||
require.Empty(t, tlsConfigurator.peeringServerName) |
||||
|
||||
require.Empty(t, cache.watched) |
||||
}) |
||||
|
||||
var leafCtx context.Context |
||||
tokenCanceler := make(chan struct{}) |
||||
|
||||
testutil.RunStep(t, "server token update", func(t *testing.T) { |
||||
store.setServerToken("first-secret", tokenCanceler) |
||||
|
||||
require.NoError(t, cache.timeoutIfNotUpdated(t)) |
||||
|
||||
require.Contains(t, cache.watched, leafWatchID) |
||||
require.Equal(t, "first-secret", cache.watched[leafWatchID].token) |
||||
|
||||
leafCtx = cache.watched[leafWatchID].ctx |
||||
}) |
||||
|
||||
testutil.RunStep(t, "leaf cert update", func(t *testing.T) { |
||||
cache.triggerLeafUpdate() |
||||
|
||||
// Wait for the update to arrive.
|
||||
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) |
||||
|
||||
expect := testCert{ |
||||
pub: "cert-pem", |
||||
priv: "key-pem", |
||||
} |
||||
require.Equal(t, expect, tlsConfigurator.cert) |
||||
}) |
||||
|
||||
testutil.RunStep(t, "another server token update", func(t *testing.T) { |
||||
store.setServerToken("second-secret", nil) |
||||
|
||||
// Fire the existing WatchSet to simulate a state store update.
|
||||
tokenCanceler <- struct{}{} |
||||
|
||||
// The leaf watch in the cache should have been reset.
|
||||
require.NoError(t, cache.timeoutIfNotUpdated(t)) |
||||
|
||||
// The original leaf watch context should have been canceled.
|
||||
require.Error(t, leafCtx.Err()) |
||||
|
||||
// A new leaf watch is expected with the new token.
|
||||
require.Contains(t, cache.watched, leafWatchID) |
||||
require.Equal(t, "second-secret", cache.watched[leafWatchID].token) |
||||
}) |
||||
|
||||
testutil.RunStep(t, "ca config update", func(t *testing.T) { |
||||
store.setCAConfig() |
||||
|
||||
// Wait for the update to arrive.
|
||||
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t)) |
||||
|
||||
expect := connect.PeeringServerSAN(mgr.config.Datacenter, connect.TestTrustDomain) |
||||
require.Equal(t, expect, tlsConfigurator.peeringServerName) |
||||
}) |
||||
} |
Loading…
Reference in new issue