stream: Move EventPublisher to stream package

The EventPublisher is the central hub of the PubSub system. It is toughly coupled with much of
stream. Some stream internals were exported exclusively for EventPublisher.

The two Subscribe cases (with or without index) were also awkwardly split between two packages. By
moving EventPublisher into stream they are now both in the same package (although still in different files).
pull/8160/head
Daniel Nephin 2020-07-06 16:15:13 -04:00
parent 489876c86b
commit 4fa0fdc0e0
7 changed files with 338 additions and 267 deletions

View File

@ -55,8 +55,7 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
publish: func(changes db.Changes) error { publish: func(changes db.Changes) error {
// publish provides a new read-only Txn to PublishChanges so that // publish provides a new read-only Txn to PublishChanges so that
// events can be constructed from the current state at the time of // events can be constructed from the current state at the time of
// Commit, and so that operations can be performed in a goroutine // Commit.
// after this WriteTxn is committed.
return c.publisher.PublishChanges(c.db.Txn(false), changes) return c.publisher.PublishChanges(c.db.Txn(false), changes)
}, },
} }

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
) )
@ -166,7 +167,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
lockDelay: NewDelay(), lockDelay: NewDelay(),
db: &changeTrackerDB{ db: &changeTrackerDB{
db: db, db: db,
publisher: NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second),
}, },
} }
return s, nil return s, nil

View File

