Merge pull request #8799 from hashicorp/streaming/rename-framing-events

stream: remove EndOfEmptySnapshot, add NewSnapshotToFollow
pull/8838/head
Daniel Nephin 2020-10-06 12:42:58 -04:00 committed by GitHub
commit 364f6589c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 487 additions and 243 deletions

View File

@ -20,22 +20,22 @@ type Event struct {
} }
// IsEndOfSnapshot returns true if this is a framing event that indicates the // IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Future events from Subscription.Next will be // snapshot has completed. Subsequent events from Subscription.Next will be
// change events. // streamed as they occur.
func (e Event) IsEndOfSnapshot() bool { func (e Event) IsEndOfSnapshot() bool {
return e.Payload == endOfSnapshot{} return e.Payload == endOfSnapshot{}
} }
// IsEndOfEmptySnapshot returns true if this is a framing event that indicates // IsNewSnapshotToFollow returns true if this is a framing event that indicates
// there is no snapshot. Future events from Subscription.Next will be // that the clients view is stale, and must be reset. Subsequent events from
// change events. // Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.
func (e Event) IsEndOfEmptySnapshot() bool { func (e Event) IsNewSnapshotToFollow() bool {
return e.Payload == endOfEmptySnapshot{} return e.Payload == newSnapshotToFollow{}
} }
type endOfSnapshot struct{} type endOfSnapshot struct{}
type endOfEmptySnapshot struct{} type newSnapshotToFollow struct{}
type closeSubscriptionPayload struct { type closeSubscriptionPayload struct {
tokensSecretIDs []string tokensSecretIDs []string

View File

@ -216,3 +216,9 @@ func (i *bufferItem) NextLink() *bufferItem {
} }
return next return next
} }
// HasEventIndex returns true if index matches the Event.Index of this item. Returns
// false if there are no events stored in the item, or the index does not match.
func (i *bufferItem) HasEventIndex(index uint64) bool {
return len(i.Events) > 0 && i.Events[0].Index == index
}

View File

@ -36,7 +36,7 @@ type EventPublisher struct {
// publishCh is used to send messages from an active txn to a goroutine which // publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from // publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path. // the Commit call in the FSM hot path.
publishCh chan changeEvents publishCh chan []Event
snapshotHandlers SnapshotHandlers snapshotHandlers SnapshotHandlers
} }
@ -54,10 +54,6 @@ type subscriptions struct {
byToken map[string]map[*SubscribeRequest]*Subscription byToken map[string]map[*SubscribeRequest]*Subscription
} }
type changeEvents struct {
events []Event
}
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot // SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. // of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// The nil Topic is reserved and should not be used. // The nil Topic is reserved and should not be used.
@ -84,7 +80,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
snapCacheTTL: snapCacheTTL, snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer), topicBuffers: make(map[Topic]*eventBuffer),
snapCache: make(map[Topic]map[string]*eventSnapshot), snapCache: make(map[Topic]map[string]*eventSnapshot),
publishCh: make(chan changeEvents, 64), publishCh: make(chan []Event, 64),
subscriptions: &subscriptions{ subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription), byToken: make(map[string]map[*SubscribeRequest]*Subscription),
}, },
@ -97,7 +93,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
// Publish events to all subscribers of the event Topic. // Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(events []Event) { func (e *EventPublisher) Publish(events []Event) {
if len(events) > 0 { if len(events) > 0 {
e.publishCh <- changeEvents{events: events} e.publishCh <- events
} }
} }
@ -110,16 +106,16 @@ func (e *EventPublisher) Run(ctx context.Context) {
e.subscriptions.closeAll() e.subscriptions.closeAll()
return return
case update := <-e.publishCh: case update := <-e.publishCh:
e.sendEvents(update) e.publishEvent(update)
} }
} }
} }
// sendEvents sends the given events to any applicable topic listeners, as well // publishEvent appends the events to any applicable topic buffers. It handles
// as any ACL update events to cause affected listeners to reset their stream. // any closeSubscriptionPayload events by closing associated subscriptions.
func (e *EventPublisher) sendEvents(update changeEvents) { func (e *EventPublisher) publishEvent(events []Event) {
eventsByTopic := make(map[Topic][]Event) eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events { for _, event := range events {
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok { if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs) e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
continue continue
@ -157,8 +153,7 @@ func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
// When the caller is finished with the subscription for any reason, it must // When the caller is finished with the subscription for any reason, it must
// call Subscription.Unsubscribe to free ACL tracking resources. // call Subscription.Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) { func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic handler, ok := e.snapshotHandlers[req.Topic]
_, ok := e.snapshotHandlers[req.Topic]
if !ok || req.Topic == nil { if !ok || req.Topic == nil {
return nil, fmt.Errorf("unknown topic %v", req.Topic) return nil, fmt.Errorf("unknown topic %v", req.Topic)
} }
@ -166,47 +161,47 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
// Ensure there is a topic buffer for that topic so we start capturing any topicHead := e.getTopicBuffer(req.Topic).Head()
// future published events.
buf := e.getTopicBuffer(req.Topic)
// See if we need a snapshot // If the client view is fresh, resume the stream.
topicHead := buf.Head() if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "end of empty snapshot" message to signal to
// client its cache is still good, then follow along from here in the topic.
buf := newEventBuffer() buf := newEventBuffer()
subscriptionHead := buf.Head()
// Store the head of that buffer before we append to it to give as the // splice the rest of the topic buffer onto the subscription buffer so
// starting point for the subscription. // the subscription will receive new events.
subHead := buf.Head()
buf.Append([]Event{{
Index: req.Index,
Topic: req.Topic,
Key: req.Key,
Payload: endOfEmptySnapshot{},
}})
// Now splice the rest of the topic buffer on so the subscription will
// continue to see future updates in the topic buffer.
buf.AppendItem(topicHead.NextLink()) buf.AppendItem(topicHead.NextLink())
return e.subscriptions.add(req, subscriptionHead), nil
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
} else {
snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil {
return nil, err
}
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
} }
e.subscriptions.add(req, sub) snapFromCache := e.getCachedSnapshotLocked(req)
return sub, nil if req.Index == 0 && snapFromCache != nil {
return e.subscriptions.add(req, snapFromCache.First), nil
}
snap := newEventSnapshot()
// if the request has an Index the client view is stale and must be reset
// with a NewSnapshotToFollow event.
if req.Index > 0 {
snap.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Payload: newSnapshotToFollow{},
}})
if snapFromCache != nil {
snap.buffer.AppendItem(snapFromCache.First)
return e.subscriptions.add(req, snap.First), nil
}
}
snap.appendAndSplice(*req, handler, topicHead)
e.setCachedSnapshotLocked(req, snap)
return e.subscriptions.add(req, snap.First), nil
} }
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
sub := newSubscription(*req, head, s.unsubscribe(req))
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -216,6 +211,7 @@ func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.byToken[req.Token] = subsByToken s.byToken[req.Token] = subsByToken
} }
subsByToken[req] = sub subsByToken[req] = sub
return sub
} }
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
@ -263,7 +259,8 @@ func (s *subscriptions) closeAll() {
} }
} }
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) { // EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
topicSnaps, ok := e.snapCache[req.Topic] topicSnaps, ok := e.snapCache[req.Topic]
if !ok { if !ok {
topicSnaps = make(map[string]*eventSnapshot) topicSnaps = make(map[string]*eventSnapshot)
@ -272,25 +269,22 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
snap, ok := topicSnaps[req.Key] snap, ok := topicSnaps[req.Key]
if ok && snap.err() == nil { if ok && snap.err() == nil {
return snap, nil return snap
} }
return nil
handler, ok := e.snapshotHandlers[req.Topic] }
if !ok {
return nil, fmt.Errorf("unknown topic %v", req.Topic) // EventPublisher.lock must be held to call this method.
} func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *eventSnapshot) {
if e.snapCacheTTL == 0 {
snap = newEventSnapshot(req, topicHead, handler) return
if e.snapCacheTTL > 0 { }
topicSnaps[req.Key] = snap e.snapCache[req.Topic][req.Key] = snap
// Trigger a clearout after TTL // Setup a cache eviction
time.AfterFunc(e.snapCacheTTL, func() { time.AfterFunc(e.snapCacheTTL, func() {
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
delete(topicSnaps, req.Key) delete(e.snapCache[req.Topic], req.Key)
}) })
}
return snap, nil
} }

