state: Move change processing out of EventPublisher

EventPublisher was receiving TopicHandlers, which had a couple of
problems:

- ChangeProcessors were being grouped by Topic, but they completely
  ignored the topic and were performed on every change
- ChangeProcessors required EventPublisher to be aware of database
  changes

By moving ChangeProcesors out of EventPublisher, and having Publish
accept events instead of changes, EventPublisher no longer needs to
be aware of these things.

Handlers is now only SnapshotHandlers, which are still mapped by Topic.

Also allows us to remove the small 'db' package that had only two types.
They can now be unexported types in state.
pull/8160/head
Daniel Nephin 2020-07-06 18:44:51 -04:00
parent 48c766d2c6
commit 6fa36e3aee
11 changed files with 106 additions and 161 deletions

View File

@ -1,7 +1,6 @@
package state package state
import ( import (
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
@ -11,7 +10,7 @@ import (
// are used to unsubscribe any subscriptions which match the tokens from the events. // are used to unsubscribe any subscriptions which match the tokens from the events.
// //
// These are special events that will never be returned to a subscriber. // These are special events that will never be returned to a subscriber.
func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { func aclChangeUnsubscribeEvent(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var secretIDs []string var secretIDs []string
for _, change := range changes.Changes { for _, change := range changes.Changes {

View File

@ -5,7 +5,6 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -128,7 +127,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
// Note we call the func under test directly rather than publishChanges so // Note we call the func under test directly rather than publishChanges so
// we can test this in isolation. // we can test this in isolation.
events, err := aclChangeUnsubscribeEvent(tx, db.Changes{Index: 100, Changes: tx.Changes()}) events, err := aclChangeUnsubscribeEvent(tx, Changes{Index: 100, Changes: tx.Changes()})
require.NoError(t, err) require.NoError(t, err)
require.Len(t, events, 1) require.Len(t, events, 1)

View File

@ -5,7 +5,6 @@ package state
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
) )
@ -290,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re
return tx.Get("acl-tokens", "local", false) return tx.Get("acl-tokens", "local", false)
} }
func aclTokenListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "policies", policy) return tx.Get("acl-tokens", "policies", policy)
} }
func aclTokenListByRole(tx db.ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "roles", role) return tx.Get("acl-tokens", "roles", role)
} }
@ -356,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte
return tx.Get("acl-roles", "id") return tx.Get("acl-roles", "id")
} }
func aclRoleListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-roles", "policies", policy) return tx.Get("acl-roles", "policies", policy)
} }

View File

@ -1,17 +0,0 @@
package db
import "github.com/hashicorp/go-memdb"
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
Abort()
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}

View File

@ -1,20 +1,37 @@
package state package state
import ( import (
"github.com/hashicorp/consul/agent/consul/state/db" "fmt"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
Abort()
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are // all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events. // sent to the eventPublisher which will create and emit change events.
type changeTrackerDB struct { type changeTrackerDB struct {
db *memdb.MemDB db *memdb.MemDB
publisher changePublisher publisher eventPublisher
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
} }
type changePublisher interface { type eventPublisher interface {
PublishChanges(tx db.ReadTxn, changes db.Changes) error PublishEvents(events []stream.Event)
} }
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
@ -50,19 +67,26 @@ func (c *changeTrackerDB) ReadTxn() *txn {
// data directly into the DB. These cases may use WriteTxnRestore. // data directly into the DB. These cases may use WriteTxnRestore.
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{ t := &txn{
Txn: c.db.Txn(true), Txn: c.db.Txn(true),
Index: idx, Index: idx,
publish: func(changes db.Changes) error { publish: c.publish,
// publish provides a new read-only Txn to PublishChanges so that
// events can be constructed from the current state at the time of
// Commit.
return c.publisher.PublishChanges(c.db.Txn(false), changes)
},
} }
t.Txn.TrackChanges() t.Txn.TrackChanges()
return t return t
} }
func (c *changeTrackerDB) publish(changes Changes) error {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
events, err := c.processChanges(readOnlyTx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.PublishEvents(events)
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change // WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to // tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes. // replace the entire contents of the Store without a need to track the changes.
@ -89,7 +113,7 @@ type txn struct {
// Index is stored so that it may be passed along to any subscribers as part // Index is stored so that it may be passed along to any subscribers as part
// of a change event. // of a change event.
Index uint64 Index uint64
publish func(changes db.Changes) error publish func(changes Changes) error
} }
// Commit first pushes changes to EventPublisher, then calls Commit on the // Commit first pushes changes to EventPublisher, then calls Commit on the
@ -103,7 +127,7 @@ func (tx *txn) Commit() error {
// In those cases changes should also be empty, and there will be nothing // In those cases changes should also be empty, and there will be nothing
// to publish. // to publish.
if tx.publish != nil { if tx.publish != nil {
changes := db.Changes{ changes := Changes{
Index: tx.Index, Index: tx.Index,
Changes: tx.Txn.Changes(), Changes: tx.Txn.Changes(),
} }
@ -115,3 +139,12 @@ func (tx *txn) Commit() error {
tx.Txn.Commit() tx.Txn.Commit()
return nil return nil
} }
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: add other table handlers here.
return aclChangeUnsubscribeEvent(tx, changes)
}
func newSnapshotHandlers() stream.SnapshotHandlers {
return stream.SnapshotHandlers{}
}

View File

@ -167,8 +167,9 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
kvsGraveyard: NewGraveyard(gc), kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(), lockDelay: NewDelay(),
db: &changeTrackerDB{ db: &changeTrackerDB{
db: db, db: db,
publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second),
processChanges: processDBChanges,
}, },
stopEventPublisher: cancel, stopEventPublisher: cancel,
} }

