diff --git a/agent/submatview/handler.go b/agent/submatview/handler.go index 900a085de8..8f03fbfd4f 100644 --- a/agent/submatview/handler.go +++ b/agent/submatview/handler.go @@ -10,37 +10,45 @@ import ( // If eventHandler fails to handle the events it may return an error. If an // error is returned the next eventHandler will be ignored. // eventHandler is used to implement a very simple finite-state machine. -type eventHandler func(events *pbsubscribe.Event) (next eventHandler, err error) +type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error) -func (m *Materializer) initialHandler(index uint64) eventHandler { +type viewState interface { + updateView(events []*pbsubscribe.Event, index uint64) error + reset() +} + +func initialHandler(index uint64) eventHandler { if index == 0 { - return newSnapshotHandler(m) + return newSnapshotHandler() } - return m.resumeStreamHandler + return resumeStreamHandler } +// snapshotHandler accumulates events. When it receives an EndOfSnapshot event +// it updates the view, and then returns eventStreamHandler to handle new events. type snapshotHandler struct { - material *Materializer - events []*pbsubscribe.Event + events []*pbsubscribe.Event } -func newSnapshotHandler(m *Materializer) eventHandler { - return (&snapshotHandler{material: m}).handle +func newSnapshotHandler() eventHandler { + return (&snapshotHandler{}).handle } -func (h *snapshotHandler) handle(event *pbsubscribe.Event) (eventHandler, error) { +func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) { if event.GetEndOfSnapshot() { - err := h.material.updateView(h.events, event.Index) - return h.material.eventStreamHandler, err + err := state.updateView(h.events, event.Index) + return eventStreamHandler, err } h.events = append(h.events, eventsFromEvent(event)...) return h.handle, nil } -func (m *Materializer) eventStreamHandler(event *pbsubscribe.Event) (eventHandler, error) { - err := m.updateView(eventsFromEvent(event), event.Index) - return m.eventStreamHandler, err +// eventStreamHandler handles events by updating the view. It always returns +// itself as the next handler. +func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) { + err := state.updateView(eventsFromEvent(event), event.Index) + return eventStreamHandler, err } func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event { @@ -50,10 +58,13 @@ func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event { return []*pbsubscribe.Event{event} } -func (m *Materializer) resumeStreamHandler(event *pbsubscribe.Event) (eventHandler, error) { +// resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it +// is it resets the view and returns a snapshotHandler to handle the next event. +// Otherwise it uses eventStreamHandler to handle events. +func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) { if event.GetNewSnapshotToFollow() { - m.reset() - return newSnapshotHandler(m), nil + state.reset() + return newSnapshotHandler(), nil } - return m.eventStreamHandler(event) + return eventStreamHandler(state, event) } diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 3a6d3d1753..c052febebe 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -129,7 +129,7 @@ func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.Subs ctx, cancel := context.WithCancel(ctx) defer cancel() - m.handler = m.initialHandler(req.Index) + m.handler = initialHandler(req.Index) s, err := m.deps.Client.Subscribe(ctx, &req) if err != nil { @@ -146,7 +146,7 @@ func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.Subs return err } - m.handler, err = m.handler(event) + m.handler, err = m.handler(m, event) if err != nil { m.reset() return err