stream: unexport identifiers

Now that EventPublisher is part of stream a lot of the internals can be hidden
pull/8160/head
Daniel Nephin 4 years ago
parent 4fa0fdc0e0
commit 889c57fd2d

@ -52,7 +52,7 @@ func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Even
}
}
// TODO: should we remove duplicate IDs here, or rely on sub.Close() being idempotent
return []stream.Event{stream.NewUnsubscribeEvent(secretIDs)}, nil
return []stream.Event{stream.NewCloseSubscriptionEvent(secretIDs)}, nil
}
// changeObject returns the object before it was deleted if the change was a delete,

@ -23,7 +23,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
Mutate: func(s *Store, tx *txn) error {
return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false)
},
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "token update",
@ -37,7 +37,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}}
return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false)
},
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "token delete",
@ -48,13 +48,13 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
token := newACLToken(1)
return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil)
},
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "policy create",
Mutate: newACLPolicyWithSingleToken,
// two identical tokens, because Mutate has two changes
expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)),
},
{
Name: "policy update",
@ -64,7 +64,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
policy.Rules = `operator = "write"`
return s.aclPolicySetTxn(tx, tx.Index, policy)
},
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "policy delete",
@ -73,13 +73,13 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
policy := newACLPolicy(1)
return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil)
},
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "role create",
Mutate: newACLRoleWithSingleToken,
// Two tokens with the same ID, because there are two changes in Mutate
expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)),
},
{
Name: "role update",
@ -93,7 +93,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
})
return s.aclRoleSetTxn(tx, tx.Index, role, true)
},
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
{
Name: "role delete",
@ -102,7 +102,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
role := newACLRole(1, newACLRolePolicyLink(1))
return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil)
},
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
},
}

@ -62,7 +62,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Ensure the reset event was sent.
err = assertErr(t, eventCh)
require.Equal(stream.ErrSubscriptionReload, err)
require.Equal(stream.ErrSubscriptionClosed, err)
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
@ -90,7 +90,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Ensure the reset event was sent.
err = assertErr(t, eventCh2)
require.Equal(stream.ErrSubscriptionReload, err)
require.Equal(stream.ErrSubscriptionClosed, err)
}
func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
@ -175,7 +175,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Ensure the reload event was sent.
err = assertErr(t, eventCh)
require.Equal(stream.ErrSubscriptionReload, err)
require.Equal(stream.ErrSubscriptionClosed, err)
// Register another subscription.
subscription3 := &stream.SubscribeRequest{
@ -362,7 +362,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
}
}
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionReload, next.Err)
require.Equal(t, stream.ErrSubscriptionClosed, next.Err)
return
case <-timeoutCh:
t.Fatalf("no err after 100ms")
@ -390,7 +390,7 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) {
Snapshot: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
@ -403,7 +403,7 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
Index: node.ModifyIndex,
Payload: node,
}
buffer.Append([]stream.Event{event})
snap.Append([]stream.Event{event})
}
return idx, nil
},

@ -1,3 +1,7 @@
/*
Package stream provides a publish/subscribe system for events produced by changes
to the state store.
*/
package stream
type Topic int32
@ -24,21 +28,21 @@ func (e Event) IsEndOfSnapshot() bool {
}
func (e Event) IsResumeStream() bool {
return e.Payload == ResumeStream{}
return e.Payload == resumeStream{}
}
type endOfSnapshot struct{}
type ResumeStream struct{}
type resumeStream struct{}
// TODO: unexport once EventPublisher is in stream package
type UnsubscribePayload struct {
TokensSecretIDs []string
type closeSubscriptionPayload struct {
tokensSecretIDs []string
}
// NewUnsubscribeEvent returns a special Event that is handled by the
// stream package, and is never sent to subscribers. It results in any subscriptions
// which match any of the TokenSecretIDs to be unsubscribed.
func NewUnsubscribeEvent(tokenSecretIDs []string) Event {
return Event{Payload: UnsubscribePayload{TokensSecretIDs: tokenSecretIDs}}
// NewCloseSubscriptionEvent returns a special Event that is handled by the
// stream package, and is never sent to subscribers. EventProcessor handles
// these events, and closes any subscriptions which were created using a token
// which matches any of the tokenSecretIDs.
func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event {
return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}}
}

