consul/agent/rpcclient/health/view_test.go

980 lines
29 KiB
Go
Raw Normal View History

2021-02-25 23:11:21 +00:00
package health
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
2022-03-28 21:17:50 +00:00
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
)
func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) {
index := uint64(42)
buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
newID, err := uuid.GenerateUUID()
require.NoError(t, err)
return structs.CheckServiceNode{
Node: &structs.Node{
ID: types.NodeID(strings.ToUpper(newID)),
Node: nodeName,
Address: nodeName,
Datacenter: "dc1",
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &structs.NodeService{
ID: serviceID,
Service: "testService",
Port: 8080,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Checks: []*structs.HealthCheck{},
}
}
zero := buildTestNode("a-zero-node", "testService:1")
one := buildTestNode("node1", "testService:1")
two := buildTestNode("node1", "testService:2")
three := buildTestNode("node2", "testService")
result := structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{three, two, zero, one},
QueryMeta: structs.QueryMeta{Index: index},
}
sortCheckServiceNodes(&result)
expected := structs.CheckServiceNodes{zero, one, two, three}
require.Equal(t, expected, result.Nodes)
}
func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithEmptySnapshot(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithEmptySnapshot(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T, peerName string) {
2022-03-28 21:17:50 +00:00
namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace)
streamClient := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
go store.Run(ctx)
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
streamClient.QueueEvents(newEndOfSnapshotEvent(1))
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
},
},
streamClient: streamClient,
}
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
Backend: structs.QueryBackendStreaming,
},
}
testutil.RunStep(t, "empty snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(1), result.Index)
require.Equal(t, empty, result.Value)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout
start := time.Now()
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
require.Equal(t, empty, result.Value, "result value should not have changed")
req.QueryOptions.MinQueryIndex = result.Index
})
var lastResultValue structs.CheckServiceNodes
testutil.RunStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web", peerName))
}()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 1,
"result value should contain the new registration")
require.Equal(t, peerName, lastResultValue[0].Node.PeerName)
require.Equal(t, peerName, lastResultValue[0].Service.PeerName)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
streamClient.QueueErr(tempError("broken pipe"))
// Next fetch will continue to block until timeout and receive the same
// result.
start := time.Now()
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index,
"result index should not have changed")
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes,
"result value should not have changed")
req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web", peerName))
start = time.Now()
req.QueryOptions.MaxQueryTime = time.Second
result, err = store.Get(ctx, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 2,
"result value should contain the new registration")
require.Equal(t, peerName, lastResultValue[0].Node.PeerName)
require.Equal(t, peerName, lastResultValue[0].Service.PeerName)
require.Equal(t, peerName, lastResultValue[1].Node.PeerName)
require.Equal(t, peerName, lastResultValue[1].Service.PeerName)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting
go func() {
time.Sleep(200 * time.Millisecond)
streamClient.QueueErr(errors.New("invalid request"))
}()
// Next fetch should return the error
start := time.Now()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.Error(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until error was sent")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes)
req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web", peerName))
req.QueryOptions.MaxQueryTime = time.Second
result, err = store.Get(ctx, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed")
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 3,
"result value should contain the new registration")
require.Equal(t, peerName, lastResultValue[0].Node.PeerName)
require.Equal(t, peerName, lastResultValue[0].Service.PeerName)
require.Equal(t, peerName, lastResultValue[1].Node.PeerName)
require.Equal(t, peerName, lastResultValue[1].Service.PeerName)
require.Equal(t, peerName, lastResultValue[2].Node.PeerName)
require.Equal(t, peerName, lastResultValue[2].Service.PeerName)
req.QueryOptions.MinQueryIndex = result.Index
})
}
type tempError string
func (e tempError) Error() string {
return string(e)
}
func (e tempError) Temporary() bool {
return true
}
func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithFullSnapshot(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithFullSnapshot(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T, peerName string) {
namespace := getNamespace("ns2")
client := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
// Create an initial snapshot of 3 instances on different nodes
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
return newEventServiceHealthRegister(index, nodeNum, "web", peerName)
}
client.QueueEvents(
registerServiceWeb(5, 1),
registerServiceWeb(5, 2),
registerServiceWeb(5, 3),
newEndOfSnapshotEvent(5))
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
},
},
streamClient: client,
}
testutil.RunStep(t, "full snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3")
expected.Index = 5
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "blocks until deregistration", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
// Deregister instance on node1
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web", peerName))
}()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(20), result.Index)
expected := newExpectedNodesInPeer(peerName, "node2", "node3")
expected.Index = 20
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "server reload is respected", func(t *testing.T) {
// Simulates the server noticing the request's ACL token privs changing. To
// detect this we'll queue up the new snapshot as a different set of nodes
// to the first.
client.QueueErr(status.Error(codes.Aborted, "reset by server"))
client.QueueEvents(
registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4),
registerServiceWeb(50, 5),
newEndOfSnapshotEvent(50))
// Make another blocking query with THE SAME index. It should immediately
// return the new snapshot.
start := time.Now()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index)
expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5")
expected.Index = 50
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
client.QueueErr(tempError("temporary connection error"))
client.QueueEvents(
newNewSnapshotToFollowEvent(),
registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4),
registerServiceWeb(50, 5),
newEndOfSnapshotEvent(50))
start := time.Now()
req.QueryOptions.MinQueryIndex = 49
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index)
expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5")
expected.Index = 50
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
})
}
func newExpectedNodesInPeer(peerName string, nodes ...string) *structs.IndexedCheckServiceNodes {
result := &structs.IndexedCheckServiceNodes{}
result.QueryMeta.Backend = structs.QueryBackendStreaming
for _, node := range nodes {
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
Node: &structs.Node{
Node: node,
PeerName: peerName,
},
})
}
return result
}
// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode
// by Node name.
var cmpCheckServiceNodeNames = cmp.Options{
cmp.Comparer(func(x, y structs.CheckServiceNode) bool {
return x.Node.Node == y.Node.Node
}),
}
func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_EventBatches(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_EventBatches(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_EventBatches(t *testing.T, peerName string) {
namespace := getNamespace("ns3")
client := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web", peerName),
newEventServiceHealthRegister(5, 2, "web", peerName),
newEventServiceHealthRegister(5, 3, "web", peerName))
client.QueueEvents(
batchEv,
newEndOfSnapshotEvent(5))
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
},
},
streamClient: client,
}
testutil.RunStep(t, "full snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3")
expected.Index = 5
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "batched updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same
// index)
batchEv := newEventBatchWithEvents(
// Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web", peerName),
// Register another
newEventServiceHealthRegister(20, 4, "web", peerName),
)
client.QueueEvents(batchEv)
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
expected := newExpectedNodesInPeer(peerName, "node2", "node3", "node4")
expected.Index = 20
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
})
}
func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_Filtering(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_Filtering(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_Filtering(t *testing.T, peerName string) {
namespace := getNamespace("ns3")
streamClient := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
go store.Run(ctx)
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
MaxQueryTime: time.Second,
},
},
},
streamClient: streamClient,
}
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web", peerName),
newEventServiceHealthRegister(5, 2, "web", peerName),
newEventServiceHealthRegister(5, 3, "web", peerName))
streamClient.QueueEvents(
batchEv,
newEndOfSnapshotEvent(5))
testutil.RunStep(t, "filtered snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
expected := newExpectedNodesInPeer(peerName, "node2")
expected.Index = 5
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
})
testutil.RunStep(t, "filtered updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (all have same index)
batchEv := newEventBatchWithEvents(
// Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web", peerName),
// Register another
newEventServiceHealthRegister(20, 4, "web", peerName),
)
streamClient.QueueEvents(batchEv)
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
expected := newExpectedNodesInPeer(peerName, "node2")
expected.Index = 20
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
})
}
// serviceRequestStub overrides NewMaterializer so that test can use a fake
// StreamClient.
type serviceRequestStub struct {
serviceRequest
streamClient submatview.StreamClient
}
func (r serviceRequestStub) NewMaterializer() (submatview.Materializer, error) {
view, err := NewHealthView(r.ServiceSpecificRequest)
if err != nil {
return nil, err
}
deps := submatview.Deps{
View: view,
Logger: hclog.New(nil),
Request: NewMaterializerRequest(r.ServiceSpecificRequest),
}
return submatview.NewRPCMaterializer(r.streamClient, deps), nil
}
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
2022-03-23 16:10:03 +00:00
ID: string(nodeID),
Node: node,
Address: addr,
Datacenter: "dc1",
PeerName: peerName,
2022-03-23 16:10:03 +00:00
RaftIndex: &pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
PeerName: peerName,
Port: 8080,
2022-03-23 16:10:03 +00:00
RaftIndex: &pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
}
}
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
PeerName: peerName,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
PeerName: peerName,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
2022-03-23 16:10:03 +00:00
RaftIndex: &pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
// getNamespace returns a namespace if namespace support exists, otherwise
// returns the empty string. It allows the same tests to work in both oss and ent
// without duplicating the tests.
func getNamespace(ns string) string {
meta := structs.NewEnterpriseMetaInDefaultPartition(ns)
return meta.NamespaceOrEmpty()
}
func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error {
return func(request *pbsubscribe.SubscribeRequest) error {
if got := request.GetNamedSubject().GetNamespace(); got != ns {
return fmt.Errorf("expected request.NamedSubject.Namespace %v, got %v", ns, got)
}
return nil
}
}
func TestNewFilterEvaluator(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
data structs.CheckServiceNode
expected bool
}
fn := func(t *testing.T, tc testCase) {
e, err := newFilterEvaluator(tc.req)
require.NoError(t, err)
actual, err := e.Evaluate(tc.data)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
}
var testCases = []testCase{
{
name: "single ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single deprecated ServiceTag match",
req: structs.ServiceSpecificRequest{
ServiceTag: "match",
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"other"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "multiple ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "second"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match", "second"},
},
},
expected: true,
},
{
name: "multiple ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "not"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "single NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"extra": "some",
},
},
},
expected: true,
},
{
name: "single NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"extra": "some",
},
},
},
expected: false,
},
{
name: "multiple NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"meta2": "a",
"extra": "some",
},
},
},
expected: true,
},
{
name: "multiple NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
"meta2": "beta",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"meta2": "gamma",
},
},
},
expected: false,
},
{
name: "QueryOptions.Filter match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: true,
},
{
name: "QueryOptions.Filter mismatch",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: false,
},
{
name: "all match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
ServiceTags: []string{"tag1", "tag2"},
NodeMetaFilters: map[string]string{
"meta1": "match1",
"meta2": "match2",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Node: "node3",
Meta: map[string]string{
"meta1": "match1",
"meta2": "match2",
"extra": "other",
},
},
Service: &structs.NodeService{
Tags: []string{"tag1", "tag2", "extra"},
},
},
expected: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) {
view, err := NewHealthView(structs.ServiceSpecificRequest{
ServiceName: "name",
Connect: true,
QueryOptions: structs.QueryOptions{
Filter: "Service.Meta.version == \"v1\"",
},
})
require.NoError(t, err)
err = view.Update([]*pbsubscribe.Event{{
Index: 1,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Service: &pbservice.NodeService{
Kind: structs.TerminatingGateway,
Service: "name",
Address: "127.0.0.1",
Port: 8443,
},
},
},
},
}})
require.NoError(t, err)
node, ok := (view.Result(1)).(*structs.IndexedCheckServiceNodes)
require.True(t, ok)
require.Len(t, node.Nodes, 1)
require.Equal(t, "127.0.0.1", node.Nodes[0].Service.Address)
require.Equal(t, 8443, node.Nodes[0].Service.Port)
}