View File

@ -17,8 +17,8 @@ func (i intTopic) String() string {
var testTopic Topic = intTopic(999) var testTopic Topic = intTopic(999)
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
subscription := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Key: "sub-key",
} }
@ -28,20 +28,18 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0) publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx) go publisher.Run(ctx)
sub, err := publisher.Subscribe(subscription) sub, err := publisher.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub) eventCh := runSubscription(ctx, sub)
result := nextResult(t, eventCh) next := getNextEvents(t, eventCh)
require.NoError(t, result.Err) expected := []Event{testSnapshotEvent}
expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}} require.Equal(t, expected, next)
require.Equal(t, expected, result.Events)
result = nextResult(t, eventCh) next = getNextEvents(t, eventCh)
require.Len(t, result.Events, 1) require.Len(t, next, 1)
require.True(t, result.Events[0].IsEndOfSnapshot()) require.True(t, next[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh) assertNoResult(t, eventCh)
events := []Event{{ events := []Event{{
@ -52,10 +50,16 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
publisher.Publish(events) publisher.Publish(events)
// Subscriber should see the published event // Subscriber should see the published event
result = nextResult(t, eventCh) next = getNextEvents(t, eventCh)
require.NoError(t, result.Err)
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}} expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
require.Equal(t, expected, result.Events) require.Equal(t, expected, next)
}
var testSnapshotEvent = Event{
Topic: testTopic,
Payload: "snapshot-event-payload",
Key: "sub-key",
Index: 1,
} }
func newTestSnapshotHandlers() SnapshotHandlers { func newTestSnapshotHandlers() SnapshotHandlers {
@ -64,18 +68,18 @@ func newTestSnapshotHandlers() SnapshotHandlers {
if req.Topic != testTopic { if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic) return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
} }
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) buf.Append([]Event{testSnapshotEvent})
return 1, nil return 1, nil
}, },
} }
} }
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr {
eventCh := make(chan subNextResult, 1) eventCh := make(chan eventOrErr, 1)
go func() { go func() {
for { for {
es, err := sub.Next(ctx) es, err := sub.Next(ctx)
eventCh <- subNextResult{ eventCh <- eventOrErr{
Events: es, Events: es,
Err: err, Err: err,
} }
@ -87,30 +91,31 @@ func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextR
return eventCh return eventCh
} }
type subNextResult struct { type eventOrErr struct {
Events []Event Events []Event
Err error Err error
} }
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult { func getNextEvents(t *testing.T, eventCh <-chan eventOrErr) []Event {
t.Helper() t.Helper()
select { select {
case next := <-eventCh: case next := <-eventCh:
return next require.NoError(t, next.Err)
return next.Events
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms") t.Fatalf("timeout waiting for event from subscription")
return nil
} }
return subNextResult{}
} }
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) { func assertNoResult(t *testing.T, eventCh <-chan eventOrErr) {
t.Helper() t.Helper()
select { select {
case next := <-eventCh: case next := <-eventCh:
require.NoError(t, next.Err) require.NoError(t, next.Err)
require.Len(t, next.Events, 1) require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload) t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond): case <-time.After(25 * time.Millisecond):
} }
} }
@ -156,3 +161,232 @@ func consumeSub(ctx context.Context, sub *Subscription) error {
} }
} }
} }
func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
_, err := publisher.Subscribe(req)
require.NoError(t, err)
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Index: 3,
}}
publisher.Publish(events)
// Subscriber should see the published event
next = getNextEvents(t, eventCh)
expected = []Event{events[0]}
require.Equal(t, expected, next)
}
func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
assertNoResult(t, eventCh)
expected := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
publisher.publishEvent([]Event{expected})
next := getNextEvents(t, eventCh)
require.Equal(t, []Event{expected}, next)
})
}
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent})
})
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
})
}
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent})
})
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
next = getNextEvents(t, eventCh)
require.Equal(t, nextEvent, next[0])
})
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
if !t.Run(name, fn) {
t.FailNow()
}
}