@ -12,225 +12,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type nextResult struct { func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
Events []stream.Event
Err error
}
func testRunSub(sub *stream.Subscription) <-chan nextResult {
eventCh := make(chan nextResult, 1)
go func() {
for {
es, err := sub.Next()
eventCh <- nextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
func assertNoEvent(t *testing.T, eventCh <-chan nextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("got unwanted event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
return &next.Events[0]
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return nil
}
func assertErr(t *testing.T, eventCh <-chan nextResult) error {
t.Helper()
select {
case next := <-eventCh:
require.Error(t, next.Err)
return next.Err
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
}
return nil
}
// assertReset checks that a ResetStream event is send to the subscription
// within 100ms. If allowEOS is true it will ignore any intermediate events that
// come before the reset provided they are EndOfSnapshot events because in many
// cases it's non-deterministic whether the snapshot will complete before the
// acl reset is handled.
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
t.Helper()
timeoutCh := time.After(100 * time.Millisecond)
for {
select {
case next := <-eventCh:
if allowEOS {
if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() {
continue
}
}
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionReload, next.Err)
return
case <-timeoutCh:
t.Fatalf("no err after 100ms")
}
}
}
var topicService stream.Topic = 901
func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler {
return map[stream.Topic]TopicHandler{
topicService: {
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
if change.Table == "services" {
service := change.After.(*structs.ServiceNode)
events = append(events, stream.Event{
Topic: topicService,
Key: service.ServiceName,
Index: changes.Index,
Payload: service,
})
}
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
}
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
}
buffer.Append([]stream.Event{event})
}
return idx, nil
},
},
stream.TopicInternal: {
ProcessChanges: aclChangeUnsubscribeEvent,
},
}
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
Description: "something",
Policies: []structs.ACLTokenPolicyLink{
{ID: testPolicyID_A},
},
Roles: []structs.ACLTokenRoleLink{
{ID: testRoleID_B},
},
}
token.SetHash(false)
// If we subscribe immediately after we create a token we race with the
// publisher that is publishing the ACL token event for the token we just
// created. That means that the subscription we create right after will often
// be immediately reset. The most reliable way to avoid that without just
// sleeping for some arbitrary time is to pre-subscribe using the token before
// it actually exists (which works because the publisher doesn't check tokens
// it assumes something lower down did that) and then wait for it to be reset
// so we know the initial token write event has been sent out before
// continuing...
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
eventCh := testRunSub(sub)
// Create the ACL token to be used in the subscription.
require.NoError(t, s.ACLTokenSet(2, token.Clone(), false))
// Wait for the pre-subscription to be reset
assertReset(t, eventCh, true)
return token
}
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
t.Parallel()
require := require.New(t)
store, err := NewStateStore(nil)
require.NoError(err)
reg := structs.TestRegisterRequest(t)
reg.Service.ID = "web1"
require.NoError(store.EnsureRegistration(1, reg))
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: reg.Service.Service,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(store), 0)
store.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
eventCh := testRunSub(sub)
// Stream should get the instance and then EndOfSnapshot
e := assertEvent(t, eventCh)
srv := e.Payload.(*structs.ServiceNode)
require.Equal(srv.ServiceID, "web1")
e = assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoEvent(t, eventCh)
// Add a new instance of service on a different node
reg.Node = "node2"
require.NoError(store.EnsureRegistration(1, reg))
// Subscriber should see registration
e = assertEvent(t, eventCh)
srv = e.Payload.(*structs.ServiceNode)
require.Equal(srv.Node, "node2")
}
func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
s := testACLTokensStateStore(t) s := testACLTokensStateStore(t)
@ -247,7 +29,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err) require.NoError(err)
@ -311,7 +93,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) {
require.Equal(stream.ErrSubscriptionReload, err) require.Equal(stream.ErrSubscriptionReload, err)
} }
func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) { func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
s := testACLTokensStateStore(t) s := testACLTokensStateStore(t)
@ -328,7 +110,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err) require.NoError(err)
@ -425,7 +207,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) {
assertReset(t, eventCh, true) assertReset(t, eventCh, true)
} }
func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) { func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
s := testACLTokensStateStore(t) s := testACLTokensStateStore(t)
@ -442,7 +224,7 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0) publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription) sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err) require.NoError(err)
@ -504,3 +286,177 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) {
// Ensure the reload event was sent. // Ensure the reload event was sent.
assertReset(t, eventCh, false) assertReset(t, eventCh, false)
} }
type nextResult struct {
Events []stream.Event
Err error
}
func testRunSub(sub *stream.Subscription) <-chan nextResult {
eventCh := make(chan nextResult, 1)
go func() {
for {
es, err := sub.Next()
eventCh <- nextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
func assertNoEvent(t *testing.T, eventCh <-chan nextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("got unwanted event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
return &next.Events[0]
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return nil
}
func assertErr(t *testing.T, eventCh <-chan nextResult) error {
t.Helper()
select {
case next := <-eventCh:
require.Error(t, next.Err)
return next.Err
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
}
return nil
}
// assertReset checks that a ResetStream event is send to the subscription
// within 100ms. If allowEOS is true it will ignore any intermediate events that
// come before the reset provided they are EndOfSnapshot events because in many
// cases it's non-deterministic whether the snapshot will complete before the
// acl reset is handled.
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
t.Helper()
timeoutCh := time.After(100 * time.Millisecond)
for {
select {
case next := <-eventCh:
if allowEOS {
if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() {
continue
}
}
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionReload, next.Err)
return
case <-timeoutCh:
t.Fatalf("no err after 100ms")
}
}
}
var topicService stream.Topic = 901
func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
return map[stream.Topic]stream.TopicHandler{
topicService: {
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
if change.Table == "services" {
service := change.After.(*structs.ServiceNode)
events = append(events, stream.Event{
Topic: topicService,
Key: service.ServiceName,
Index: changes.Index,
Payload: service,
})
}
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
}
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
}
buffer.Append([]stream.Event{event})
}
return idx, nil
},
},
stream.TopicInternal: {
ProcessChanges: aclChangeUnsubscribeEvent,
},
}
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
Description: "something",
Policies: []structs.ACLTokenPolicyLink{
{ID: testPolicyID_A},
},
Roles: []structs.ACLTokenRoleLink{
{ID: testRoleID_B},
},
}
token.SetHash(false)
// If we subscribe immediately after we create a token we race with the
// publisher that is publishing the ACL token event for the token we just
// created. That means that the subscription we create right after will often
// be immediately reset. The most reliable way to avoid that without just
// sleeping for some arbitrary time is to pre-subscribe using the token before
// it actually exists (which works because the publisher doesn't check tokens
// it assumes something lower down did that) and then wait for it to be reset
// so we know the initial token write event has been sent out before
// continuing...
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
eventCh := testRunSub(sub)
// Create the ACL token to be used in the subscription.
require.NoError(t, s.ACLTokenSet(2, token.Clone(), false))
// Wait for the pre-subscription to be reset
assertReset(t, eventCh, true)
return token
}

