diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index a223733a5a..654174d7b4 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -28,7 +28,7 @@ type EventPublisher struct { // not that expensive, but having a cache for a few seconds can help // de-duplicate building the same snapshot over and over again when a // thundering herd of watchers all subscribe to the same topic within a few - // seconds. TODO + // seconds. snapCacheTTL time.Duration // This lock protects the topicBuffers, snapCache and subsByToken maps. @@ -38,8 +38,8 @@ type EventPublisher struct { // for a topic. topicBuffers map[stream.Topic]*stream.EventBuffer - // snapCache stores the head of any snapshot buffers still in cache if caching - // is enabled. + // snapCache if a cache of EventSnapshots indexed by topic and key. + // TODO: new struct for snapCache and snapFns and snapCacheTTL snapCache map[stream.Topic]map[string]*stream.EventSnapshot // snapFns is the set of snapshot functions that were registered bound to the @@ -51,9 +51,10 @@ type EventPublisher struct { // ACL permissions change. subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription - // commitCh decouples the Commit call in the FSM hot path from distributing - // the resulting events. - commitCh chan commitUpdate + // publishCh is used to send messages from an active txn to a goroutine which + // publishes events, so that publishing can happen asynchronously from + // the Commit call in the FSM hot path. + publishCh chan commitUpdate } type commitUpdate struct { @@ -70,7 +71,7 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), snapFns: make(map[stream.Topic]stream.SnapFn), subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), - commitCh: make(chan commitUpdate, 64), + publishCh: make(chan commitUpdate, 64), } // create a local handler table @@ -98,7 +99,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error { events = append(events, es...) } } - e.commitCh <- commitUpdate{ + e.publishCh <- commitUpdate{ // TODO: document why it must be created here, and not in the new thread // // Create a new transaction since it's going to be used from a different @@ -113,7 +114,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error { func (e *EventPublisher) handleUpdates() { for { - update := <-e.commitCh + update := <-e.publishCh e.sendEvents(update) } } @@ -158,15 +159,22 @@ func (e *EventPublisher) sendEvents(update commitUpdate) { eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) } - // Deliver events for topic, events := range eventsByTopic { - buf, ok := e.topicBuffers[topic] - if !ok { - buf = stream.NewEventBuffer() - e.topicBuffers[topic] = buf - } - buf.Append(events) + e.getTopicBuffer(topic).Append(events) + } +} + +// getTopicBuffer for the topic. Creates a new event buffer if one does not +// already exist. +// +// EventPublisher.lock must be held to call this method. +func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer { + buf, ok := e.topicBuffers[topic] + if !ok { + buf = stream.NewEventBuffer() + e.topicBuffers[topic] = buf } + return buf } // handleACLUpdate handles an ACL token/policy/role update. This method assumes @@ -250,11 +258,7 @@ func (e *EventPublisher) Subscribe( // Ensure there is a topic buffer for that topic so we start capturing any // future published events. - buf, ok := e.topicBuffers[req.Topic] - if !ok { - buf = stream.NewEventBuffer() - e.topicBuffers[req.Topic] = buf - } + buf := e.getTopicBuffer(req.Topic) // See if we need a snapshot topicHead := buf.Head()