submatview: remove method receiver from handlers

pull/8809/head
Daniel Nephin 2020-10-05 15:08:03 -04:00
parent 3fa08beecf
commit 5eab1d8cef
2 changed files with 31 additions and 20 deletions

View File

@ -10,37 +10,45 @@ import (
// If eventHandler fails to handle the events it may return an error. If an // If eventHandler fails to handle the events it may return an error. If an
// error is returned the next eventHandler will be ignored. // error is returned the next eventHandler will be ignored.
// eventHandler is used to implement a very simple finite-state machine. // eventHandler is used to implement a very simple finite-state machine.
type eventHandler func(events *pbsubscribe.Event) (next eventHandler, err error) type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error)
func (m *Materializer) initialHandler(index uint64) eventHandler { type viewState interface {
updateView(events []*pbsubscribe.Event, index uint64) error
reset()
}
func initialHandler(index uint64) eventHandler {
if index == 0 { if index == 0 {
return newSnapshotHandler(m) return newSnapshotHandler()
} }
return m.resumeStreamHandler return resumeStreamHandler
} }
// snapshotHandler accumulates events. When it receives an EndOfSnapshot event
// it updates the view, and then returns eventStreamHandler to handle new events.
type snapshotHandler struct { type snapshotHandler struct {
material *Materializer events []*pbsubscribe.Event
events []*pbsubscribe.Event
} }
func newSnapshotHandler(m *Materializer) eventHandler { func newSnapshotHandler() eventHandler {
return (&snapshotHandler{material: m}).handle return (&snapshotHandler{}).handle
} }
func (h *snapshotHandler) handle(event *pbsubscribe.Event) (eventHandler, error) { func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
if event.GetEndOfSnapshot() { if event.GetEndOfSnapshot() {
err := h.material.updateView(h.events, event.Index) err := state.updateView(h.events, event.Index)
return h.material.eventStreamHandler, err return eventStreamHandler, err
} }
h.events = append(h.events, eventsFromEvent(event)...) h.events = append(h.events, eventsFromEvent(event)...)
return h.handle, nil return h.handle, nil
} }
func (m *Materializer) eventStreamHandler(event *pbsubscribe.Event) (eventHandler, error) { // eventStreamHandler handles events by updating the view. It always returns
err := m.updateView(eventsFromEvent(event), event.Index) // itself as the next handler.
return m.eventStreamHandler, err func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
err := state.updateView(eventsFromEvent(event), event.Index)
return eventStreamHandler, err
} }
func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event { func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
@ -50,10 +58,13 @@ func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
return []*pbsubscribe.Event{event} return []*pbsubscribe.Event{event}
} }
func (m *Materializer) resumeStreamHandler(event *pbsubscribe.Event) (eventHandler, error) { // resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it
// is it resets the view and returns a snapshotHandler to handle the next event.
// Otherwise it uses eventStreamHandler to handle events.
func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
if event.GetNewSnapshotToFollow() { if event.GetNewSnapshotToFollow() {
m.reset() state.reset()
return newSnapshotHandler(m), nil return newSnapshotHandler(), nil
} }
return m.eventStreamHandler(event) return eventStreamHandler(state, event)
} }

View File

@ -129,7 +129,7 @@ func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.Subs
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
m.handler = m.initialHandler(req.Index) m.handler = initialHandler(req.Index)
s, err := m.deps.Client.Subscribe(ctx, &req) s, err := m.deps.Client.Subscribe(ctx, &req)
if err != nil { if err != nil {
@ -146,7 +146,7 @@ func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.Subs
return err return err
} }
m.handler, err = m.handler(event) m.handler, err = m.handler(m, event)
if err != nil { if err != nil {
m.reset() m.reset()
return err return err