View File

@ -5,8 +5,8 @@ import (
) )
// newTopicHandlers returns the default handlers for state change events. // newTopicHandlers returns the default handlers for state change events.
func newTopicHandlers() map[stream.Topic]TopicHandler { func newTopicHandlers() map[stream.Topic]stream.TopicHandler {
return map[stream.Topic]TopicHandler{ return map[stream.Topic]stream.TopicHandler{
// TopicInternal is a special case for processors that handle events that are // TopicInternal is a special case for processors that handle events that are
// not for subscribers. They are used by the stream package. // not for subscribers. They are used by the stream package.
stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent}, stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent},

View File

@ -4,6 +4,7 @@ type Topic int32
// TODO: remove underscores // TODO: remove underscores
// TODO: type string instead of int? // TODO: type string instead of int?
// TODO: define non-internal topics in state package?
const ( const (
TopicInternal Topic = 0 TopicInternal Topic = 0
Topic_ServiceHealth Topic = 1 Topic_ServiceHealth Topic = 1

View File

@ -1,4 +1,4 @@
package state package stream
import ( import (
"context" "context"
@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream"
) )
// EventPublisher receives changes events from Publish, and sends them to all // EventPublisher receives changes events from Publish, and sends them to all
@ -34,11 +33,11 @@ type EventPublisher struct {
// topicBuffers stores the head of the linked-list buffer to publish events to // topicBuffers stores the head of the linked-list buffer to publish events to
// for a topic. // for a topic.
topicBuffers map[stream.Topic]*stream.EventBuffer topicBuffers map[Topic]*EventBuffer
// snapCache if a cache of EventSnapshots indexed by topic and key. // snapCache if a cache of EventSnapshots indexed by topic and key.
// TODO: new struct for snapCache and snapFns and snapCacheTTL // TODO: new struct for snapCache and snapFns and snapCacheTTL
snapCache map[stream.Topic]map[string]*stream.EventSnapshot snapCache map[Topic]map[string]*EventSnapshot
subscriptions *subscriptions subscriptions *subscriptions
@ -47,7 +46,7 @@ type EventPublisher struct {
// the Commit call in the FSM hot path. // the Commit call in the FSM hot path.
publishCh chan changeEvents publishCh chan changeEvents
handlers map[stream.Topic]TopicHandler handlers map[Topic]TopicHandler
} }
type subscriptions struct { type subscriptions struct {
@ -60,21 +59,21 @@ type subscriptions struct {
// When the token is modified all subscriptions under that token will be // When the token is modified all subscriptions under that token will be
// reloaded. // reloaded.
// A subscription may be unsubscribed by using the pointer to the request. // A subscription may be unsubscribed by using the pointer to the request.
byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription byToken map[string]map[*SubscribeRequest]*Subscription
} }
type changeEvents struct { type changeEvents struct {
events []stream.Event events []Event
} }
// TopicHandler provides functions which create stream.Events for a topic. // TopicHandler provides functions which create stream.Events for a topic.
type TopicHandler struct { type TopicHandler struct {
// Snapshot creates the necessary events to reproduce the current state and // Snapshot creates the necessary events to reproduce the current state and
// appends them to the EventBuffer. // appends them to the EventBuffer.
Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error) Snapshot func(*SubscribeRequest, *EventBuffer) (index uint64, err error)
// ProcessChanges accepts a slice of Changes, and builds a slice of events for // ProcessChanges accepts a slice of Changes, and builds a slice of events for
// those changes. // those changes.
ProcessChanges func(db.ReadTxn, db.Changes) ([]stream.Event, error) ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error)
} }
// NewEventPublisher returns an EventPublisher for publishing change events. // NewEventPublisher returns an EventPublisher for publishing change events.
@ -82,14 +81,14 @@ type TopicHandler struct {
// A goroutine is run in the background to publish events to all subscribes. // A goroutine is run in the background to publish events to all subscribes.
// Cancelling the context will shutdown the goroutine, to free resources, // Cancelling the context will shutdown the goroutine, to free resources,
// and stop all publishing. // and stop all publishing.
func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{ e := &EventPublisher{
snapCacheTTL: snapCacheTTL, snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[stream.Topic]*stream.EventBuffer), topicBuffers: make(map[Topic]*EventBuffer),
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), snapCache: make(map[Topic]map[string]*EventSnapshot),
publishCh: make(chan changeEvents, 64), publishCh: make(chan changeEvents, 64),
subscriptions: &subscriptions{ subscriptions: &subscriptions{
byToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), byToken: make(map[string]map[*SubscribeRequest]*Subscription),
}, },
handlers: handlers, handlers: handlers,
} }
@ -99,13 +98,13 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl
return e return e
} }
// PublishChanges to all subscribers. tx is a read-only transaction that may be // PublishChanges to all subscribers. tx is a read-only transaction that captures
// used from a goroutine. The caller should never use the tx once it has been // the state at the time the change happened. The caller must never use the tx once
// passed to PublishChanged. // it has been passed to PublishChanged.
func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error {
defer tx.Abort() defer tx.Abort()
var events []stream.Event var events []Event
for topic, handler := range e.handlers { for topic, handler := range e.handlers {
if handler.ProcessChanges != nil { if handler.ProcessChanges != nil {
es, err := handler.ProcessChanges(tx, changes) es, err := handler.ProcessChanges(tx, changes)
@ -137,14 +136,14 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
// as any ACL update events to cause affected listeners to reset their stream. // as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update changeEvents) { func (e *EventPublisher) sendEvents(update changeEvents) {
for _, event := range update.events { for _, event := range update.events {
if unsubEvent, ok := event.Payload.(stream.UnsubscribePayload); ok { if unsubEvent, ok := event.Payload.(UnsubscribePayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs) e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs)
} }
} }
eventsByTopic := make(map[stream.Topic][]stream.Event) eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events { for _, event := range update.events {
if event.Topic == stream.TopicInternal { if event.Topic == TopicInternal {
continue continue
} }
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
@ -161,10 +160,10 @@ func (e *EventPublisher) sendEvents(update changeEvents) {
// already exist. // already exist.
// //
// EventPublisher.lock must be held to call this method. // EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer { func (e *EventPublisher) getTopicBuffer(topic Topic) *EventBuffer {
buf, ok := e.topicBuffers[topic] buf, ok := e.topicBuffers[topic]
if !ok { if !ok {
buf = stream.NewEventBuffer() buf = NewEventBuffer()
e.topicBuffers[topic] = buf e.topicBuffers[topic] = buf
} }
return buf return buf
@ -181,11 +180,11 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer
// call Subscription.Unsubscribe to free ACL tracking resources. // call Subscription.Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe( func (e *EventPublisher) Subscribe(
ctx context.Context, ctx context.Context,
req *stream.SubscribeRequest, req *SubscribeRequest,
) (*stream.Subscription, error) { ) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic // Ensure we know how to make a snapshot for this topic
_, ok := e.handlers[req.Topic] _, ok := e.handlers[req.Topic]
if !ok || req.Topic == stream.TopicInternal { if !ok || req.Topic == TopicInternal {
return nil, fmt.Errorf("unknown topic %d", req.Topic) return nil, fmt.Errorf("unknown topic %d", req.Topic)
} }
@ -198,26 +197,26 @@ func (e *EventPublisher) Subscribe(
// See if we need a snapshot // See if we need a snapshot
topicHead := buf.Head() topicHead := buf.Head()
var sub *stream.Subscription var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "resume stream" message to signal to // No need for a snapshot, send the "resume stream" message to signal to
// client it's cache is still good. (note that this can be distinguished // client it's cache is still good. (note that this can be distinguished
// from a legitimate empty snapshot due to the index matching the one the // from a legitimate empty snapshot due to the index matching the one the
// client sent), then follow along from here in the topic. // client sent), then follow along from here in the topic.
e := stream.Event{ e := Event{
Index: req.Index, Index: req.Index,
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Key: req.Key,
Payload: stream.ResumeStream{}, Payload: ResumeStream{},
} }
// Make a new buffer to send to the client containing the resume. // Make a new buffer to send to the client containing the resume.
buf := stream.NewEventBuffer() buf := NewEventBuffer()
// Store the head of that buffer before we append to it to give as the // Store the head of that buffer before we append to it to give as the
// starting point for the subscription. // starting point for the subscription.
subHead := buf.Head() subHead := buf.Head()
buf.Append([]stream.Event{e}) buf.Append([]Event{e})
// Now splice the rest of the topic buffer on so the subscription will // Now splice the rest of the topic buffer on so the subscription will
// continue to see future updates in the topic buffer. // continue to see future updates in the topic buffer.
@ -227,13 +226,13 @@ func (e *EventPublisher) Subscribe(
} }
buf.AppendBuffer(follow) buf.AppendBuffer(follow)
sub = stream.NewSubscription(ctx, req, subHead) sub = NewSubscription(ctx, req, subHead)
} else { } else {
snap, err := e.getSnapshotLocked(req, topicHead) snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sub = stream.NewSubscription(ctx, req, snap.Snap) sub = NewSubscription(ctx, req, snap.Snap)
} }
e.subscriptions.add(req, sub) e.subscriptions.add(req, sub)
@ -246,13 +245,13 @@ func (e *EventPublisher) Subscribe(
return sub, nil return sub, nil
} }
func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscription) { func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token] subsByToken, ok := s.byToken[req.Token]
if !ok { if !ok {
subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription) subsByToken = make(map[*SubscribeRequest]*Subscription)
s.byToken[req.Token] = subsByToken s.byToken[req.Token] = subsByToken
} }
subsByToken[req] = sub subsByToken[req] = sub
@ -275,7 +274,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
// subscription to free resources monitoring changes in it's ACL token. // subscription to free resources monitoring changes in it's ACL token.
// //
// req MUST be the same pointer that was used to register the subscription. // req MUST be the same pointer that was used to register the subscription.
func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) { func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -289,11 +288,11 @@ func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) {
} }
} }
func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) { func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *BufferItem) (*EventSnapshot, error) {
// See if there is a cached snapshot // See if there is a cached snapshot
topicSnaps, ok := e.snapCache[req.Topic] topicSnaps, ok := e.snapCache[req.Topic]
if !ok { if !ok {
topicSnaps = make(map[string]*stream.EventSnapshot) topicSnaps = make(map[string]*EventSnapshot)
e.snapCache[req.Topic] = topicSnaps e.snapCache[req.Topic] = topicSnaps
} }
@ -308,7 +307,7 @@ func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHe
return nil, fmt.Errorf("unknown topic %d", req.Topic) return nil, fmt.Errorf("unknown topic %d", req.Topic)
} }
snap = stream.NewEventSnapshot(req, topicHead, handler.Snapshot) snap = NewEventSnapshot(req, topicHead, handler.Snapshot)
if e.snapCacheTTL > 0 { if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap topicSnaps[req.Key] = snap

View File

@ -0,0 +1,115 @@
package stream
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
var testTopic Topic = 999
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
subscription := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0)
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
eventCh := consumeSubscription(sub)
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}
require.Equal(t, expected, result.Events)
result = nextResult(t, eventCh)
require.Len(t, result.Events, 1)
require.True(t, result.Events[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{})
require.NoError(t, err)
// Subscriber should see the published event
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
require.Equal(t, expected, result.Events)
}
func newTestTopicHandlers() map[Topic]TopicHandler {
return map[Topic]TopicHandler{
testTopic: {
Snapshot: func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
return 1, nil
},
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) {
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
}}
return events, nil
},
},
}
}
func consumeSubscription(sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next()
eventCh <- subNextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
type subNextResult struct {
Events []Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
t.Helper()
select {
case next := <-eventCh:
return next
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}