@ -3,30 +3,19 @@ package stream
// EventSnapshot represents the state of memdb for a given topic and key at some
// EventSnapshot represents the state of memdb for a given topic and key at some
// point in time. It is modelled as a buffer of events so that snapshots can be
// point in time. It is modelled as a buffer of events so that snapshots can be
// streamed to possibly multiple subscribers concurrently, and can be trivially
// streamed to possibly multiple subscribers concurrently, and can be trivially
// cached by just keeping the Snapshot around. Once the EventSnapshot is dropped
// cached by retaining a reference to a Snapshot. Once the reference to EventSnapshot
// from memory, any subscribers still reading from it may do so by following
// is dropped from memory, any subscribers still reading from it may do so by following
// their pointers but eventually the snapshot is garbage collected automatically
// their pointers. When the last subscribe unsubscribes the snapshot is garbage
// by Go's runtime, simplifying snapshot and buffer management dramatically.
// collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically.
type EventSnapshot struct {
type EventSnapshot struct {
// Request that this snapshot satisfies.
Request * SubscribeRequest
// Snap is the first item in the buffer containing the snapshot. Once the
// Snap is the first item in the buffer containing the snapshot. Once the
// snapshot is complete, subsequent update's BufferItems are appended such
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
// that subscribers just need to follow this buffer for the duration of their
// so that subscribers receive all the events from the same buffer.
// subscription stream.
Snap * BufferItem
Snap * BufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to.
// snapBuffer is the Head of the snapshot buffer the fn should write to.
snapBuffer * EventBuffer
snapBuffer * EventBuffer
// topicBufferHead stored the current most-recent published item from before
// the snapshot was taken such that anything published during snapshot
// publishing can be captured.
topicBufferHead * BufferItem
// SnapFn is the function that will make the snapshot for this request.
fn SnapFn
}
}
// SnapFn is the type of function needed to generate a snapshot for a topic and
// SnapFn is the type of function needed to generate a snapshot for a topic and
@ -43,39 +32,32 @@ type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error)
func NewEventSnapshot ( req * SubscribeRequest , topicBufferHead * BufferItem , fn SnapFn ) * EventSnapshot {
func NewEventSnapshot ( req * SubscribeRequest , topicBufferHead * BufferItem , fn SnapFn ) * EventSnapshot {
buf := NewEventBuffer ( )
buf := NewEventBuffer ( )
s := & EventSnapshot {
s := & EventSnapshot {
Request : req ,
Snap : buf . Head ( ) ,
Snap : buf . Head ( ) ,
snapBuffer : buf ,
snapBuffer : buf ,
topicBufferHead : topicBufferHead ,
fn : fn ,
}
}
go s . doSnapshot ( )
go func ( ) {
idx , err := fn ( req , s . snapBuffer )
if err != nil {
s . snapBuffer . AppendErr ( err )
return
}
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
s . snapBuffer . Append ( [ ] Event { {
Topic : req . Topic ,
Key : req . Key ,
Index : idx ,
Payload : endOfSnapshot { } ,
} } )
s . spliceFromTopicBuffer ( topicBufferHead , idx )
} ( )
return s
return s
}
}
func ( s * EventSnapshot ) doSnapshot ( ) {
func ( s * EventSnapshot ) spliceFromTopicBuffer ( topicBufferHead * BufferItem , idx uint64 ) {
// Call snapshot func
idx , err := s . fn ( s . Request , s . snapBuffer )
if err != nil {
// Append an error result to signal to subscribers that this snapshot is no
// good.
s . snapBuffer . AppendErr ( err )
return
}
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
s . snapBuffer . Append ( [ ] Event { {
Topic : s . Request . Topic ,
Key : s . Request . Key ,
Index : idx ,
Payload : & agentpb . Event_EndOfSnapshot {
EndOfSnapshot : true ,
} ,
} } )
// Now splice on the topic buffer. We need to iterate through the buffer to
// Now splice on the topic buffer. We need to iterate through the buffer to
// find the first event after the current snapshot.
// find the first event after the current snapshot.
item := s. topicBufferHead
item := topicBufferHead
for {
for {
// Find the next item that we should include.
// Find the next item that we should include.
next , err := item . NextNoBlock ( )
next , err := item . NextNoBlock ( )
@ -108,14 +90,12 @@ func (s *EventSnapshot) doSnapshot() {
return
return
}
}
if len ( next . Events ) > 0 {
if len ( next . Events ) > 0 && next . Events [ 0 ] . Index > idx {
if next . Events [ 0 ] . Index > idx {
// We've found an update in the topic buffer that happened after our
// We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers
// snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it.
// can continue to read this and others after it.
s . snapBuffer . AppendBuffer ( next )
s . snapBuffer . AppendBuffer ( next )
return
return
}
}
}
// We don't need this item, continue to next
// We don't need this item, continue to next
item = next
item = next