diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index 6afd705281..d8d5c65f20 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -219,9 +219,6 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress && args.Source.Node == "" - args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) - out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args) if err != nil { return nil, err diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index f376264983..a5712b86f0 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -8,16 +8,14 @@ import ( "github.com/hashicorp/consul/agent/submatview" ) +// TODO: godoc type Client struct { - NetRPC NetRPC - Cache CacheGetter - ViewStore MaterializedViewStore - MaterializerDeps MaterializerDeps - // CacheName to use for service health. - CacheName string - // CacheNameNotStreaming is the name of the cache type to use for any requests - // that are not supported by the streaming backend (ex: Ingress=true). - CacheNameNotStreaming string + NetRPC NetRPC + Cache CacheGetter + ViewStore MaterializedViewStore + MaterializerDeps MaterializerDeps + CacheName string + UseStreamingBackend bool } type NetRPC interface { @@ -38,6 +36,15 @@ func (c *Client) ServiceNodes( ctx context.Context, req structs.ServiceSpecificRequest, ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { + if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) { + result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req)) + if err != nil { + return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err + } + // TODO: can we store non-pointer + return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err + } + out, md, err := c.getServiceNodes(ctx, req) if err != nil { return out, md, err @@ -58,34 +65,12 @@ func (c *Client) getServiceNodes( req structs.ServiceSpecificRequest, ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { var out structs.IndexedCheckServiceNodes - - // TODO: if UseStreaming, elif !UseCache, else cache - if !req.QueryOptions.UseCache { err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out) return out, cache.ResultMeta{}, err } - if req.Source.Node == "" { - sr := serviceRequest{ - ServiceSpecificRequest: req, - deps: c.MaterializerDeps, - } - - result, err := c.ViewStore.Get(ctx, sr) - if err != nil { - return out, cache.ResultMeta{}, err - } - // TODO: can we store non-pointer - return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err - } - - cacheName := c.CacheName - if req.Ingress || req.Source.Node != "" { - cacheName = c.CacheNameNotStreaming - } - - raw, md, err := c.Cache.Get(ctx, cacheName, &req) + raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) if err != nil { return out, md, err } @@ -104,11 +89,23 @@ func (c *Client) Notify( correlationID string, ch chan<- cache.UpdateEvent, ) error { - cacheName := c.CacheName - if req.Ingress || req.Source.Node != "" { - cacheName = c.CacheNameNotStreaming + if c.useStreaming(req) { + sr := c.newServiceRequest(req) + return c.ViewStore.Notify(ctx, sr, correlationID, ch) + } + + return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) +} + +func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool { + return c.UseStreamingBackend && !req.Ingress && req.Source.Node == "" +} + +func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest { + return serviceRequest{ + ServiceSpecificRequest: req, + deps: c.MaterializerDeps, } - return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch) } type serviceRequest struct { diff --git a/agent/rpcclient/health/health_test.go b/agent/rpcclient/health/health_test.go new file mode 100644 index 0000000000..09da967bde --- /dev/null +++ b/agent/rpcclient/health/health_test.go @@ -0,0 +1,235 @@ +package health + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" +) + +func TestClient_ServiceNodes_BackendRouting(t *testing.T) { + type testCase struct { + name string + req structs.ServiceSpecificRequest + expected func(t *testing.T, c *Client) + } + + run := func(t *testing.T, tc testCase) { + c := &Client{ + NetRPC: &fakeNetRPC{}, + Cache: &fakeCache{}, + ViewStore: &fakeViewStore{}, + CacheName: "cache-no-streaming", + UseStreamingBackend: true, + } + + _, _, err := c.ServiceNodes(context.Background(), tc.req) + require.NoError(t, err) + tc.expected(t, c) + } + + var testCases = []testCase{ + { + name: "rpc by default", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + }, + expected: useRPC, + }, + { + name: "use streaming instead of cache", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{UseCache: true}, + }, + expected: useStreaming, + }, + { + name: "use streaming for MinQueryIndex", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{MinQueryIndex: 22}, + }, + expected: useStreaming, + }, + { + name: "use cache for ingress request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{UseCache: true}, + Ingress: true, + }, + expected: useCache, + }, + { + name: "use cache for near request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{UseCache: true}, + Source: structs.QuerySource{Node: "node1"}, + }, + expected: useCache, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func useRPC(t *testing.T, c *Client) { + t.Helper() + + rpc, ok := c.NetRPC.(*fakeNetRPC) + require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) + + cache, ok := c.Cache.(*fakeCache) + require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) + + store, ok := c.ViewStore.(*fakeViewStore) + require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) + + require.Len(t, cache.calls, 0) + require.Len(t, store.calls, 0) + require.Equal(t, []string{"Health.ServiceNodes"}, rpc.calls) +} + +func useStreaming(t *testing.T, c *Client) { + t.Helper() + + rpc, ok := c.NetRPC.(*fakeNetRPC) + require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) + + cache, ok := c.Cache.(*fakeCache) + require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) + + store, ok := c.ViewStore.(*fakeViewStore) + require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) + + require.Len(t, cache.calls, 0) + require.Len(t, rpc.calls, 0) + require.Len(t, store.calls, 1) +} + +func useCache(t *testing.T, c *Client) { + t.Helper() + + rpc, ok := c.NetRPC.(*fakeNetRPC) + require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) + + cache, ok := c.Cache.(*fakeCache) + require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) + + store, ok := c.ViewStore.(*fakeViewStore) + require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) + + require.Len(t, rpc.calls, 0) + require.Len(t, store.calls, 0) + require.Equal(t, []string{"cache-no-streaming"}, cache.calls) +} + +type fakeCache struct { + calls []string +} + +func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface{}, cache.ResultMeta, error) { + f.calls = append(f.calls, t) + result := &structs.IndexedCheckServiceNodes{} + return result, cache.ResultMeta{}, nil +} + +func (f *fakeCache) Notify(_ context.Context, t string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error { + f.calls = append(f.calls, t) + return nil +} + +type fakeNetRPC struct { + calls []string +} + +func (f *fakeNetRPC) RPC(method string, _ interface{}, _ interface{}) error { + f.calls = append(f.calls, method) + return nil +} + +type fakeViewStore struct { + calls []submatview.Request +} + +func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatview.Result, error) { + f.calls = append(f.calls, req) + return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil +} + +func (f *fakeViewStore) Notify(_ context.Context, req submatview.Request, _ string, _ chan<- cache.UpdateEvent) error { + f.calls = append(f.calls, req) + return nil +} + +func TestClient_Notify_BackendRouting(t *testing.T) { + type testCase struct { + name string + req structs.ServiceSpecificRequest + expected func(t *testing.T, c *Client) + } + + run := func(t *testing.T, tc testCase) { + c := &Client{ + NetRPC: &fakeNetRPC{}, + Cache: &fakeCache{}, + ViewStore: &fakeViewStore{}, + CacheName: "cache-no-streaming", + UseStreamingBackend: true, + } + + err := c.Notify(context.Background(), tc.req, "cid", nil) + require.NoError(t, err) + tc.expected(t, c) + } + + var testCases = []testCase{ + { + name: "streaming by default", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + }, + expected: useStreaming, + }, + { + name: "use cache for ingress request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + Ingress: true, + }, + expected: useCache, + }, + { + name: "use cache for near request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + Source: structs.QuerySource{Node: "node1"}, + }, + expected: useCache, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +}