mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
283 lines
7.2 KiB
283 lines
7.2 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: BUSL-1.1 |
|
|
|
package health |
|
|
|
import ( |
|
"context" |
|
"testing" |
|
"time" |
|
|
|
"github.com/hashicorp/consul/agent/rpcclient" |
|
"github.com/stretchr/testify/require" |
|
|
|
"github.com/hashicorp/consul/agent/cache" |
|
"github.com/hashicorp/consul/agent/config" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/agent/submatview" |
|
) |
|
|
|
func TestClient_ServiceNodes_BackendRouting(t *testing.T) { |
|
type testCase struct { |
|
name string |
|
req structs.ServiceSpecificRequest |
|
expected func(t *testing.T, c *Client) |
|
} |
|
|
|
run := func(t *testing.T, tc testCase) { |
|
c := &Client{ |
|
Client: rpcclient.Client{ |
|
NetRPC: &fakeNetRPC{}, |
|
Cache: &fakeCache{}, |
|
ViewStore: &fakeViewStore{}, |
|
CacheName: "cache-no-streaming", |
|
UseStreamingBackend: true, |
|
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{}), |
|
}, |
|
} |
|
|
|
_, _, err := c.ServiceNodes(context.Background(), tc.req) |
|
require.NoError(t, err) |
|
tc.expected(t, c) |
|
} |
|
|
|
var testCases = []testCase{ |
|
{ |
|
name: "rpc by default", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
}, |
|
expected: useRPC, |
|
}, |
|
{ |
|
name: "use streaming instead of cache", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
QueryOptions: structs.QueryOptions{UseCache: true}, |
|
}, |
|
expected: useStreaming, |
|
}, |
|
{ |
|
name: "use streaming for MinQueryIndex", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
QueryOptions: structs.QueryOptions{MinQueryIndex: 22}, |
|
}, |
|
expected: useStreaming, |
|
}, |
|
{ |
|
name: "use cache for ingress request", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
QueryOptions: structs.QueryOptions{UseCache: true}, |
|
Ingress: true, |
|
}, |
|
expected: useCache, |
|
}, |
|
{ |
|
name: "use cache for near request", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
QueryOptions: structs.QueryOptions{UseCache: true}, |
|
Source: structs.QuerySource{Node: "node1"}, |
|
}, |
|
expected: useCache, |
|
}, |
|
{ |
|
name: "rpc if merge-central-config", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
MergeCentralConfig: true, |
|
QueryOptions: structs.QueryOptions{MinQueryIndex: 22}, |
|
}, |
|
expected: useRPC, |
|
}, |
|
} |
|
|
|
for _, tc := range testCases { |
|
t.Run(tc.name, func(t *testing.T) { |
|
run(t, tc) |
|
}) |
|
} |
|
} |
|
|
|
func useRPC(t *testing.T, c *Client) { |
|
t.Helper() |
|
|
|
rpc, ok := c.NetRPC.(*fakeNetRPC) |
|
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) |
|
|
|
cache, ok := c.Cache.(*fakeCache) |
|
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) |
|
|
|
store, ok := c.ViewStore.(*fakeViewStore) |
|
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) |
|
|
|
require.Len(t, cache.calls, 0) |
|
require.Len(t, store.calls, 0) |
|
require.Equal(t, []string{"Health.ServiceNodes"}, rpc.calls) |
|
} |
|
|
|
func useStreaming(t *testing.T, c *Client) { |
|
t.Helper() |
|
|
|
rpc, ok := c.NetRPC.(*fakeNetRPC) |
|
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) |
|
|
|
cache, ok := c.Cache.(*fakeCache) |
|
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) |
|
|
|
store, ok := c.ViewStore.(*fakeViewStore) |
|
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) |
|
|
|
require.Len(t, cache.calls, 0) |
|
require.Len(t, rpc.calls, 0) |
|
require.Len(t, store.calls, 1) |
|
} |
|
|
|
func useCache(t *testing.T, c *Client) { |
|
t.Helper() |
|
|
|
rpc, ok := c.NetRPC.(*fakeNetRPC) |
|
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) |
|
|
|
cache, ok := c.Cache.(*fakeCache) |
|
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) |
|
|
|
store, ok := c.ViewStore.(*fakeViewStore) |
|
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) |
|
|
|
require.Len(t, rpc.calls, 0) |
|
require.Len(t, store.calls, 0) |
|
require.Equal(t, []string{"cache-no-streaming"}, cache.calls) |
|
} |
|
|
|
type fakeCache struct { |
|
calls []string |
|
} |
|
|
|
func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface{}, cache.ResultMeta, error) { |
|
f.calls = append(f.calls, t) |
|
result := &structs.IndexedCheckServiceNodes{} |
|
return result, cache.ResultMeta{}, nil |
|
} |
|
|
|
func (f *fakeCache) NotifyCallback(_ context.Context, t string, _ cache.Request, _ string, _ cache.Callback) error { |
|
f.calls = append(f.calls, t) |
|
return nil |
|
} |
|
|
|
type fakeNetRPC struct { |
|
calls []string |
|
} |
|
|
|
func (f *fakeNetRPC) RPC(ctx context.Context, method string, _ interface{}, _ interface{}) error { |
|
f.calls = append(f.calls, method) |
|
return nil |
|
} |
|
|
|
type fakeViewStore struct { |
|
calls []submatview.Request |
|
} |
|
|
|
func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatview.Result, error) { |
|
f.calls = append(f.calls, req) |
|
return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil |
|
} |
|
|
|
func (f *fakeViewStore) NotifyCallback(_ context.Context, req submatview.Request, _ string, _ cache.Callback) error { |
|
f.calls = append(f.calls, req) |
|
return nil |
|
} |
|
|
|
func TestClient_Notify_BackendRouting(t *testing.T) { |
|
type testCase struct { |
|
name string |
|
req structs.ServiceSpecificRequest |
|
expected func(t *testing.T, c *Client) |
|
} |
|
|
|
run := func(t *testing.T, tc testCase) { |
|
c := &Client{ |
|
Client: rpcclient.Client{ |
|
NetRPC: &fakeNetRPC{}, |
|
Cache: &fakeCache{}, |
|
ViewStore: &fakeViewStore{}, |
|
CacheName: "cache-no-streaming", |
|
UseStreamingBackend: true, |
|
}, |
|
} |
|
|
|
err := c.Notify(context.Background(), tc.req, "cid", nil) |
|
require.NoError(t, err) |
|
tc.expected(t, c) |
|
} |
|
|
|
var testCases = []testCase{ |
|
{ |
|
name: "streaming by default", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
}, |
|
expected: useStreaming, |
|
}, |
|
{ |
|
name: "use cache for ingress request", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
Ingress: true, |
|
}, |
|
expected: useCache, |
|
}, |
|
{ |
|
name: "use cache for near request", |
|
req: structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
Source: structs.QuerySource{Node: "node1"}, |
|
}, |
|
expected: useCache, |
|
}, |
|
} |
|
|
|
for _, tc := range testCases { |
|
t.Run(tc.name, func(t *testing.T) { |
|
run(t, tc) |
|
}) |
|
} |
|
} |
|
|
|
func TestClient_ServiceNodes_SetsDefaults(t *testing.T) { |
|
store := &fakeViewStore{} |
|
c := &Client{ |
|
Client: rpcclient.Client{ |
|
ViewStore: store, |
|
CacheName: "cache-no-streaming", |
|
UseStreamingBackend: true, |
|
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{ |
|
MaxQueryTime: 200 * time.Second, |
|
DefaultQueryTime: 100 * time.Second, |
|
}), |
|
}, |
|
} |
|
|
|
req := structs.ServiceSpecificRequest{ |
|
Datacenter: "dc1", |
|
ServiceName: "web1", |
|
QueryOptions: structs.QueryOptions{MinQueryIndex: 22}, |
|
} |
|
|
|
_, _, err := c.ServiceNodes(context.Background(), req) |
|
require.NoError(t, err) |
|
|
|
require.Len(t, store.calls, 1) |
|
require.Equal(t, 100*time.Second, store.calls[0].CacheInfo().Timeout) |
|
}
|
|
|