diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 86badf67e4..1b22b02bd4 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -124,15 +124,21 @@ func (c *cacheProxyDataSource[ReqType]) Notify( func dispatchCacheUpdate(ch chan<- proxycfg.UpdateEvent) cache.Callback { return func(ctx context.Context, e cache.UpdateEvent) { - u := proxycfg.UpdateEvent{ - CorrelationID: e.CorrelationID, - Result: e.Result, - Err: e.Err, - } - select { - case ch <- u: + case ch <- newUpdateEvent(e.CorrelationID, e.Result, e.Err): case <-ctx.Done(): } } } + +func newUpdateEvent(correlationID string, result any, err error) proxycfg.UpdateEvent { + // This roughly matches the logic in agent/submatview.LocalMaterializer.isTerminalError. + if acl.IsErrNotFound(err) { + err = proxycfg.TerminalError(err) + } + return proxycfg.UpdateEvent{ + CorrelationID: correlationID, + Result: result, + Err: err, + } +} diff --git a/agent/proxycfg-glue/intention_upstreams.go b/agent/proxycfg-glue/intention_upstreams.go index 186d91b357..a694d033b4 100644 --- a/agent/proxycfg-glue/intention_upstreams.go +++ b/agent/proxycfg-glue/intention_upstreams.go @@ -54,13 +54,8 @@ func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.Servi func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) { return func(ctx context.Context, correlationID string, result ResultType, err error) { - event := proxycfg.UpdateEvent{ - CorrelationID: correlationID, - Result: result, - Err: err, - } select { - case ch <- event: + case ch <- newUpdateEvent(correlationID, result, err): case <-ctx.Done(): } } diff --git a/agent/proxycfg-glue/intentions.go b/agent/proxycfg-glue/intentions.go index 57f48bdae9..69652d922d 100644 --- a/agent/proxycfg-glue/intentions.go +++ b/agent/proxycfg-glue/intentions.go @@ -39,12 +39,8 @@ func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecifi QueryOptions: structs.QueryOptions{Token: req.QueryOptions.Token}, } return c.c.NotifyCallback(ctx, cachetype.IntentionMatchName, query, correlationID, func(ctx context.Context, event cache.UpdateEvent) { - e := proxycfg.UpdateEvent{ - CorrelationID: correlationID, - Err: event.Err, - } - - if e.Err == nil { + var result any + if event.Err == nil { rsp, ok := event.Result.(*structs.IndexedIntentionMatches) if !ok { return @@ -54,11 +50,11 @@ func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecifi if len(rsp.Matches) != 0 { matches = rsp.Matches[0] } - e.Result = matches + result = matches } select { - case ch <- e: + case ch <- newUpdateEvent(correlationID, result, event.Err): case <-ctx.Done(): } }) @@ -110,10 +106,7 @@ func (s *serverIntentions) Notify(ctx context.Context, req *structs.ServiceSpeci sort.Sort(structs.IntentionPrecedenceSorter(intentions)) - return proxycfg.UpdateEvent{ - CorrelationID: correlationID, - Result: intentions, - }, true + return newUpdateEvent(correlationID, intentions, nil), true } for subjectIdx, subject := range subjects { diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index bda0226ffb..3649bed2d3 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -2,6 +2,7 @@ package proxycfg import ( "context" + "errors" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" @@ -15,6 +16,28 @@ type UpdateEvent struct { Err error } +// TerminalError wraps the given error to indicate that the data source is in +// an irrecoverably broken state (e.g. because the given ACL token has been +// deleted). +// +// Setting UpdateEvent.Err to a TerminalError causes all watches to be canceled +// which, in turn, terminates the xDS streams. +func TerminalError(err error) error { + return terminalError{err} +} + +// IsTerminalError returns whether the given error indicates that the data +// source is in an irrecoverably broken state so watches should be torn down +// and retried at a higher level. +func IsTerminalError(err error) bool { + return errors.As(err, &terminalError{}) +} + +type terminalError struct{ err error } + +func (e terminalError) Error() string { return e.err.Error() } +func (e terminalError) Unwrap() error { return e.err } + // DataSources contains the dependencies used to consume data used to configure // proxies. type DataSources struct { diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 3de11b3f8a..efdfe4b724 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -127,7 +127,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour } // We are updating the proxy, close its old state - state.Close() + state.Close(false) } // TODO: move to a function that translates ManagerConfig->stateConfig @@ -148,14 +148,13 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour return err } - ch, err := state.Watch() - if err != nil { + if _, err = state.Watch(); err != nil { return err } m.proxies[id] = state // Start a goroutine that will wait for changes and broadcast them to watchers. - go m.notifyBroadcast(ch) + go m.notifyBroadcast(id, state) return nil } @@ -175,8 +174,8 @@ func (m *Manager) Deregister(id ProxyID, source ProxySource) { } // Closing state will let the goroutine we started in Register finish since - // watch chan is closed. - state.Close() + // watch chan is closed + state.Close(false) delete(m.proxies, id) // We intentionally leave potential watchers hanging here - there is no new @@ -186,11 +185,17 @@ func (m *Manager) Deregister(id ProxyID, source ProxySource) { // cleaned up naturally. } -func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) { - // Run until ch is closed - for snap := range ch { +func (m *Manager) notifyBroadcast(proxyID ProxyID, state *state) { + // Run until ch is closed (by a defer in state.run). + for snap := range state.snapCh { m.notify(&snap) } + + // If state.run exited because of an irrecoverable error, close all of the + // watchers so that the consumers reconnect/retry at a higher level. + if state.failed() { + m.closeAllWatchers(proxyID) + } } func (m *Manager) notify(snap *ConfigSnapshot) { @@ -281,6 +286,20 @@ func (m *Manager) Watch(id ProxyID) (<-chan *ConfigSnapshot, CancelFunc) { } } +func (m *Manager) closeAllWatchers(proxyID ProxyID) { + m.mu.Lock() + defer m.mu.Unlock() + + watchers, ok := m.watchers[proxyID] + if !ok { + return + } + + for watchID := range watchers { + m.closeWatchLocked(proxyID, watchID) + } +} + // closeWatchLocked cleans up state related to a single watcher. It assumes the // lock is held. func (m *Manager) closeWatchLocked(proxyID ProxyID, watchID uint64) { @@ -309,7 +328,7 @@ func (m *Manager) Close() error { // Then close all states for proxyID, state := range m.proxies { - state.Close() + state.Close(false) delete(m.proxies, proxyID) } return nil diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 13b22c4fd2..34d3364356 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "reflect" + "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -70,11 +71,21 @@ type state struct { // in Watch. cancel func() + // failedFlag is (atomically) set to 1 (by Close) when run exits because a data + // source is in an irrecoverable state. It can be read with failed. + failedFlag int32 + ch chan UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot } +// failed returns whether run exited because a data source is in an +// irrecoverable state. +func (s *state) failed() bool { + return atomic.LoadInt32(&s.failedFlag) == 1 +} + type DNSConfig struct { Domain string AltDomain string @@ -250,10 +261,13 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) { } // Close discards the state and stops any long-running watches. -func (s *state) Close() error { +func (s *state) Close(failed bool) error { if s.cancel != nil { s.cancel() } + if failed { + atomic.StoreInt32(&s.failedFlag, 1) + } return nil } @@ -300,7 +314,13 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { case <-ctx.Done(): return case u := <-s.ch: - s.logger.Trace("A blocking query returned; handling snapshot update", "correlationID", u.CorrelationID) + s.logger.Trace("Data source returned; handling snapshot update", "correlationID", u.CorrelationID) + + if IsTerminalError(u.Err) { + s.logger.Error("Data source in an irrecoverable state; exiting", "error", u.Err, "correlationID", u.CorrelationID) + s.Close(true) + return + } if err := s.handler.handleUpdate(ctx, u, snap); err != nil { s.logger.Error("Failed to handle update from watch", diff --git a/agent/submatview/local_materializer.go b/agent/submatview/local_materializer.go index 6e32b36025..b3d4480bda 100644 --- a/agent/submatview/local_materializer.go +++ b/agent/submatview/local_materializer.go @@ -66,6 +66,10 @@ func (m *LocalMaterializer) Run(ctx context.Context) { if ctx.Err() != nil { return } + if m.isTerminalError(err) { + return + } + m.mat.handleError(req, err) if err := m.mat.retryWaiter.Wait(ctx); err != nil { @@ -74,6 +78,14 @@ func (m *LocalMaterializer) Run(ctx context.Context) { } } +// isTerminalError determines whether the given error cannot be recovered from +// and should cause the materializer to halt and be evicted from the view store. +// +// This roughly matches the logic in agent/proxycfg-glue.newUpdateEvent. +func (m *LocalMaterializer) isTerminalError(err error) bool { + return acl.IsErrNotFound(err) +} + // subscribeOnce opens a new subscription to a local backend and runs // for its lifetime or until the view is closed. func (m *LocalMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error { diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 242a0d70d7..dacf2d8bae 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -47,6 +47,9 @@ type entry struct { // requests is the count of active requests using this entry. This entry will // remain in the store as long as this count remains > 0. requests int + // evicting is used to mark an entry that will be evicted when the current in- + // flight requests finish. + evicting bool } // NewStore creates and returns a Store that is ready for use. The caller must @@ -89,6 +92,7 @@ func (s *Store) Run(ctx context.Context) { // Only stop the materializer if there are no active requests. if e.requests == 0 { + s.logger.Trace("evicting item from store", "key", he.Key()) e.stop() delete(s.byKey, he.Key()) } @@ -187,13 +191,13 @@ func (s *Store) NotifyCallback( "error", err, "request-type", req.Type(), "index", index) - continue } index = result.Index cb(ctx, cache.UpdateEvent{ CorrelationID: correlationID, Result: result.Value, + Err: err, Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached}, }) } @@ -211,6 +215,9 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) { defer s.lock.Unlock() e, ok := s.byKey[key] if ok { + if e.evicting { + return "", nil, errors.New("item is marked for eviction") + } e.requests++ s.byKey[key] = e return key, e.materializer, nil @@ -222,7 +229,18 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) { } ctx, cancel := context.WithCancel(context.Background()) - go mat.Run(ctx) + go func() { + mat.Run(ctx) + + // Materializers run until they either reach their TTL and are evicted (which + // cancels the given context) or encounter an irrecoverable error. + // + // If the context hasn't been canceled, we know it's the error case so we + // trigger an immediate eviction. + if ctx.Err() == nil { + s.evictNow(key) + } + }() e = entry{ materializer: mat, @@ -233,6 +251,28 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) { return key, e.materializer, nil } +// evictNow causes the item with the given key to be evicted immediately. +// +// If there are requests in-flight, the item is marked for eviction such that +// once the requests have been served releaseEntry will move it to the top of +// the expiry heap. If there are no requests in-flight, evictNow will move the +// item to the top of the expiry heap itself. +// +// In either case, the entry's evicting flag prevents it from being served by +// readEntry (and thereby gaining new in-flight requests). +func (s *Store) evictNow(key string) { + s.lock.Lock() + defer s.lock.Unlock() + + e := s.byKey[key] + e.evicting = true + s.byKey[key] = e + + if e.requests == 0 { + s.expireNowLocked(key) + } +} + // releaseEntry decrements the request count and starts an expiry timer if the // count has reached 0. Must be called once for every call to readEntry. func (s *Store) releaseEntry(key string) { @@ -246,6 +286,11 @@ func (s *Store) releaseEntry(key string) { return } + if e.evicting { + s.expireNowLocked(key) + return + } + if e.expiry.Index() == ttlcache.NotIndexed { e.expiry = s.expiryHeap.Add(key, s.idleTTL) s.byKey[key] = e @@ -255,6 +300,17 @@ func (s *Store) releaseEntry(key string) { s.expiryHeap.Update(e.expiry.Index(), s.idleTTL) } +// expireNowLocked moves the item with the given key to the top of the expiry +// heap, causing it to be picked up by the expiry loop and evicted immediately. +func (s *Store) expireNowLocked(key string) { + e := s.byKey[key] + if idx := e.expiry.Index(); idx != ttlcache.NotIndexed { + s.expiryHeap.Remove(idx) + } + e.expiry = s.expiryHeap.Add(key, time.Duration(0)) + s.byKey[key] = e +} + // makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. func makeEntryKey(typ string, r cache.RequestInfo) string { return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 1d5789c054..aab0995998 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -509,3 +509,75 @@ func TestStore_Run_ExpiresEntries(t *testing.T) { require.Len(t, store.byKey, 0) require.Equal(t, ttlcache.NotIndexed, e.expiry.Index()) } + +func TestStore_Run_FailingMaterializer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := NewStore(hclog.NewNullLogger()) + store.idleTTL = 24 * time.Hour + go store.Run(ctx) + + t.Run("with an in-flight request", func(t *testing.T) { + req := &failingMaterializerRequest{ + doneCh: make(chan struct{}), + } + + ch := make(chan cache.UpdateEvent) + reqCtx, reqCancel := context.WithCancel(context.Background()) + t.Cleanup(reqCancel) + require.NoError(t, store.Notify(reqCtx, req, "", ch)) + + assertRequestCount(t, store, req, 1) + + // Cause the materializer to "fail" (exit before its context is canceled). + close(req.doneCh) + + // End the in-flight request. + reqCancel() + + // Check that the item was evicted. + retry.Run(t, func(r *retry.R) { + store.lock.Lock() + defer store.lock.Unlock() + + require.Len(r, store.byKey, 0) + }) + }) + + t.Run("with no in-flight requests", func(t *testing.T) { + req := &failingMaterializerRequest{ + doneCh: make(chan struct{}), + } + + // Cause the materializer to "fail" (exit before its context is canceled). + close(req.doneCh) + + // Check that the item was evicted. + retry.Run(t, func(r *retry.R) { + store.lock.Lock() + defer store.lock.Unlock() + + require.Len(r, store.byKey, 0) + }) + }) +} + +type failingMaterializerRequest struct { + doneCh chan struct{} +} + +func (failingMaterializerRequest) CacheInfo() cache.RequestInfo { return cache.RequestInfo{} } +func (failingMaterializerRequest) Type() string { return "test.FailingMaterializerRequest" } + +func (r *failingMaterializerRequest) NewMaterializer() (Materializer, error) { + return &failingMaterializer{doneCh: r.doneCh}, nil +} + +type failingMaterializer struct { + doneCh <-chan struct{} +} + +func (failingMaterializer) Query(context.Context, uint64) (Result, error) { return Result{}, nil } + +func (m *failingMaterializer) Run(context.Context) { <-m.doneCh } diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 701c04f2ed..71c1edcb0f 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -81,6 +81,11 @@ const ( ) func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discovery_v3.DeltaDiscoveryRequest) error { + // Handle invalid ACL tokens up-front. + if _, err := s.authenticate(stream.Context()); err != nil { + return err + } + // Loop state var ( cfgSnap *proxycfg.ConfigSnapshot @@ -200,7 +205,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } } - case cfgSnap = <-stateCh: + case cs, ok := <-stateCh: + if !ok { + // stateCh is closed either when *we* cancel the watch (on-exit via defer) + // or by the proxycfg.Manager when an irrecoverable error is encountered + // such as the ACL token getting deleted. + // + // We know for sure that this is the latter case, because in the former we + // would've already exited this loop. + return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again") + } + cfgSnap = cs + newRes, err := generator.allResourcesFromSnapshot(cfgSnap) if err != nil { return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err) diff --git a/agent/xds/server.go b/agent/xds/server.go index cc27f3fde7..3ee42e77b0 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -186,6 +186,18 @@ func (s *Server) Register(srv *grpc.Server) { envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s) } +func (s *Server) authenticate(ctx context.Context) (acl.Authorizer, error) { + authz, err := s.ResolveToken(external.TokenFromContext(ctx)) + if acl.IsErrNotFound(err) { + return nil, status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) + } else if acl.IsErrPermissionDenied(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } else if err != nil { + return nil, status.Errorf(codes.Internal, "error resolving acl token: %v", err) + } + return authz, nil +} + // authorize the xDS request using the token stored in ctx. This authorization is // a bit different from most interfaces. Instead of explicitly authorizing or // filtering each piece of data in the response, the request is authorized @@ -201,13 +213,9 @@ func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") } - authz, err := s.ResolveToken(external.TokenFromContext(ctx)) - if acl.IsErrNotFound(err) { - return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) - } else if acl.IsErrPermissionDenied(err) { - return status.Error(codes.PermissionDenied, err.Error()) - } else if err != nil { - return status.Errorf(codes.Internal, "error resolving acl token: %v", err) + authz, err := s.authenticate(ctx) + if err != nil { + return err } var authzContext acl.AuthorizerContext