From fc1c2ae412b273b868c311fb785ebd4e030f4c72 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 6 Jul 2020 20:04:24 -0400 Subject: [PATCH] stream: change Topic to an interface Consumers of the package can decide on which type to use for the Topic. In the future we may use a gRPC type for the topic. --- agent/consul/state/memdb.go | 7 +++++++ agent/consul/state/store_integration_test.go | 2 +- agent/consul/stream/event.go | 15 ++++++--------- agent/consul/stream/event_buffer_test.go | 2 +- agent/consul/stream/event_publisher.go | 4 ++-- agent/consul/stream/event_publisher_test.go | 8 +++++++- agent/consul/stream/event_snapshot_test.go | 2 +- agent/consul/stream/subscription_test.go | 6 +++--- 8 files changed, 28 insertions(+), 18 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index b0dc217173..5ec7e44bac 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -140,6 +140,13 @@ func (tx *txn) Commit() error { return nil } +// TODO: may be replaced by a gRPC type. +type topic string + +func (t topic) String() string { + return string(t) +} + func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { // TODO: add other table handlers here. return aclChangeUnsubscribeEvent(tx, changes) diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 857a3a1e18..2482cde535 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -369,7 +369,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } -var topicService stream.Topic = 901 +var topicService stream.Topic = topic("test-topic-service") func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 2bd11fd5a1..d78c29e43c 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -4,17 +4,14 @@ to the state store. */ package stream -type Topic int32 +import "fmt" -// TODO: remove underscores -// TODO: type string instead of int? -// TODO: move topics to state package? -const ( - Topic_ServiceHealth Topic = 1 - Topic_ServiceHealthConnect Topic = 2 -) +// Topic is an identifier that partitions events. A subscription will only receive +// events which match the Topic. +type Topic fmt.Stringer -// TODO: +// Event is a structure with identifiers and a payload. Events are Published to +// EventPublisher and returned to Subscribers. type Event struct { Topic Topic Key string diff --git a/agent/consul/stream/event_buffer_test.go b/agent/consul/stream/event_buffer_test.go index f8ca7b7b7d..bb27b91a40 100644 --- a/agent/consul/stream/event_buffer_test.go +++ b/agent/consul/stream/event_buffer_test.go @@ -38,7 +38,7 @@ func TestEventBufferFuzz(t *testing.T) { // streaming - here we only care about the semantics of the buffer. e := Event{ Index: uint64(i), // Indexes should be contiguous - Topic: Topic_ServiceHealth, + Topic: testTopic, } b.Append([]Event{e}) // Sleep sometimes for a while to let some subscribers catch up diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index bc83ce8eb0..0b3e7c2a7a 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -169,7 +169,7 @@ func (e *EventPublisher) Subscribe( // Ensure we know how to make a snapshot for this topic _, ok := e.snapshotHandlers[req.Topic] if !ok { - return nil, fmt.Errorf("unknown topic %d", req.Topic) + return nil, fmt.Errorf("unknown topic %v", req.Topic) } e.lock.Lock() @@ -286,7 +286,7 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf handler, ok := e.snapshotHandlers[req.Topic] if !ok { - return nil, fmt.Errorf("unknown topic %d", req.Topic) + return nil, fmt.Errorf("unknown topic %v", req.Topic) } snap = newEventSnapshot(req, topicHead, handler) diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 8988562f1e..4bf4bd27ee 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -9,7 +9,13 @@ import ( "github.com/stretchr/testify/require" ) -var testTopic Topic = 999 +type intTopic int + +func (i intTopic) String() string { + return fmt.Sprintf("%d", i) +} + +var testTopic Topic = intTopic(999) func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { subscription := &SubscribeRequest{ diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 8b2185de2b..923e864ce3 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -170,7 +170,7 @@ func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc { func newDefaultHealthEvent(index uint64, n int) Event { return Event{ Index: index, - Topic: Topic_ServiceHealth, + Topic: testTopic, Payload: fmt.Sprintf("test-event-%03d", n), } } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 2a5eb0654c..9e4ab5269b 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -23,7 +23,7 @@ func TestSubscription(t *testing.T) { // Create a subscription req := &SubscribeRequest{ - Topic: Topic_ServiceHealth, + Topic: testTopic, Key: "test", } sub := newSubscription(ctx, req, startHead) @@ -103,7 +103,7 @@ func TestSubscription_Close(t *testing.T) { // Create a subscription req := &SubscribeRequest{ - Topic: Topic_ServiceHealth, + Topic: testTopic, Key: "test", } sub := newSubscription(ctx, req, startHead) @@ -141,7 +141,7 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) { // but enough to test subscription mechanics. e := Event{ Index: index, - Topic: Topic_ServiceHealth, + Topic: testTopic, Key: key, } b.Append([]Event{e})