diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 601f765bf7..b3e7ae2e75 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -31,7 +31,7 @@ type changeTrackerDB struct { } type eventPublisher interface { - PublishEvents(events []stream.Event) + Publish(events []stream.Event) } // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting @@ -83,7 +83,7 @@ func (c *changeTrackerDB) publish(changes Changes) error { if err != nil { return fmt.Errorf("failed generating events from changes: %v", err) } - c.publisher.PublishEvents(events) + c.publisher.Publish(events) return nil } diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 2482cde535..83a978bb0a 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -30,8 +30,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) + sub, err := publisher.Subscribe(subscription) require.NoError(err) + defer sub.Unsubscribe() eventCh := testRunSub(sub) @@ -69,8 +70,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { Key: "nope", Token: token.SecretID, } - sub2, err := publisher.Subscribe(ctx, subscription2) + sub2, err := publisher.Subscribe(subscription2) require.NoError(err) + defer sub2.Unsubscribe() eventCh2 := testRunSub(sub2) @@ -111,8 +113,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) + sub, err := publisher.Subscribe(subscription) require.NoError(err) + defer sub.Unsubscribe() eventCh := testRunSub(sub) @@ -154,7 +157,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { Key: "nope", Token: token.SecretID, } - sub, err = publisher.Subscribe(ctx, subscription2) + sub, err = publisher.Subscribe(subscription2) require.NoError(err) eventCh = testRunSub(sub) @@ -182,8 +185,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { Key: "nope", Token: token.SecretID, } - sub, err = publisher.Subscribe(ctx, subscription3) + sub, err = publisher.Subscribe(subscription3) require.NoError(err) + defer sub.Unsubscribe() eventCh = testRunSub(sub) @@ -225,7 +229,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) + sub, err := publisher.Subscribe(subscription) require.NoError(err) eventCh := testRunSub(sub) @@ -264,7 +268,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { Key: "nope", Token: token.SecretID, } - sub, err = publisher.Subscribe(ctx, subscription2) + sub, err = publisher.Subscribe(subscription2) require.NoError(err) eventCh = testRunSub(sub) @@ -295,7 +299,7 @@ func testRunSub(sub *stream.Subscription) <-chan nextResult { eventCh := make(chan nextResult, 1) go func() { for { - es, err := sub.Next() + es, err := sub.Next(context.TODO()) eventCh <- nextResult{ Events: es, Err: err, @@ -351,7 +355,6 @@ func assertErr(t *testing.T, eventCh <-chan nextResult) error { // acl reset is handled. func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { t.Helper() - timeoutCh := time.After(100 * time.Millisecond) for { select { case next := <-eventCh: @@ -363,7 +366,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { require.Error(t, next.Err) require.Equal(t, stream.ErrSubscriptionClosed, next.Err) return - case <-timeoutCh: + case <-time.After(100 * time.Millisecond): t.Fatalf("no err after 100ms") } } @@ -416,7 +419,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo // it assumes something lower down did that) and then wait for it to be reset // so we know the initial token write event has been sent out before // continuing... - subscription := &stream.SubscribeRequest{ + req := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, @@ -426,8 +429,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher - sub, err := publisher.Subscribe(ctx, subscription) + sub, err := publisher.Subscribe(req) require.NoError(t, err) + defer sub.Unsubscribe() eventCh := testRunSub(sub) diff --git a/agent/consul/stream/event_buffer.go b/agent/consul/stream/event_buffer.go index f908f10fcb..7701ab44b2 100644 --- a/agent/consul/stream/event_buffer.go +++ b/agent/consul/stream/event_buffer.go @@ -3,6 +3,7 @@ package stream import ( "context" "errors" + "fmt" "sync/atomic" ) @@ -13,17 +14,17 @@ import ( // specific design has several important features that significantly simplify a // lot of our PubSub machinery. // -// The Buffer itself only ever tracks the most recent set of events published so -// if there are no consumers older events are automatically garbage collected. -// Notification of new events is done by closing a channel on the previous head +// The eventBuffer only tracks the most recent set of published events, so +// if there are no consumers, older events are automatically garbage collected. +// Consumers are notified of new events by closing a channel on the previous head // allowing efficient broadcast to many watchers without having to run multiple // goroutines or deliver to O(N) separate channels. // -// Because it's a linked list with atomically updated pointers, readers don't +// Because eventBuffer is a linked list with atomically updated pointers, readers don't // have to take a lock and can consume at their own pace. We also don't need a -// fixed limit on the number of items which avoids needing to configure -// buffer length to balance wasting lots of memory all the time against being able to -// tolerate occasional slow readers. +// fixed limit on the number of items, which avoids needing to configure +// buffer length to balance wasting memory, against being able to tolerate +// occasionally slow readers. // // The buffer is used to deliver all messages broadcast to a topic for active // subscribers to consume, but it is also an effective way to both deliver and @@ -50,8 +51,8 @@ import ( // Events array. This enables subscribers to start watching for the next update // immediately. // -// The zero value eventBuffer is _not_ a usable type since it has not been -// initialized with an empty bufferItem so can't be used to wait for the first +// The zero value eventBuffer is _not_ usable, as it has not been +// initialized with an empty bufferItem so can not be used to wait for the first // published event. Call newEventBuffer to construct a new buffer. // // Calls to Append or AppendBuffer that mutate the head must be externally @@ -65,7 +66,7 @@ type eventBuffer struct { // newEventBuffer creates an eventBuffer ready for use. func newEventBuffer() *eventBuffer { b := &eventBuffer{} - b.head.Store(newBufferItem()) + b.head.Store(newBufferItem(nil)) return b } @@ -77,10 +78,7 @@ func newEventBuffer() *eventBuffer { // goroutines. Append only supports a single concurrent caller and must be // externally synchronized with other Append, AppendBuffer or AppendErr calls. func (b *eventBuffer) Append(events []Event) { - // Push events to the head - it := newBufferItem() - it.Events = events - b.AppendBuffer(it) + b.AppendItem(newBufferItem(events)) } // AppendBuffer joins another buffer which may be the tail of a separate buffer @@ -92,7 +90,7 @@ func (b *eventBuffer) Append(events []Event) { // // AppendBuffer only supports a single concurrent caller and must be externally // synchronized with other Append, AppendBuffer or AppendErr calls. -func (b *eventBuffer) AppendBuffer(item *bufferItem) { +func (b *eventBuffer) AppendItem(item *bufferItem) { // First store it as the next node for the old head this ensures once it's // visible to new searchers the linked list is already valid. Not sure it // matters but this seems nicer. @@ -105,15 +103,6 @@ func (b *eventBuffer) AppendBuffer(item *bufferItem) { // don't set chan to nil since that will race with readers accessing it. } -// AppendErr publishes an error result to the end of the buffer. This is -// considered terminal and will cause all subscribers to end their current -// streaming subscription and return the error. AppendErr only supports a -// single concurrent caller and must be externally synchronized with other -// Append, AppendBuffer or AppendErr calls. -func (b *eventBuffer) AppendErr(err error) { - b.AppendBuffer(&bufferItem{Err: err}) -} - // Head returns the current head of the buffer. It will always exist but it may // be a "sentinel" empty item with a nil Events slice to allow consumers to // watch for the next update. Consumers should always check for empty Events and @@ -172,22 +161,23 @@ type bufferLink struct { // newBufferItem returns a blank buffer item with a link and chan ready to have // the fields set and be appended to a buffer. -func newBufferItem() *bufferItem { +func newBufferItem(events []Event) *bufferItem { return &bufferItem{ - link: &bufferLink{ - ch: make(chan struct{}), - }, + link: &bufferLink{ch: make(chan struct{})}, + Events: events, } } // Next return the next buffer item in the buffer. It may block until ctx is // cancelled or until the next item is published. -func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) { +func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) { // See if there is already a next value, block if so. Note we don't rely on // state change (chan nil) as that's not threadsafe but detecting close is. select { case <-ctx.Done(): return nil, ctx.Err() + case <-forceClose: + return nil, fmt.Errorf("subscription closed") case <-i.link.ch: } @@ -201,45 +191,28 @@ func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) { if next.Err != nil { return nil, next.Err } - if len(next.Events) == 0 { - // Skip this event - return next.Next(ctx) - } return next, nil } // NextNoBlock returns the next item in the buffer without blocking. If it -// reaches the most recent item it will return nil and no error. -func (i *bufferItem) NextNoBlock() (*bufferItem, error) { +// reaches the most recent item it will return nil. +func (i *bufferItem) NextNoBlock() *bufferItem { nextRaw := i.link.next.Load() if nextRaw == nil { - return nil, nil + return nil } - next := nextRaw.(*bufferItem) - if next.Err != nil { - return nil, next.Err - } - if len(next.Events) == 0 { - // Skip this event - return next.NextNoBlock() - } - return next, nil + return nextRaw.(*bufferItem) } -// FollowAfter returns either the next item in the buffer if there is already -// one, or if not it returns an empty item (that will be ignored by subscribers) -// that has the same link as the current buffer so that it will be notified of -// future updates in the buffer without including the current item. -func (i *bufferItem) FollowAfter() (*bufferItem, error) { - next, err := i.NextNoBlock() - if err != nil { - return nil, err - } +// NextLink returns either the next item in the buffer if there is one, or +// an empty item (that will be ignored by subscribers) that has a pointer to +// the same link as this bufferItem (but none of the bufferItem content). +// When the link.ch is closed, subscriptions will be notified of the next item. +func (i *bufferItem) NextLink() *bufferItem { + next := i.NextNoBlock() if next == nil { // Return an empty item that can be followed to the next item published. - item := &bufferItem{} - item.link = i.link - return item, nil + return &bufferItem{link: i.link} } - return next, nil + return next } diff --git a/agent/consul/stream/event_buffer_test.go b/agent/consul/stream/event_buffer_test.go index 6f491e62f4..7a91347202 100644 --- a/agent/consul/stream/event_buffer_test.go +++ b/agent/consul/stream/event_buffer_test.go @@ -60,7 +60,7 @@ func TestEventBufferFuzz(t *testing.T) { item := head var err error for { - item, err = item.Next(context.Background()) + item, err = item.Next(context.Background(), nil) if err != nil { errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i, expect, err) diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index ce17fbe048..444b117c53 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -7,8 +7,8 @@ import ( "time" ) -// EventPublisher receives changes events from Publish, and sends them to all -// registered subscribers. +// EventPublisher receives change events from Publish, and sends the events to +// all subscribers of the event Topic. type EventPublisher struct { // snapCacheTTL controls how long we keep snapshots in our cache before // allowing them to be garbage collected and a new one made for subsequent @@ -60,6 +60,7 @@ type changeEvents struct { // SnapshotHandlers is a mapping of Topic to a function which produces a snapshot // of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. +// The nil Topic is reserved and should not be used. type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) // SnapshotAppender appends groups of events to create a Snapshot of state. @@ -91,10 +92,8 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache return e } -// PublishEvents to all subscribers. tx is a read-only transaction that captures -// the state at the time the change happened. The caller must never use the tx once -// it has been passed to PublishChanged. -func (e *EventPublisher) PublishEvents(events []Event) { +// Publish events to all subscribers of the event Topic. +func (e *EventPublisher) Publish(events []Event) { if len(events) > 0 { e.publishCh <- changeEvents{events: events} } @@ -146,22 +145,18 @@ func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer { return buf } -// Subscribe returns a new stream.Subscription for the given request. A -// subscription will stream an initial snapshot of events matching the request -// if required and then block until new events that modify the request occur, or -// the context is cancelled. Subscriptions may be forced to reset if the server -// decides it can no longer maintain correct operation for example if ACL -// policies changed or the state store was restored. +// Subscribe returns a new Subscription for the given request. A subscription +// will receive an initial snapshot of events matching the request if req.Index > 0. +// After the snapshot, events will be streamed as they are created. +// Subscriptions may be closed, forcing the client to resubscribe (for example if +// ACL policies changed or the state store is abandoned). // // When the caller is finished with the subscription for any reason, it must // call Subscription.Unsubscribe to free ACL tracking resources. -func (e *EventPublisher) Subscribe( - ctx context.Context, - req *SubscribeRequest, -) (*Subscription, error) { +func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) { // Ensure we know how to make a snapshot for this topic _, ok := e.snapshotHandlers[req.Topic] - if !ok { + if !ok || req.Topic == nil { return nil, fmt.Errorf("unknown topic %v", req.Topic) } @@ -176,47 +171,35 @@ func (e *EventPublisher) Subscribe( topicHead := buf.Head() var sub *Subscription if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { - // No need for a snapshot, send the "resume stream" message to signal to + // No need for a snapshot, send the "end of empty snapshot" message to signal to // client its cache is still good, then follow along from here in the topic. - e := Event{ - Index: req.Index, - Topic: req.Topic, - Key: req.Key, - Payload: endOfEmptySnapshot{}, - } - // Make a new buffer to send to the client containing the resume. buf := newEventBuffer() // Store the head of that buffer before we append to it to give as the // starting point for the subscription. subHead := buf.Head() - buf.Append([]Event{e}) + buf.Append([]Event{{ + Index: req.Index, + Topic: req.Topic, + Key: req.Key, + Payload: endOfEmptySnapshot{}, + }}) // Now splice the rest of the topic buffer on so the subscription will // continue to see future updates in the topic buffer. - follow, err := topicHead.FollowAfter() - if err != nil { - return nil, err - } - buf.AppendBuffer(follow) + buf.AppendItem(topicHead.NextLink()) - sub = newSubscription(ctx, req, subHead) + sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req)) } else { snap, err := e.getSnapshotLocked(req, topicHead) if err != nil { return nil, err } - sub = newSubscription(ctx, req, snap.Snap) + sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req)) } e.subscriptions.add(req, sub) - // Set unsubscribe so that the caller doesn't need to keep track of the - // SubscriptionRequest, and can not accidentally call unsubscribe with the - // wrong value. - sub.Unsubscribe = func() { - e.subscriptions.unsubscribe(req) - } return sub, nil } @@ -239,27 +222,30 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { for _, secretID := range tokenSecretIDs { if subs, ok := s.byToken[secretID]; ok { for _, sub := range subs { - sub.Close() + sub.forceClose() } } } } -// unsubscribe must be called when a client is no longer interested in a -// subscription to free resources monitoring changes in it's ACL token. -// -// req MUST be the same pointer that was used to register the subscription. -func (s *subscriptions) unsubscribe(req *SubscribeRequest) { - s.lock.Lock() - defer s.lock.Unlock() +// unsubscribe returns a function that the subscription will call to remove +// itself from the subsByToken. +// This function is returned as a closure so that the caller doesn't need to keep +// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the +// wrong pointer. +func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { + return func() { + s.lock.Lock() + defer s.lock.Unlock() - subsByToken, ok := s.byToken[req.Token] - if !ok { - return - } - delete(subsByToken, req) - if len(subsByToken) == 0 { - delete(s.byToken, req.Token) + subsByToken, ok := s.byToken[req.Token] + if !ok { + return + } + delete(subsByToken, req) + if len(subsByToken) == 0 { + delete(s.byToken, req.Token) + } } } diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 4bf4bd27ee..4a8c6542ef 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -26,9 +26,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { defer cancel() publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0) - sub, err := publisher.Subscribe(ctx, subscription) + sub, err := publisher.Subscribe(subscription) require.NoError(t, err) - eventCh := consumeSubscription(sub) + eventCh := consumeSubscription(ctx, sub) result := nextResult(t, eventCh) require.NoError(t, result.Err) @@ -47,7 +47,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { Key: "sub-key", Payload: "the-published-event-payload", }} - publisher.PublishEvents(events) + publisher.Publish(events) // Subscriber should see the published event result = nextResult(t, eventCh) @@ -68,11 +68,11 @@ func newTestSnapshotHandlers() SnapshotHandlers { } } -func consumeSubscription(sub *Subscription) <-chan subNextResult { +func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { eventCh := make(chan subNextResult, 1) go func() { for { - es, err := sub.Next() + es, err := sub.Next(ctx) eventCh <- subNextResult{ Events: es, Err: err, diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 0ca24f8088..12a52ea37b 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -9,10 +9,10 @@ package stream // collected automatically by Go's runtime. This simplifies snapshot and buffer // management dramatically. type eventSnapshot struct { - // Snap is the first item in the buffer containing the snapshot. Once the + // Head is the first item in the buffer containing the snapshot. Once the // snapshot is complete, subsequent BufferItems are appended to snapBuffer, // so that subscribers receive all the events from the same buffer. - Snap *bufferItem + Head *bufferItem // snapBuffer is the Head of the snapshot buffer the fn should write to. snapBuffer *eventBuffer @@ -30,14 +30,14 @@ type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot { buf := newEventBuffer() s := &eventSnapshot{ - Snap: buf.Head(), + Head: buf.Head(), snapBuffer: buf, } go func() { idx, err := fn(req, s.snapBuffer) if err != nil { - s.snapBuffer.AppendErr(err) + s.snapBuffer.AppendItem(&bufferItem{Err: err}) return } // We wrote the snapshot events to the buffer, send the "end of snapshot" event @@ -57,44 +57,33 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u // find the first event after the current snapshot. item := topicBufferHead for { - // Find the next item that we should include. - next, err := item.NextNoBlock() - if err != nil { - // Append an error result to signal to subscribers that this snapshot is - // no good. - s.snapBuffer.AppendErr(err) - return - } - - if next == nil { + next := item.NextNoBlock() + switch { + case next == nil: // This is the head of the topic buffer (or was just now which is after // the snapshot completed). We don't want any of the events (if any) in // the snapshot buffer as they came before the snapshot but we do need to // wait for the next update. - follow, err := item.FollowAfter() - if err != nil { - s.snapBuffer.AppendErr(err) - return - } - - s.snapBuffer.AppendBuffer(follow) - // We are done, subscribers will now follow future updates to the topic - // after reading the snapshot events. + s.snapBuffer.AppendItem(item.NextLink()) return - } - if next.Err != nil { - s.snapBuffer.AppendErr(next.Err) + case next.Err != nil: + // This case is not currently possible because errors can only come + // from a snapshot func, and this is consuming events from a topic + // buffer which does not contain a snapshot. + // Handle this case anyway in case errors can come from other places + // in the future. + s.snapBuffer.AppendItem(next) return - } - if len(next.Events) > 0 && next.Events[0].Index > idx { + case len(next.Events) > 0 && next.Events[0].Index > idx: // We've found an update in the topic buffer that happened after our // snapshot was taken, splice it into the snapshot buffer so subscribers // can continue to read this and others after it. - s.snapBuffer.AppendBuffer(next) + s.snapBuffer.AppendItem(next) return } + // We don't need this item, continue to next item = next } diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 923e864ce3..a11be78551 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -112,11 +112,11 @@ func TestEventSnapshot(t *testing.T) { snapIDs := make([]string, 0, tc.snapshotSize) updateIDs := make([]string, 0, tc.updatesAfterSnap) snapDone := false - curItem := es.Snap + curItem := es.Head var err error RECV: for { - curItem, err = curItem.Next(ctx) + curItem, err = curItem.Next(ctx, nil) // This error is typically timeout so dump the state to aid debugging. require.NoError(t, err, "current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone, diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go new file mode 100644 index 0000000000..aff938a678 --- /dev/null +++ b/agent/consul/stream/event_test.go @@ -0,0 +1,17 @@ +package stream + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEvent_IsEndOfSnapshot(t *testing.T) { + e := Event{Payload: endOfSnapshot{}} + require.True(t, e.IsEndOfSnapshot()) + + t.Run("not EndOfSnapshot", func(t *testing.T) { + e := Event{Payload: endOfEmptySnapshot{}} + require.False(t, e.IsEndOfSnapshot()) + }) +} diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index c04f80d53d..c2177468ff 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -19,11 +19,10 @@ const ( // ErrSubscriptionClosed is a error signalling the subscription has been // closed. The client should Unsubscribe, then re-Subscribe. -var ErrSubscriptionClosed = errors.New("subscription closed by server, client should unsub and retry") +var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe") -// Subscription holds state about a single Subscribe call. Subscribe clients -// access their next event by calling Next(). This may initially include the -// snapshot events to catch them up if they are new or behind. +// Subscription provides events on a Topic. Events may be filtered by Key. +// Events are returned by Next(), and may start with a Snapshot of events. type Subscription struct { // state is accessed atomically 0 means open, 1 means closed with reload state uint32 @@ -35,17 +34,15 @@ type Subscription struct { // is mutated by calls to Next. currentItem *bufferItem - // ctx is the Subscription context that wraps the context of the streaming RPC - // handler call. - ctx context.Context + // forceClosed is closed when forceClose is called. It is used by + // EventPublisher to cancel Next(). + forceClosed chan struct{} - // cancelFn stores the context cancel function that will wake up the - // in-progress Next call on a server-initiated state change e.g. Reload. - cancelFn func() - - // Unsubscribe is a function set by EventPublisher that is called to - // free resources when the subscription is no longer needed. - Unsubscribe func() + // unsub is a function set by EventPublisher that is called to free resources + // when the subscription is no longer needed. + // It must be safe to call the function from multiple goroutines and the function + // must be idempotent. + unsub func() } // SubscribeRequest identifies the types of events the subscriber would like to @@ -59,74 +56,81 @@ type SubscribeRequest struct { // newSubscription return a new subscription. The caller is responsible for // calling Unsubscribe when it is done with the subscription, to free resources. -func newSubscription(ctx context.Context, req *SubscribeRequest, item *bufferItem) *Subscription { - subCtx, cancel := context.WithCancel(ctx) +func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { return &Subscription{ - ctx: subCtx, - cancelFn: cancel, + forceClosed: make(chan struct{}), req: req, currentItem: item, + unsub: unsub, } } // Next returns the next set of events to deliver. It must only be called from a // single goroutine concurrently as it mutates the Subscription. -func (s *Subscription) Next() ([]Event, error) { +func (s *Subscription) Next(ctx context.Context) ([]Event, error) { if atomic.LoadUint32(&s.state) == subscriptionStateClosed { return nil, ErrSubscriptionClosed } for { - next, err := s.currentItem.Next(s.ctx) - if err != nil { - // Check we didn't return because of a state change cancelling the context - if atomic.LoadUint32(&s.state) == subscriptionStateClosed { - return nil, ErrSubscriptionClosed - } + next, err := s.currentItem.Next(ctx, s.forceClosed) + switch { + case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed: + return nil, ErrSubscriptionClosed + case err != nil: return nil, err } - // Advance our cursor for next loop or next call s.currentItem = next - // Assume happy path where all events (or none) are relevant. - allMatch := true - - // If there is a specific key, see if we need to filter any events - if s.req.Key != "" { - for _, e := range next.Events { - if s.req.Key != e.Key { - allMatch = false - break - } - } + events := s.filter(next.Events) + if len(events) == 0 { + continue } - - // Only if we need to filter events should we bother allocating a new slice - // as this is a hot loop. - events := next.Events - if !allMatch { - events = make([]Event, 0, len(next.Events)) - for _, e := range next.Events { - // Only return it if the key matches. - if s.req.Key == "" || s.req.Key == e.Key { - events = append(events, e) - } - } - } - - if len(events) > 0 { - return events, nil - } - // Keep looping until we find some events we are interested in. + return events, nil } } +// TODO: test cases for this method +func (s *Subscription) filter(events []Event) []Event { + if s.req.Key == "" || len(events) == 0 { + return events + } + + allMatch := true + for _, e := range events { + if s.req.Key != e.Key { + allMatch = false + break + } + } + + // Only allocate a new slice if some events need to be filtered out + if allMatch { + return events + } + + // FIXME: this will over-allocate. We could get a count from the previous range + // over events. + events = make([]Event, 0, len(events)) + for _, e := range events { + if s.req.Key == e.Key { + events = append(events, e) + } + } + return events +} + // Close the subscription. Subscribers will receive an error when they call Next, // and will need to perform a new Subscribe request. // It is safe to call from any goroutine. -func (s *Subscription) Close() { +func (s *Subscription) forceClose() { swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) if swapped { - s.cancelFn() + close(s.forceClosed) } } + +// Unsubscribe the subscription, freeing resources. +func (s *Subscription) Unsubscribe() { + s.unsub() +} diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 9e4ab5269b..36a60dc482 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -8,6 +8,8 @@ import ( "github.com/stretchr/testify/require" ) +func noopUnSub() {} + func TestSubscription(t *testing.T) { eb := newEventBuffer() @@ -26,11 +28,11 @@ func TestSubscription(t *testing.T) { Topic: testTopic, Key: "test", } - sub := newSubscription(ctx, req, startHead) + sub := newSubscription(req, startHead, noopUnSub) // First call to sub.Next should return our published event immediately start := time.Now() - got, err := sub.Next() + got, err := sub.Next(ctx) elapsed := time.Since(start) require.NoError(t, err) require.True(t, elapsed < 200*time.Millisecond, @@ -46,7 +48,7 @@ func TestSubscription(t *testing.T) { }) // Next call should block until event is delivered - got, err = sub.Next() + got, err = sub.Next(ctx) elapsed = time.Since(start) require.NoError(t, err) require.True(t, elapsed > 200*time.Millisecond, @@ -64,7 +66,7 @@ func TestSubscription(t *testing.T) { publishTestEvent(index, eb, "test") start = time.Now() - got, err = sub.Next() + got, err = sub.Next(ctx) elapsed = time.Since(start) require.NoError(t, err) require.True(t, elapsed < 200*time.Millisecond, @@ -79,7 +81,7 @@ func TestSubscription(t *testing.T) { cancel() }) - _, err = sub.Next() + _, err = sub.Next(ctx) elapsed = time.Since(start) require.Error(t, err) require.True(t, elapsed > 200*time.Millisecond, @@ -106,11 +108,11 @@ func TestSubscription_Close(t *testing.T) { Topic: testTopic, Key: "test", } - sub := newSubscription(ctx, req, startHead) + sub := newSubscription(req, startHead, noopUnSub) // First call to sub.Next should return our published event immediately start := time.Now() - got, err := sub.Next() + got, err := sub.Next(ctx) elapsed := time.Since(start) require.NoError(t, err) require.True(t, elapsed < 200*time.Millisecond, @@ -122,10 +124,10 @@ func TestSubscription_Close(t *testing.T) { // needs to reset (e.g. on ACL perm change). start = time.Now() time.AfterFunc(200*time.Millisecond, func() { - sub.Close() + sub.forceClose() }) - _, err = sub.Next() + _, err = sub.Next(ctx) elapsed = time.Since(start) require.Error(t, err) require.Equal(t, ErrSubscriptionClosed, err)