View File

@ -9,50 +9,45 @@ package stream
// collected automatically by Go's runtime. This simplifies snapshot and buffer // collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically. // management dramatically.
type eventSnapshot struct { type eventSnapshot struct {
// Head is the first item in the buffer containing the snapshot. Once the // First item in the buffer. Used as the Head of a subscription, or to
// snapshot is complete, subsequent BufferItems are appended to snapBuffer, // splice this snapshot onto another one.
// so that subscribers receive all the events from the same buffer. First *bufferItem
Head *bufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to. // buffer is the Head of the snapshot buffer the fn should write to.
snapBuffer *eventBuffer buffer *eventBuffer
} }
// newEventSnapshot creates a snapshot buffer based on the subscription request. // newEventSnapshot creates an empty snapshot buffer.
// The current buffer head for the topic requested is passed so that once the func newEventSnapshot() *eventSnapshot {
// snapshot is complete and has been delivered into the buffer, any events snapBuffer := newEventBuffer()
// published during snapshotting can be immediately appended and won't be return &eventSnapshot{
// missed. Once the snapshot is delivered the topic buffer is spliced onto the First: snapBuffer.Head(),
// snapshot buffer so that subscribers will naturally follow from the snapshot buffer: snapBuffer,
// to wait for any subsequent updates.
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot {
buf := newEventBuffer()
s := &eventSnapshot{
Head: buf.Head(),
snapBuffer: buf,
} }
go func() {
idx, err := fn(*req, s.snapBuffer)
if err != nil {
s.snapBuffer.AppendItem(&bufferItem{Err: err})
return
}
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
s.snapBuffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Index: idx,
Payload: endOfSnapshot{},
}})
s.spliceFromTopicBuffer(topicBufferHead, idx)
}()
return s
} }
// appendAndSlice populates the snapshot buffer by calling the SnapshotFunc,
// then adding an endOfSnapshot framing event, and finally by splicing in
// events from the topicBuffer.
func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, topicBufferHead *bufferItem) {
idx, err := fn(req, s.buffer)
if err != nil {
s.buffer.AppendItem(&bufferItem{Err: err})
return
}
s.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Index: idx,
Payload: endOfSnapshot{},
}})
s.spliceFromTopicBuffer(topicBufferHead, idx)
}
// spliceFromTopicBuffer traverses the topicBuffer looking for the last item
// in the buffer, or the first item where the index is greater than idx. Once
// the item is found it is appended to the snapshot buffer.
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) { func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
// Now splice on the topic buffer. We need to iterate through the buffer to
// find the first event after the current snapshot.
item := topicBufferHead item := topicBufferHead
for { for {
next := item.NextNoBlock() next := item.NextNoBlock()
@ -62,7 +57,7 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
// the snapshot completed). We don't want any of the events (if any) in // the snapshot completed). We don't want any of the events (if any) in
// the snapshot buffer as they came before the snapshot but we do need to // the snapshot buffer as they came before the snapshot but we do need to
// wait for the next update. // wait for the next update.
s.snapBuffer.AppendItem(item.NextLink()) s.buffer.AppendItem(item.NextLink())
return return
case next.Err != nil: case next.Err != nil:
@ -71,14 +66,14 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
// buffer which does not contain a snapshot. // buffer which does not contain a snapshot.
// Handle this case anyway in case errors can come from other places // Handle this case anyway in case errors can come from other places
// in the future. // in the future.
s.snapBuffer.AppendItem(next) s.buffer.AppendItem(next)
return return
case len(next.Events) > 0 && next.Events[0].Index > idx: case len(next.Events) > 0 && next.Events[0].Index > idx:
// We've found an update in the topic buffer that happened after our // We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers // snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it. // can continue to read this and others after it.
s.snapBuffer.AppendItem(next) s.buffer.AppendItem(next)
return return
} }
@ -93,6 +88,6 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
func (s *eventSnapshot) err() error { func (s *eventSnapshot) err() error {
// Fetch the head of the buffer, this is atomic. If the snapshot func errored // Fetch the head of the buffer, this is atomic. If the snapshot func errored
// then the last event will be an error. // then the last event will be an error.
head := s.snapBuffer.Head() head := s.buffer.Head()
return head.Err return head.Err
} }

View File

@ -87,9 +87,8 @@ func TestEventSnapshot(t *testing.T) {
tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)}) tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
} }
// Create eventSnapshot, (will call snFn in another goroutine). The es := newEventSnapshot()
// Request is ignored by the snapFunc so doesn't matter for now. es.appendAndSplice(SubscribeRequest{}, snFn, tbHead)
es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
// Deliver any post-snapshot events simulating updates that occur // Deliver any post-snapshot events simulating updates that occur
// logically after snapshot. It doesn't matter that these might actually // logically after snapshot. It doesn't matter that these might actually
@ -112,7 +111,7 @@ func TestEventSnapshot(t *testing.T) {
snapIDs := make([]string, 0, tc.snapshotSize) snapIDs := make([]string, 0, tc.snapshotSize)
updateIDs := make([]string, 0, tc.updatesAfterSnap) updateIDs := make([]string, 0, tc.updatesAfterSnap)
snapDone := false snapDone := false
curItem := es.Head curItem := es.First
var err error var err error
RECV: RECV:
for { for {

View File

@ -11,7 +11,7 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) {
require.True(t, e.IsEndOfSnapshot()) require.True(t, e.IsEndOfSnapshot())
t.Run("not EndOfSnapshot", func(t *testing.T) { t.Run("not EndOfSnapshot", func(t *testing.T) {
e := Event{Payload: endOfEmptySnapshot{}} e := Event{Payload: newSnapshotToFollow{}}
require.False(t, e.IsEndOfSnapshot()) require.False(t, e.IsEndOfSnapshot())
}) })
} }

View File

@ -28,7 +28,7 @@ type Subscription struct {
state uint32 state uint32
// req is the requests that we are responding to // req is the requests that we are responding to
req *SubscribeRequest req SubscribeRequest
// currentItem stores the current snapshot or topic buffer item we are on. It // currentItem stores the current snapshot or topic buffer item we are on. It
// is mutated by calls to Next. // is mutated by calls to Next.
@ -56,7 +56,7 @@ type SubscribeRequest struct {
// newSubscription return a new subscription. The caller is responsible for // newSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources. // calling Unsubscribe when it is done with the subscription, to free resources.
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{ return &Subscription{
forceClosed: make(chan struct{}), forceClosed: make(chan struct{}),
req: req, req: req,

View File

@ -23,8 +23,7 @@ func TestSubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// Create a subscription req := SubscribeRequest{
req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "test", Key: "test",
} }
@ -103,8 +102,7 @@ func TestSubscription_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// Create a subscription req := SubscribeRequest{
req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "test", Key: "test",
} }

View File

@ -200,7 +200,7 @@ type Event struct {
// //
// Types that are valid to be assigned to Payload: // Types that are valid to be assigned to Payload:
// *Event_EndOfSnapshot // *Event_EndOfSnapshot
// *Event_EndOfEmptySnapshot // *Event_NewSnapshotToFollow
// *Event_EventBatch // *Event_EventBatch
// *Event_ServiceHealth // *Event_ServiceHealth
Payload isEvent_Payload `protobuf_oneof:"Payload"` Payload isEvent_Payload `protobuf_oneof:"Payload"`
@ -251,8 +251,8 @@ type isEvent_Payload interface {
type Event_EndOfSnapshot struct { type Event_EndOfSnapshot struct {
EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"` EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"`
} }
type Event_EndOfEmptySnapshot struct { type Event_NewSnapshotToFollow struct {
EndOfEmptySnapshot bool `protobuf:"varint,6,opt,name=EndOfEmptySnapshot,proto3,oneof"` NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"`
} }
type Event_EventBatch struct { type Event_EventBatch struct {
EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"` EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"`
@ -261,10 +261,10 @@ type Event_ServiceHealth struct {
ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"` ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"`
} }
func (*Event_EndOfSnapshot) isEvent_Payload() {} func (*Event_EndOfSnapshot) isEvent_Payload() {}
func (*Event_EndOfEmptySnapshot) isEvent_Payload() {} func (*Event_NewSnapshotToFollow) isEvent_Payload() {}
func (*Event_EventBatch) isEvent_Payload() {} func (*Event_EventBatch) isEvent_Payload() {}
func (*Event_ServiceHealth) isEvent_Payload() {} func (*Event_ServiceHealth) isEvent_Payload() {}
func (m *Event) GetPayload() isEvent_Payload { func (m *Event) GetPayload() isEvent_Payload {
if m != nil { if m != nil {
@ -301,9 +301,9 @@ func (m *Event) GetEndOfSnapshot() bool {
return false return false
} }
func (m *Event) GetEndOfEmptySnapshot() bool { func (m *Event) GetNewSnapshotToFollow() bool {
if x, ok := m.GetPayload().(*Event_EndOfEmptySnapshot); ok { if x, ok := m.GetPayload().(*Event_NewSnapshotToFollow); ok {
return x.EndOfEmptySnapshot return x.NewSnapshotToFollow
} }
return false return false
} }
@ -326,7 +326,7 @@ func (m *Event) GetServiceHealth() *ServiceHealthUpdate {
func (*Event) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { func (*Event) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _Event_OneofMarshaler, _Event_OneofUnmarshaler, _Event_OneofSizer, []interface{}{ return _Event_OneofMarshaler, _Event_OneofUnmarshaler, _Event_OneofSizer, []interface{}{
(*Event_EndOfSnapshot)(nil), (*Event_EndOfSnapshot)(nil),
(*Event_EndOfEmptySnapshot)(nil), (*Event_NewSnapshotToFollow)(nil),
(*Event_EventBatch)(nil), (*Event_EventBatch)(nil),
(*Event_ServiceHealth)(nil), (*Event_ServiceHealth)(nil),
} }
@ -343,9 +343,9 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
} }
_ = b.EncodeVarint(5<<3 | proto.WireVarint) _ = b.EncodeVarint(5<<3 | proto.WireVarint)
_ = b.EncodeVarint(t) _ = b.EncodeVarint(t)
case *Event_EndOfEmptySnapshot: case *Event_NewSnapshotToFollow:
t := uint64(0) t := uint64(0)
if x.EndOfEmptySnapshot { if x.NewSnapshotToFollow {
t = 1 t = 1
} }
_ = b.EncodeVarint(6<<3 | proto.WireVarint) _ = b.EncodeVarint(6<<3 | proto.WireVarint)
@ -377,12 +377,12 @@ func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer)
x, err := b.DecodeVarint() x, err := b.DecodeVarint()
m.Payload = &Event_EndOfSnapshot{x != 0} m.Payload = &Event_EndOfSnapshot{x != 0}
return true, err return true, err
case 6: // Payload.EndOfEmptySnapshot case 6: // Payload.NewSnapshotToFollow
if wire != proto.WireVarint { if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType return true, proto.ErrInternalBadWireType
} }
x, err := b.DecodeVarint() x, err := b.DecodeVarint()
m.Payload = &Event_EndOfEmptySnapshot{x != 0} m.Payload = &Event_NewSnapshotToFollow{x != 0}
return true, err return true, err
case 7: // Payload.EventBatch case 7: // Payload.EventBatch
if wire != proto.WireBytes { if wire != proto.WireBytes {
@ -412,7 +412,7 @@ func _Event_OneofSizer(msg proto.Message) (n int) {
case *Event_EndOfSnapshot: case *Event_EndOfSnapshot:
n += 1 // tag and wire n += 1 // tag and wire
n += 1 n += 1
case *Event_EndOfEmptySnapshot: case *Event_NewSnapshotToFollow:
n += 1 // tag and wire n += 1 // tag and wire
n += 1 n += 1
case *Event_EventBatch: case *Event_EventBatch:
@ -546,40 +546,40 @@ func init() {
func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) } func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) }
var fileDescriptor_ab3eb8c810e315fb = []byte{ var fileDescriptor_ab3eb8c810e315fb = []byte{
// 521 bytes of a gzipped FileDescriptorProto // 526 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x4f, 0x8f, 0xd2, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x5f, 0x6f, 0xd2, 0x50,
0x14, 0xef, 0xc0, 0x02, 0xcb, 0xc3, 0xdd, 0xd4, 0x11, 0x63, 0xc3, 0x26, 0x0d, 0x12, 0xb3, 0xa9, 0x14, 0xef, 0x85, 0x01, 0xe3, 0xe0, 0x96, 0x7a, 0x87, 0xb1, 0x61, 0x49, 0x83, 0xc4, 0x2c, 0x75,
0x9b, 0x48, 0x37, 0x98, 0xe8, 0x4d, 0x23, 0x2c, 0x8a, 0x31, 0x11, 0x53, 0xdc, 0x83, 0xde, 0x86, 0x89, 0xd4, 0x60, 0xa2, 0x6f, 0x1a, 0x61, 0x9b, 0x18, 0x93, 0x61, 0xca, 0xf6, 0xa0, 0x6f, 0x97,
0xf6, 0x49, 0x1b, 0xd8, 0x99, 0xb1, 0x1d, 0x56, 0xb9, 0xfb, 0x21, 0xf6, 0xcb, 0x78, 0xf7, 0xe8, 0xf6, 0x48, 0x1b, 0xea, 0xbd, 0xb5, 0xbd, 0x0c, 0xf7, 0xee, 0x87, 0xd8, 0xb7, 0xf1, 0xd5, 0x47,
0x47, 0x30, 0xf8, 0x45, 0x0c, 0x43, 0xb7, 0x5b, 0x60, 0x6f, 0xde, 0xfa, 0x7e, 0x7f, 0xe6, 0xfd, 0x3f, 0x82, 0xc1, 0x2f, 0x62, 0xb8, 0x94, 0xae, 0xc0, 0xde, 0xf6, 0xd6, 0xf3, 0xfb, 0x73, 0xcf,
0xf2, 0x5e, 0x1f, 0x3c, 0x94, 0xb1, 0x50, 0xc2, 0x95, 0xe3, 0x64, 0x3e, 0x4e, 0xfc, 0x38, 0x1a, 0x2f, 0xe7, 0xf4, 0xc0, 0x93, 0x28, 0x16, 0x52, 0xd8, 0xd1, 0x28, 0x99, 0x8e, 0x12, 0x37, 0x0e,
0xa3, 0x9b, 0x7d, 0xb5, 0x35, 0x47, 0xab, 0x19, 0xd0, 0x68, 0x64, 0x6a, 0x8c, 0x2f, 0x23, 0x1f, 0x46, 0x68, 0x67, 0x5f, 0x6d, 0xc5, 0xd1, 0x6a, 0x06, 0x34, 0x1a, 0x99, 0x1a, 0xe3, 0xab, 0xc0,
0x5d, 0x2e, 0x82, 0x54, 0xd6, 0xba, 0x22, 0x60, 0x8e, 0xae, 0x95, 0x1e, 0x7e, 0x9d, 0x63, 0xa2, 0x45, 0x9b, 0x0b, 0x2f, 0x95, 0xb5, 0x6e, 0x08, 0xe8, 0xc3, 0x95, 0xd2, 0xc1, 0xef, 0x53, 0x4c,
0xe8, 0x31, 0x94, 0x3e, 0x0a, 0x19, 0xf9, 0x16, 0x69, 0x12, 0xe7, 0xb0, 0x63, 0xb6, 0x6f, 0x1e, 0x24, 0x3d, 0x82, 0xd2, 0x85, 0x88, 0x02, 0xd7, 0x20, 0x4d, 0x62, 0xed, 0x77, 0xf4, 0xf6, 0xed,
0xd7, 0xb8, 0xb7, 0xa6, 0xa9, 0x09, 0xc5, 0x77, 0xb8, 0xb0, 0x0a, 0x4d, 0xe2, 0x54, 0xbd, 0xd5, 0xe3, 0x0a, 0x77, 0x96, 0x34, 0xd5, 0xa1, 0xf8, 0x11, 0xaf, 0x8d, 0x42, 0x93, 0x58, 0x55, 0x67,
0x27, 0xad, 0xaf, 0x9c, 0x53, 0xe4, 0x56, 0x51, 0x63, 0xeb, 0x62, 0x85, 0xbe, 0xe5, 0x01, 0x7e, 0xf1, 0x49, 0xeb, 0x0b, 0xe7, 0x04, 0xb9, 0x51, 0x54, 0xd8, 0xb2, 0x58, 0xa0, 0x1f, 0xb8, 0x87,
0xb7, 0xf6, 0x9a, 0xc4, 0xd9, 0xf3, 0xd6, 0x05, 0xb5, 0x01, 0xce, 0x98, 0x62, 0x3e, 0x72, 0x85, 0x3f, 0x8c, 0x9d, 0x26, 0xb1, 0x76, 0x9c, 0x65, 0x41, 0x4d, 0x80, 0x13, 0x26, 0x99, 0x8b, 0x5c,
0xb1, 0x55, 0xd2, 0x86, 0x1c, 0xd2, 0xfa, 0x59, 0x80, 0x52, 0xff, 0x12, 0xf9, 0x7f, 0xe6, 0x59, 0x62, 0x6c, 0x94, 0x94, 0x21, 0x87, 0xb4, 0x7e, 0x15, 0xa0, 0x74, 0x7a, 0x85, 0xfc, 0x9e, 0x79,
0x77, 0x2e, 0xe6, 0x3b, 0x1f, 0xc3, 0x41, 0x9f, 0x07, 0xc3, 0x2f, 0x23, 0xce, 0x64, 0x12, 0x0a, 0x96, 0x9d, 0x8b, 0xf9, 0xce, 0x47, 0xb0, 0x77, 0xca, 0xbd, 0xc1, 0xd7, 0x21, 0x67, 0x51, 0xe2,
0xa5, 0x9b, 0xef, 0x0f, 0x0c, 0x6f, 0x13, 0xa6, 0xa7, 0x40, 0x35, 0xd0, 0xbf, 0x90, 0x6a, 0x91, 0x0b, 0xa9, 0x9a, 0xef, 0xf6, 0x35, 0x67, 0x1d, 0xa6, 0x1d, 0x38, 0x38, 0xc7, 0xd9, 0xaa, 0xbc,
0x89, 0xcb, 0xa9, 0xf8, 0x16, 0x8e, 0x3e, 0x07, 0xd0, 0x91, 0xbb, 0x4c, 0xf9, 0xa1, 0x55, 0x69, 0x10, 0x67, 0x22, 0x0c, 0xc5, 0xcc, 0x28, 0xa7, 0xea, 0xbb, 0x48, 0xfa, 0x1a, 0x40, 0x85, 0xee,
0x12, 0xa7, 0xd6, 0xb9, 0x9f, 0x8b, 0x7b, 0x43, 0x0e, 0x0c, 0x2f, 0x27, 0xa5, 0xaf, 0xe1, 0x60, 0x32, 0xe9, 0xfa, 0x46, 0xa5, 0x49, 0xac, 0x5a, 0xe7, 0x51, 0x2e, 0xf0, 0x2d, 0xd9, 0xd7, 0x9c,
0xb4, 0xde, 0xce, 0x00, 0xd9, 0x4c, 0x85, 0x16, 0x68, 0xaf, 0x9d, 0xf3, 0x6e, 0xf0, 0xe7, 0x32, 0x9c, 0x94, 0x9e, 0xc1, 0xde, 0x70, 0xb9, 0x9f, 0x3e, 0xb2, 0x50, 0xfa, 0x06, 0x28, 0xaf, 0x99,
0x60, 0x0a, 0x57, 0x91, 0x37, 0xe0, 0x6e, 0x15, 0x2a, 0x1f, 0xd8, 0x62, 0x26, 0x58, 0xd0, 0x7a, 0xf3, 0xae, 0xf1, 0x97, 0x91, 0xc7, 0x24, 0x2e, 0x42, 0xaf, 0xc1, 0xdd, 0x2a, 0x54, 0x3e, 0xb1,
0x96, 0xcf, 0x42, 0x1d, 0x28, 0xeb, 0x2a, 0xb1, 0x48, 0xb3, 0xe8, 0xd4, 0x36, 0x86, 0xa8, 0x09, 0xeb, 0x50, 0x30, 0xaf, 0xf5, 0x2a, 0x9f, 0x85, 0x5a, 0x50, 0x56, 0x55, 0x62, 0x90, 0x66, 0xd1,
0x2f, 0xe5, 0x5b, 0x3f, 0x08, 0xdc, 0xbb, 0xa5, 0x17, 0x7d, 0x04, 0x85, 0xa1, 0x4c, 0x57, 0x50, 0xaa, 0xad, 0x8d, 0x51, 0x11, 0x4e, 0xca, 0xb7, 0x7e, 0x12, 0x38, 0xb8, 0xa3, 0x17, 0x7d, 0x0a,
0xcf, 0xb9, 0x7b, 0x4c, 0xb1, 0x99, 0x98, 0x0c, 0xa5, 0x57, 0x18, 0x4a, 0xfa, 0x06, 0xcc, 0x5e, 0x85, 0x41, 0x94, 0x2e, 0xa1, 0x9e, 0x73, 0xf7, 0x98, 0x64, 0xa1, 0x18, 0x0f, 0x22, 0xa7, 0x30,
0x88, 0xfe, 0x34, 0x7d, 0xe1, 0xbd, 0x08, 0x50, 0x2f, 0xa4, 0xd6, 0x39, 0x6a, 0x67, 0x7f, 0x60, 0x88, 0xe8, 0x7b, 0xd0, 0x7b, 0x3e, 0xba, 0x93, 0xf4, 0x85, 0x73, 0xe1, 0xa1, 0x5a, 0x49, 0xad,
0x7b, 0x5b, 0xe2, 0xed, 0x98, 0x4e, 0x5e, 0xa5, 0x4b, 0xa7, 0x35, 0xa8, 0x9c, 0xf3, 0x29, 0x17, 0x73, 0xd8, 0xce, 0xfe, 0xc1, 0xf6, 0xa6, 0xc4, 0xd9, 0x32, 0x1d, 0xbf, 0x4b, 0xd7, 0x4e, 0x6b,
0xdf, 0xb8, 0x69, 0xd0, 0xbb, 0x5b, 0x73, 0x32, 0x09, 0xb5, 0xa0, 0xbe, 0x01, 0xf5, 0x04, 0xe7, 0x50, 0xb9, 0xe4, 0x13, 0x2e, 0x66, 0x5c, 0xd7, 0xe8, 0xc3, 0x8d, 0x39, 0xe9, 0x84, 0x1a, 0x50,
0xe8, 0x2b, 0xb3, 0x70, 0xf2, 0x18, 0xaa, 0x59, 0x38, 0x7a, 0x07, 0xf6, 0x3d, 0x9c, 0x44, 0x89, 0x5f, 0x83, 0x7a, 0x82, 0x73, 0x74, 0xa5, 0x5e, 0x38, 0x7e, 0x06, 0xd5, 0x2c, 0x1c, 0x7d, 0x00,
0xc2, 0xd8, 0x34, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xd7, 0x35, 0xe9, 0x7c, 0x82, 0x07, 0x23, 0xc5, 0xbb, 0x0e, 0x8e, 0x83, 0x44, 0x62, 0xac, 0x6b, 0x74, 0x1f, 0xe0, 0x04, 0xe3, 0x55, 0x4d, 0x3a,
0x14, 0xf6, 0x42, 0xc6, 0x27, 0x98, 0x5e, 0x84, 0x54, 0x91, 0xe0, 0xf4, 0x05, 0x54, 0xb3, 0x0b, 0x9f, 0xe1, 0xf1, 0x50, 0x32, 0x89, 0x3d, 0x9f, 0xf1, 0x31, 0xa6, 0x37, 0x11, 0xc9, 0x40, 0x70,
0xa1, 0x47, 0xf9, 0x85, 0x6c, 0xdd, 0x4d, 0x63, 0x67, 0xa6, 0x2d, 0xe3, 0x94, 0x74, 0x5f, 0xfe, 0xfa, 0x06, 0xaa, 0xd9, 0x8d, 0xd0, 0xc3, 0xfc, 0x42, 0x36, 0x2e, 0xa7, 0xb1, 0x35, 0xd3, 0x96,
0x5a, 0xda, 0xe4, 0xf7, 0xd2, 0x26, 0x7f, 0x96, 0x36, 0xb9, 0xfa, 0x6b, 0x1b, 0x9f, 0x9f, 0x4c, 0xf6, 0x82, 0x74, 0xdf, 0xfe, 0x9e, 0x9b, 0xe4, 0xcf, 0xdc, 0x24, 0x7f, 0xe7, 0x26, 0xb9, 0xf9,
0x22, 0x15, 0xce, 0xc7, 0x6d, 0x5f, 0x5c, 0xb8, 0x21, 0x4b, 0xc2, 0xc8, 0x17, 0xb1, 0x74, 0x7d, 0x67, 0x6a, 0x5f, 0x9e, 0x8f, 0x03, 0xe9, 0x4f, 0x47, 0x6d, 0x57, 0x7c, 0xb3, 0x7d, 0x96, 0xf8,
0xc1, 0x93, 0xf9, 0xcc, 0xdd, 0x39, 0xed, 0x71, 0x59, 0x43, 0x4f, 0xff, 0x05, 0x00, 0x00, 0xff, 0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, 0x4f, 0xa6, 0xa1, 0xbd, 0x75, 0xdc, 0xa3, 0xb2, 0x82, 0x5e,
0xff, 0x7d, 0xf7, 0xca, 0x01, 0xf6, 0x03, 0x00, 0x00, 0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x44, 0xbc, 0x0a, 0xfb, 0xf8, 0x03, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -595,18 +595,24 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StateChangeSubscriptionClient interface { type StateChangeSubscriptionClient interface {
// Subscribe to a topic to receive events when there are changes to the topic. // Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
// //
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
// //
// Subscribe may return an ABORTED status error to indicate the client must // If SubscribeRequest.Index is > 0 it is assumed the client already has a
// re-start the Subscribe call. // snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the // This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed // stream, for example because the ACL permissions for the token changed, or
// and the server doesn't know which previously delivered events should // because the server state was restored from a snapshot.
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (StateChangeSubscription_SubscribeClient, error) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (StateChangeSubscription_SubscribeClient, error)
} }
@ -653,18 +659,24 @@ func (x *stateChangeSubscriptionSubscribeClient) Recv() (*Event, error) {
// StateChangeSubscriptionServer is the server API for StateChangeSubscription service. // StateChangeSubscriptionServer is the server API for StateChangeSubscription service.
type StateChangeSubscriptionServer interface { type StateChangeSubscriptionServer interface {
// Subscribe to a topic to receive events when there are changes to the topic. // Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
// //
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
// //
// Subscribe may return an ABORTED status error to indicate the client must // If SubscribeRequest.Index is > 0 it is assumed the client already has a
// re-start the Subscribe call. // snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the // This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed // stream, for example because the ACL permissions for the token changed, or
// and the server doesn't know which previously delivered events should // because the server state was restored from a snapshot.
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
Subscribe(*SubscribeRequest, StateChangeSubscription_SubscribeServer) error Subscribe(*SubscribeRequest, StateChangeSubscription_SubscribeServer) error
} }
@ -842,14 +854,14 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0x28 dAtA[i] = 0x28
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *Event_EndOfEmptySnapshot) MarshalTo(dAtA []byte) (int, error) { func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) {
return m.MarshalToSizedBuffer(dAtA[:m.Size()]) return m.MarshalToSizedBuffer(dAtA[:m.Size()])
} }
func (m *Event_EndOfEmptySnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA) i := len(dAtA)
i-- i--
if m.EndOfEmptySnapshot { if m.NewSnapshotToFollow {
dAtA[i] = 1 dAtA[i] = 1
} else { } else {
dAtA[i] = 0 dAtA[i] = 0
@ -1058,7 +1070,7 @@ func (m *Event_EndOfSnapshot) Size() (n int) {
n += 2 n += 2
return n return n
} }
func (m *Event_EndOfEmptySnapshot) Size() (n int) { func (m *Event_NewSnapshotToFollow) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
} }
@ -1444,7 +1456,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
m.Payload = &Event_EndOfSnapshot{b} m.Payload = &Event_EndOfSnapshot{b}
case 6: case 6:
if wireType != 0 { if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndOfEmptySnapshot", wireType) return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType)
} }
var v int var v int
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
@ -1462,7 +1474,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
} }
} }
b := bool(v != 0) b := bool(v != 0)
m.Payload = &Event_EndOfEmptySnapshot{b} m.Payload = &Event_NewSnapshotToFollow{b}
case 7: case 7:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType) return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType)

