From 9f7626d50180f057f4361ebf33c1440be8150ac7 Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Wed, 14 Feb 2024 14:17:55 -0600 Subject: [PATCH] Ensure all topics are refreshed on FSM restore and add supervisor loop to v1 controller subscriptions (#20642) Ensure all topics are refreshed on FSM restore and add supervisor loop to v1 controller subscriptions This PR fixes two issues: 1. Not all streams were force closed whenever a snapshot restore happened. This means that anything consuming data from the stream (controllers, queries, etc) were unaware that the data they have is potentially stale / invalid. This first part ensures that all topics are purged. 2. The v1 controllers did not properly handle stream errors (which are likely to appear much more often due to 1 above) and so it introduces a supervisor thread to restart the watches when these errors occur. --- .changelog/20642.txt | 7 ++ agent/consul/controller/controller.go | 84 ++++++++++++++------ agent/consul/fsm/fsm.go | 4 +- agent/consul/gateways/controller_gateways.go | 3 +- agent/consul/stream/event_publisher.go | 27 +++++-- 5 files changed, 91 insertions(+), 34 deletions(-) create mode 100644 .changelog/20642.txt diff --git a/.changelog/20642.txt b/.changelog/20642.txt new file mode 100644 index 0000000000..0f224654cb --- /dev/null +++ b/.changelog/20642.txt @@ -0,0 +1,7 @@ +```release-note:bug +server: Ensure internal streams are properly terminated on snapshot restore. +``` + +```release-note:bug +server: Ensure controllers are automatically restarted on internal stream errors. +``` diff --git a/agent/consul/controller/controller.go b/agent/consul/controller/controller.go index 29f864a7a6..9f49b9cb91 100644 --- a/agent/consul/controller/controller.go +++ b/agent/consul/controller/controller.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/retry" ) // much of this is a re-implementation of @@ -217,38 +218,41 @@ func (c *controller) Run(ctx context.Context) error { for _, sub := range c.subscriptions { // store a reference for the closure sub := sub + // Fetch data from subscriptions repeatedly until the context is cancelled. c.group.Go(func() error { - var index uint64 - - subscription, err := c.publisher.Subscribe(sub.request) - if err != nil { - return err + defer c.logger.Debug("stopping controller subscription", "topic", sub.request.Topic) + lastFailTime := time.Now() + retryWaiter := &retry.Waiter{ + MinFailures: 1, + MinWait: 1 * time.Second, + MaxWait: 20 * time.Second, } - defer subscription.Unsubscribe() - - for { - event, err := subscription.Next(ctx) + // Ensure the subscription is restarted when non-context errors happen. + // Stop if either the parent context or the group ctx is cancelled. + for c.groupCtx.Err() == nil { + c.logger.Debug("rewatching controller subscription", "topic", sub.request.Topic) + err := c.watchSubscription(ctx, sub) switch { case errors.Is(err, context.Canceled): return nil + case errors.Is(err, stream.ErrSubForceClosed): + c.logger.Debug("controller subscription force closed", "topic", sub.request.Topic) case err != nil: - return err - } - - if event.IsFramingEvent() { - continue - } - - if event.Index <= index { - continue - } - - index = event.Index - - if err := c.processEvent(sub, event); err != nil { - return err + // Log the error and backoff wait. Do not return the error + // or else the subscriptions will stop being watched. + c.logger.Warn("error watching controller subscription", + "topic", sub.request.Topic, + "err", err) + // Reset the waiter if the last failure was more than 10 minutes ago. + // This simply prevents the backoff from being too aggressive. + if time.Now().After(lastFailTime.Add(10 * time.Minute)) { + retryWaiter.Reset() + } + lastFailTime = time.Now() + retryWaiter.Wait(c.groupCtx) } } + return nil }) } @@ -272,6 +276,38 @@ func (c *controller) Run(ctx context.Context) error { return nil } +// watchSubscription fetches events in a loop that stops on the first error. +func (c *controller) watchSubscription(ctx context.Context, sub subscription) error { + var index uint64 + subscription, err := c.publisher.Subscribe(sub.request) + if err != nil { + return err + } + defer subscription.Unsubscribe() + + for ctx.Err() == nil { + event, err := subscription.Next(ctx) + if err != nil { + return err + } + + if event.IsFramingEvent() { + continue + } + + if event.Index <= index { + continue + } + + index = event.Index + + if err := c.processEvent(sub, event); err != nil { + return err + } + } + return ctx.Err() +} + // AddTrigger allows for triggering a reconciliation request every time that the // triggering function returns, when the passed in context is canceled // the trigger must return diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 64c75913f4..7449858af2 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -303,9 +303,7 @@ func (c *FSM) Restore(old io.ReadCloser) error { // for new data. To prevent that inconsistency we refresh the topics while holding // the lock which ensures that any subscriptions to topics for FSM generated events if c.deps.Publisher != nil { - c.deps.Publisher.RefreshTopic(state.EventTopicServiceHealth) - c.deps.Publisher.RefreshTopic(state.EventTopicServiceHealthConnect) - c.deps.Publisher.RefreshTopic(state.EventTopicCARoots) + c.deps.Publisher.RefreshAllTopics() } c.stateLock.Unlock() diff --git a/agent/consul/gateways/controller_gateways.go b/agent/consul/gateways/controller_gateways.go index ab96ca7b24..fa830337fc 100644 --- a/agent/consul/gateways/controller_gateways.go +++ b/agent/consul/gateways/controller_gateways.go @@ -540,7 +540,8 @@ func NewAPIGatewayController(fsm *fsm.FSM, publisher state.EventPublisher, updat logger: logger, updater: updater, } - reconciler.controller = controller.New(publisher, reconciler) + reconciler.controller = controller.New(publisher, reconciler). + WithLogger(logger.With("controller", "apiGatewayController")) return reconciler.controller.Subscribe( &stream.SubscribeRequest{ Topic: state.EventTopicAPIGateway, diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 258376e18a..04aa08334b 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -144,6 +144,21 @@ func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc, supp return nil } +func (e *EventPublisher) RefreshAllTopics() { + topics := make(map[Topic]struct{}) + + e.lock.Lock() + for topic := range e.snapshotHandlers { + topics[topic] = struct{}{} + e.forceEvictByTopicLocked(topic) + } + e.lock.Unlock() + + for topic := range topics { + e.subscriptions.closeAllByTopic(topic) + } +} + func (e *EventPublisher) RefreshTopic(topic Topic) error { e.lock.Lock() _, found := e.snapshotHandlers[topic] @@ -153,7 +168,9 @@ func (e *EventPublisher) RefreshTopic(topic Topic) error { return fmt.Errorf("topic %s is not registered", topic) } - e.forceEvictByTopic(topic) + e.lock.Lock() + e.forceEvictByTopicLocked(topic) + e.lock.Unlock() e.subscriptions.closeAllByTopic(topic) return nil @@ -444,14 +461,12 @@ func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *ev }) } -// forceEvictByTopic will remove all entries from the snapshot cache for a given topic. -// This method should be called while holding the publishers lock. -func (e *EventPublisher) forceEvictByTopic(topic Topic) { - e.lock.Lock() +// forceEvictByTopicLocked will remove all entries from the snapshot cache for a given topic. +// This method should be called while holding the EventPublisher's lock. +func (e *EventPublisher) forceEvictByTopicLocked(topic Topic) { for key := range e.snapCache { if key.Topic == topic.String() { delete(e.snapCache, key) } } - e.lock.Unlock() }