stream: change Topic to an interface

Consumers of the package can decide on which type to use for the Topic. In the future we may
use a gRPC type for the topic.
pull/8160/head
Daniel Nephin 2020-07-06 20:04:24 -04:00
parent 6fa36e3aee
commit fc1c2ae412
8 changed files with 28 additions and 18 deletions

View File

@ -140,6 +140,13 @@ func (tx *txn) Commit() error {
return nil
}
// TODO: may be replaced by a gRPC type.
type topic string
func (t topic) String() string {
return string(t)
}
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: add other table handlers here.
return aclChangeUnsubscribeEvent(tx, changes)

View File

@ -369,7 +369,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
}
}
var topicService stream.Topic = 901
var topicService stream.Topic = topic("test-topic-service")
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{

View File

@ -4,17 +4,14 @@ to the state store.
*/
package stream
type Topic int32
import "fmt"
// TODO: remove underscores
// TODO: type string instead of int?
// TODO: move topics to state package?
const (
Topic_ServiceHealth Topic = 1
Topic_ServiceHealthConnect Topic = 2
)
// Topic is an identifier that partitions events. A subscription will only receive
// events which match the Topic.
type Topic fmt.Stringer
// TODO:
// Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers.
type Event struct {
Topic Topic
Key string

View File

@ -38,7 +38,7 @@ func TestEventBufferFuzz(t *testing.T) {
// streaming - here we only care about the semantics of the buffer.
e := Event{
Index: uint64(i), // Indexes should be contiguous
Topic: Topic_ServiceHealth,
Topic: testTopic,
}
b.Append([]Event{e})
// Sleep sometimes for a while to let some subscribers catch up

View File

@ -169,7 +169,7 @@ func (e *EventPublisher) Subscribe(
// Ensure we know how to make a snapshot for this topic
_, ok := e.snapshotHandlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic)
return nil, fmt.Errorf("unknown topic %v", req.Topic)
}
e.lock.Lock()
@ -286,7 +286,7 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
handler, ok := e.snapshotHandlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic)
return nil, fmt.Errorf("unknown topic %v", req.Topic)
}
snap = newEventSnapshot(req, topicHead, handler)

View File

@ -9,7 +9,13 @@ import (
"github.com/stretchr/testify/require"
)
var testTopic Topic = 999
type intTopic int
func (i intTopic) String() string {
return fmt.Sprintf("%d", i)
}
var testTopic Topic = intTopic(999)
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
subscription := &SubscribeRequest{

View File

@ -170,7 +170,7 @@ func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc {
func newDefaultHealthEvent(index uint64, n int) Event {
return Event{
Index: index,
Topic: Topic_ServiceHealth,
Topic: testTopic,
Payload: fmt.Sprintf("test-event-%03d", n),
}
}

View File

@ -23,7 +23,7 @@ func TestSubscription(t *testing.T) {
// Create a subscription
req := &SubscribeRequest{
Topic: Topic_ServiceHealth,
Topic: testTopic,
Key: "test",
}
sub := newSubscription(ctx, req, startHead)
@ -103,7 +103,7 @@ func TestSubscription_Close(t *testing.T) {
// Create a subscription
req := &SubscribeRequest{
Topic: Topic_ServiceHealth,
Topic: testTopic,
Key: "test",
}
sub := newSubscription(ctx, req, startHead)
@ -141,7 +141,7 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
// but enough to test subscription mechanics.
e := Event{
Index: index,
Topic: Topic_ServiceHealth,
Topic: testTopic,
Key: key,
}
b.Append([]Event{e})