@ -6,7 +6,7 @@ import (
"sync/atomic"
)
// EventBuffer is a single-writer, multiple-reader, unlimited length concurrent
// eventBuffer is a single-writer, multiple-reader, unlimited length concurrent
// buffer of events that have been published on a topic. The buffer is
// effectively just the head of an atomically updated single-linked list. Atomic
// accesses are usually to be suspected as premature optimization but this
@ -27,7 +27,7 @@ import (
//
// The buffer is used to deliver all messages broadcast toa topic for active
// subscribers to consume, but it is also an effective way to both deliver and
// optionally cache snapshots per topic and key. byt using an EventBuffer,
// optionally cache snapshots per topic and key. byt using an eventBuffer,
// snapshot functions don't have to read the whole snapshot into memory before
// delivery - they can stream from memdb. However simply by storing a pointer to
// the first event in the buffer, we can cache the buffered events for future
@ -46,26 +46,26 @@ import (
// automatically keep the events we need to make that work for exactly the
// optimal amount of time and no longer.
//
// A new buffer is constructed with a sentinel "empty" BufferItem that has a nil
// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil
// Events array. This enables subscribers to start watching for the next update
// immediately.
//
// The zero value EventBuffer is _not_ a usable type since it has not been
// The zero value eventBuffer is _not_ a usable type since it has not been
// initialized with an empty bufferItem so can't be used to wait for the first
// published event. Call NewEventBuffer to construct a new buffer.
// published event. Call newEventBuffer to construct a new buffer.
//
// Calls to Append or AppendBuffer that mutate the head must be externally
// synchronized. This allows systems that already serialize writes to append
// without lock overhead (e.g. a snapshot goroutine appending thousands of
// events).
type EventBuffer struct {
type eventBuffer struct {
head atomic.Value
}
// NewEventBuffer creates an EventBuffer ready for use.
func NewEventBuffer() *EventBuffer {
b := &EventBuffer{}
b.head.Store(NewBufferItem())
// newEventBuffer creates an eventBuffer ready for use.
func newEventBuffer() *eventBuffer {
b := &eventBuffer{}
b.head.Store(newBufferItem())
return b
}
@ -76,9 +76,9 @@ func NewEventBuffer() *EventBuffer {
// mutations to the events as they may have been exposed to subscribers in other
// goroutines. Append only supports a single concurrent caller and must be
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *EventBuffer) Append(events []Event) {
func (b *eventBuffer) Append(events []Event) {
// Push events to the head
it := NewBufferItem()
it := newBufferItem()
it.Events = events
b.AppendBuffer(it)
}
@ -92,7 +92,7 @@ func (b *EventBuffer) Append(events []Event) {
//
// AppendBuffer only supports a single concurrent caller and must be externally
// synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *EventBuffer) AppendBuffer(item *BufferItem) {
func (b *eventBuffer) AppendBuffer(item *bufferItem) {
// First store it as the next node for the old head this ensures once it's
// visible to new searchers the linked list is already valid. Not sure it
// matters but this seems nicer.
@ -110,20 +110,20 @@ func (b *EventBuffer) AppendBuffer(item *BufferItem) {
// streaming subscription and return the error. AppendErr only supports a
// single concurrent caller and must be externally synchronized with other
// Append, AppendBuffer or AppendErr calls.
func (b *EventBuffer) AppendErr(err error) {
b.AppendBuffer(&BufferItem{Err: err})
func (b *eventBuffer) AppendErr(err error) {
b.AppendBuffer(&bufferItem{Err: err})
}
// Head returns the current head of the buffer. It will always exist but it may
// be a "sentinel" empty item with a nil Events slice to allow consumers to
// watch for the next update. Consumers should always check for empty Events and
// treat them as no-ops. Will panic if EventBuffer was not initialized correctly
// with EventBuffer.
func (b *EventBuffer) Head() *BufferItem {
return b.head.Load().(*BufferItem)
// treat them as no-ops. Will panic if eventBuffer was not initialized correctly
// with eventBuffer.
func (b *eventBuffer) Head() *bufferItem {
return b.head.Load().(*bufferItem)
}
// BufferItem represents a set of events published by a single raft operation.
// bufferItem represents a set of events published by a single raft operation.
// The first item returned by a newly constructed buffer will have nil Events.
// It is a sentinel value which is used to wait on the next events via Next.
//
@ -135,9 +135,9 @@ func (b *EventBuffer) Head() *BufferItem {
// they have been delivered except where it's intentional to maintain a cache or
// trailing store of events for performance reasons.
//
// Subscribers must not mutate the BufferItem or the Events or Encoded payloads
// Subscribers must not mutate the bufferItem or the Events or Encoded payloads
// inside as these are shared between all readers.
type BufferItem struct {
type bufferItem struct {
// Events is the set of events published at one raft index. This may be nil as
// a sentinel value to allow watching for the first event in a buffer. Callers
// should check and skip nil Events at any point in the buffer. It will also
@ -170,10 +170,10 @@ type bufferLink struct {
ch chan struct{}
}
// NewBufferItem returns a blank buffer item with a link and chan ready to have
// newBufferItem returns a blank buffer item with a link and chan ready to have
// the fields set and be appended to a buffer.
func NewBufferItem() *BufferItem {
return &BufferItem{
func newBufferItem() *bufferItem {
return &bufferItem{
link: &bufferLink{
ch: make(chan struct{}),
},
@ -182,7 +182,7 @@ func NewBufferItem() *BufferItem {
// Next return the next buffer item in the buffer. It may block until ctx is
// cancelled or until the next item is published.
func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) {
func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) {
// See if there is already a next value, block if so. Note we don't rely on
// state change (chan nil) as that's not threadsafe but detecting close is.
select {
@ -197,7 +197,7 @@ func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) {
// shouldn't be possible
return nil, errors.New("invalid next item")
}
next := nextRaw.(*BufferItem)
next := nextRaw.(*bufferItem)
if next.Err != nil {
return nil, next.Err
}
@ -210,12 +210,12 @@ func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) {
// NextNoBlock returns the next item in the buffer without blocking. If it
// reaches the most recent item it will return nil and no error.
func (i *BufferItem) NextNoBlock() (*BufferItem, error) {
func (i *bufferItem) NextNoBlock() (*bufferItem, error) {
nextRaw := i.link.next.Load()
if nextRaw == nil {
return nil, nil
}
next := nextRaw.(*BufferItem)
next := nextRaw.(*bufferItem)
if next.Err != nil {
return nil, next.Err
}
@ -230,14 +230,14 @@ func (i *BufferItem) NextNoBlock() (*BufferItem, error) {
// one, or if not it returns an empty item (that will be ignored by subscribers)
// that has the same link as the current buffer so that it will be notified of
// future updates in the buffer without including the current item.
func (i *BufferItem) FollowAfter() (*BufferItem, error) {
func (i *bufferItem) FollowAfter() (*bufferItem, error) {
next, err := i.NextNoBlock()
if err != nil {
return nil, err
}
if next == nil {
// Return an empty item that can be followed to the next item published.
item := &BufferItem{}
item := &bufferItem{}
item.link = i.link
return item, nil
}

@ -20,7 +20,7 @@ func TestEventBufferFuzz(t *testing.T) {
nReaders := 1000
nMessages := 1000
b := NewEventBuffer()
b := newEventBuffer()
// Start a write goroutine that will publish 10000 messages with sequential
// indexes and some jitter in timing (to allow clients to "catch up" and block

@ -33,11 +33,11 @@ type EventPublisher struct {
// topicBuffers stores the head of the linked-list buffer to publish events to
// for a topic.
topicBuffers map[Topic]*EventBuffer
topicBuffers map[Topic]*eventBuffer
// snapCache if a cache of EventSnapshots indexed by topic and key.
// TODO: new struct for snapCache and snapFns and snapCacheTTL
snapCache map[Topic]map[string]*EventSnapshot
snapCache map[Topic]map[string]*eventSnapshot
subscriptions *subscriptions
@ -69,13 +69,20 @@ type changeEvents struct {
// TopicHandler provides functions which create stream.Events for a topic.
type TopicHandler struct {
// Snapshot creates the necessary events to reproduce the current state and
// appends them to the EventBuffer.
Snapshot func(*SubscribeRequest, *EventBuffer) (index uint64, err error)
// appends them to the eventBuffer.
Snapshot func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
// ProcessChanges accepts a slice of Changes, and builds a slice of events for
// those changes.
ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error)
}
// SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotAppender interface {
// Append events to the snapshot.
// TODO: document why parameter is a slice instead of a single Event
Append(events []Event)
}
// NewEventPublisher returns an EventPublisher for publishing change events.
// Handlers are used to convert the memDB changes into events.
// A goroutine is run in the background to publish events to all subscribes.
@ -84,8 +91,8 @@ type TopicHandler struct {
func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*EventBuffer),
snapCache: make(map[Topic]map[string]*EventSnapshot),
topicBuffers: make(map[Topic]*eventBuffer),
snapCache: make(map[Topic]map[string]*eventSnapshot),
publishCh: make(chan changeEvents, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
@ -136,8 +143,8 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
// as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update changeEvents) {
for _, event := range update.events {
if unsubEvent, ok := event.Payload.(UnsubscribePayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs)
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
}
}
@ -160,10 +167,10 @@ func (e *EventPublisher) sendEvents(update changeEvents) {
// already exist.
//
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getTopicBuffer(topic Topic) *EventBuffer {
func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
buf, ok := e.topicBuffers[topic]
if !ok {
buf = NewEventBuffer()
buf = newEventBuffer()
e.topicBuffers[topic] = buf
}
return buf
@ -207,10 +214,10 @@ func (e *EventPublisher) Subscribe(
Index: req.Index,
Topic: req.Topic,
Key: req.Key,
Payload: ResumeStream{},
Payload: resumeStream{},
}
// Make a new buffer to send to the client containing the resume.
buf := NewEventBuffer()
buf := newEventBuffer()
// Store the head of that buffer before we append to it to give as the
// starting point for the subscription.
@ -226,13 +233,13 @@ func (e *EventPublisher) Subscribe(
}
buf.AppendBuffer(follow)
sub = NewSubscription(ctx, req, subHead)
sub = newSubscription(ctx, req, subHead)
} else {
snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil {
return nil, err
}
sub = NewSubscription(ctx, req, snap.Snap)
sub = newSubscription(ctx, req, snap.Snap)
}
e.subscriptions.add(req, sub)
@ -288,16 +295,16 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
}
}
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *BufferItem) (*EventSnapshot, error) {
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
// See if there is a cached snapshot
topicSnaps, ok := e.snapCache[req.Topic]
if !ok {
topicSnaps = make(map[string]*EventSnapshot)
topicSnaps = make(map[string]*eventSnapshot)
e.snapCache[req.Topic] = topicSnaps
}
snap, ok := topicSnaps[req.Key]
if ok && snap.Err() == nil {
if ok && snap.err() == nil {
return snap, nil
}
@ -307,7 +314,7 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *Buf
return nil, fmt.Errorf("unknown topic %d", req.Topic)
}
snap = NewEventSnapshot(req, topicHead, handler.Snapshot)
snap = newEventSnapshot(req, topicHead, handler.Snapshot)
if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap

@ -51,7 +51,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
func newTestTopicHandlers() map[Topic]TopicHandler {
return map[Topic]TopicHandler{
testTopic: {
Snapshot: func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) {
Snapshot: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}

@ -1,37 +1,35 @@
package stream
// EventSnapshot represents the state of memdb for a given topic and key at some
// eventSnapshot represents the state of memdb for a given topic and key at some
// point in time. It is modelled as a buffer of events so that snapshots can be
// streamed to possibly multiple subscribers concurrently, and can be trivially
// cached by retaining a reference to a Snapshot. Once the reference to EventSnapshot
// cached by retaining a reference to a Snapshot. Once the reference to eventSnapshot
// is dropped from memory, any subscribers still reading from it may do so by following
// their pointers. When the last subscribe unsubscribes the snapshot is garbage
// collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically.
type EventSnapshot struct {
type eventSnapshot struct {
// Snap is the first item in the buffer containing the snapshot. Once the
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
// so that subscribers receive all the events from the same buffer.
Snap *BufferItem
Snap *bufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to.
snapBuffer *EventBuffer
snapBuffer *eventBuffer
}
// SnapFn is the type of function needed to generate a snapshot for a topic and
// key.
type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error)
type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
// NewEventSnapshot creates a snapshot buffer based on the subscription request.
// newEventSnapshot creates a snapshot buffer based on the subscription request.
// The current buffer head for the topic in question is passed so that once the
// snapshot is complete and has been delivered into the buffer, any events
// published during snapshotting can be immediately appended and won't be
// missed. Once the snapshot is delivered the topic buffer is spliced onto the
// snapshot buffer so that subscribers will naturally follow from the snapshot
// to wait for any subsequent updates.
func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot {
buf := NewEventBuffer()
s := &EventSnapshot{
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot {
buf := newEventBuffer()
s := &eventSnapshot{
Snap: buf.Head(),
snapBuffer: buf,
}
@ -54,7 +52,7 @@ func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn Sna
return s
}
func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx uint64) {
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
// Now splice on the topic buffer. We need to iterate through the buffer to
// find the first event after the current snapshot.
item := topicBufferHead
@ -102,10 +100,10 @@ func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx u
}
}
// Err returns an error if the snapshot func has failed with an error or nil
// err returns an error if the snapshot func has failed with an error or nil
// otherwise. Nil doesn't necessarily mean there won't be an error but there
// hasn't been one yet.
func (s *EventSnapshot) Err() error {
func (s *eventSnapshot) err() error {
// Fetch the head of the buffer, this is atomic. If the snapshot func errored
// then the last event will be an error.
head := s.snapBuffer.Head()

@ -70,10 +70,10 @@ func TestEventSnapshot(t *testing.T) {
snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex)
// Create a topic buffer for updates
tb := NewEventBuffer()
tb := newEventBuffer()
// Capture the topic buffer head now so updatesBeforeSnap are "concurrent"
// and are seen by the EventSnapshot once it completes the snap.
// and are seen by the eventSnapshot once it completes the snap.
tbHead := tb.Head()
// Deliver any pre-snapshot events simulating updates that occur after the
@ -87,9 +87,9 @@ func TestEventSnapshot(t *testing.T) {
tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
}
// Create EventSnapshot, (will call snFn in another goroutine). The
// Request is ignored by the SnapFn so doesn't matter for now.
es := NewEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
// Create eventSnapshot, (will call snFn in another goroutine). The
// Request is ignored by the snapFunc so doesn't matter for now.
es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
// Deliver any post-snapshot events simulating updates that occur
// logically after snapshot. It doesn't matter that these might actually
@ -155,8 +155,8 @@ func genSequentialIDs(start, end int) []string {
return ids
}
func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn {
return func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) {
func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc {
return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
for i := 0; i < size; i++ {
// Event content is arbitrary we are just using Health because it's the
// first type defined. We just want a set of things with consecutive

@ -17,11 +17,9 @@ const (
subscriptionStateClosed uint32 = 1
)
var (
// ErrSubscriptionReload is a error signalling reload event should be sent to
// the client and the server should close.
ErrSubscriptionReload = errors.New("subscription closed by server, client should retry")
)
// ErrSubscriptionClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should unsub and retry")
// Subscription holds state about a single Subscribe call. Subscribe clients
// access their next event by calling Next(). This may initially include the
@ -35,7 +33,7 @@ type Subscription struct {
// currentItem stores the current snapshot or topic buffer item we are on. It
// is mutated by calls to Next.
currentItem *BufferItem
currentItem *bufferItem
// ctx is the Subscription context that wraps the context of the streaming RPC
// handler call.
@ -59,9 +57,9 @@ type SubscribeRequest struct {
Index uint64
}
// NewSubscription return a new subscription. The caller is responsible for
// newSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources.
func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription {
func newSubscription(ctx context.Context, req *SubscribeRequest, item *bufferItem) *Subscription {
subCtx, cancel := context.WithCancel(ctx)
return &Subscription{
ctx: subCtx,
@ -75,7 +73,7 @@ func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferIte
// single goroutine concurrently as it mutates the Subscription.
func (s *Subscription) Next() ([]Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionReload
return nil, ErrSubscriptionClosed
}
for {
@ -83,7 +81,7 @@ func (s *Subscription) Next() ([]Event, error) {
if err != nil {
// Check we didn't return because of a state change cancelling the context
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionReload
return nil, ErrSubscriptionClosed
}
return nil, err
}

@ -9,7 +9,7 @@ import (
)
func TestSubscription(t *testing.T) {
eb := NewEventBuffer()
eb := newEventBuffer()
index := uint64(100)
@ -26,7 +26,7 @@ func TestSubscription(t *testing.T) {
Topic: Topic_ServiceHealth,
Key: "test",
}
sub := NewSubscription(ctx, req, startHead)
sub := newSubscription(ctx, req, startHead)
// First call to sub.Next should return our published event immediately
start := time.Now()
@ -89,7 +89,7 @@ func TestSubscription(t *testing.T) {
}
func TestSubscription_Close(t *testing.T) {
eb := NewEventBuffer()
eb := newEventBuffer()
index := uint64(100)
@ -106,7 +106,7 @@ func TestSubscription_Close(t *testing.T) {
Topic: Topic_ServiceHealth,
Key: "test",
}
sub := NewSubscription(ctx, req, startHead)
sub := newSubscription(ctx, req, startHead)
// First call to sub.Next should return our published event immediately
start := time.Now()
@ -128,14 +128,14 @@ func TestSubscription_Close(t *testing.T) {
_, err = sub.Next()
elapsed = time.Since(start)
require.Error(t, err)
require.Equal(t, ErrSubscriptionReload, err)
require.Equal(t, ErrSubscriptionClosed, err)
require.True(t, elapsed > 200*time.Millisecond,
"Reload should have happened after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Reload should have been delivered after short time, took %s", elapsed)
}
func publishTestEvent(index uint64, b *EventBuffer, key string) {
func publishTestEvent(index uint64, b *eventBuffer, key string) {
// Don't care about the event payload for now just the semantics of publishing
// something. This is not a valid stream in the end-to-end streaming protocol
// but enough to test subscription mechanics.

Loading…
Cancel
Save