From 5de4d5bbe38f01fc2e7574e9df22002c9e2624ac Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 14 Jul 2020 19:23:44 -0400 Subject: [PATCH] stream: have SnapshotFunc accept a non-pointer SubscribeRequest The value is not expected to be modified. Passing a value makes that explicit. --- agent/consul/state/store_integration_test.go | 2 +- agent/consul/stream/event_publisher.go | 6 +++++- agent/consul/stream/event_publisher_test.go | 4 ++-- agent/consul/stream/event_snapshot.go | 6 ++---- agent/consul/stream/event_snapshot_test.go | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 83a978bb0a..6b2e9d1fe6 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -376,7 +376,7 @@ var topicService stream.Topic = topic("test-topic-service") func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { + topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) if err != nil { return idx, err diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 9dfb8bf9e5..815a68a261 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -61,7 +61,11 @@ type changeEvents struct { // SnapshotHandlers is a mapping of Topic to a function which produces a snapshot // of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. // The nil Topic is reserved and should not be used. -type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) +type SnapshotHandlers map[Topic]SnapshotFunc + +// SnapshotFunc builds a snapshot for the subscription request, and appends the +// events to the Snapshot using SnapshotAppender. +type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error) // SnapshotAppender appends groups of events to create a Snapshot of state. type SnapshotAppender interface { diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 4deeb1503e..4448e68454 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -58,7 +58,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ - testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { + testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { if req.Topic != testTopic { return 0, fmt.Errorf("unexpected topic: %v", req.Topic) } @@ -117,7 +117,7 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { t.Cleanup(cancel) handlers := newTestSnapshotHandlers() - fn := func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { + fn := func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { return 0, nil } handlers[intTopic(22)] = fn diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 12a52ea37b..2f0d276f78 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -18,8 +18,6 @@ type eventSnapshot struct { snapBuffer *eventBuffer } -type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) - // newEventSnapshot creates a snapshot buffer based on the subscription request. // The current buffer head for the topic requested is passed so that once the // snapshot is complete and has been delivered into the buffer, any events @@ -27,7 +25,7 @@ type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) // missed. Once the snapshot is delivered the topic buffer is spliced onto the // snapshot buffer so that subscribers will naturally follow from the snapshot // to wait for any subsequent updates. -func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot { +func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot { buf := newEventBuffer() s := &eventSnapshot{ Head: buf.Head(), @@ -35,7 +33,7 @@ func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn sna } go func() { - idx, err := fn(req, s.snapBuffer) + idx, err := fn(*req, s.snapBuffer) if err != nil { s.snapBuffer.AppendItem(&bufferItem{Err: err}) return diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 5e62e7f94f..c888e844ab 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -161,8 +161,8 @@ func genSequentialIDs(start, end int) []string { return ids } -func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc { - return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { +func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapshotFunc { + return func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { for i := 0; i < size; i++ { // Event content is arbitrary we are just using Health because it's the // first type defined. We just want a set of things with consecutive