|
|
|
@ -392,6 +392,81 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) {
|
|
|
|
|
req := &SubscribeRequest{
|
|
|
|
|
Topic: testTopic,
|
|
|
|
|
Key: "sub-key",
|
|
|
|
|
Index: 1,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nextEvent := Event{
|
|
|
|
|
Topic: testTopic,
|
|
|
|
|
Index: 3,
|
|
|
|
|
Payload: simplePayload{key: "sub-key", value: "event-3"},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
handlers := SnapshotHandlers{
|
|
|
|
|
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
|
|
|
|
if req.Topic != testTopic {
|
|
|
|
|
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
|
|
|
|
|
}
|
|
|
|
|
buf.Append([]Event{testSnapshotEvent})
|
|
|
|
|
buf.Append([]Event{nextEvent})
|
|
|
|
|
return 3, nil
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
publisher := NewEventPublisher(handlers, time.Second)
|
|
|
|
|
go publisher.Run(ctx)
|
|
|
|
|
// Include the same events in the topicBuffer
|
|
|
|
|
publisher.publishEvent([]Event{testSnapshotEvent})
|
|
|
|
|
publisher.publishEvent([]Event{nextEvent})
|
|
|
|
|
|
|
|
|
|
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
|
|
|
|
sub, err := publisher.Subscribe(req)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
defer sub.Unsubscribe()
|
|
|
|
|
|
|
|
|
|
eventCh := runSubscription(ctx, sub)
|
|
|
|
|
next := getNextEvent(t, eventCh)
|
|
|
|
|
require.True(t, next.IsNewSnapshotToFollow(), next)
|
|
|
|
|
|
|
|
|
|
next = getNextEvent(t, eventCh)
|
|
|
|
|
require.Equal(t, testSnapshotEvent, next)
|
|
|
|
|
|
|
|
|
|
next = getNextEvent(t, eventCh)
|
|
|
|
|
require.Equal(t, nextEvent, next)
|
|
|
|
|
|
|
|
|
|
next = getNextEvent(t, eventCh)
|
|
|
|
|
require.True(t, next.IsEndOfSnapshot(), next)
|
|
|
|
|
require.Equal(t, uint64(3), next.Index)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
|
|
|
|
|
return 0, fmt.Errorf("error should not be seen, cache should have been used")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runStep(t, "resume the subscription", func(t *testing.T) {
|
|
|
|
|
newReq := *req
|
|
|
|
|
newReq.Index = 0
|
|
|
|
|
sub, err := publisher.Subscribe(&newReq)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
eventCh := runSubscription(ctx, sub)
|
|
|
|
|
next := getNextEvent(t, eventCh)
|
|
|
|
|
require.Equal(t, testSnapshotEvent, next)
|
|
|
|
|
|
|
|
|
|
next = getNextEvent(t, eventCh)
|
|
|
|
|
require.Equal(t, nextEvent, next)
|
|
|
|
|
|
|
|
|
|
next = getNextEvent(t, eventCh)
|
|
|
|
|
require.True(t, next.IsEndOfSnapshot())
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|
|
|
|
t.Helper()
|
|
|
|
|
if !t.Run(name, fn) {
|
|
|
|
|