From 9b3c6da9df06237f53c62ae3d9eea21c2aeffebf Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 16 Feb 2021 11:54:51 -0500 Subject: [PATCH] stream: test the snapshot cache is saved correctly when the cache entry is created from resuming a stream. --- agent/consul/stream/event_publisher_test.go | 75 +++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 576d4ccc35..2967ef8d3d 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -392,6 +392,81 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin }) } +func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) { + req := &SubscribeRequest{ + Topic: testTopic, + Key: "sub-key", + Index: 1, + } + + nextEvent := Event{ + Topic: testTopic, + Index: 3, + Payload: simplePayload{key: "sub-key", value: "event-3"}, + } + + handlers := SnapshotHandlers{ + testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { + if req.Topic != testTopic { + return 0, fmt.Errorf("unexpected topic: %v", req.Topic) + } + buf.Append([]Event{testSnapshotEvent}) + buf.Append([]Event{nextEvent}) + return 3, nil + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := NewEventPublisher(handlers, time.Second) + go publisher.Run(ctx) + // Include the same events in the topicBuffer + publisher.publishEvent([]Event{testSnapshotEvent}) + publisher.publishEvent([]Event{nextEvent}) + + runStep(t, "start a subscription and unsub", func(t *testing.T) { + sub, err := publisher.Subscribe(req) + require.NoError(t, err) + defer sub.Unsubscribe() + + eventCh := runSubscription(ctx, sub) + next := getNextEvent(t, eventCh) + require.True(t, next.IsNewSnapshotToFollow(), next) + + next = getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) + + next = getNextEvent(t, eventCh) + require.Equal(t, nextEvent, next) + + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot(), next) + require.Equal(t, uint64(3), next.Index) + }) + + publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) { + return 0, fmt.Errorf("error should not be seen, cache should have been used") + } + + runStep(t, "resume the subscription", func(t *testing.T) { + newReq := *req + newReq.Index = 0 + sub, err := publisher.Subscribe(&newReq) + require.NoError(t, err) + + eventCh := runSubscription(ctx, sub) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) + + next = getNextEvent(t, eventCh) + require.Equal(t, nextEvent, next) + + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) + }) +} + func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.Helper() if !t.Run(name, fn) {