diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 864694b532..d607e9bc23 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -55,8 +55,7 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { publish: func(changes db.Changes) error { // publish provides a new read-only Txn to PublishChanges so that // events can be constructed from the current state at the time of - // Commit, and so that operations can be performed in a goroutine - // after this WriteTxn is committed. + // Commit. return c.publisher.PublishChanges(c.db.Txn(false), changes) }, } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 0cb04ea2f9..0a26311a70 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) @@ -166,7 +167,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { lockDelay: NewDelay(), db: &changeTrackerDB{ db: db, - publisher: NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), + publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), }, } return s, nil diff --git a/agent/consul/state/event_publisher_test.go b/agent/consul/state/store_integration_test.go similarity index 86% rename from agent/consul/state/event_publisher_test.go rename to agent/consul/state/store_integration_test.go index d685b6fb66..0dd56b61b1 100644 --- a/agent/consul/state/event_publisher_test.go +++ b/agent/consul/state/store_integration_test.go @@ -12,225 +12,7 @@ import ( "github.com/stretchr/testify/require" ) -type nextResult struct { - Events []stream.Event - Err error -} - -func testRunSub(sub *stream.Subscription) <-chan nextResult { - eventCh := make(chan nextResult, 1) - go func() { - for { - es, err := sub.Next() - eventCh <- nextResult{ - Events: es, - Err: err, - } - if err != nil { - return - } - } - }() - return eventCh -} - -func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { - t.Helper() - select { - case next := <-eventCh: - require.NoError(t, next.Err) - require.Len(t, next.Events, 1) - t.Fatalf("got unwanted event: %#v", next.Events[0].Payload) - case <-time.After(100 * time.Millisecond): - } -} - -func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event { - t.Helper() - select { - case next := <-eventCh: - require.NoError(t, next.Err) - require.Len(t, next.Events, 1) - return &next.Events[0] - case <-time.After(100 * time.Millisecond): - t.Fatalf("no event after 100ms") - } - return nil -} - -func assertErr(t *testing.T, eventCh <-chan nextResult) error { - t.Helper() - select { - case next := <-eventCh: - require.Error(t, next.Err) - return next.Err - case <-time.After(100 * time.Millisecond): - t.Fatalf("no err after 100ms") - } - return nil -} - -// assertReset checks that a ResetStream event is send to the subscription -// within 100ms. If allowEOS is true it will ignore any intermediate events that -// come before the reset provided they are EndOfSnapshot events because in many -// cases it's non-deterministic whether the snapshot will complete before the -// acl reset is handled. -func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { - t.Helper() - timeoutCh := time.After(100 * time.Millisecond) - for { - select { - case next := <-eventCh: - if allowEOS { - if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() { - continue - } - } - require.Error(t, next.Err) - require.Equal(t, stream.ErrSubscriptionReload, next.Err) - return - case <-timeoutCh: - t.Fatalf("no err after 100ms") - } - } -} - -var topicService stream.Topic = 901 - -func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler { - return map[stream.Topic]TopicHandler{ - topicService: { - ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { - var events []stream.Event - for _, change := range changes.Changes { - if change.Table == "services" { - service := change.After.(*structs.ServiceNode) - events = append(events, stream.Event{ - Topic: topicService, - Key: service.ServiceName, - Index: changes.Index, - Payload: service, - }) - } - } - return events, nil - }, - Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) { - idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) - if err != nil { - return idx, err - } - - for _, node := range nodes { - event := stream.Event{ - Topic: req.Topic, - Key: req.Key, - Index: node.ModifyIndex, - Payload: node, - } - buffer.Append([]stream.Event{event}) - } - return idx, nil - }, - }, - stream.TopicInternal: { - ProcessChanges: aclChangeUnsubscribeEvent, - }, - } -} - -func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { - token := &structs.ACLToken{ - AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", - SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", - Description: "something", - Policies: []structs.ACLTokenPolicyLink{ - {ID: testPolicyID_A}, - }, - Roles: []structs.ACLTokenRoleLink{ - {ID: testRoleID_B}, - }, - } - token.SetHash(false) - - // If we subscribe immediately after we create a token we race with the - // publisher that is publishing the ACL token event for the token we just - // created. That means that the subscription we create right after will often - // be immediately reset. The most reliable way to avoid that without just - // sleeping for some arbitrary time is to pre-subscribe using the token before - // it actually exists (which works because the publisher doesn't check tokens - // it assumes something lower down did that) and then wait for it to be reset - // so we know the initial token write event has been sent out before - // continuing... - subscription := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, - } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) - s.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) - require.NoError(t, err) - - eventCh := testRunSub(sub) - - // Create the ACL token to be used in the subscription. - require.NoError(t, s.ACLTokenSet(2, token.Clone(), false)) - - // Wait for the pre-subscription to be reset - assertReset(t, eventCh, true) - - return token -} - -func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { - t.Parallel() - require := require.New(t) - store, err := NewStateStore(nil) - require.NoError(err) - - reg := structs.TestRegisterRequest(t) - reg.Service.ID = "web1" - require.NoError(store.EnsureRegistration(1, reg)) - - // Register the subscription. - subscription := &stream.SubscribeRequest{ - Topic: topicService, - Key: reg.Service.Service, - } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(store), 0) - store.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) - require.NoError(err) - - eventCh := testRunSub(sub) - - // Stream should get the instance and then EndOfSnapshot - e := assertEvent(t, eventCh) - srv := e.Payload.(*structs.ServiceNode) - require.Equal(srv.ServiceID, "web1") - e = assertEvent(t, eventCh) - require.True(e.IsEndOfSnapshot()) - - // Now subscriber should block waiting for updates - assertNoEvent(t, eventCh) - - // Add a new instance of service on a different node - reg.Node = "node2" - require.NoError(store.EnsureRegistration(1, reg)) - - // Subscriber should see registration - e = assertEvent(t, eventCh) - srv = e.Payload.(*structs.ServiceNode) - require.Equal(srv.Node, "node2") -} - -func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) { +func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -247,7 +29,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -311,7 +93,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) { require.Equal(stream.ErrSubscriptionReload, err) } -func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { +func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -328,7 +110,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -425,7 +207,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { assertReset(t, eventCh, true) } -func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { +func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) @@ -442,7 +224,7 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -504,3 +286,177 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { // Ensure the reload event was sent. assertReset(t, eventCh, false) } + +type nextResult struct { + Events []stream.Event + Err error +} + +func testRunSub(sub *stream.Subscription) <-chan nextResult { + eventCh := make(chan nextResult, 1) + go func() { + for { + es, err := sub.Next() + eventCh <- nextResult{ + Events: es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + t.Fatalf("got unwanted event: %#v", next.Events[0].Payload) + case <-time.After(100 * time.Millisecond): + } +} + +func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + return &next.Events[0] + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return nil +} + +func assertErr(t *testing.T, eventCh <-chan nextResult) error { + t.Helper() + select { + case next := <-eventCh: + require.Error(t, next.Err) + return next.Err + case <-time.After(100 * time.Millisecond): + t.Fatalf("no err after 100ms") + } + return nil +} + +// assertReset checks that a ResetStream event is send to the subscription +// within 100ms. If allowEOS is true it will ignore any intermediate events that +// come before the reset provided they are EndOfSnapshot events because in many +// cases it's non-deterministic whether the snapshot will complete before the +// acl reset is handled. +func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { + t.Helper() + timeoutCh := time.After(100 * time.Millisecond) + for { + select { + case next := <-eventCh: + if allowEOS { + if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() { + continue + } + } + require.Error(t, next.Err) + require.Equal(t, stream.ErrSubscriptionReload, next.Err) + return + case <-timeoutCh: + t.Fatalf("no err after 100ms") + } + } +} + +var topicService stream.Topic = 901 + +func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler { + return map[stream.Topic]stream.TopicHandler{ + topicService: { + ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + if change.Table == "services" { + service := change.After.(*structs.ServiceNode) + events = append(events, stream.Event{ + Topic: topicService, + Key: service.ServiceName, + Index: changes.Index, + Payload: service, + }) + } + } + return events, nil + }, + Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) { + idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) + if err != nil { + return idx, err + } + + for _, node := range nodes { + event := stream.Event{ + Topic: req.Topic, + Key: req.Key, + Index: node.ModifyIndex, + Payload: node, + } + buffer.Append([]stream.Event{event}) + } + return idx, nil + }, + }, + stream.TopicInternal: { + ProcessChanges: aclChangeUnsubscribeEvent, + }, + } +} + +func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { + token := &structs.ACLToken{ + AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", + SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", + Description: "something", + Policies: []structs.ACLTokenPolicyLink{ + {ID: testPolicyID_A}, + }, + Roles: []structs.ACLTokenRoleLink{ + {ID: testRoleID_B}, + }, + } + token.SetHash(false) + + // If we subscribe immediately after we create a token we race with the + // publisher that is publishing the ACL token event for the token we just + // created. That means that the subscription we create right after will often + // be immediately reset. The most reliable way to avoid that without just + // sleeping for some arbitrary time is to pre-subscribe using the token before + // it actually exists (which works because the publisher doesn't check tokens + // it assumes something lower down did that) and then wait for it to be reset + // so we know the initial token write event has been sent out before + // continuing... + subscription := &stream.SubscribeRequest{ + Topic: topicService, + Key: "nope", + Token: token.SecretID, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + s.db.publisher = publisher + sub, err := publisher.Subscribe(ctx, subscription) + require.NoError(t, err) + + eventCh := testRunSub(sub) + + // Create the ACL token to be used in the subscription. + require.NoError(t, s.ACLTokenSet(2, token.Clone(), false)) + + // Wait for the pre-subscription to be reset + assertReset(t, eventCh, true) + + return token +} diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index d7d84fcbb4..effcaa8c5e 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -5,8 +5,8 @@ import ( ) // newTopicHandlers returns the default handlers for state change events. -func newTopicHandlers() map[stream.Topic]TopicHandler { - return map[stream.Topic]TopicHandler{ +func newTopicHandlers() map[stream.Topic]stream.TopicHandler { + return map[stream.Topic]stream.TopicHandler{ // TopicInternal is a special case for processors that handle events that are // not for subscribers. They are used by the stream package. stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent}, diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index d05537488e..ef6265a565 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -4,6 +4,7 @@ type Topic int32 // TODO: remove underscores // TODO: type string instead of int? +// TODO: define non-internal topics in state package? const ( TopicInternal Topic = 0 Topic_ServiceHealth Topic = 1 diff --git a/agent/consul/state/event_publisher.go b/agent/consul/stream/event_publisher.go similarity index 80% rename from agent/consul/state/event_publisher.go rename to agent/consul/stream/event_publisher.go index b1fb22960d..e56612d350 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -1,4 +1,4 @@ -package state +package stream import ( "context" @@ -7,7 +7,6 @@ import ( "time" "github.com/hashicorp/consul/agent/consul/state/db" - "github.com/hashicorp/consul/agent/consul/stream" ) // EventPublisher receives changes events from Publish, and sends them to all @@ -34,11 +33,11 @@ type EventPublisher struct { // topicBuffers stores the head of the linked-list buffer to publish events to // for a topic. - topicBuffers map[stream.Topic]*stream.EventBuffer + topicBuffers map[Topic]*EventBuffer // snapCache if a cache of EventSnapshots indexed by topic and key. // TODO: new struct for snapCache and snapFns and snapCacheTTL - snapCache map[stream.Topic]map[string]*stream.EventSnapshot + snapCache map[Topic]map[string]*EventSnapshot subscriptions *subscriptions @@ -47,7 +46,7 @@ type EventPublisher struct { // the Commit call in the FSM hot path. publishCh chan changeEvents - handlers map[stream.Topic]TopicHandler + handlers map[Topic]TopicHandler } type subscriptions struct { @@ -60,21 +59,21 @@ type subscriptions struct { // When the token is modified all subscriptions under that token will be // reloaded. // A subscription may be unsubscribed by using the pointer to the request. - byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription + byToken map[string]map[*SubscribeRequest]*Subscription } type changeEvents struct { - events []stream.Event + events []Event } // TopicHandler provides functions which create stream.Events for a topic. type TopicHandler struct { // Snapshot creates the necessary events to reproduce the current state and // appends them to the EventBuffer. - Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) + Snapshot func(*SubscribeRequest, *EventBuffer) (index uint64, err error) // ProcessChanges accepts a slice of Changes, and builds a slice of events for // those changes. - ProcessChanges func(db.ReadTxn, db.Changes) ([]stream.Event, error) + ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error) } // NewEventPublisher returns an EventPublisher for publishing change events. @@ -82,14 +81,14 @@ type TopicHandler struct { // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, - topicBuffers: make(map[stream.Topic]*stream.EventBuffer), - snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), + topicBuffers: make(map[Topic]*EventBuffer), + snapCache: make(map[Topic]map[string]*EventSnapshot), publishCh: make(chan changeEvents, 64), subscriptions: &subscriptions{ - byToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), + byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, handlers: handlers, } @@ -99,13 +98,13 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl return e } -// PublishChanges to all subscribers. tx is a read-only transaction that may be -// used from a goroutine. The caller should never use the tx once it has been -// passed to PublishChanged. +// PublishChanges to all subscribers. tx is a read-only transaction that captures +// the state at the time the change happened. The caller must never use the tx once +// it has been passed to PublishChanged. func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { defer tx.Abort() - var events []stream.Event + var events []Event for topic, handler := range e.handlers { if handler.ProcessChanges != nil { es, err := handler.ProcessChanges(tx, changes) @@ -137,14 +136,14 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { // as any ACL update events to cause affected listeners to reset their stream. func (e *EventPublisher) sendEvents(update changeEvents) { for _, event := range update.events { - if unsubEvent, ok := event.Payload.(stream.UnsubscribePayload); ok { + if unsubEvent, ok := event.Payload.(UnsubscribePayload); ok { e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs) } } - eventsByTopic := make(map[stream.Topic][]stream.Event) + eventsByTopic := make(map[Topic][]Event) for _, event := range update.events { - if event.Topic == stream.TopicInternal { + if event.Topic == TopicInternal { continue } eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) @@ -161,10 +160,10 @@ func (e *EventPublisher) sendEvents(update changeEvents) { // already exist. // // EventPublisher.lock must be held to call this method. -func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer { +func (e *EventPublisher) getTopicBuffer(topic Topic) *EventBuffer { buf, ok := e.topicBuffers[topic] if !ok { - buf = stream.NewEventBuffer() + buf = NewEventBuffer() e.topicBuffers[topic] = buf } return buf @@ -181,11 +180,11 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer // call Subscription.Unsubscribe to free ACL tracking resources. func (e *EventPublisher) Subscribe( ctx context.Context, - req *stream.SubscribeRequest, -) (*stream.Subscription, error) { + req *SubscribeRequest, +) (*Subscription, error) { // Ensure we know how to make a snapshot for this topic _, ok := e.handlers[req.Topic] - if !ok || req.Topic == stream.TopicInternal { + if !ok || req.Topic == TopicInternal { return nil, fmt.Errorf("unknown topic %d", req.Topic) } @@ -198,26 +197,26 @@ func (e *EventPublisher) Subscribe( // See if we need a snapshot topicHead := buf.Head() - var sub *stream.Subscription + var sub *Subscription if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { // No need for a snapshot, send the "resume stream" message to signal to // client it's cache is still good. (note that this can be distinguished // from a legitimate empty snapshot due to the index matching the one the // client sent), then follow along from here in the topic. - e := stream.Event{ + e := Event{ Index: req.Index, Topic: req.Topic, Key: req.Key, - Payload: stream.ResumeStream{}, + Payload: ResumeStream{}, } // Make a new buffer to send to the client containing the resume. - buf := stream.NewEventBuffer() + buf := NewEventBuffer() // Store the head of that buffer before we append to it to give as the // starting point for the subscription. subHead := buf.Head() - buf.Append([]stream.Event{e}) + buf.Append([]Event{e}) // Now splice the rest of the topic buffer on so the subscription will // continue to see future updates in the topic buffer. @@ -227,13 +226,13 @@ func (e *EventPublisher) Subscribe( } buf.AppendBuffer(follow) - sub = stream.NewSubscription(ctx, req, subHead) + sub = NewSubscription(ctx, req, subHead) } else { snap, err := e.getSnapshotLocked(req, topicHead) if err != nil { return nil, err } - sub = stream.NewSubscription(ctx, req, snap.Snap) + sub = NewSubscription(ctx, req, snap.Snap) } e.subscriptions.add(req, sub) @@ -246,13 +245,13 @@ func (e *EventPublisher) Subscribe( return sub, nil } -func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscription) { +func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { s.lock.Lock() defer s.lock.Unlock() subsByToken, ok := s.byToken[req.Token] if !ok { - subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription) + subsByToken = make(map[*SubscribeRequest]*Subscription) s.byToken[req.Token] = subsByToken } subsByToken[req] = sub @@ -275,7 +274,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { // subscription to free resources monitoring changes in it's ACL token. // // req MUST be the same pointer that was used to register the subscription. -func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) { +func (s *subscriptions) unsubscribe(req *SubscribeRequest) { s.lock.Lock() defer s.lock.Unlock() @@ -289,11 +288,11 @@ func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) { } } -func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) { +func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *BufferItem) (*EventSnapshot, error) { // See if there is a cached snapshot topicSnaps, ok := e.snapCache[req.Topic] if !ok { - topicSnaps = make(map[string]*stream.EventSnapshot) + topicSnaps = make(map[string]*EventSnapshot) e.snapCache[req.Topic] = topicSnaps } @@ -308,7 +307,7 @@ func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHe return nil, fmt.Errorf("unknown topic %d", req.Topic) } - snap = stream.NewEventSnapshot(req, topicHead, handler.Snapshot) + snap = NewEventSnapshot(req, topicHead, handler.Snapshot) if e.snapCacheTTL > 0 { topicSnaps[req.Key] = snap diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go new file mode 100644 index 0000000000..5f9df33bab --- /dev/null +++ b/agent/consul/stream/event_publisher_test.go @@ -0,0 +1,115 @@ +package stream + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/agent/consul/state/db" + "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +var testTopic Topic = 999 + +func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { + subscription := &SubscribeRequest{ + Topic: testTopic, + Key: "sub-key", + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0) + sub, err := publisher.Subscribe(ctx, subscription) + require.NoError(t, err) + eventCh := consumeSubscription(sub) + + result := nextResult(t, eventCh) + require.NoError(t, result.Err) + expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}} + require.Equal(t, expected, result.Events) + + result = nextResult(t, eventCh) + require.Len(t, result.Events, 1) + require.True(t, result.Events[0].IsEndOfSnapshot()) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{}) + require.NoError(t, err) + + // Subscriber should see the published event + result = nextResult(t, eventCh) + require.NoError(t, result.Err) + expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}} + require.Equal(t, expected, result.Events) +} + +func newTestTopicHandlers() map[Topic]TopicHandler { + return map[Topic]TopicHandler{ + testTopic: { + Snapshot: func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) { + if req.Topic != testTopic { + return 0, fmt.Errorf("unexpected topic: %v", req.Topic) + } + buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) + return 1, nil + }, + ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) { + events := []Event{{ + Topic: testTopic, + Key: "sub-key", + Payload: "the-published-event-payload", + }} + return events, nil + }, + }, + } +} + +func consumeSubscription(sub *Subscription) <-chan subNextResult { + eventCh := make(chan subNextResult, 1) + go func() { + for { + es, err := sub.Next() + eventCh <- subNextResult{ + Events: es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +type subNextResult struct { + Events []Event + Err error +} + +func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult { + t.Helper() + select { + case next := <-eventCh: + return next + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return subNextResult{} +} + +func assertNoResult(t *testing.T, eventCh <-chan subNextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + t.Fatalf("received unexpected event: %#v", next.Events[0].Payload) + case <-time.After(100 * time.Millisecond): + } +}