From e849f6d7ac66aa21ff093614c1a666fe8fe3e6fa Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 1 Oct 2020 02:34:43 -0400 Subject: [PATCH] submatview: Move the 'use materialize from result.State' logic No need to do all this other work if we have one already. This logic moved closer to this call site 3 times during the process of refactoring. --- .../cache-types/streaming_health_services.go | 79 +++++++------------ .../streaming_health_services_test.go | 32 ++++---- agent/submatview/materializer.go | 11 +-- 3 files changed, 47 insertions(+), 75 deletions(-) diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index 42f58741d3..e6e014802e 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -26,65 +26,59 @@ const ( // StreamingHealthServices supports fetching discovering service instances via the // catalog using the streaming gRPC endpoint. type StreamingHealthServices struct { - client submatview.StreamingClient - logger hclog.Logger + RegisterOptionsBlockingRefresh + deps MaterializerDeps } // NewStreamingHealthServices creates a cache-type for watching for service // health results via streaming updates. -func NewStreamingHealthServices(client submatview.StreamingClient, logger hclog.Logger) *StreamingHealthServices { - return &StreamingHealthServices{ - client: client, - logger: logger, - } +func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices { + return &StreamingHealthServices{deps: deps} +} + +type MaterializerDeps struct { + Client submatview.StreamingClient + Logger hclog.Logger } // Fetch implements cache.Type func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { - // The request should be a ServiceSpecificRequest. - reqReal, ok := req.(*structs.ServiceSpecificRequest) - if !ok { - return cache.FetchResult{}, fmt.Errorf( - "Internal cache failure: request wrong type: %T", req) + if opts.LastResult != nil && opts.LastResult.State != nil { + return opts.LastResult.State.(*submatview.Materializer).Fetch(opts) } - r := submatview.Request{ - SubscribeRequest: pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: reqReal.ServiceName, - Token: reqReal.Token, - Index: reqReal.MinQueryIndex, - Datacenter: reqReal.Datacenter, - }, - Filter: reqReal.Filter, + srvReq := req.(*structs.ServiceSpecificRequest) + subReq := pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: srvReq.ServiceName, + Token: srvReq.Token, + Index: srvReq.MinQueryIndex, + Datacenter: srvReq.Datacenter, } - - // Connect requests need a different topic - if reqReal.Connect { - r.Topic = pbsubscribe.Topic_ServiceHealthConnect + if srvReq.Connect { + subReq.Topic = pbsubscribe.Topic_ServiceHealthConnect } - - view, err := c.getMaterializedView(opts, r) + view, err := newMaterializer(c.deps, subReq, srvReq.Filter) if err != nil { return cache.FetchResult{}, err } return view.Fetch(opts) } -func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r submatview.Request) (*submatview.Materializer, error) { - if opts.LastResult != nil && opts.LastResult.State != nil { - return opts.LastResult.State.(*submatview.Materializer), nil - } - - state, err := newHealthViewState(r.Filter) +func newMaterializer( + d MaterializerDeps, + r pbsubscribe.SubscribeRequest, + filter string, +) (*submatview.Materializer, error) { + state, err := newHealthViewState(filter) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.TODO()) view := submatview.NewMaterializer(submatview.ViewDeps{ State: state, - Client: c.client, - Logger: c.logger, + Client: d.Client, + Logger: d.Logger, Waiter: &retry.Waiter{ MinFailures: 1, MinWait: 0, @@ -99,11 +93,6 @@ func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r return view, nil } -// SupportsBlocking implements cache.Type -func (c *StreamingHealthServices) SupportsBlocking() bool { - return true -} - func newHealthViewState(filterExpr string) (submatview.View, error) { s := &healthViewState{state: make(map[string]structs.CheckServiceNode)} @@ -116,16 +105,6 @@ func newHealthViewState(filterExpr string) (submatview.View, error) { return s, err } -// StreamingClient implements StreamingCacheType -func (c *StreamingHealthServices) StreamingClient() submatview.StreamingClient { - return c.client -} - -// Logger implements StreamingCacheType -func (c *StreamingHealthServices) Logger() hclog.Logger { - return c.logger -} - // healthViewState implements View for storing the view state // of a service health result. We store it as a map to make updates and // deletions a little easier but we could just store a result type diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 84e8342f3d..2118ee1f48 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -19,10 +19,10 @@ import ( func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { client := NewTestStreamingClient() - typ := StreamingHealthServices{ - client: client, - logger: hclog.Default(), - } + typ := StreamingHealthServices{deps: MaterializerDeps{ + Client: client, + Logger: hclog.Default(), + }} // Initially there are no services registered. Server should send an // EndOfSnapshot message immediately with index of 1. @@ -233,10 +233,10 @@ func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNode func TestStreamingHealthServices_FullSnapshot(t *testing.T) { client := NewTestStreamingClient() - typ := StreamingHealthServices{ - client: client, - logger: hclog.Default(), - } + typ := StreamingHealthServices{deps: MaterializerDeps{ + Client: client, + Logger: hclog.Default(), + }} // Create an initial snapshot of 3 instances on different nodes makeReg := func(index uint64, nodeNum int) *pbsubscribe.Event { @@ -341,10 +341,10 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { func TestStreamingHealthServices_EventBatches(t *testing.T) { client := NewTestStreamingClient() - typ := StreamingHealthServices{ - client: client, - logger: hclog.Default(), - } + typ := StreamingHealthServices{deps: MaterializerDeps{ + Client: client, + Logger: hclog.Default(), + }} // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( @@ -411,10 +411,10 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { func TestStreamingHealthServices_Filtering(t *testing.T) { client := NewTestStreamingClient() - typ := StreamingHealthServices{ - client: client, - logger: hclog.Default(), - } + typ := StreamingHealthServices{deps: MaterializerDeps{ + Client: client, + Logger: hclog.Default(), + }} // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index c75126b27c..4c599edf89 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -56,13 +56,6 @@ func (e resetErr) Error() string { return string(e) } -type Request struct { - pbsubscribe.SubscribeRequest - // Filter is a bexpr filter expression that is used to filter events on the - // client side. - Filter string -} - // TODO: update godoc // Materializer is a partial view of the state on servers, maintained via // streaming subscriptions. It is specialized for different cache types by @@ -94,7 +87,7 @@ type ViewDeps struct { Client StreamingClient Logger hclog.Logger Waiter *retry.Waiter - Request Request + Request pbsubscribe.SubscribeRequest Stop func() Done <-chan struct{} } @@ -203,7 +196,7 @@ func (v *Materializer) runSubscription(ctx context.Context) error { v.l.Unlock() - s, err := v.deps.Client.Subscribe(ctx, &req.SubscribeRequest) + s, err := v.deps.Client.Subscribe(ctx, &req) if err != nil { return err }