View File

@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -29,7 +28,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err) require.NoError(err)
@ -110,7 +109,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err) require.NoError(err)
@ -224,7 +223,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err) require.NoError(err)
@ -372,44 +371,24 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
var topicService stream.Topic = 901 var topicService stream.Topic = 901
func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler { func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return map[stream.Topic]stream.TopicHandler{ return stream.SnapshotHandlers{
topicService: { topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
var events []stream.Event if err != nil {
for _, change := range changes.Changes { return idx, err
if change.Table == "services" { }
service := change.After.(*structs.ServiceNode)
events = append(events, stream.Event{
Topic: topicService,
Key: service.ServiceName,
Index: changes.Index,
Payload: service,
})
}
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
}
for _, node := range nodes { for _, node := range nodes {
event := stream.Event{ event := stream.Event{
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Key: req.Key,
Index: node.ModifyIndex, Index: node.ModifyIndex,
Payload: node, Payload: node,
}
snap.Append([]stream.Event{event})
} }
return idx, nil snap.Append([]stream.Event{event})
}, }
}, return idx, nil
stream.TopicInternal: {
ProcessChanges: aclChangeUnsubscribeEvent,
}, },
} }
} }
@ -445,7 +424,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err) require.NoError(t, err)

View File

@ -1,14 +0,0 @@
package state
import (
"github.com/hashicorp/consul/agent/consul/stream"
)
// newTopicHandlers returns the default handlers for state change events.
func newTopicHandlers() map[stream.Topic]stream.TopicHandler {
return map[stream.Topic]stream.TopicHandler{
// TopicInternal is a special case for processors that handle events that are
// not for subscribers. They are used by the stream package.
stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent},
}
}

View File

@ -8,9 +8,8 @@ type Topic int32
// TODO: remove underscores // TODO: remove underscores
// TODO: type string instead of int? // TODO: type string instead of int?
// TODO: define non-internal topics in state package? // TODO: move topics to state package?
const ( const (
TopicInternal Topic = 0
Topic_ServiceHealth Topic = 1 Topic_ServiceHealth Topic = 1
Topic_ServiceHealthConnect Topic = 2 Topic_ServiceHealthConnect Topic = 2
) )

View File