View File

@ -13,18 +13,24 @@ import "proto/pbservice/node.proto";
// state change events. Events are streamed as they happen. // state change events. Events are streamed as they happen.
service StateChangeSubscription { service StateChangeSubscription {
// Subscribe to a topic to receive events when there are changes to the topic. // Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
// //
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
// //
// Subscribe may return an ABORTED status error to indicate the client must // If SubscribeRequest.Index is > 0 it is assumed the client already has a
// re-start the Subscribe call. // snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the // This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed // stream, for example because the ACL permissions for the token changed, or
// and the server doesn't know which previously delivered events should // because the server state was restored from a snapshot.
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
rpc Subscribe(SubscribeRequest) returns (stream Event) {} rpc Subscribe(SubscribeRequest) returns (stream Event) {}
} }
@ -92,11 +98,11 @@ message Event {
// ended. Subsequent Events delivered will be mutations to that result. // ended. Subsequent Events delivered will be mutations to that result.
bool EndOfSnapshot = 5; bool EndOfSnapshot = 5;
// EndOfEmptySnapshot indicates that the client is still up-to-date. // NewSnapshotToFollow indicates that the client view is stale. The client
// The snapshot has ended, and was empty. The rest of the stream will be // must reset its view before handing any more events. Subsequent events
// individual update events. It distinguishes between "up to date, no snapshot" // in the stream will be for a new snapshot until an EndOfSnapshot event
// and "snapshot contains zero events but you should reset any old state to be blank". // is received.
bool EndOfEmptySnapshot = 6; bool NewSnapshotToFollow = 6;
// EventBatch is a set of events. This is typically used as the payload // EventBatch is a set of events. This is typically used as the payload
// type where multiple events are emitted in a single topic and raft // type where multiple events are emitted in a single topic and raft