diff --git a/agent/cache-types/streaming_events_test.go b/agent/cache-types/streaming_events_test.go index 4019ee8f6b..f1fe9ce7a1 100644 --- a/agent/cache-types/streaming_events_test.go +++ b/agent/cache-types/streaming_events_test.go @@ -17,11 +17,11 @@ func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.E } } -func newEndOfEmptySnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { +func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { return &pbsubscribe.Event{ Topic: topic, Index: index, - Payload: &pbsubscribe.Event_EndOfEmptySnapshot{EndOfEmptySnapshot: true}, + Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, } } diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index e6e014802e..addbbc0593 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -37,46 +37,59 @@ func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices } type MaterializerDeps struct { - Client submatview.StreamingClient + Client submatview.StreamClient Logger hclog.Logger } // Fetch implements cache.Type func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { if opts.LastResult != nil && opts.LastResult.State != nil { - return opts.LastResult.State.(*submatview.Materializer).Fetch(opts) + state := opts.LastResult.State.(*streamingHealthState) + return state.materializer.Fetch(state.done, opts) } srvReq := req.(*structs.ServiceSpecificRequest) - subReq := pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: srvReq.ServiceName, - Token: srvReq.Token, - Index: srvReq.MinQueryIndex, - Datacenter: srvReq.Datacenter, - } - if srvReq.Connect { - subReq.Topic = pbsubscribe.Topic_ServiceHealthConnect + newReqFn := func(index uint64) pbsubscribe.SubscribeRequest { + req := pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: srvReq.ServiceName, + Token: srvReq.Token, + Datacenter: srvReq.Datacenter, + Index: index, + } + if srvReq.Connect { + req.Topic = pbsubscribe.Topic_ServiceHealthConnect + } + return req } - view, err := newMaterializer(c.deps, subReq, srvReq.Filter) + + m, err := newMaterializer(c.deps, newReqFn, srvReq.Filter) if err != nil { return cache.FetchResult{}, err } - return view.Fetch(opts) + ctx, cancel := context.WithCancel(context.TODO()) + go m.Run(ctx) + + result, err := m.Fetch(ctx.Done(), opts) + result.State = &streamingHealthState{ + materializer: m, + done: ctx.Done(), + cancel: cancel, + } + return result, err } func newMaterializer( d MaterializerDeps, - r pbsubscribe.SubscribeRequest, + r func(uint64) pbsubscribe.SubscribeRequest, filter string, ) (*submatview.Materializer, error) { - state, err := newHealthViewState(filter) + view, err := newHealthViewState(filter) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(context.TODO()) - view := submatview.NewMaterializer(submatview.ViewDeps{ - State: state, + return submatview.NewMaterializer(submatview.Deps{ + View: view, Client: d.Client, Logger: d.Logger, Waiter: &retry.Waiter{ @@ -86,15 +99,22 @@ func newMaterializer( Jitter: retry.NewJitter(100), }, Request: r, - Stop: cancel, - Done: ctx.Done(), - }) - go view.Run(ctx) - return view, nil + }), nil +} + +type streamingHealthState struct { + materializer *submatview.Materializer + done <-chan struct{} + cancel func() +} + +func (c *streamingHealthState) Close() error { + c.cancel() + return nil } func newHealthViewState(filterExpr string) (submatview.View, error) { - s := &healthViewState{state: make(map[string]structs.CheckServiceNode)} + s := &healthView{state: make(map[string]structs.CheckServiceNode)} // We apply filtering to the raw CheckServiceNodes before we are done mutating // state in Update to save from storing stuff in memory we'll only filter @@ -105,19 +125,19 @@ func newHealthViewState(filterExpr string) (submatview.View, error) { return s, err } -// healthViewState implements View for storing the view state +// healthView implements submatview.View for storing the view state // of a service health result. We store it as a map to make updates and // deletions a little easier but we could just store a result type // (IndexedCheckServiceNodes) and update it in place for each event - that // involves re-sorting each time etc. though. -type healthViewState struct { +type healthView struct { state map[string]structs.CheckServiceNode // TODO: test case with filter filter *bexpr.Filter } // Update implements View -func (s *healthViewState) Update(events []*pbsubscribe.Event) error { +func (s *healthView) Update(events []*pbsubscribe.Event) error { for _, event := range events { serviceHealth := event.GetServiceHealth() if serviceHealth == nil { @@ -147,7 +167,7 @@ func (s *healthViewState) Update(events []*pbsubscribe.Event) error { } // Result implements View -func (s *healthViewState) Result(index uint64) (interface{}, error) { +func (s *healthView) Result(index uint64) (interface{}, error) { var result structs.IndexedCheckServiceNodes // Avoid a nil slice if there are no results in the view // TODO: why this ^ @@ -159,6 +179,6 @@ func (s *healthViewState) Result(index uint64) (interface{}, error) { return &result, nil } -func (s *healthViewState) Reset() { +func (s *healthView) Reset() { s.state = make(map[string]structs.CheckServiceNode) } diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 2118ee1f48..2eae5c93a9 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -28,10 +28,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { // EndOfSnapshot message immediately with index of 1. client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1)) - // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ MinIndex: 0, - Timeout: 1 * time.Second, + Timeout: time.Second, } req := &structs.ServiceSpecificRequest{ Datacenter: "dc1", @@ -111,7 +110,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { // After the error the view should re-subscribe with same index so will get // a "resume stream". - client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) + client.QueueEvents(newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) // Next fetch will continue to block until timeout and receive the same // result. @@ -157,7 +156,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { // After the error the view should re-subscribe with same index so will get // a "resume stream". - client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) + client.QueueEvents(newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) }() // Next fetch should return the error diff --git a/agent/submatview/handler.go b/agent/submatview/handler.go new file mode 100644 index 0000000000..7484e4dd33 --- /dev/null +++ b/agent/submatview/handler.go @@ -0,0 +1,51 @@ +package submatview + +import "github.com/hashicorp/consul/proto/pbsubscribe" + +type eventHandler func(events *pbsubscribe.Event) (eventHandler, error) + +func (m *Materializer) initialHandler(index uint64) eventHandler { + if index == 0 { + return newSnapshotHandler(m) + } + return m.resumeStreamHandler +} + +type snapshotHandler struct { + material *Materializer + events []*pbsubscribe.Event +} + +func newSnapshotHandler(m *Materializer) eventHandler { + return (&snapshotHandler{material: m}).handle +} + +func (h *snapshotHandler) handle(event *pbsubscribe.Event) (eventHandler, error) { + if event.GetEndOfSnapshot() { + err := h.material.updateView(h.events, event.Index) + return h.material.eventStreamHandler, err + } + + h.events = append(h.events, eventsFromEvent(event)...) + return h.handle, nil +} + +func (m *Materializer) eventStreamHandler(event *pbsubscribe.Event) (eventHandler, error) { + err := m.updateView(eventsFromEvent(event), event.Index) + return m.eventStreamHandler, err +} + +func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event { + if batch := event.GetEventBatch(); batch != nil { + return batch.Events + } + return []*pbsubscribe.Event{event} +} + +func (m *Materializer) resumeStreamHandler(event *pbsubscribe.Event) (eventHandler, error) { + if event.GetNewSnapshotToFollow() { + m.reset() + return newSnapshotHandler(m), nil + } + return m.eventStreamHandler(event) +} diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 4c599edf89..090793a1db 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -2,7 +2,6 @@ package submatview import ( "context" - "errors" "sync" "time" @@ -16,8 +15,9 @@ import ( "github.com/hashicorp/consul/proto/pbsubscribe" ) -// View is the interface used to manage they type-specific -// materialized view logic. +// View receives events from, and return results to, Materializer. A view is +// responsible for converting the pbsubscribe.Event.Payload into the local +// type, and storing it so that it can be returned by Result(). type View interface { // Update is called when one or more events are received. The first call will // include _all_ events in the initial snapshot which may be an empty set. @@ -39,132 +39,85 @@ type View interface { Reset() } -type Filter func(seq interface{}) (interface{}, error) - -// resetErr represents a server request to reset the subscription, it's typed so -// we can mark it as temporary and so attempt to retry first time without -// notifying clients. -type resetErr string - -// Temporary Implements the internal Temporary interface -func (e resetErr) Temporary() bool { - return true -} - -// Error implements error -func (e resetErr) Error() string { - return string(e) -} - -// TODO: update godoc -// Materializer is a partial view of the state on servers, maintained via -// streaming subscriptions. It is specialized for different cache types by -// providing a View that encapsulates the logic to update the -// state and format it as the correct result type. +// Materializer consumes the event stream, handling any framing events, and +// sends the events to View as they are received. // -// The Materializer object becomes the cache.Result.State for a streaming +// Materializer is used as the cache.Result.State for a streaming // cache type and manages the actual streaming RPC call to the servers behind // the scenes until the cache result is discarded when TTL expires. type Materializer struct { - // Properties above the lock are immutable after the view is constructed in - // NewMaterializer and must not be modified. - deps ViewDeps - - // l protects the mutable state - all fields below it must only be accessed - // while holding l. - l sync.Mutex - index uint64 - view View - snapshotDone bool - updateCh chan struct{} - retryWaiter *retry.Waiter - err error + deps Deps + retryWaiter *retry.Waiter + handler eventHandler + + // lock protects the mutable state - all fields below it must only be accessed + // while holding lock. + lock sync.Mutex + index uint64 + view View + updateCh chan struct{} + err error } -// TODO: rename -type ViewDeps struct { - State View - Client StreamingClient +type Deps struct { + View View + Client StreamClient Logger hclog.Logger Waiter *retry.Waiter - Request pbsubscribe.SubscribeRequest + Request func(index uint64) pbsubscribe.SubscribeRequest Stop func() - Done <-chan struct{} } -// StreamingClient is the interface we need from the gRPC client stub. Separate -// interface simplifies testing. -type StreamingClient interface { +// StreamClient provides a subscription to state change events. +type StreamClient interface { Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) } -// NewMaterializer retrieves an existing view from the cache result -// state if one exists, otherwise creates a new one. Note that the returned view -// MUST have Close called eventually to avoid leaking resources. Typically this -// is done automatically if the view is returned in a cache.Result.State when -// the cache evicts the result. If the view is not returned in a result state -// though Close must be called some other way to avoid leaking the goroutine and -// memory. -func NewMaterializer(deps ViewDeps) *Materializer { +// NewMaterializer returns a new Materializer. Run must be called to start it. +func NewMaterializer(deps Deps) *Materializer { v := &Materializer{ deps: deps, - view: deps.State, + view: deps.View, retryWaiter: deps.Waiter, } v.reset() return v } -// Close implements io.Close and discards view state and stops background view -// maintenance. -func (v *Materializer) Close() error { - v.l.Lock() - defer v.l.Unlock() - v.deps.Stop() - return nil -} +// Run receives events from the StreamClient and sends them to the View. It runs +// until ctx is cancelled, so it is expected to be run in a goroutine. +func (m *Materializer) Run(ctx context.Context) { + for { + req := m.deps.Request(m.index) + err := m.runSubscription(ctx, req) + if ctx.Err() != nil { + return + } -func (v *Materializer) Run(ctx context.Context) { - if ctx.Err() != nil { - return - } + m.lock.Lock() + // TODO: move this into a func + // If this is a temporary error and it's the first consecutive failure, + // retry to see if we can get a result without erroring back to clients. + // If it's non-temporary or a repeated failure return to clients while we + // retry to get back in a good state. + if _, ok := err.(temporary); !ok || m.retryWaiter.Failures() > 0 { + m.notifyUpdateLocked(err) + } + waitCh := m.retryWaiter.Failed() + failures := m.retryWaiter.Failures() + m.lock.Unlock() - // Loop in case stream resets and we need to start over - for { - err := v.runSubscription(ctx) - if err != nil { - if ctx.Err() != nil { - // Err doesn't matter and is likely just context cancelled - return - } + m.deps.Logger.Error("subscribe call failed", + "err", err, + "topic", req.Topic, + "key", req.Key, + "failure_count", failures) - v.l.Lock() - // If this is a temporary error and it's the first consecutive failure, - // retry to see if we can get a result without erroring back to clients. - // If it's non-temporary or a repeated failure return to clients while we - // retry to get back in a good state. - if _, ok := err.(temporary); !ok || v.retryWaiter.Failures() > 0 { - // Report error to blocked fetchers - v.err = err - v.notifyUpdateLocked() - } - waitCh := v.retryWaiter.Failed() - failures := v.retryWaiter.Failures() - v.l.Unlock() - - v.deps.Logger.Error("subscribe call failed", - "err", err, - "topic", v.deps.Request.Topic, - "key", v.deps.Request.Key, - "failure_count", failures) - - select { - case <-ctx.Done(): - return - case <-waitCh: - } + select { + case <-ctx.Done(): + return + case <-waitCh: } - // Loop and keep trying to resume subscription after error } } @@ -176,114 +129,31 @@ type temporary interface { // runSubscription opens a new subscribe streaming call to the servers and runs // for it's lifetime or until the view is closed. -func (v *Materializer) runSubscription(ctx context.Context) error { +func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.SubscribeRequest) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - // Copy the request template - req := v.deps.Request - - v.l.Lock() + m.handler = m.initialHandler(req.Index) - // Update request index to be the current view index in case we are resuming a - // broken stream. - req.Index = v.index - - // Make local copy so we don't have to read with a lock for every event. We - // are the only goroutine that can update so we know it won't change without - // us knowing but we do need lock to protect external readers when we update. - snapshotDone := v.snapshotDone - - v.l.Unlock() - - s, err := v.deps.Client.Subscribe(ctx, &req) + s, err := m.deps.Client.Subscribe(ctx, &req) if err != nil { return err } - snapshotEvents := make([]*pbsubscribe.Event, 0) - for { event, err := s.Recv() switch { case isGrpcStatus(err, codes.Aborted): - v.reset() + m.reset() return resetErr("stream reset requested") case err != nil: return err } - if event.GetEndOfSnapshot() { - // Hold lock while mutating view state so implementer doesn't need to - // worry about synchronization. - v.l.Lock() - - // Deliver snapshot events to the View state - if err := v.view.Update(snapshotEvents); err != nil { - v.l.Unlock() - // This error is kinda fatal to the view - we didn't apply some events - // the server sent us which means our view is now not in sync. The only - // thing we can do is start over and hope for a better outcome. - v.reset() - return err - } - // Done collecting these now - snapshotEvents = nil - v.snapshotDone = true - // update our local copy so we can read it without lock. - snapshotDone = true - v.index = event.Index - // We have a good result, reset the error flag - v.err = nil - v.retryWaiter.Reset() - // Notify watchers of the update to the view - v.notifyUpdateLocked() - v.l.Unlock() - continue - } - - if event.GetEndOfEmptySnapshot() { - // We've opened a new subscribe with a non-zero index to resume a - // connection and the server confirms it's not sending a new snapshot. - if !snapshotDone { - // We've somehow got into a bad state here - the server thinks we have - // an up-to-date snapshot but we don't think we do. Reset and start - // over. - v.reset() - return errors.New("stream resume sent but no local snapshot") - } - // Just continue on as we were! - continue - } - - // We have an event for the topic - events := []*pbsubscribe.Event{event} - - // If the event is a batch, unwrap and deliver the raw events - if batch := event.GetEventBatch(); batch != nil { - events = batch.Events - } - - if snapshotDone { - // We've already got a snapshot, this is an update, deliver it right away. - v.l.Lock() - if err := v.view.Update(events); err != nil { - v.l.Unlock() - // This error is kinda fatal to the view - we didn't apply some events - // the server sent us which means our view is now not in sync. The only - // thing we can do is start over and hope for a better outcome. - v.reset() - return err - } - // Notify watchers of the update to the view - v.index = event.Index - // We have a good result, reset the error flag - v.err = nil - v.retryWaiter.Reset() - v.notifyUpdateLocked() - v.l.Unlock() - } else { - snapshotEvents = append(snapshotEvents, events...) + m.handler, err = m.handler(event) + if err != nil { + m.reset() + return err } } } @@ -293,42 +163,67 @@ func isGrpcStatus(err error, code codes.Code) bool { return ok && s.Code() == code } +// resetErr represents a server request to reset the subscription, it's typed so +// we can mark it as temporary and so attempt to retry first time without +// notifying clients. +type resetErr string + +// Temporary Implements the internal Temporary interface +func (e resetErr) Temporary() bool { + return true +} + +// Error implements error +func (e resetErr) Error() string { + return string(e) +} + // reset clears the state ready to start a new stream from scratch. -func (v *Materializer) reset() { - v.l.Lock() - defer v.l.Unlock() - - v.view.Reset() - v.notifyUpdateLocked() - // Always start from zero when we have a new state so we load a snapshot from - // the servers. - v.index = 0 - v.snapshotDone = false - v.err = nil - v.retryWaiter.Reset() +func (m *Materializer) reset() { + m.lock.Lock() + defer m.lock.Unlock() + + m.view.Reset() + m.index = 0 + m.notifyUpdateLocked(nil) + m.retryWaiter.Reset() +} + +func (m *Materializer) updateView(events []*pbsubscribe.Event, index uint64) error { + m.lock.Lock() + defer m.lock.Unlock() + + if err := m.view.Update(events); err != nil { + return err + } + m.index = index + m.notifyUpdateLocked(nil) + m.retryWaiter.Reset() + return nil } // notifyUpdateLocked closes the current update channel and recreates a new -// one. It must be called while holding the s.l lock. -func (v *Materializer) notifyUpdateLocked() { - if v.updateCh != nil { - close(v.updateCh) +// one. It must be called while holding the s.lock lock. +func (m *Materializer) notifyUpdateLocked(err error) { + m.err = err + if m.updateCh != nil { + close(m.updateCh) } - v.updateCh = make(chan struct{}) + m.updateCh = make(chan struct{}) } // Fetch implements the logic a StreamingCacheType will need during it's Fetch // call. Cache types that use streaming should just be able to proxy to this // once they have a subscription object and return it's results directly. -func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) { +func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) { var result cache.FetchResult // Get current view Result and index - v.l.Lock() - index := v.index - val, err := v.view.Result(v.index) - updateCh := v.updateCh - v.l.Unlock() + m.lock.Lock() + index := m.index + val, err := m.view.Result(m.index) + updateCh := m.updateCh + m.lock.Unlock() if err != nil { return result, err @@ -336,7 +231,6 @@ func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) result.Index = index result.Value = val - result.State = v // If our index is > req.Index return right away. If index is zero then we // haven't loaded a snapshot at all yet which means we should wait for one on @@ -355,18 +249,18 @@ func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) select { case <-updateCh: // View updated, return the new result - v.l.Lock() - result.Index = v.index + m.lock.Lock() + result.Index = m.index // Grab the new updateCh in case we need to keep waiting for the next // update. - updateCh = v.updateCh - fetchErr := v.err + updateCh = m.updateCh + fetchErr := m.err if fetchErr == nil { // Only generate a new result if there was no error to avoid pointless // work potentially shuffling the same data around. - result.Value, err = v.view.Result(v.index) + result.Value, err = m.view.Result(m.index) } - v.l.Unlock() + m.lock.Unlock() // If there was a non-transient error return it if fetchErr != nil { @@ -391,7 +285,7 @@ func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) // Just return whatever we got originally, might still be empty return result, nil - case <-v.deps.Done: + case <-done: return result, context.Canceled } }