@ -5,8 +5,6 @@ import (
"fmt" "fmt"
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/agent/consul/state/db"
) )
// EventPublisher receives changes events from Publish, and sends them to all // EventPublisher receives changes events from Publish, and sends them to all
@ -46,7 +44,7 @@ type EventPublisher struct {
// the Commit call in the FSM hot path. // the Commit call in the FSM hot path.
publishCh chan changeEvents publishCh chan changeEvents
handlers map[Topic]TopicHandler snapshotHandlers SnapshotHandlers
} }
type subscriptions struct { type subscriptions struct {
@ -66,15 +64,9 @@ type changeEvents struct {
events []Event events []Event
} }
// TopicHandler provides functions which create stream.Events for a topic. // SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
type TopicHandler struct { // of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// Snapshot creates the necessary events to reproduce the current state and type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
// appends them to the eventBuffer.
Snapshot func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
// ProcessChanges accepts a slice of Changes, and builds a slice of events for
// those changes.
ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error)
}
// SnapshotAppender appends groups of events to create a Snapshot of state. // SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotAppender interface { type SnapshotAppender interface {
@ -88,7 +80,7 @@ type SnapshotAppender interface {
// A goroutine is run in the background to publish events to all subscribes. // A goroutine is run in the background to publish events to all subscribes.
// Cancelling the context will shutdown the goroutine, to free resources, // Cancelling the context will shutdown the goroutine, to free resources,
// and stop all publishing. // and stop all publishing.
func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{ e := &EventPublisher{
snapCacheTTL: snapCacheTTL, snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer), topicBuffers: make(map[Topic]*eventBuffer),
@ -97,7 +89,7 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna
subscriptions: &subscriptions{ subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription), byToken: make(map[string]map[*SubscribeRequest]*Subscription),
}, },
handlers: handlers, snapshotHandlers: handlers,
} }
go e.handleUpdates(ctx) go e.handleUpdates(ctx)
@ -105,25 +97,13 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna
return e return e
} }
// PublishChanges to all subscribers. tx is a read-only transaction that captures // PublishEvents to all subscribers. tx is a read-only transaction that captures
// the state at the time the change happened. The caller must never use the tx once // the state at the time the change happened. The caller must never use the tx once
// it has been passed to PublishChanged. // it has been passed to PublishChanged.
func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { func (e *EventPublisher) PublishEvents(events []Event) {
defer tx.Abort() if len(events) > 0 {
e.publishCh <- changeEvents{events: events}
var events []Event
for topic, handler := range e.handlers {
if handler.ProcessChanges != nil {
es, err := handler.ProcessChanges(tx, changes)
if err != nil {
return fmt.Errorf("failed generating events for topic %q: %s", topic, err)
}
events = append(events, es...)
}
} }
e.publishCh <- changeEvents{events: events}
return nil
} }
func (e *EventPublisher) handleUpdates(ctx context.Context) { func (e *EventPublisher) handleUpdates(ctx context.Context) {
@ -150,9 +130,6 @@ func (e *EventPublisher) sendEvents(update changeEvents) {
eventsByTopic := make(map[Topic][]Event) eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events { for _, event := range update.events {
if event.Topic == TopicInternal {
continue
}
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
} }
@ -190,8 +167,8 @@ func (e *EventPublisher) Subscribe(
req *SubscribeRequest, req *SubscribeRequest,
) (*Subscription, error) { ) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic // Ensure we know how to make a snapshot for this topic
_, ok := e.handlers[req.Topic] _, ok := e.snapshotHandlers[req.Topic]
if !ok || req.Topic == TopicInternal { if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic) return nil, fmt.Errorf("unknown topic %d", req.Topic)
} }
@ -296,7 +273,6 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
} }
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) { func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
// See if there is a cached snapshot
topicSnaps, ok := e.snapCache[req.Topic] topicSnaps, ok := e.snapCache[req.Topic]
if !ok { if !ok {
topicSnaps = make(map[string]*eventSnapshot) topicSnaps = make(map[string]*eventSnapshot)
@ -308,13 +284,12 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
return snap, nil return snap, nil
} }
// No snap or errored snap in cache, create a new one handler, ok := e.snapshotHandlers[req.Topic]
handler, ok := e.handlers[req.Topic]
if !ok { if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic) return nil, fmt.Errorf("unknown topic %d", req.Topic)
} }
snap = newEventSnapshot(req, topicHead, handler.Snapshot) snap = newEventSnapshot(req, topicHead, handler)
if e.snapCacheTTL > 0 { if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap topicSnaps[req.Key] = snap

View File

@ -6,8 +6,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -21,7 +19,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0) publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err) require.NoError(t, err)
eventCh := consumeSubscription(sub) eventCh := consumeSubscription(sub)
@ -38,8 +36,12 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
// Now subscriber should block waiting for updates // Now subscriber should block waiting for updates
assertNoResult(t, eventCh) assertNoResult(t, eventCh)
err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{}) events := []Event{{
require.NoError(t, err) Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
}}
publisher.PublishEvents(events)
// Subscriber should see the published event // Subscriber should see the published event
result = nextResult(t, eventCh) result = nextResult(t, eventCh)
@ -48,24 +50,14 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
require.Equal(t, expected, result.Events) require.Equal(t, expected, result.Events)
} }
func newTestTopicHandlers() map[Topic]TopicHandler { func newTestSnapshotHandlers() SnapshotHandlers {
return map[Topic]TopicHandler{ return SnapshotHandlers{
testTopic: { testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
Snapshot: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { if req.Topic != testTopic {
if req.Topic != testTopic { return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
return 0, fmt.Errorf("unexpected topic: %v", req.Topic) }
} buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) return 1, nil
return 1, nil
},
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) {
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
}}
return events, nil
},
}, },
} }
} }