From effab15131459912783d5b45489d3e3e98e9372d Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 10 Jun 2020 19:07:58 -0400 Subject: [PATCH] stream.EventSnapshot: reduce the fields on the struct Many of the fields are only needed in one place, and by using a closure they can be removed from the struct. This reduces the scope of the variables making it esier to see how they are used. --- agent/consul/stream/event_snapshot.go | 86 ++++++++++----------------- 1 file changed, 33 insertions(+), 53 deletions(-) diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 520ac8e606..59359a925a 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -3,30 +3,19 @@ package stream // EventSnapshot represents the state of memdb for a given topic and key at some // point in time. It is modelled as a buffer of events so that snapshots can be // streamed to possibly multiple subscribers concurrently, and can be trivially -// cached by just keeping the Snapshot around. Once the EventSnapshot is dropped -// from memory, any subscribers still reading from it may do so by following -// their pointers but eventually the snapshot is garbage collected automatically -// by Go's runtime, simplifying snapshot and buffer management dramatically. +// cached by retaining a reference to a Snapshot. Once the reference to EventSnapshot +// is dropped from memory, any subscribers still reading from it may do so by following +// their pointers. When the last subscribe unsubscribes the snapshot is garbage +// collected automatically by Go's runtime. This simplifies snapshot and buffer +// management dramatically. type EventSnapshot struct { - // Request that this snapshot satisfies. - Request *SubscribeRequest - // Snap is the first item in the buffer containing the snapshot. Once the - // snapshot is complete, subsequent update's BufferItems are appended such - // that subscribers just need to follow this buffer for the duration of their - // subscription stream. + // snapshot is complete, subsequent BufferItems are appended to snapBuffer, + // so that subscribers receive all the events from the same buffer. Snap *BufferItem // snapBuffer is the Head of the snapshot buffer the fn should write to. snapBuffer *EventBuffer - - // topicBufferHead stored the current most-recent published item from before - // the snapshot was taken such that anything published during snapshot - // publishing can be captured. - topicBufferHead *BufferItem - - // SnapFn is the function that will make the snapshot for this request. - fn SnapFn } // SnapFn is the type of function needed to generate a snapshot for a topic and @@ -43,39 +32,32 @@ type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot { buf := NewEventBuffer() s := &EventSnapshot{ - Request: req, - Snap: buf.Head(), - snapBuffer: buf, - topicBufferHead: topicBufferHead, - fn: fn, + Snap: buf.Head(), + snapBuffer: buf, } - go s.doSnapshot() + + go func() { + idx, err := fn(req, s.snapBuffer) + if err != nil { + s.snapBuffer.AppendErr(err) + return + } + // We wrote the snapshot events to the buffer, send the "end of snapshot" event + s.snapBuffer.Append([]Event{{ + Topic: req.Topic, + Key: req.Key, + Index: idx, + Payload: endOfSnapshot{}, + }}) + s.spliceFromTopicBuffer(topicBufferHead, idx) + }() return s } -func (s *EventSnapshot) doSnapshot() { - // Call snapshot func - idx, err := s.fn(s.Request, s.snapBuffer) - if err != nil { - // Append an error result to signal to subscribers that this snapshot is no - // good. - s.snapBuffer.AppendErr(err) - return - } - - // We wrote the snapshot events to the buffer, send the "end of snapshot" event - s.snapBuffer.Append([]Event{{ - Topic: s.Request.Topic, - Key: s.Request.Key, - Index: idx, - Payload: &agentpb.Event_EndOfSnapshot{ - EndOfSnapshot: true, - }, - }}) - +func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx uint64) { // Now splice on the topic buffer. We need to iterate through the buffer to // find the first event after the current snapshot. - item := s.topicBufferHead + item := topicBufferHead for { // Find the next item that we should include. next, err := item.NextNoBlock() @@ -108,14 +90,12 @@ func (s *EventSnapshot) doSnapshot() { return } - if len(next.Events) > 0 { - if next.Events[0].Index > idx { - // We've found an update in the topic buffer that happened after our - // snapshot was taken, splice it into the snapshot buffer so subscribers - // can continue to read this and others after it. - s.snapBuffer.AppendBuffer(next) - return - } + if len(next.Events) > 0 && next.Events[0].Index > idx { + // We've found an update in the topic buffer that happened after our + // snapshot was taken, splice it into the snapshot buffer so subscribers + // can continue to read this and others after it. + s.snapBuffer.AppendBuffer(next) + return } // We don't need this item, continue to next item = next