stream: fix a bug with creating a snapshot

The head of the topic buffer was being ignored when creating a snapshot. This commit fixes
the bug by ensuring that the head of the topic buffer is included in the snapshot
before handing it off to the subscription.
pull/10355/head
Daniel Nephin 4 years ago
parent 3ebb65ea60
commit 5ef8a045f3

@ -49,34 +49,33 @@ func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, t
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) { func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
item := topicBufferHead item := topicBufferHead
for { for {
next := item.NextNoBlock()
switch { switch {
case next == nil: case item.Err != nil:
// This is the head of the topic buffer (or was just now which is after
// the snapshot completed). We don't want any of the events (if any) in
// the snapshot buffer as they came before the snapshot but we do need to
// wait for the next update.
s.buffer.AppendItem(item.NextLink())
return
case next.Err != nil:
// This case is not currently possible because errors can only come // This case is not currently possible because errors can only come
// from a snapshot func, and this is consuming events from a topic // from a snapshot func, and this is consuming events from a topic
// buffer which does not contain a snapshot. // buffer which does not contain a snapshot.
// Handle this case anyway in case errors can come from other places // Handle this case anyway in case errors can come from other places
// in the future. // in the future.
s.buffer.AppendItem(next) s.buffer.AppendItem(item)
return return
case len(next.Events) > 0 && next.Events[0].Index > idx: case len(item.Events) > 0 && item.Events[0].Index > idx:
// We've found an update in the topic buffer that happened after our // We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers // snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it. // can continue to read this and others after it.
s.buffer.AppendItem(next) s.buffer.AppendItem(item)
return return
} }
// We don't need this item, continue to next next := item.NextNoBlock()
if next == nil {
// We reached the head of the topic buffer. We don't want any of the
// events in the topic buffer as they came before the snapshot.
// Append a link to any future items.
s.buffer.AppendItem(item.NextLink())
return
}
// Proceed to the next item in the topic buffer
item = next item = next
} }
} }

Loading…
Cancel
Save