From 17b833b4c947e4b8367ced26b674c5a77eca78c5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 19 Jun 2020 12:26:36 -0400 Subject: [PATCH] Add a context for stopping EventPublisher goroutine --- agent/consul/state/event_publisher.go | 21 ++++++++++++++++----- agent/consul/state/state_store.go | 7 ++++++- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index d4a97768fb..b2ad5b5f36 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -67,7 +67,12 @@ type commitUpdate struct { events []stream.Event } -func NewEventPublisher(handlers map[stream.Topic]topicHandler, snapCacheTTL time.Duration) *EventPublisher { +// NewEventPublisher returns an EventPublisher for publishing change events. +// Handlers are used to convert the memDB changes into events. +// A goroutine is run in the background to publish events to all subscribes. +// Cancelling the context will shutdown the goroutine, to free resources, +// and stop all publishing. +func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]topicHandler, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[stream.Topic]*stream.EventBuffer), @@ -79,7 +84,7 @@ func NewEventPublisher(handlers map[stream.Topic]topicHandler, snapCacheTTL time handlers: handlers, } - go e.handleUpdates() + go e.handleUpdates(ctx) return e } @@ -121,10 +126,16 @@ func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error { return nil } -func (e *EventPublisher) handleUpdates() { +func (e *EventPublisher) handleUpdates(ctx context.Context) { for { - update := <-e.publishCh - e.sendEvents(update) + select { + case <-ctx.Done(): + // TODO: also close all subscriptions so the subscribers are moved + // to the new publisher? + return + case update := <-e.publishCh: + e.sendEvents(update) + } } } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 83412bdb39..0cb04ea2f9 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -1,6 +1,7 @@ package state import ( + "context" "errors" "fmt" "time" @@ -154,6 +155,10 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { return nil, fmt.Errorf("Failed setting up state store: %s", err) } + // TODO: context should be cancelled when the store is Abandoned to free + // resources. + ctx := context.TODO() + s := &Store{ schema: schema, abandonCh: make(chan struct{}), @@ -161,7 +166,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { lockDelay: NewDelay(), db: &changeTrackerDB{ db: db, - publisher: NewEventPublisher(newTopicHandlers(), 10*time.Second), + publisher: NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), }, } return s, nil