From 4fa0fdc0e0f94e56f2e187ed432503a55d43a9c8 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 6 Jul 2020 16:15:13 -0400 Subject: [PATCH] stream: Move EventPublisher to stream package The EventPublisher is the central hub of the PubSub system. It is toughly coupled with much of stream. Some stream internals were exported exclusively for EventPublisher. The two Subscribe cases (with or without index) were also awkwardly split between two packages. By moving EventPublisher into stream they are now both in the same package (although still in different files). --- agent/consul/state/memdb.go | 3 +- agent/consul/state/state_store.go | 3 +- ...sher_test.go => store_integration_test.go} | 404 ++++++++---------- agent/consul/state/stream_topics.go | 4 +- agent/consul/stream/event.go | 1 + .../{state => stream}/event_publisher.go | 75 ++-- agent/consul/stream/event_publisher_test.go | 115 +++++ 7 files changed, 338 insertions(+), 267 deletions(-) rename agent/consul/state/{event_publisher_test.go => store_integration_test.go} (86%) rename agent/consul/{state => stream}/event_publisher.go (80%) create mode 100644 agent/consul/stream/event_publisher_test.go diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 864694b532..d607e9bc23 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -55,8 +55,7 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { publish: func(changes db.Changes) error { // publish provides a new read-only Txn to PublishChanges so that // events can be constructed from the current state at the time of - // Commit, and so that operations can be performed in a goroutine - // after this WriteTxn is committed. + // Commit. return c.publisher.PublishChanges(c.db.Txn(false), changes) }, } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 0cb04ea2f9..0a26311a70 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) @@ -166,7 +167,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { lockDelay: NewDelay(), db: &changeTrackerDB{ db: db, - publisher: NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), + publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), }, } return s, nil diff --git a/agent/consul/state/event_publisher_test.go b/agent/consul/state/store_integration_test.go similarity index 86% rename from agent/consul/state/event_publisher_test.go rename to agent/consul/state/store_integration_test.go index d685b6fb66..0dd56b61b1 100644 --- a/agent/consul/state/event_publisher_test.go +++ b/agent/consul/state/store_integration_test.go @@ -12,225 +12,7 @@ import ( "github.com/stretchr/testify/require" ) -type nextResult struct { - Events []stream.Event - Err error -} - -func testRunSub(sub *stream.Subscription) <-chan nextResult { - eventCh := make(chan nextResult, 1) - go func() { - for { - es, err := sub.Next() - eventCh <- nextResult{ - Events: es, - Err: err, - } - if err != nil { - return - } - } - }() - return eventCh -} - -func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { - t.Helper() - select { - case next := <-eventCh: - require.NoError(t, next.Err) - require.Len(t, next.Events, 1) - t.Fatalf("got unwanted event: %#v", next.Events[0].Payload) - case <-time.After(100 * time.Millisecond): - } -} - -func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event { - t.Helper() - select { - case next := <-eventCh: - require.NoError(t, next.Err) - require.Len(t, next.Events, 1) - return &next.Events[0] - case <-time.After(100 * time.Millisecond): - t.Fatalf("no event after 100ms") - } - return nil -} - -func assertErr(t *testing.T, eventCh <-chan nextResult) error { - t.Helper() - select { - case next := <-eventCh: - require.Error(t, next.Err) - return next.Err - case <-time.After(100 * time.Millisecond): - t.Fatalf("no err after 100ms") - } - return nil -} - -// assertReset checks that a ResetStream event is send to the subscription -// within 100ms. If allowEOS is true it will ignore any intermediate events that -// come before the reset provided they are EndOfSnapshot events because in many -// cases it's non-deterministic whether the snapshot will complete before the -// acl reset is handled. -func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { - t.Helper() - timeoutCh := time.After(100 * time.Millisecond) - for { - select { - case next := <-eventCh: - if allowEOS { - if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() { - continue - } - } - require.Error(t, next.Err) - require.Equal(t, stream.ErrSubscriptionReload, next.Err) - return - case <-timeoutCh: - t.Fatalf("no err after 100ms") - } - } -} - -var topicService stream.Topic = 901 - -func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler { - return map[stream.Topic]TopicHandler{ - topicService: { - ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { - var events []stream.Event - for _, change := range changes.Changes { - if change.Table == "services" { - service := change.After.(*structs.ServiceNode) - events = append(events, stream.Event{ - Topic: topicService, - Key: service.ServiceName, - Index: changes.Index, - Payload: service, - }) - } - } - return events, nil - }, - Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) { - idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) - if err != nil { - return idx, err - } - - for _, node := range nodes { - event := stream.Event{ - Topic: req.Topic, - Key: req.Key, - Index: node.ModifyIndex, - Payload: node, - } - buffer.Append([]stream.Event{event}) - } - return idx, nil - }, - }, - stream.TopicInternal: { - ProcessChanges: aclChangeUnsubscribeEvent, - }, - } -} - -func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { - token := &structs.ACLToken{ - AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", - SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", - Description: "something", - Policies: []structs.ACLTokenPolicyLink{ - {ID: testPolicyID_A}, - }, - Roles: []structs.ACLTokenRoleLink{ - {ID: testRoleID_B}, - }, - } - token.SetHash(false) - - // If we subscribe immediately after we create a token we race with the - // publisher that is publishing the ACL token event for the token we just - // created. That means that the subscription we create right after will often - // be immediately reset. The most reliable way to avoid that without just - // sleeping for some arbitrary time is to pre-subscribe using the token before - // it actually exists (which works because the publisher doesn't check tokens - // it assumes something lower down did that) and then wait for it to be reset - // so we know the initial token write event has been sent out before - // continuing... - subscription := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, - } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) - s.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) - require.NoError(t, err) - - eventCh := testRunSub(sub) - - // Create the ACL token to be used in the subscription. - require.NoError(t, s.ACLTokenSet(2, token.Clone(), false)) - - // Wait for the pre-subscription to be reset - assertReset(t, eventCh, true) - - return token -} - -func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { - t.Parallel() - require := require.New(t) - store, err := NewStateStore(nil) - require.NoError(err) - - reg := structs.TestRegisterRequest(t) - reg.Service.ID = "web1" - require.NoError(store.EnsureRegistration(1, reg)) - - // Register the subscription. - subscription := &stream.SubscribeRequest{ - Topic: topicService, - Key: reg.Service.Service, - } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(store), 0) - store.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) - require.NoError(err) - - eventCh := testRunSub(sub) - - // Stream should get the instance and then EndOfSnapshot - e := assertEvent(t, eventCh) - srv := e.Payload.(*structs.ServiceNode) - require.Equal(srv.ServiceID, "web1") - e = assertEvent(t, eventCh) - require.True(e.IsEndOfSnapshot()) - - // Now subscriber should block waiting for updates - assertNoEvent(t, eventCh) - - // Add a new instance of service on a different node - reg.Node = "node2" - require.NoError(store.EnsureRegistration(1, reg)) - - // Subscriber should see registration - e = assertEvent(t, eventCh) - srv = e.Payload.(*structs.ServiceNode) - require.Equal(srv.Node, "node2") -} - -func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) { +func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -247,7 +29,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -311,7 +93,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) { require.Equal(stream.ErrSubscriptionReload, err) } -func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { +func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -328,7 +110,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -425,7 +207,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { assertReset(t, eventCh, true) } -func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { +func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -442,7 +224,7 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -504,3 +286,177 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { // Ensure the reload event was sent. assertReset(t, eventCh, false) } + +type nextResult struct { + Events []stream.Event + Err error +} + +func testRunSub(sub *stream.Subscription) <-chan nextResult { + eventCh := make(chan nextResult, 1) + go func() { + for { + es, err := sub.Next() + eventCh <- nextResult{ + Events: es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + t.Fatalf("got unwanted event: %#v", next.Events[0].Payload) + case <-time.After(100 * time.Millisecond): + } +} + +func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + return &next.Events[0] + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return nil +} + +func assertErr(t *testing.T, eventCh <-chan nextResult) error { + t.Helper() + select { + case next := <-eventCh: + require.Error(t, next.Err) + return next.Err + case <-time.After(100 * time.Millisecond): + t.Fatalf("no err after 100ms") + } + return nil +} + +// assertReset checks that a ResetStream event is send to the subscription +// within 100ms. If allowEOS is true it will ignore any intermediate events that +// come before the reset provided they are EndOfSnapshot events because in many +// cases it's non-deterministic whether the snapshot will complete before the +// acl reset is handled. +func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { + t.Helper() + timeoutCh := time.After(100 * time.Millisecond) + for { + select { + case next := <-eventCh: + if allowEOS { + if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() { + continue + } + } + require.Error(t, next.Err) + require.Equal(t, stream.ErrSubscriptionReload, next.Err) + return + case <-timeoutCh: + t.Fatalf("no err after 100ms") + } + } +} + +var topicService stream.Topic = 901 + +func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler { + return map[stream.Topic]stream.TopicHandler{ + topicService: { + ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + if change.Table == "services" { + service := change.After.(*structs.ServiceNode) + events = append(events, stream.Event{ + Topic: topicService, + Key: service.ServiceName, + Index: changes.Index, + Payload: service, + }) + } + } + return events, nil + }, + Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) { + idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) + if err != nil { + return idx, err + } + + for _, node := range nodes { + event := stream.Event{ + Topic: req.Topic, + Key: req.Key, + Index: node.ModifyIndex, + Payload: node, + } + buffer.Append([]stream.Event{event}) + } + return idx, nil + }, + }, + stream.TopicInternal: { + ProcessChanges: aclChangeUnsubscribeEvent, + }, + } +} + +func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { + token := &structs.ACLToken{ + AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", + SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", + Description: "something", + Policies: []structs.ACLTokenPolicyLink{ + {ID: testPolicyID_A}, + }, + Roles: []structs.ACLTokenRoleLink{ + {ID: testRoleID_B}, + }, + } + token.SetHash(false) + + // If we subscribe immediately after we create a token we race with the + // publisher that is publishing the ACL token event for the token we just + // created. That means that the subscription we create right after will often + // be immediately reset. The most reliable way to avoid that without just + // sleeping for some arbitrary time is to pre-subscribe using the token before + // it actually exists (which works because the publisher doesn't check tokens + // it assumes something lower down did that) and then wait for it to be reset + // so we know the initial token write event has been sent out before + // continuing... + subscription := &stream.SubscribeRequest{ + Topic: topicService, + Key: "nope", + Token: token.SecretID, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + s.db.publisher = publisher + sub, err := publisher.Subscribe(ctx, subscription) + require.NoError(t, err) + + eventCh := testRunSub(sub) + + // Create the ACL token to be used in the subscription. + require.NoError(t, s.ACLTokenSet(2, token.Clone(), false)) + + // Wait for the pre-subscription to be reset + assertReset(t, eventCh, true) + + return token +} diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index d7d84fcbb4..effcaa8c5e 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -5,8 +5,8 @@ import ( ) // newTopicHandlers returns the default handlers for state change events. -func newTopicHandlers() map[stream.Topic]TopicHandler { - return map[stream.Topic]TopicHandler{ +func newTopicHandlers() map[stream.Topic]stream.TopicHandler { + return map[stream.Topic]stream.TopicHandler{ // TopicInternal is a special case for processors that handle events that are // not for subscribers. They are used by the stream package. stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent}, diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index d05537488e..ef6265a565 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -4,6 +4,7 @@ type Topic int32 // TODO: remove underscores // TODO: type string instead of int? +// TODO: define non-internal topics in state package? const ( TopicInternal Topic = 0 Topic_ServiceHealth Topic = 1 diff --git a/agent/consul/state/event_publisher.go b/agent/consul/stream/event_publisher.go similarity index 80% rename from agent/consul/state/event_publisher.go rename to agent/consul/stream/event_publisher.go index b1fb22960d..e56612d350 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -1,4 +1,4 @@ -package state +package stream import ( "context" @@ -7,7 +7,6 @@ import ( "time" "github.com/hashicorp/consul/agent/consul/state/db" - "github.com/hashicorp/consul/agent/consul/stream" ) // EventPublisher receives changes events from Publish, and sends them to all @@ -34,11 +33,11 @@ type EventPublisher struct { // topicBuffers stores the head of the linked-list buffer to publish events to // for a topic. - topicBuffers map[stream.Topic]*stream.EventBuffer + topicBuffers map[Topic]*EventBuffer // snapCache if a cache of EventSnapshots indexed by topic and key. // TODO: new struct for snapCache and snapFns and snapCacheTTL - snapCache map[stream.Topic]map[string]*stream.EventSnapshot + snapCache map[Topic]map[string]*EventSnapshot subscriptions *subscriptions @@ -47,7 +46,7 @@ type EventPublisher struct { // the Commit call in the FSM hot path. publishCh chan changeEvents - handlers map[stream.Topic]TopicHandler + handlers map[Topic]TopicHandler } type subscriptions struct { @@ -60,21 +59,21 @@ type subscriptions struct { // When the token is modified all subscriptions under that token will be // reloaded. // A subscription may be unsubscribed by using the pointer to the request. - byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription + byToken map[string]map[*SubscribeRequest]*Subscription } type changeEvents struct { - events []stream.Event + events []Event } // TopicHandler provides functions which create stream.Events for a topic. type TopicHandler struct { // Snapshot creates the necessary events to reproduce the current state and // appends them to the EventBuffer. - Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) + Snapshot func(*SubscribeRequest, *EventBuffer) (index uint64, err error) // ProcessChanges accepts a slice of Changes, and builds a slice of events for // those changes. - ProcessChanges func(db.ReadTxn, db.Changes) ([]stream.Event, error) + ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error) } // NewEventPublisher returns an EventPublisher for publishing change events. @@ -82,14 +81,14 @@ type TopicHandler struct { // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, - topicBuffers: make(map[stream.Topic]*stream.EventBuffer), - snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), + topicBuffers: make(map[Topic]*EventBuffer), + snapCache: make(map[Topic]map[string]*EventSnapshot), publishCh: make(chan changeEvents, 64), subscriptions: &subscriptions{ - byToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), + byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, handlers: handlers, } @@ -99,13 +98,13 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl return e } -// PublishChanges to all subscribers. tx is a read-only transaction that may be -// used from a goroutine. The caller should never use the tx once it has been -// passed to PublishChanged. +// PublishChanges to all subscribers. tx is a read-only transaction that captures +// the state at the time the change happened. The caller must never use the tx once +// it has been passed to PublishChanged. func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { defer tx.Abort() - var events []stream.Event + var events []Event for topic, handler := range e.handlers { if handler.ProcessChanges != nil { es, err := handler.ProcessChanges(tx, changes) @@ -137,14 +136,14 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { // as any ACL update events to cause affected listeners to reset their stream. func (e *EventPublisher) sendEvents(update changeEvents) { for _, event := range update.events { - if unsubEvent, ok := event.Payload.(stream.UnsubscribePayload); ok { + if unsubEvent, ok := event.Payload.(UnsubscribePayload); ok { e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs) } } - eventsByTopic := make(map[stream.Topic][]stream.Event) + eventsByTopic := make(map[Topic][]Event) for _, event := range update.events { - if event.Topic == stream.TopicInternal { + if event.Topic == TopicInternal { continue } eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) @@ -161,10 +160,10 @@ func (e *EventPublisher) sendEvents(update changeEvents) { // already exist. // // EventPublisher.lock must be held to call this method. -func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer { +func (e *EventPublisher) getTopicBuffer(topic Topic) *EventBuffer { buf, ok := e.topicBuffers[topic] if !ok { - buf = stream.NewEventBuffer() + buf = NewEventBuffer() e.topicBuffers[topic] = buf } return buf @@ -181,11 +180,11 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer // call Subscription.Unsubscribe to free ACL tracking resources. func (e *EventPublisher) Subscribe( ctx context.Context, - req *stream.SubscribeRequest, -) (*stream.Subscription, error) { + req *SubscribeRequest, +) (*Subscription, error) { // Ensure we know how to make a snapshot for this topic _, ok := e.handlers[req.Topic] - if !ok || req.Topic == stream.TopicInternal { + if !ok || req.Topic == TopicInternal { return nil, fmt.Errorf("unknown topic %d", req.Topic) } @@ -198,26 +197,26 @@ func (e *EventPublisher) Subscribe( // See if we need a snapshot topicHead := buf.Head() - var sub *stream.Subscription + var sub *Subscription if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { // No need for a snapshot, send the "resume stream" message to signal to // client it's cache is still good. (note that this can be distinguished // from a legitimate empty snapshot due to the index matching the one the // client sent), then follow along from here in the topic. - e := stream.Event{ + e := Event{ Index: req.Index, Topic: req.Topic, Key: req.Key, - Payload: stream.ResumeStream{}, + Payload: ResumeStream{}, } // Make a new buffer to send to the client containing the resume. - buf := stream.NewEventBuffer() + buf := NewEventBuffer() // Store the head of that buffer before we append to it to give as the // starting point for the subscription. subHead := buf.Head() - buf.Append([]stream.Event{e}) + buf.Append([]Event{e}) // Now splice the rest of the topic buffer on so the subscription will // continue to see future updates in the topic buffer. @@ -227,13 +226,13 @@ func (e *EventPublisher) Subscribe( } buf.AppendBuffer(follow) - sub = stream.NewSubscription(ctx, req, subHead) + sub = NewSubscription(ctx, req, subHead) } else { snap, err := e.getSnapshotLocked(req, topicHead) if err != nil { return nil, err } - sub = stream.NewSubscription(ctx, req, snap.Snap) + sub = NewSubscription(ctx, req, snap.Snap) } e.subscriptions.add(req, sub) @@ -246,13 +245,13 @@ func (e *EventPublisher) Subscribe( return sub, nil } -func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscription) { +func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { s.lock.Lock() defer s.lock.Unlock() subsByToken, ok := s.byToken[req.Token] if !ok { - subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription) + subsByToken = make(map[*SubscribeRequest]*Subscription) s.byToken[req.Token] = subsByToken } subsByToken[req] = sub @@ -275,7 +274,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { // subscription to free resources monitoring changes in it's ACL token. // // req MUST be the same pointer that was used to register the subscription. -func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) { +func (s *subscriptions) unsubscribe(req *SubscribeRequest) { s.lock.Lock() defer s.lock.Unlock() @@ -289,11 +288,11 @@ func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) { } } -func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) { +func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *BufferItem) (*EventSnapshot, error) { // See if there is a cached snapshot topicSnaps, ok := e.snapCache[req.Topic] if !ok { - topicSnaps = make(map[string]*stream.EventSnapshot) + topicSnaps = make(map[string]*EventSnapshot) e.snapCache[req.Topic] = topicSnaps } @@ -308,7 +307,7 @@ func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHe return nil, fmt.Errorf("unknown topic %d", req.Topic) } - snap = stream.NewEventSnapshot(req, topicHead, handler.Snapshot) + snap = NewEventSnapshot(req, topicHead, handler.Snapshot) if e.snapCacheTTL > 0 { topicSnaps[req.Key] = snap diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go new file mode 100644 index 0000000000..5f9df33bab --- /dev/null +++ b/agent/consul/stream/event_publisher_test.go @@ -0,0 +1,115 @@ +package stream + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/agent/consul/state/db" + "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +var testTopic Topic = 999 + +func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { + subscription := &SubscribeRequest{ + Topic: testTopic, + Key: "sub-key", + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0) + sub, err := publisher.Subscribe(ctx, subscription) + require.NoError(t, err) + eventCh := consumeSubscription(sub) + + result := nextResult(t, eventCh) + require.NoError(t, result.Err) + expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}} + require.Equal(t, expected, result.Events) + + result = nextResult(t, eventCh) + require.Len(t, result.Events, 1) + require.True(t, result.Events[0].IsEndOfSnapshot()) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{}) + require.NoError(t, err) + + // Subscriber should see the published event + result = nextResult(t, eventCh) + require.NoError(t, result.Err) + expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}} + require.Equal(t, expected, result.Events) +} + +func newTestTopicHandlers() map[Topic]TopicHandler { + return map[Topic]TopicHandler{ + testTopic: { + Snapshot: func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) { + if req.Topic != testTopic { + return 0, fmt.Errorf("unexpected topic: %v", req.Topic) + } + buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) + return 1, nil + }, + ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) { + events := []Event{{ + Topic: testTopic, + Key: "sub-key", + Payload: "the-published-event-payload", + }} + return events, nil + }, + }, + } +} + +func consumeSubscription(sub *Subscription) <-chan subNextResult { + eventCh := make(chan subNextResult, 1) + go func() { + for { + es, err := sub.Next() + eventCh <- subNextResult{ + Events: es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +type subNextResult struct { + Events []Event + Err error +} + +func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult { + t.Helper() + select { + case next := <-eventCh: + return next + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return subNextResult{} +} + +func assertNoResult(t *testing.T, eventCh <-chan subNextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + t.Fatalf("received unexpected event: %#v", next.Events[0].Payload) + case <-time.After(100 * time.Millisecond): + } +}