Ensure all topics are refreshed on FSM restore and add supervisor loop to v1 controller subscriptions (#20642)

Ensure all topics are refreshed on FSM restore and add supervisor loop to v1 controller subscriptions

This PR fixes two issues:

1. Not all streams were force closed whenever a snapshot restore happened. This means that anything consuming data from the stream (controllers, queries, etc) were unaware that the data they have is potentially stale / invalid. This first part ensures that all topics are purged.

2. The v1 controllers did not properly handle stream errors (which are likely to appear much more often due to 1 above) and so it introduces a supervisor thread to restart the watches when these errors occur.
pull/20648/head
Derek Menteer 10 months ago committed by GitHub
parent 137c9c0973
commit 9f7626d501
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,7 @@
```release-note:bug
server: Ensure internal streams are properly terminated on snapshot restore.
```
```release-note:bug
server: Ensure controllers are automatically restarted on internal stream errors.
```

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
)
// much of this is a re-implementation of
@ -217,38 +218,41 @@ func (c *controller) Run(ctx context.Context) error {
for _, sub := range c.subscriptions {
// store a reference for the closure
sub := sub
// Fetch data from subscriptions repeatedly until the context is cancelled.
c.group.Go(func() error {
var index uint64
subscription, err := c.publisher.Subscribe(sub.request)
if err != nil {
return err
defer c.logger.Debug("stopping controller subscription", "topic", sub.request.Topic)
lastFailTime := time.Now()
retryWaiter := &retry.Waiter{
MinFailures: 1,
MinWait: 1 * time.Second,
MaxWait: 20 * time.Second,
}
defer subscription.Unsubscribe()
for {
event, err := subscription.Next(ctx)
// Ensure the subscription is restarted when non-context errors happen.
// Stop if either the parent context or the group ctx is cancelled.
for c.groupCtx.Err() == nil {
c.logger.Debug("rewatching controller subscription", "topic", sub.request.Topic)
err := c.watchSubscription(ctx, sub)
switch {
case errors.Is(err, context.Canceled):
return nil
case errors.Is(err, stream.ErrSubForceClosed):
c.logger.Debug("controller subscription force closed", "topic", sub.request.Topic)
case err != nil:
return err
}
if event.IsFramingEvent() {
continue
}
if event.Index <= index {
continue
}
index = event.Index
if err := c.processEvent(sub, event); err != nil {
return err
// Log the error and backoff wait. Do not return the error
// or else the subscriptions will stop being watched.
c.logger.Warn("error watching controller subscription",
"topic", sub.request.Topic,
"err", err)
// Reset the waiter if the last failure was more than 10 minutes ago.
// This simply prevents the backoff from being too aggressive.
if time.Now().After(lastFailTime.Add(10 * time.Minute)) {
retryWaiter.Reset()
}
lastFailTime = time.Now()
retryWaiter.Wait(c.groupCtx)
}
}
return nil
})
}
@ -272,6 +276,38 @@ func (c *controller) Run(ctx context.Context) error {
return nil
}
// watchSubscription fetches events in a loop that stops on the first error.
func (c *controller) watchSubscription(ctx context.Context, sub subscription) error {
var index uint64
subscription, err := c.publisher.Subscribe(sub.request)
if err != nil {
return err
}
defer subscription.Unsubscribe()
for ctx.Err() == nil {
event, err := subscription.Next(ctx)
if err != nil {
return err
}
if event.IsFramingEvent() {
continue
}
if event.Index <= index {
continue
}
index = event.Index
if err := c.processEvent(sub, event); err != nil {
return err
}
}
return ctx.Err()
}
// AddTrigger allows for triggering a reconciliation request every time that the
// triggering function returns, when the passed in context is canceled
// the trigger must return

@ -303,9 +303,7 @@ func (c *FSM) Restore(old io.ReadCloser) error {
// for new data. To prevent that inconsistency we refresh the topics while holding
// the lock which ensures that any subscriptions to topics for FSM generated events
if c.deps.Publisher != nil {
c.deps.Publisher.RefreshTopic(state.EventTopicServiceHealth)
c.deps.Publisher.RefreshTopic(state.EventTopicServiceHealthConnect)
c.deps.Publisher.RefreshTopic(state.EventTopicCARoots)
c.deps.Publisher.RefreshAllTopics()
}
c.stateLock.Unlock()

@ -540,7 +540,8 @@ func NewAPIGatewayController(fsm *fsm.FSM, publisher state.EventPublisher, updat
logger: logger,
updater: updater,
}
reconciler.controller = controller.New(publisher, reconciler)
reconciler.controller = controller.New(publisher, reconciler).
WithLogger(logger.With("controller", "apiGatewayController"))
return reconciler.controller.Subscribe(
&stream.SubscribeRequest{
Topic: state.EventTopicAPIGateway,

@ -144,6 +144,21 @@ func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc, supp
return nil
}
func (e *EventPublisher) RefreshAllTopics() {
topics := make(map[Topic]struct{})
e.lock.Lock()
for topic := range e.snapshotHandlers {
topics[topic] = struct{}{}
e.forceEvictByTopicLocked(topic)
}
e.lock.Unlock()
for topic := range topics {
e.subscriptions.closeAllByTopic(topic)
}
}
func (e *EventPublisher) RefreshTopic(topic Topic) error {
e.lock.Lock()
_, found := e.snapshotHandlers[topic]
@ -153,7 +168,9 @@ func (e *EventPublisher) RefreshTopic(topic Topic) error {
return fmt.Errorf("topic %s is not registered", topic)
}
e.forceEvictByTopic(topic)
e.lock.Lock()
e.forceEvictByTopicLocked(topic)
e.lock.Unlock()
e.subscriptions.closeAllByTopic(topic)
return nil
@ -444,14 +461,12 @@ func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *ev
})
}
// forceEvictByTopic will remove all entries from the snapshot cache for a given topic.
// This method should be called while holding the publishers lock.
func (e *EventPublisher) forceEvictByTopic(topic Topic) {
e.lock.Lock()
// forceEvictByTopicLocked will remove all entries from the snapshot cache for a given topic.
// This method should be called while holding the EventPublisher's lock.
func (e *EventPublisher) forceEvictByTopicLocked(topic Topic) {
for key := range e.snapCache {
if key.Topic == topic.String() {
delete(e.snapCache, key)
}
}
e.lock.Unlock()
}

Loading…
Cancel
Save