mirror of https://github.com/hashicorp/consul
stream: Add forceClose and refactor subscription filtering
Move the subscription context to Next. context.Context should generally never be stored in a struct because it makes that struct only valid while the context is valid. This is rarely obvious from the caller. Adds a forceClosed channel in place of the old context, and uses the new context as a way for the caller to stop the Subscription blocking. Remove some recursion out of bufferImte.Next. The caller is already looping so we can continue in that loop instead of recursing. This ensures currentItem is updated immediately (which probably does not matter in practice), and also removes the chance that we overflow the stack. NextNoBlock and FollowAfter do not need to handle bufferItem.Err, the caller already handles it. Moves filter to a method to simplify Next, and more explicitly separate filtering from looping. Also improve some godoc Only unwrap itemBuffer.Err when necessarypull/8160/head
parent
f19f8e99bb
commit
c07acbeb6b
|
@ -31,7 +31,7 @@ type changeTrackerDB struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventPublisher interface {
|
type eventPublisher interface {
|
||||||
PublishEvents(events []stream.Event)
|
Publish(events []stream.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
||||||
|
@ -83,7 +83,7 @@ func (c *changeTrackerDB) publish(changes Changes) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed generating events from changes: %v", err)
|
return fmt.Errorf("failed generating events from changes: %v", err)
|
||||||
}
|
}
|
||||||
c.publisher.PublishEvents(events)
|
c.publisher.Publish(events)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,8 +30,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(ctx, subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
eventCh := testRunSub(sub)
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
@ -69,8 +70,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
|
||||||
Key: "nope",
|
Key: "nope",
|
||||||
Token: token.SecretID,
|
Token: token.SecretID,
|
||||||
}
|
}
|
||||||
sub2, err := publisher.Subscribe(ctx, subscription2)
|
sub2, err := publisher.Subscribe(subscription2)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
defer sub2.Unsubscribe()
|
||||||
|
|
||||||
eventCh2 := testRunSub(sub2)
|
eventCh2 := testRunSub(sub2)
|
||||||
|
|
||||||
|
@ -111,8 +113,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(ctx, subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
eventCh := testRunSub(sub)
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
@ -154,7 +157,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
|
||||||
Key: "nope",
|
Key: "nope",
|
||||||
Token: token.SecretID,
|
Token: token.SecretID,
|
||||||
}
|
}
|
||||||
sub, err = publisher.Subscribe(ctx, subscription2)
|
sub, err = publisher.Subscribe(subscription2)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
|
||||||
eventCh = testRunSub(sub)
|
eventCh = testRunSub(sub)
|
||||||
|
@ -182,8 +185,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
|
||||||
Key: "nope",
|
Key: "nope",
|
||||||
Token: token.SecretID,
|
Token: token.SecretID,
|
||||||
}
|
}
|
||||||
sub, err = publisher.Subscribe(ctx, subscription3)
|
sub, err = publisher.Subscribe(subscription3)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
eventCh = testRunSub(sub)
|
eventCh = testRunSub(sub)
|
||||||
|
|
||||||
|
@ -225,7 +229,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(ctx, subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
|
||||||
eventCh := testRunSub(sub)
|
eventCh := testRunSub(sub)
|
||||||
|
@ -264,7 +268,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
|
||||||
Key: "nope",
|
Key: "nope",
|
||||||
Token: token.SecretID,
|
Token: token.SecretID,
|
||||||
}
|
}
|
||||||
sub, err = publisher.Subscribe(ctx, subscription2)
|
sub, err = publisher.Subscribe(subscription2)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
|
||||||
eventCh = testRunSub(sub)
|
eventCh = testRunSub(sub)
|
||||||
|
@ -295,7 +299,7 @@ func testRunSub(sub *stream.Subscription) <-chan nextResult {
|
||||||
eventCh := make(chan nextResult, 1)
|
eventCh := make(chan nextResult, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
es, err := sub.Next()
|
es, err := sub.Next(context.TODO())
|
||||||
eventCh <- nextResult{
|
eventCh <- nextResult{
|
||||||
Events: es,
|
Events: es,
|
||||||
Err: err,
|
Err: err,
|
||||||
|
@ -351,7 +355,6 @@ func assertErr(t *testing.T, eventCh <-chan nextResult) error {
|
||||||
// acl reset is handled.
|
// acl reset is handled.
|
||||||
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
|
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
timeoutCh := time.After(100 * time.Millisecond)
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case next := <-eventCh:
|
case next := <-eventCh:
|
||||||
|
@ -363,7 +366,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
|
||||||
require.Error(t, next.Err)
|
require.Error(t, next.Err)
|
||||||
require.Equal(t, stream.ErrSubscriptionClosed, next.Err)
|
require.Equal(t, stream.ErrSubscriptionClosed, next.Err)
|
||||||
return
|
return
|
||||||
case <-timeoutCh:
|
case <-time.After(100 * time.Millisecond):
|
||||||
t.Fatalf("no err after 100ms")
|
t.Fatalf("no err after 100ms")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -416,7 +419,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
|
||||||
// it assumes something lower down did that) and then wait for it to be reset
|
// it assumes something lower down did that) and then wait for it to be reset
|
||||||
// so we know the initial token write event has been sent out before
|
// so we know the initial token write event has been sent out before
|
||||||
// continuing...
|
// continuing...
|
||||||
subscription := &stream.SubscribeRequest{
|
req := &stream.SubscribeRequest{
|
||||||
Topic: topicService,
|
Topic: topicService,
|
||||||
Key: "nope",
|
Key: "nope",
|
||||||
Token: token.SecretID,
|
Token: token.SecretID,
|
||||||
|
@ -426,8 +429,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(ctx, subscription)
|
sub, err := publisher.Subscribe(req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
eventCh := testRunSub(sub)
|
eventCh := testRunSub(sub)
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package stream
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -13,17 +14,17 @@ import (
|
||||||
// specific design has several important features that significantly simplify a
|
// specific design has several important features that significantly simplify a
|
||||||
// lot of our PubSub machinery.
|
// lot of our PubSub machinery.
|
||||||
//
|
//
|
||||||
// The Buffer itself only ever tracks the most recent set of events published so
|
// The eventBuffer only tracks the most recent set of published events, so
|
||||||
// if there are no consumers older events are automatically garbage collected.
|
// if there are no consumers, older events are automatically garbage collected.
|
||||||
// Notification of new events is done by closing a channel on the previous head
|
// Consumers are notified of new events by closing a channel on the previous head
|
||||||
// allowing efficient broadcast to many watchers without having to run multiple
|
// allowing efficient broadcast to many watchers without having to run multiple
|
||||||
// goroutines or deliver to O(N) separate channels.
|
// goroutines or deliver to O(N) separate channels.
|
||||||
//
|
//
|
||||||
// Because it's a linked list with atomically updated pointers, readers don't
|
// Because eventBuffer is a linked list with atomically updated pointers, readers don't
|
||||||
// have to take a lock and can consume at their own pace. We also don't need a
|
// have to take a lock and can consume at their own pace. We also don't need a
|
||||||
// fixed limit on the number of items which avoids needing to configure
|
// fixed limit on the number of items, which avoids needing to configure
|
||||||
// buffer length to balance wasting lots of memory all the time against being able to
|
// buffer length to balance wasting memory, against being able to tolerate
|
||||||
// tolerate occasional slow readers.
|
// occasionally slow readers.
|
||||||
//
|
//
|
||||||
// The buffer is used to deliver all messages broadcast to a topic for active
|
// The buffer is used to deliver all messages broadcast to a topic for active
|
||||||
// subscribers to consume, but it is also an effective way to both deliver and
|
// subscribers to consume, but it is also an effective way to both deliver and
|
||||||
|
@ -50,8 +51,8 @@ import (
|
||||||
// Events array. This enables subscribers to start watching for the next update
|
// Events array. This enables subscribers to start watching for the next update
|
||||||
// immediately.
|
// immediately.
|
||||||
//
|
//
|
||||||
// The zero value eventBuffer is _not_ a usable type since it has not been
|
// The zero value eventBuffer is _not_ usable, as it has not been
|
||||||
// initialized with an empty bufferItem so can't be used to wait for the first
|
// initialized with an empty bufferItem so can not be used to wait for the first
|
||||||
// published event. Call newEventBuffer to construct a new buffer.
|
// published event. Call newEventBuffer to construct a new buffer.
|
||||||
//
|
//
|
||||||
// Calls to Append or AppendBuffer that mutate the head must be externally
|
// Calls to Append or AppendBuffer that mutate the head must be externally
|
||||||
|
@ -65,7 +66,7 @@ type eventBuffer struct {
|
||||||
// newEventBuffer creates an eventBuffer ready for use.
|
// newEventBuffer creates an eventBuffer ready for use.
|
||||||
func newEventBuffer() *eventBuffer {
|
func newEventBuffer() *eventBuffer {
|
||||||
b := &eventBuffer{}
|
b := &eventBuffer{}
|
||||||
b.head.Store(newBufferItem())
|
b.head.Store(newBufferItem(nil))
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,10 +78,7 @@ func newEventBuffer() *eventBuffer {
|
||||||
// goroutines. Append only supports a single concurrent caller and must be
|
// goroutines. Append only supports a single concurrent caller and must be
|
||||||
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
|
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
|
||||||
func (b *eventBuffer) Append(events []Event) {
|
func (b *eventBuffer) Append(events []Event) {
|
||||||
// Push events to the head
|
b.AppendItem(newBufferItem(events))
|
||||||
it := newBufferItem()
|
|
||||||
it.Events = events
|
|
||||||
b.AppendBuffer(it)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendBuffer joins another buffer which may be the tail of a separate buffer
|
// AppendBuffer joins another buffer which may be the tail of a separate buffer
|
||||||
|
@ -92,7 +90,7 @@ func (b *eventBuffer) Append(events []Event) {
|
||||||
//
|
//
|
||||||
// AppendBuffer only supports a single concurrent caller and must be externally
|
// AppendBuffer only supports a single concurrent caller and must be externally
|
||||||
// synchronized with other Append, AppendBuffer or AppendErr calls.
|
// synchronized with other Append, AppendBuffer or AppendErr calls.
|
||||||
func (b *eventBuffer) AppendBuffer(item *bufferItem) {
|
func (b *eventBuffer) AppendItem(item *bufferItem) {
|
||||||
// First store it as the next node for the old head this ensures once it's
|
// First store it as the next node for the old head this ensures once it's
|
||||||
// visible to new searchers the linked list is already valid. Not sure it
|
// visible to new searchers the linked list is already valid. Not sure it
|
||||||
// matters but this seems nicer.
|
// matters but this seems nicer.
|
||||||
|
@ -105,15 +103,6 @@ func (b *eventBuffer) AppendBuffer(item *bufferItem) {
|
||||||
// don't set chan to nil since that will race with readers accessing it.
|
// don't set chan to nil since that will race with readers accessing it.
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendErr publishes an error result to the end of the buffer. This is
|
|
||||||
// considered terminal and will cause all subscribers to end their current
|
|
||||||
// streaming subscription and return the error. AppendErr only supports a
|
|
||||||
// single concurrent caller and must be externally synchronized with other
|
|
||||||
// Append, AppendBuffer or AppendErr calls.
|
|
||||||
func (b *eventBuffer) AppendErr(err error) {
|
|
||||||
b.AppendBuffer(&bufferItem{Err: err})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Head returns the current head of the buffer. It will always exist but it may
|
// Head returns the current head of the buffer. It will always exist but it may
|
||||||
// be a "sentinel" empty item with a nil Events slice to allow consumers to
|
// be a "sentinel" empty item with a nil Events slice to allow consumers to
|
||||||
// watch for the next update. Consumers should always check for empty Events and
|
// watch for the next update. Consumers should always check for empty Events and
|
||||||
|
@ -172,22 +161,23 @@ type bufferLink struct {
|
||||||
|
|
||||||
// newBufferItem returns a blank buffer item with a link and chan ready to have
|
// newBufferItem returns a blank buffer item with a link and chan ready to have
|
||||||
// the fields set and be appended to a buffer.
|
// the fields set and be appended to a buffer.
|
||||||
func newBufferItem() *bufferItem {
|
func newBufferItem(events []Event) *bufferItem {
|
||||||
return &bufferItem{
|
return &bufferItem{
|
||||||
link: &bufferLink{
|
link: &bufferLink{ch: make(chan struct{})},
|
||||||
ch: make(chan struct{}),
|
Events: events,
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next return the next buffer item in the buffer. It may block until ctx is
|
// Next return the next buffer item in the buffer. It may block until ctx is
|
||||||
// cancelled or until the next item is published.
|
// cancelled or until the next item is published.
|
||||||
func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) {
|
func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) {
|
||||||
// See if there is already a next value, block if so. Note we don't rely on
|
// See if there is already a next value, block if so. Note we don't rely on
|
||||||
// state change (chan nil) as that's not threadsafe but detecting close is.
|
// state change (chan nil) as that's not threadsafe but detecting close is.
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
|
case <-forceClose:
|
||||||
|
return nil, fmt.Errorf("subscription closed")
|
||||||
case <-i.link.ch:
|
case <-i.link.ch:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,45 +191,28 @@ func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) {
|
||||||
if next.Err != nil {
|
if next.Err != nil {
|
||||||
return nil, next.Err
|
return nil, next.Err
|
||||||
}
|
}
|
||||||
if len(next.Events) == 0 {
|
|
||||||
// Skip this event
|
|
||||||
return next.Next(ctx)
|
|
||||||
}
|
|
||||||
return next, nil
|
return next, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextNoBlock returns the next item in the buffer without blocking. If it
|
// NextNoBlock returns the next item in the buffer without blocking. If it
|
||||||
// reaches the most recent item it will return nil and no error.
|
// reaches the most recent item it will return nil.
|
||||||
func (i *bufferItem) NextNoBlock() (*bufferItem, error) {
|
func (i *bufferItem) NextNoBlock() *bufferItem {
|
||||||
nextRaw := i.link.next.Load()
|
nextRaw := i.link.next.Load()
|
||||||
if nextRaw == nil {
|
if nextRaw == nil {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
next := nextRaw.(*bufferItem)
|
return nextRaw.(*bufferItem)
|
||||||
if next.Err != nil {
|
|
||||||
return nil, next.Err
|
|
||||||
}
|
|
||||||
if len(next.Events) == 0 {
|
|
||||||
// Skip this event
|
|
||||||
return next.NextNoBlock()
|
|
||||||
}
|
|
||||||
return next, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FollowAfter returns either the next item in the buffer if there is already
|
// NextLink returns either the next item in the buffer if there is one, or
|
||||||
// one, or if not it returns an empty item (that will be ignored by subscribers)
|
// an empty item (that will be ignored by subscribers) that has a pointer to
|
||||||
// that has the same link as the current buffer so that it will be notified of
|
// the same link as this bufferItem (but none of the bufferItem content).
|
||||||
// future updates in the buffer without including the current item.
|
// When the link.ch is closed, subscriptions will be notified of the next item.
|
||||||
func (i *bufferItem) FollowAfter() (*bufferItem, error) {
|
func (i *bufferItem) NextLink() *bufferItem {
|
||||||
next, err := i.NextNoBlock()
|
next := i.NextNoBlock()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if next == nil {
|
if next == nil {
|
||||||
// Return an empty item that can be followed to the next item published.
|
// Return an empty item that can be followed to the next item published.
|
||||||
item := &bufferItem{}
|
return &bufferItem{link: i.link}
|
||||||
item.link = i.link
|
|
||||||
return item, nil
|
|
||||||
}
|
}
|
||||||
return next, nil
|
return next
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ func TestEventBufferFuzz(t *testing.T) {
|
||||||
item := head
|
item := head
|
||||||
var err error
|
var err error
|
||||||
for {
|
for {
|
||||||
item, err = item.Next(context.Background())
|
item, err = item.Next(context.Background(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
|
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
|
||||||
expect, err)
|
expect, err)
|
||||||
|
|
|
@ -7,8 +7,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EventPublisher receives changes events from Publish, and sends them to all
|
// EventPublisher receives change events from Publish, and sends the events to
|
||||||
// registered subscribers.
|
// all subscribers of the event Topic.
|
||||||
type EventPublisher struct {
|
type EventPublisher struct {
|
||||||
// snapCacheTTL controls how long we keep snapshots in our cache before
|
// snapCacheTTL controls how long we keep snapshots in our cache before
|
||||||
// allowing them to be garbage collected and a new one made for subsequent
|
// allowing them to be garbage collected and a new one made for subsequent
|
||||||
|
@ -60,6 +60,7 @@ type changeEvents struct {
|
||||||
|
|
||||||
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
||||||
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
||||||
|
// The nil Topic is reserved and should not be used.
|
||||||
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
||||||
|
|
||||||
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
||||||
|
@ -91,10 +92,8 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishEvents to all subscribers. tx is a read-only transaction that captures
|
// Publish events to all subscribers of the event Topic.
|
||||||
// the state at the time the change happened. The caller must never use the tx once
|
func (e *EventPublisher) Publish(events []Event) {
|
||||||
// it has been passed to PublishChanged.
|
|
||||||
func (e *EventPublisher) PublishEvents(events []Event) {
|
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
e.publishCh <- changeEvents{events: events}
|
e.publishCh <- changeEvents{events: events}
|
||||||
}
|
}
|
||||||
|
@ -146,22 +145,18 @@ func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe returns a new stream.Subscription for the given request. A
|
// Subscribe returns a new Subscription for the given request. A subscription
|
||||||
// subscription will stream an initial snapshot of events matching the request
|
// will receive an initial snapshot of events matching the request if req.Index > 0.
|
||||||
// if required and then block until new events that modify the request occur, or
|
// After the snapshot, events will be streamed as they are created.
|
||||||
// the context is cancelled. Subscriptions may be forced to reset if the server
|
// Subscriptions may be closed, forcing the client to resubscribe (for example if
|
||||||
// decides it can no longer maintain correct operation for example if ACL
|
// ACL policies changed or the state store is abandoned).
|
||||||
// policies changed or the state store was restored.
|
|
||||||
//
|
//
|
||||||
// When the caller is finished with the subscription for any reason, it must
|
// When the caller is finished with the subscription for any reason, it must
|
||||||
// call Subscription.Unsubscribe to free ACL tracking resources.
|
// call Subscription.Unsubscribe to free ACL tracking resources.
|
||||||
func (e *EventPublisher) Subscribe(
|
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
|
||||||
ctx context.Context,
|
|
||||||
req *SubscribeRequest,
|
|
||||||
) (*Subscription, error) {
|
|
||||||
// Ensure we know how to make a snapshot for this topic
|
// Ensure we know how to make a snapshot for this topic
|
||||||
_, ok := e.snapshotHandlers[req.Topic]
|
_, ok := e.snapshotHandlers[req.Topic]
|
||||||
if !ok {
|
if !ok || req.Topic == nil {
|
||||||
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,47 +171,35 @@ func (e *EventPublisher) Subscribe(
|
||||||
topicHead := buf.Head()
|
topicHead := buf.Head()
|
||||||
var sub *Subscription
|
var sub *Subscription
|
||||||
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
|
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
|
||||||
// No need for a snapshot, send the "resume stream" message to signal to
|
// No need for a snapshot, send the "end of empty snapshot" message to signal to
|
||||||
// client its cache is still good, then follow along from here in the topic.
|
// client its cache is still good, then follow along from here in the topic.
|
||||||
e := Event{
|
|
||||||
Index: req.Index,
|
|
||||||
Topic: req.Topic,
|
|
||||||
Key: req.Key,
|
|
||||||
Payload: endOfEmptySnapshot{},
|
|
||||||
}
|
|
||||||
// Make a new buffer to send to the client containing the resume.
|
|
||||||
buf := newEventBuffer()
|
buf := newEventBuffer()
|
||||||
|
|
||||||
// Store the head of that buffer before we append to it to give as the
|
// Store the head of that buffer before we append to it to give as the
|
||||||
// starting point for the subscription.
|
// starting point for the subscription.
|
||||||
subHead := buf.Head()
|
subHead := buf.Head()
|
||||||
|
|
||||||
buf.Append([]Event{e})
|
buf.Append([]Event{{
|
||||||
|
Index: req.Index,
|
||||||
|
Topic: req.Topic,
|
||||||
|
Key: req.Key,
|
||||||
|
Payload: endOfEmptySnapshot{},
|
||||||
|
}})
|
||||||
|
|
||||||
// Now splice the rest of the topic buffer on so the subscription will
|
// Now splice the rest of the topic buffer on so the subscription will
|
||||||
// continue to see future updates in the topic buffer.
|
// continue to see future updates in the topic buffer.
|
||||||
follow, err := topicHead.FollowAfter()
|
buf.AppendItem(topicHead.NextLink())
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
buf.AppendBuffer(follow)
|
|
||||||
|
|
||||||
sub = newSubscription(ctx, req, subHead)
|
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
|
||||||
} else {
|
} else {
|
||||||
snap, err := e.getSnapshotLocked(req, topicHead)
|
snap, err := e.getSnapshotLocked(req, topicHead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
sub = newSubscription(ctx, req, snap.Snap)
|
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
|
||||||
}
|
}
|
||||||
|
|
||||||
e.subscriptions.add(req, sub)
|
e.subscriptions.add(req, sub)
|
||||||
// Set unsubscribe so that the caller doesn't need to keep track of the
|
|
||||||
// SubscriptionRequest, and can not accidentally call unsubscribe with the
|
|
||||||
// wrong value.
|
|
||||||
sub.Unsubscribe = func() {
|
|
||||||
e.subscriptions.unsubscribe(req)
|
|
||||||
}
|
|
||||||
return sub, nil
|
return sub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,27 +222,30 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
||||||
for _, secretID := range tokenSecretIDs {
|
for _, secretID := range tokenSecretIDs {
|
||||||
if subs, ok := s.byToken[secretID]; ok {
|
if subs, ok := s.byToken[secretID]; ok {
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
sub.Close()
|
sub.forceClose()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// unsubscribe must be called when a client is no longer interested in a
|
// unsubscribe returns a function that the subscription will call to remove
|
||||||
// subscription to free resources monitoring changes in it's ACL token.
|
// itself from the subsByToken.
|
||||||
//
|
// This function is returned as a closure so that the caller doesn't need to keep
|
||||||
// req MUST be the same pointer that was used to register the subscription.
|
// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the
|
||||||
func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
|
// wrong pointer.
|
||||||
s.lock.Lock()
|
func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
|
||||||
defer s.lock.Unlock()
|
return func() {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
subsByToken, ok := s.byToken[req.Token]
|
subsByToken, ok := s.byToken[req.Token]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
delete(subsByToken, req)
|
delete(subsByToken, req)
|
||||||
if len(subsByToken) == 0 {
|
if len(subsByToken) == 0 {
|
||||||
delete(s.byToken, req.Token)
|
delete(s.byToken, req.Token)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,9 +26,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
|
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
|
||||||
sub, err := publisher.Subscribe(ctx, subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
eventCh := consumeSubscription(sub)
|
eventCh := consumeSubscription(ctx, sub)
|
||||||
|
|
||||||
result := nextResult(t, eventCh)
|
result := nextResult(t, eventCh)
|
||||||
require.NoError(t, result.Err)
|
require.NoError(t, result.Err)
|
||||||
|
@ -47,7 +47,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
||||||
Key: "sub-key",
|
Key: "sub-key",
|
||||||
Payload: "the-published-event-payload",
|
Payload: "the-published-event-payload",
|
||||||
}}
|
}}
|
||||||
publisher.PublishEvents(events)
|
publisher.Publish(events)
|
||||||
|
|
||||||
// Subscriber should see the published event
|
// Subscriber should see the published event
|
||||||
result = nextResult(t, eventCh)
|
result = nextResult(t, eventCh)
|
||||||
|
@ -68,11 +68,11 @@ func newTestSnapshotHandlers() SnapshotHandlers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func consumeSubscription(sub *Subscription) <-chan subNextResult {
|
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
|
||||||
eventCh := make(chan subNextResult, 1)
|
eventCh := make(chan subNextResult, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
es, err := sub.Next()
|
es, err := sub.Next(ctx)
|
||||||
eventCh <- subNextResult{
|
eventCh <- subNextResult{
|
||||||
Events: es,
|
Events: es,
|
||||||
Err: err,
|
Err: err,
|
||||||
|
|
|
@ -9,10 +9,10 @@ package stream
|
||||||
// collected automatically by Go's runtime. This simplifies snapshot and buffer
|
// collected automatically by Go's runtime. This simplifies snapshot and buffer
|
||||||
// management dramatically.
|
// management dramatically.
|
||||||
type eventSnapshot struct {
|
type eventSnapshot struct {
|
||||||
// Snap is the first item in the buffer containing the snapshot. Once the
|
// Head is the first item in the buffer containing the snapshot. Once the
|
||||||
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
|
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
|
||||||
// so that subscribers receive all the events from the same buffer.
|
// so that subscribers receive all the events from the same buffer.
|
||||||
Snap *bufferItem
|
Head *bufferItem
|
||||||
|
|
||||||
// snapBuffer is the Head of the snapshot buffer the fn should write to.
|
// snapBuffer is the Head of the snapshot buffer the fn should write to.
|
||||||
snapBuffer *eventBuffer
|
snapBuffer *eventBuffer
|
||||||
|
@ -30,14 +30,14 @@ type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
|
||||||
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot {
|
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot {
|
||||||
buf := newEventBuffer()
|
buf := newEventBuffer()
|
||||||
s := &eventSnapshot{
|
s := &eventSnapshot{
|
||||||
Snap: buf.Head(),
|
Head: buf.Head(),
|
||||||
snapBuffer: buf,
|
snapBuffer: buf,
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
idx, err := fn(req, s.snapBuffer)
|
idx, err := fn(req, s.snapBuffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.snapBuffer.AppendErr(err)
|
s.snapBuffer.AppendItem(&bufferItem{Err: err})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
|
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
|
||||||
|
@ -57,44 +57,33 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
|
||||||
// find the first event after the current snapshot.
|
// find the first event after the current snapshot.
|
||||||
item := topicBufferHead
|
item := topicBufferHead
|
||||||
for {
|
for {
|
||||||
// Find the next item that we should include.
|
next := item.NextNoBlock()
|
||||||
next, err := item.NextNoBlock()
|
switch {
|
||||||
if err != nil {
|
case next == nil:
|
||||||
// Append an error result to signal to subscribers that this snapshot is
|
|
||||||
// no good.
|
|
||||||
s.snapBuffer.AppendErr(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if next == nil {
|
|
||||||
// This is the head of the topic buffer (or was just now which is after
|
// This is the head of the topic buffer (or was just now which is after
|
||||||
// the snapshot completed). We don't want any of the events (if any) in
|
// the snapshot completed). We don't want any of the events (if any) in
|
||||||
// the snapshot buffer as they came before the snapshot but we do need to
|
// the snapshot buffer as they came before the snapshot but we do need to
|
||||||
// wait for the next update.
|
// wait for the next update.
|
||||||
follow, err := item.FollowAfter()
|
s.snapBuffer.AppendItem(item.NextLink())
|
||||||
if err != nil {
|
|
||||||
s.snapBuffer.AppendErr(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.snapBuffer.AppendBuffer(follow)
|
|
||||||
// We are done, subscribers will now follow future updates to the topic
|
|
||||||
// after reading the snapshot events.
|
|
||||||
return
|
return
|
||||||
}
|
|
||||||
|
|
||||||
if next.Err != nil {
|
case next.Err != nil:
|
||||||
s.snapBuffer.AppendErr(next.Err)
|
// This case is not currently possible because errors can only come
|
||||||
|
// from a snapshot func, and this is consuming events from a topic
|
||||||
|
// buffer which does not contain a snapshot.
|
||||||
|
// Handle this case anyway in case errors can come from other places
|
||||||
|
// in the future.
|
||||||
|
s.snapBuffer.AppendItem(next)
|
||||||
return
|
return
|
||||||
}
|
|
||||||
|
|
||||||
if len(next.Events) > 0 && next.Events[0].Index > idx {
|
case len(next.Events) > 0 && next.Events[0].Index > idx:
|
||||||
// We've found an update in the topic buffer that happened after our
|
// We've found an update in the topic buffer that happened after our
|
||||||
// snapshot was taken, splice it into the snapshot buffer so subscribers
|
// snapshot was taken, splice it into the snapshot buffer so subscribers
|
||||||
// can continue to read this and others after it.
|
// can continue to read this and others after it.
|
||||||
s.snapBuffer.AppendBuffer(next)
|
s.snapBuffer.AppendItem(next)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't need this item, continue to next
|
// We don't need this item, continue to next
|
||||||
item = next
|
item = next
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,11 +112,11 @@ func TestEventSnapshot(t *testing.T) {
|
||||||
snapIDs := make([]string, 0, tc.snapshotSize)
|
snapIDs := make([]string, 0, tc.snapshotSize)
|
||||||
updateIDs := make([]string, 0, tc.updatesAfterSnap)
|
updateIDs := make([]string, 0, tc.updatesAfterSnap)
|
||||||
snapDone := false
|
snapDone := false
|
||||||
curItem := es.Snap
|
curItem := es.Head
|
||||||
var err error
|
var err error
|
||||||
RECV:
|
RECV:
|
||||||
for {
|
for {
|
||||||
curItem, err = curItem.Next(ctx)
|
curItem, err = curItem.Next(ctx, nil)
|
||||||
// This error is typically timeout so dump the state to aid debugging.
|
// This error is typically timeout so dump the state to aid debugging.
|
||||||
require.NoError(t, err,
|
require.NoError(t, err,
|
||||||
"current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone,
|
"current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone,
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEvent_IsEndOfSnapshot(t *testing.T) {
|
||||||
|
e := Event{Payload: endOfSnapshot{}}
|
||||||
|
require.True(t, e.IsEndOfSnapshot())
|
||||||
|
|
||||||
|
t.Run("not EndOfSnapshot", func(t *testing.T) {
|
||||||
|
e := Event{Payload: endOfEmptySnapshot{}}
|
||||||
|
require.False(t, e.IsEndOfSnapshot())
|
||||||
|
})
|
||||||
|
}
|
|
@ -19,11 +19,10 @@ const (
|
||||||
|
|
||||||
// ErrSubscriptionClosed is a error signalling the subscription has been
|
// ErrSubscriptionClosed is a error signalling the subscription has been
|
||||||
// closed. The client should Unsubscribe, then re-Subscribe.
|
// closed. The client should Unsubscribe, then re-Subscribe.
|
||||||
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should unsub and retry")
|
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
|
||||||
|
|
||||||
// Subscription holds state about a single Subscribe call. Subscribe clients
|
// Subscription provides events on a Topic. Events may be filtered by Key.
|
||||||
// access their next event by calling Next(). This may initially include the
|
// Events are returned by Next(), and may start with a Snapshot of events.
|
||||||
// snapshot events to catch them up if they are new or behind.
|
|
||||||
type Subscription struct {
|
type Subscription struct {
|
||||||
// state is accessed atomically 0 means open, 1 means closed with reload
|
// state is accessed atomically 0 means open, 1 means closed with reload
|
||||||
state uint32
|
state uint32
|
||||||
|
@ -35,17 +34,15 @@ type Subscription struct {
|
||||||
// is mutated by calls to Next.
|
// is mutated by calls to Next.
|
||||||
currentItem *bufferItem
|
currentItem *bufferItem
|
||||||
|
|
||||||
// ctx is the Subscription context that wraps the context of the streaming RPC
|
// forceClosed is closed when forceClose is called. It is used by
|
||||||
// handler call.
|
// EventPublisher to cancel Next().
|
||||||
ctx context.Context
|
forceClosed chan struct{}
|
||||||
|
|
||||||
// cancelFn stores the context cancel function that will wake up the
|
// unsub is a function set by EventPublisher that is called to free resources
|
||||||
// in-progress Next call on a server-initiated state change e.g. Reload.
|
// when the subscription is no longer needed.
|
||||||
cancelFn func()
|
// It must be safe to call the function from multiple goroutines and the function
|
||||||
|
// must be idempotent.
|
||||||
// Unsubscribe is a function set by EventPublisher that is called to
|
unsub func()
|
||||||
// free resources when the subscription is no longer needed.
|
|
||||||
Unsubscribe func()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeRequest identifies the types of events the subscriber would like to
|
// SubscribeRequest identifies the types of events the subscriber would like to
|
||||||
|
@ -59,74 +56,81 @@ type SubscribeRequest struct {
|
||||||
|
|
||||||
// newSubscription return a new subscription. The caller is responsible for
|
// newSubscription return a new subscription. The caller is responsible for
|
||||||
// calling Unsubscribe when it is done with the subscription, to free resources.
|
// calling Unsubscribe when it is done with the subscription, to free resources.
|
||||||
func newSubscription(ctx context.Context, req *SubscribeRequest, item *bufferItem) *Subscription {
|
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
|
||||||
subCtx, cancel := context.WithCancel(ctx)
|
|
||||||
return &Subscription{
|
return &Subscription{
|
||||||
ctx: subCtx,
|
forceClosed: make(chan struct{}),
|
||||||
cancelFn: cancel,
|
|
||||||
req: req,
|
req: req,
|
||||||
currentItem: item,
|
currentItem: item,
|
||||||
|
unsub: unsub,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next returns the next set of events to deliver. It must only be called from a
|
// Next returns the next set of events to deliver. It must only be called from a
|
||||||
// single goroutine concurrently as it mutates the Subscription.
|
// single goroutine concurrently as it mutates the Subscription.
|
||||||
func (s *Subscription) Next() ([]Event, error) {
|
func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
|
||||||
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
|
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
|
||||||
return nil, ErrSubscriptionClosed
|
return nil, ErrSubscriptionClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
next, err := s.currentItem.Next(s.ctx)
|
next, err := s.currentItem.Next(ctx, s.forceClosed)
|
||||||
if err != nil {
|
switch {
|
||||||
// Check we didn't return because of a state change cancelling the context
|
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
|
||||||
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
|
return nil, ErrSubscriptionClosed
|
||||||
return nil, ErrSubscriptionClosed
|
case err != nil:
|
||||||
}
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Advance our cursor for next loop or next call
|
|
||||||
s.currentItem = next
|
s.currentItem = next
|
||||||
|
|
||||||
// Assume happy path where all events (or none) are relevant.
|
events := s.filter(next.Events)
|
||||||
allMatch := true
|
if len(events) == 0 {
|
||||||
|
continue
|
||||||
// If there is a specific key, see if we need to filter any events
|
|
||||||
if s.req.Key != "" {
|
|
||||||
for _, e := range next.Events {
|
|
||||||
if s.req.Key != e.Key {
|
|
||||||
allMatch = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return events, nil
|
||||||
// Only if we need to filter events should we bother allocating a new slice
|
|
||||||
// as this is a hot loop.
|
|
||||||
events := next.Events
|
|
||||||
if !allMatch {
|
|
||||||
events = make([]Event, 0, len(next.Events))
|
|
||||||
for _, e := range next.Events {
|
|
||||||
// Only return it if the key matches.
|
|
||||||
if s.req.Key == "" || s.req.Key == e.Key {
|
|
||||||
events = append(events, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(events) > 0 {
|
|
||||||
return events, nil
|
|
||||||
}
|
|
||||||
// Keep looping until we find some events we are interested in.
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: test cases for this method
|
||||||
|
func (s *Subscription) filter(events []Event) []Event {
|
||||||
|
if s.req.Key == "" || len(events) == 0 {
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
|
allMatch := true
|
||||||
|
for _, e := range events {
|
||||||
|
if s.req.Key != e.Key {
|
||||||
|
allMatch = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only allocate a new slice if some events need to be filtered out
|
||||||
|
if allMatch {
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: this will over-allocate. We could get a count from the previous range
|
||||||
|
// over events.
|
||||||
|
events = make([]Event, 0, len(events))
|
||||||
|
for _, e := range events {
|
||||||
|
if s.req.Key == e.Key {
|
||||||
|
events = append(events, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
// Close the subscription. Subscribers will receive an error when they call Next,
|
// Close the subscription. Subscribers will receive an error when they call Next,
|
||||||
// and will need to perform a new Subscribe request.
|
// and will need to perform a new Subscribe request.
|
||||||
// It is safe to call from any goroutine.
|
// It is safe to call from any goroutine.
|
||||||
func (s *Subscription) Close() {
|
func (s *Subscription) forceClose() {
|
||||||
swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
|
swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
|
||||||
if swapped {
|
if swapped {
|
||||||
s.cancelFn()
|
close(s.forceClosed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unsubscribe the subscription, freeing resources.
|
||||||
|
func (s *Subscription) Unsubscribe() {
|
||||||
|
s.unsub()
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func noopUnSub() {}
|
||||||
|
|
||||||
func TestSubscription(t *testing.T) {
|
func TestSubscription(t *testing.T) {
|
||||||
eb := newEventBuffer()
|
eb := newEventBuffer()
|
||||||
|
|
||||||
|
@ -26,11 +28,11 @@ func TestSubscription(t *testing.T) {
|
||||||
Topic: testTopic,
|
Topic: testTopic,
|
||||||
Key: "test",
|
Key: "test",
|
||||||
}
|
}
|
||||||
sub := newSubscription(ctx, req, startHead)
|
sub := newSubscription(req, startHead, noopUnSub)
|
||||||
|
|
||||||
// First call to sub.Next should return our published event immediately
|
// First call to sub.Next should return our published event immediately
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
got, err := sub.Next()
|
got, err := sub.Next(ctx)
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, elapsed < 200*time.Millisecond,
|
require.True(t, elapsed < 200*time.Millisecond,
|
||||||
|
@ -46,7 +48,7 @@ func TestSubscription(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Next call should block until event is delivered
|
// Next call should block until event is delivered
|
||||||
got, err = sub.Next()
|
got, err = sub.Next(ctx)
|
||||||
elapsed = time.Since(start)
|
elapsed = time.Since(start)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, elapsed > 200*time.Millisecond,
|
require.True(t, elapsed > 200*time.Millisecond,
|
||||||
|
@ -64,7 +66,7 @@ func TestSubscription(t *testing.T) {
|
||||||
publishTestEvent(index, eb, "test")
|
publishTestEvent(index, eb, "test")
|
||||||
|
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
got, err = sub.Next()
|
got, err = sub.Next(ctx)
|
||||||
elapsed = time.Since(start)
|
elapsed = time.Since(start)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, elapsed < 200*time.Millisecond,
|
require.True(t, elapsed < 200*time.Millisecond,
|
||||||
|
@ -79,7 +81,7 @@ func TestSubscription(t *testing.T) {
|
||||||
cancel()
|
cancel()
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err = sub.Next()
|
_, err = sub.Next(ctx)
|
||||||
elapsed = time.Since(start)
|
elapsed = time.Since(start)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.True(t, elapsed > 200*time.Millisecond,
|
require.True(t, elapsed > 200*time.Millisecond,
|
||||||
|
@ -106,11 +108,11 @@ func TestSubscription_Close(t *testing.T) {
|
||||||
Topic: testTopic,
|
Topic: testTopic,
|
||||||
Key: "test",
|
Key: "test",
|
||||||
}
|
}
|
||||||
sub := newSubscription(ctx, req, startHead)
|
sub := newSubscription(req, startHead, noopUnSub)
|
||||||
|
|
||||||
// First call to sub.Next should return our published event immediately
|
// First call to sub.Next should return our published event immediately
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
got, err := sub.Next()
|
got, err := sub.Next(ctx)
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, elapsed < 200*time.Millisecond,
|
require.True(t, elapsed < 200*time.Millisecond,
|
||||||
|
@ -122,10 +124,10 @@ func TestSubscription_Close(t *testing.T) {
|
||||||
// needs to reset (e.g. on ACL perm change).
|
// needs to reset (e.g. on ACL perm change).
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
time.AfterFunc(200*time.Millisecond, func() {
|
time.AfterFunc(200*time.Millisecond, func() {
|
||||||
sub.Close()
|
sub.forceClose()
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err = sub.Next()
|
_, err = sub.Next(ctx)
|
||||||
elapsed = time.Since(start)
|
elapsed = time.Since(start)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, ErrSubscriptionClosed, err)
|
require.Equal(t, ErrSubscriptionClosed, err)
|
||||||
|
|
Loading…
Reference in New Issue