mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
943 lines
28 KiB
943 lines
28 KiB
package health |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"strings" |
|
"testing" |
|
"time" |
|
|
|
"github.com/google/go-cmp/cmp" |
|
"github.com/hashicorp/go-hclog" |
|
"github.com/hashicorp/go-uuid" |
|
"github.com/stretchr/testify/require" |
|
"google.golang.org/grpc/codes" |
|
"google.golang.org/grpc/status" |
|
|
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/agent/submatview" |
|
"github.com/hashicorp/consul/proto/pbcommon" |
|
"github.com/hashicorp/consul/proto/pbservice" |
|
"github.com/hashicorp/consul/proto/pbsubscribe" |
|
"github.com/hashicorp/consul/proto/prototest" |
|
"github.com/hashicorp/consul/sdk/testutil" |
|
"github.com/hashicorp/consul/types" |
|
) |
|
|
|
func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) { |
|
index := uint64(42) |
|
buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode { |
|
newID, err := uuid.GenerateUUID() |
|
require.NoError(t, err) |
|
return structs.CheckServiceNode{ |
|
Node: &structs.Node{ |
|
ID: types.NodeID(strings.ToUpper(newID)), |
|
Node: nodeName, |
|
Address: nodeName, |
|
Datacenter: "dc1", |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
}, |
|
Service: &structs.NodeService{ |
|
ID: serviceID, |
|
Service: "testService", |
|
Port: 8080, |
|
Weights: &structs.Weights{ |
|
Passing: 1, |
|
Warning: 1, |
|
}, |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), |
|
}, |
|
Checks: []*structs.HealthCheck{}, |
|
} |
|
} |
|
zero := buildTestNode("a-zero-node", "testService:1") |
|
one := buildTestNode("node1", "testService:1") |
|
two := buildTestNode("node1", "testService:2") |
|
three := buildTestNode("node2", "testService") |
|
result := structs.IndexedCheckServiceNodes{ |
|
Nodes: structs.CheckServiceNodes{three, two, zero, one}, |
|
QueryMeta: structs.QueryMeta{Index: index}, |
|
} |
|
sortCheckServiceNodes(&result) |
|
expected := structs.CheckServiceNodes{zero, one, two, three} |
|
require.Equal(t, expected, result.Nodes) |
|
} |
|
|
|
func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { |
|
if testing.Short() { |
|
t.Skip("too slow for testing.Short") |
|
} |
|
|
|
t.Run("local data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_WithEmptySnapshot(t, structs.DefaultPeerKeyword) |
|
}) |
|
|
|
t.Run("peered data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_WithEmptySnapshot(t, "my-peer") |
|
}) |
|
} |
|
|
|
func testHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T, peerName string) { |
|
namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace) |
|
streamClient := newStreamClient(validateNamespace(namespace)) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
store := submatview.NewStore(hclog.New(nil)) |
|
go store.Run(ctx) |
|
|
|
// Initially there are no services registered. Server should send an |
|
// EndOfSnapshot message immediately with index of 1. |
|
streamClient.QueueEvents(newEndOfSnapshotEvent(1)) |
|
|
|
req := serviceRequestStub{ |
|
serviceRequest: serviceRequest{ |
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{ |
|
PeerName: peerName, |
|
Datacenter: "dc1", |
|
ServiceName: "web", |
|
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), |
|
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, |
|
}, |
|
}, |
|
streamClient: streamClient, |
|
} |
|
empty := &structs.IndexedCheckServiceNodes{ |
|
Nodes: structs.CheckServiceNodes{}, |
|
QueryMeta: structs.QueryMeta{ |
|
Index: 1, |
|
Backend: structs.QueryBackendStreaming, |
|
}, |
|
} |
|
|
|
testutil.RunStep(t, "empty snapshot returned", func(t *testing.T) { |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
|
|
require.Equal(t, uint64(1), result.Index) |
|
require.Equal(t, empty, result.Value) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "blocks for timeout", func(t *testing.T) { |
|
// Subsequent fetch should block for the timeout |
|
start := time.Now() |
|
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed := time.Since(start) |
|
require.True(t, elapsed >= 200*time.Millisecond, |
|
"Fetch should have blocked until timeout") |
|
|
|
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") |
|
require.Equal(t, empty, result.Value, "result value should not have changed") |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
var lastResultValue structs.CheckServiceNodes |
|
|
|
testutil.RunStep(t, "blocks until update", func(t *testing.T) { |
|
// Make another blocking query with a longer timeout and trigger an update |
|
// event part way through. |
|
start := time.Now() |
|
go func() { |
|
time.Sleep(200 * time.Millisecond) |
|
streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web", peerName)) |
|
}() |
|
|
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed := time.Since(start) |
|
require.True(t, elapsed >= 200*time.Millisecond, |
|
"Fetch should have blocked until the event was delivered") |
|
require.True(t, elapsed < time.Second, |
|
"Fetch should have returned before the timeout") |
|
|
|
require.Equal(t, uint64(4), result.Index, "result index should not have changed") |
|
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes |
|
require.Len(t, lastResultValue, 1, |
|
"result value should contain the new registration") |
|
|
|
require.Equal(t, peerName, lastResultValue[0].Node.PeerName) |
|
require.Equal(t, peerName, lastResultValue[0].Service.PeerName) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { |
|
streamClient.QueueErr(tempError("broken pipe")) |
|
|
|
// Next fetch will continue to block until timeout and receive the same |
|
// result. |
|
start := time.Now() |
|
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed := time.Since(start) |
|
require.True(t, elapsed >= 200*time.Millisecond, |
|
"Fetch should have blocked until timeout") |
|
|
|
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, |
|
"result index should not have changed") |
|
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, |
|
"result value should not have changed") |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
|
|
// But an update should still be noticed due to reconnection |
|
streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web", peerName)) |
|
|
|
start = time.Now() |
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err = store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed = time.Since(start) |
|
require.True(t, elapsed < time.Second, |
|
"Fetch should have returned before the timeout") |
|
|
|
require.Equal(t, uint64(10), result.Index, "result index should not have changed") |
|
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes |
|
require.Len(t, lastResultValue, 2, |
|
"result value should contain the new registration") |
|
|
|
require.Equal(t, peerName, lastResultValue[0].Node.PeerName) |
|
require.Equal(t, peerName, lastResultValue[0].Service.PeerName) |
|
require.Equal(t, peerName, lastResultValue[1].Node.PeerName) |
|
require.Equal(t, peerName, lastResultValue[1].Service.PeerName) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "returns non-temporary error to watchers", func(t *testing.T) { |
|
// Wait and send the error while fetcher is waiting |
|
go func() { |
|
time.Sleep(200 * time.Millisecond) |
|
streamClient.QueueErr(errors.New("invalid request")) |
|
}() |
|
|
|
// Next fetch should return the error |
|
start := time.Now() |
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err := store.Get(ctx, req) |
|
require.Error(t, err) |
|
elapsed := time.Since(start) |
|
require.True(t, elapsed >= 200*time.Millisecond, |
|
"Fetch should have blocked until error was sent") |
|
require.True(t, elapsed < time.Second, |
|
"Fetch should have returned before the timeout") |
|
|
|
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") |
|
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
|
|
// But an update should still be noticed due to reconnection |
|
streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web", peerName)) |
|
|
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err = store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed = time.Since(start) |
|
require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") |
|
|
|
require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed") |
|
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes |
|
require.Len(t, lastResultValue, 3, |
|
"result value should contain the new registration") |
|
|
|
require.Equal(t, peerName, lastResultValue[0].Node.PeerName) |
|
require.Equal(t, peerName, lastResultValue[0].Service.PeerName) |
|
require.Equal(t, peerName, lastResultValue[1].Node.PeerName) |
|
require.Equal(t, peerName, lastResultValue[1].Service.PeerName) |
|
require.Equal(t, peerName, lastResultValue[2].Node.PeerName) |
|
require.Equal(t, peerName, lastResultValue[2].Service.PeerName) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
} |
|
|
|
type tempError string |
|
|
|
func (e tempError) Error() string { |
|
return string(e) |
|
} |
|
|
|
func (e tempError) Temporary() bool { |
|
return true |
|
} |
|
|
|
func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { |
|
if testing.Short() { |
|
t.Skip("too slow for testing.Short") |
|
} |
|
|
|
t.Run("local data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_WithFullSnapshot(t, structs.DefaultPeerKeyword) |
|
}) |
|
|
|
t.Run("peered data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_WithFullSnapshot(t, "my-peer") |
|
}) |
|
} |
|
|
|
func testHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T, peerName string) { |
|
namespace := getNamespace("ns2") |
|
client := newStreamClient(validateNamespace(namespace)) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
store := submatview.NewStore(hclog.New(nil)) |
|
|
|
// Create an initial snapshot of 3 instances on different nodes |
|
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { |
|
return newEventServiceHealthRegister(index, nodeNum, "web", peerName) |
|
} |
|
client.QueueEvents( |
|
registerServiceWeb(5, 1), |
|
registerServiceWeb(5, 2), |
|
registerServiceWeb(5, 3), |
|
newEndOfSnapshotEvent(5)) |
|
|
|
req := serviceRequestStub{ |
|
serviceRequest: serviceRequest{ |
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{ |
|
PeerName: peerName, |
|
Datacenter: "dc1", |
|
ServiceName: "web", |
|
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), |
|
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, |
|
}, |
|
}, |
|
streamClient: client, |
|
} |
|
|
|
testutil.RunStep(t, "full snapshot returned", func(t *testing.T) { |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
|
|
require.Equal(t, uint64(5), result.Index) |
|
expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3") |
|
expected.Index = 5 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "blocks until deregistration", func(t *testing.T) { |
|
// Make another blocking query with a longer timeout and trigger an update |
|
// event part way through. |
|
start := time.Now() |
|
go func() { |
|
time.Sleep(200 * time.Millisecond) |
|
|
|
// Deregister instance on node1 |
|
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web", peerName)) |
|
}() |
|
|
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed := time.Since(start) |
|
require.True(t, elapsed >= 200*time.Millisecond, |
|
"Fetch should have blocked until the event was delivered") |
|
require.True(t, elapsed < time.Second, |
|
"Fetch should have returned before the timeout") |
|
|
|
require.Equal(t, uint64(20), result.Index) |
|
expected := newExpectedNodesInPeer(peerName, "node2", "node3") |
|
expected.Index = 20 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "server reload is respected", func(t *testing.T) { |
|
// Simulates the server noticing the request's ACL token privs changing. To |
|
// detect this we'll queue up the new snapshot as a different set of nodes |
|
// to the first. |
|
client.QueueErr(status.Error(codes.Aborted, "reset by server")) |
|
|
|
client.QueueEvents( |
|
registerServiceWeb(50, 3), // overlap existing node |
|
registerServiceWeb(50, 4), |
|
registerServiceWeb(50, 5), |
|
newEndOfSnapshotEvent(50)) |
|
|
|
// Make another blocking query with THE SAME index. It should immediately |
|
// return the new snapshot. |
|
start := time.Now() |
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed := time.Since(start) |
|
require.True(t, elapsed < time.Second, |
|
"Fetch should have returned before the timeout") |
|
|
|
require.Equal(t, uint64(50), result.Index) |
|
expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5") |
|
expected.Index = 50 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) { |
|
client.QueueErr(tempError("temporary connection error")) |
|
|
|
client.QueueEvents( |
|
newNewSnapshotToFollowEvent(), |
|
registerServiceWeb(50, 3), // overlap existing node |
|
registerServiceWeb(50, 4), |
|
registerServiceWeb(50, 5), |
|
newEndOfSnapshotEvent(50)) |
|
|
|
start := time.Now() |
|
req.QueryOptions.MinQueryIndex = 49 |
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
elapsed := time.Since(start) |
|
require.True(t, elapsed < time.Second, |
|
"Fetch should have returned before the timeout") |
|
|
|
require.Equal(t, uint64(50), result.Index) |
|
expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5") |
|
expected.Index = 50 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
}) |
|
} |
|
|
|
func newExpectedNodesInPeer(peerName string, nodes ...string) *structs.IndexedCheckServiceNodes { |
|
result := &structs.IndexedCheckServiceNodes{} |
|
result.QueryMeta.Backend = structs.QueryBackendStreaming |
|
for _, node := range nodes { |
|
result.Nodes = append(result.Nodes, structs.CheckServiceNode{ |
|
Node: &structs.Node{ |
|
Node: node, |
|
PeerName: peerName, |
|
}, |
|
}) |
|
} |
|
return result |
|
} |
|
|
|
// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode |
|
// by Node name. |
|
var cmpCheckServiceNodeNames = cmp.Options{ |
|
cmp.Comparer(func(x, y structs.CheckServiceNode) bool { |
|
return x.Node.Node == y.Node.Node |
|
}), |
|
} |
|
|
|
func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { |
|
t.Run("local data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_EventBatches(t, structs.DefaultPeerKeyword) |
|
}) |
|
|
|
t.Run("peered data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_EventBatches(t, "my-peer") |
|
}) |
|
} |
|
|
|
func testHealthView_IntegrationWithStore_EventBatches(t *testing.T, peerName string) { |
|
namespace := getNamespace("ns3") |
|
client := newStreamClient(validateNamespace(namespace)) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
store := submatview.NewStore(hclog.New(nil)) |
|
|
|
// Create an initial snapshot of 3 instances but in a single event batch |
|
batchEv := newEventBatchWithEvents( |
|
newEventServiceHealthRegister(5, 1, "web", peerName), |
|
newEventServiceHealthRegister(5, 2, "web", peerName), |
|
newEventServiceHealthRegister(5, 3, "web", peerName)) |
|
client.QueueEvents( |
|
batchEv, |
|
newEndOfSnapshotEvent(5)) |
|
|
|
req := serviceRequestStub{ |
|
serviceRequest: serviceRequest{ |
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{ |
|
PeerName: peerName, |
|
Datacenter: "dc1", |
|
ServiceName: "web", |
|
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), |
|
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, |
|
}, |
|
}, |
|
streamClient: client, |
|
} |
|
|
|
testutil.RunStep(t, "full snapshot returned", func(t *testing.T) { |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
|
|
require.Equal(t, uint64(5), result.Index) |
|
|
|
expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3") |
|
expected.Index = 5 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "batched updates work too", func(t *testing.T) { |
|
// Simulate multiple registrations happening in one Txn (so all have same |
|
// index) |
|
batchEv := newEventBatchWithEvents( |
|
// Deregister an existing node |
|
newEventServiceHealthDeregister(20, 1, "web", peerName), |
|
// Register another |
|
newEventServiceHealthRegister(20, 4, "web", peerName), |
|
) |
|
client.QueueEvents(batchEv) |
|
req.QueryOptions.MaxQueryTime = time.Second |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
|
|
require.Equal(t, uint64(20), result.Index) |
|
expected := newExpectedNodesInPeer(peerName, "node2", "node3", "node4") |
|
expected.Index = 20 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
} |
|
|
|
func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { |
|
t.Run("local data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_Filtering(t, structs.DefaultPeerKeyword) |
|
}) |
|
|
|
t.Run("peered data", func(t *testing.T) { |
|
testHealthView_IntegrationWithStore_Filtering(t, "my-peer") |
|
}) |
|
} |
|
|
|
func testHealthView_IntegrationWithStore_Filtering(t *testing.T, peerName string) { |
|
namespace := getNamespace("ns3") |
|
streamClient := newStreamClient(validateNamespace(namespace)) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
store := submatview.NewStore(hclog.New(nil)) |
|
go store.Run(ctx) |
|
|
|
req := serviceRequestStub{ |
|
serviceRequest: serviceRequest{ |
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{ |
|
PeerName: peerName, |
|
Datacenter: "dc1", |
|
ServiceName: "web", |
|
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), |
|
QueryOptions: structs.QueryOptions{ |
|
Filter: `Node.Node == "node2"`, |
|
MaxQueryTime: time.Second, |
|
}, |
|
}, |
|
}, |
|
streamClient: streamClient, |
|
} |
|
|
|
// Create an initial snapshot of 3 instances but in a single event batch |
|
batchEv := newEventBatchWithEvents( |
|
newEventServiceHealthRegister(5, 1, "web", peerName), |
|
newEventServiceHealthRegister(5, 2, "web", peerName), |
|
newEventServiceHealthRegister(5, 3, "web", peerName)) |
|
streamClient.QueueEvents( |
|
batchEv, |
|
newEndOfSnapshotEvent(5)) |
|
|
|
testutil.RunStep(t, "filtered snapshot returned", func(t *testing.T) { |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
|
|
require.Equal(t, uint64(5), result.Index) |
|
expected := newExpectedNodesInPeer(peerName, "node2") |
|
expected.Index = 5 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
|
|
req.QueryOptions.MinQueryIndex = result.Index |
|
}) |
|
|
|
testutil.RunStep(t, "filtered updates work too", func(t *testing.T) { |
|
// Simulate multiple registrations happening in one Txn (all have same index) |
|
batchEv := newEventBatchWithEvents( |
|
// Deregister an existing node |
|
newEventServiceHealthDeregister(20, 1, "web", peerName), |
|
// Register another |
|
newEventServiceHealthRegister(20, 4, "web", peerName), |
|
) |
|
streamClient.QueueEvents(batchEv) |
|
result, err := store.Get(ctx, req) |
|
require.NoError(t, err) |
|
|
|
require.Equal(t, uint64(20), result.Index) |
|
expected := newExpectedNodesInPeer(peerName, "node2") |
|
expected.Index = 20 |
|
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) |
|
}) |
|
} |
|
|
|
// serviceRequestStub overrides NewMaterializer so that test can use a fake |
|
// StreamClient. |
|
type serviceRequestStub struct { |
|
serviceRequest |
|
streamClient submatview.StreamClient |
|
} |
|
|
|
func (r serviceRequestStub) NewMaterializer() (submatview.Materializer, error) { |
|
view, err := NewHealthView(r.ServiceSpecificRequest) |
|
if err != nil { |
|
return nil, err |
|
} |
|
deps := submatview.Deps{ |
|
View: view, |
|
Logger: hclog.New(nil), |
|
Request: NewMaterializerRequest(r.ServiceSpecificRequest), |
|
} |
|
return submatview.NewRPCMaterializer(r.streamClient, deps), nil |
|
} |
|
|
|
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event { |
|
node := fmt.Sprintf("node%d", nodeNum) |
|
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) |
|
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) |
|
|
|
return &pbsubscribe.Event{ |
|
Index: index, |
|
Payload: &pbsubscribe.Event_ServiceHealth{ |
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ |
|
Op: pbsubscribe.CatalogOp_Register, |
|
CheckServiceNode: &pbservice.CheckServiceNode{ |
|
Node: &pbservice.Node{ |
|
ID: string(nodeID), |
|
Node: node, |
|
Address: addr, |
|
Datacenter: "dc1", |
|
PeerName: peerName, |
|
RaftIndex: &pbcommon.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
}, |
|
Service: &pbservice.NodeService{ |
|
ID: svc, |
|
Service: svc, |
|
PeerName: peerName, |
|
Port: 8080, |
|
RaftIndex: &pbcommon.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
}, |
|
}, |
|
}, |
|
}, |
|
} |
|
} |
|
|
|
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event { |
|
node := fmt.Sprintf("node%d", nodeNum) |
|
|
|
return &pbsubscribe.Event{ |
|
Index: index, |
|
Payload: &pbsubscribe.Event_ServiceHealth{ |
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ |
|
Op: pbsubscribe.CatalogOp_Deregister, |
|
CheckServiceNode: &pbservice.CheckServiceNode{ |
|
Node: &pbservice.Node{ |
|
Node: node, |
|
PeerName: peerName, |
|
}, |
|
Service: &pbservice.NodeService{ |
|
ID: svc, |
|
Service: svc, |
|
PeerName: peerName, |
|
Port: 8080, |
|
Weights: &pbservice.Weights{ |
|
Passing: 1, |
|
Warning: 1, |
|
}, |
|
RaftIndex: &pbcommon.RaftIndex{ |
|
// The original insertion index since a delete doesn't update |
|
// this. This magic value came from state store tests where we |
|
// setup at index 10 and then mutate at index 100. It can be |
|
// modified by the caller later and makes it easier than having |
|
// yet another argument in the common case. |
|
CreateIndex: 10, |
|
ModifyIndex: 10, |
|
}, |
|
}, |
|
}, |
|
}, |
|
}, |
|
} |
|
} |
|
|
|
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event { |
|
events := make([]*pbsubscribe.Event, len(evs)+1) |
|
events[0] = first |
|
for i := range evs { |
|
events[i+1] = evs[i] |
|
} |
|
return &pbsubscribe.Event{ |
|
Index: first.Index, |
|
Payload: &pbsubscribe.Event_EventBatch{ |
|
EventBatch: &pbsubscribe.EventBatch{Events: events}, |
|
}, |
|
} |
|
} |
|
|
|
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { |
|
return &pbsubscribe.Event{ |
|
Index: index, |
|
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, |
|
} |
|
} |
|
|
|
func newNewSnapshotToFollowEvent() *pbsubscribe.Event { |
|
return &pbsubscribe.Event{ |
|
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, |
|
} |
|
} |
|
|
|
// getNamespace returns a namespace if namespace support exists, otherwise |
|
// returns the empty string. It allows the same tests to work in both oss and ent |
|
// without duplicating the tests. |
|
func getNamespace(ns string) string { |
|
meta := structs.NewEnterpriseMetaInDefaultPartition(ns) |
|
return meta.NamespaceOrEmpty() |
|
} |
|
|
|
func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error { |
|
return func(request *pbsubscribe.SubscribeRequest) error { |
|
if got := request.GetNamedSubject().GetNamespace(); got != ns { |
|
return fmt.Errorf("expected request.NamedSubject.Namespace %v, got %v", ns, got) |
|
} |
|
return nil |
|
} |
|
} |
|
|
|
func TestNewFilterEvaluator(t *testing.T) { |
|
type testCase struct { |
|
name string |
|
req structs.ServiceSpecificRequest |
|
data structs.CheckServiceNode |
|
expected bool |
|
} |
|
|
|
fn := func(t *testing.T, tc testCase) { |
|
e, err := newFilterEvaluator(tc.req) |
|
require.NoError(t, err) |
|
actual, err := e.Evaluate(tc.data) |
|
require.NoError(t, err) |
|
require.Equal(t, tc.expected, actual) |
|
} |
|
|
|
var testCases = []testCase{ |
|
{ |
|
name: "single ServiceTags match", |
|
req: structs.ServiceSpecificRequest{ |
|
ServiceTags: []string{"match"}, |
|
TagFilter: true, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Service: &structs.NodeService{ |
|
Tags: []string{"extra", "match"}, |
|
}, |
|
}, |
|
expected: true, |
|
}, |
|
{ |
|
name: "single deprecated ServiceTag match", |
|
req: structs.ServiceSpecificRequest{ |
|
ServiceTag: "match", |
|
TagFilter: true, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Service: &structs.NodeService{ |
|
Tags: []string{"extra", "match"}, |
|
}, |
|
}, |
|
expected: true, |
|
}, |
|
{ |
|
name: "single ServiceTags mismatch", |
|
req: structs.ServiceSpecificRequest{ |
|
ServiceTags: []string{"other"}, |
|
TagFilter: true, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Service: &structs.NodeService{ |
|
Tags: []string{"extra", "match"}, |
|
}, |
|
}, |
|
expected: false, |
|
}, |
|
{ |
|
name: "multiple ServiceTags match", |
|
req: structs.ServiceSpecificRequest{ |
|
ServiceTags: []string{"match", "second"}, |
|
TagFilter: true, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Service: &structs.NodeService{ |
|
Tags: []string{"extra", "match", "second"}, |
|
}, |
|
}, |
|
expected: true, |
|
}, |
|
{ |
|
name: "multiple ServiceTags mismatch", |
|
req: structs.ServiceSpecificRequest{ |
|
ServiceTags: []string{"match", "not"}, |
|
TagFilter: true, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Service: &structs.NodeService{ |
|
Tags: []string{"extra", "match"}, |
|
}, |
|
}, |
|
expected: false, |
|
}, |
|
{ |
|
name: "single NodeMetaFilter match", |
|
req: structs.ServiceSpecificRequest{ |
|
NodeMetaFilters: map[string]string{"meta1": "match"}, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Node: &structs.Node{ |
|
Meta: map[string]string{ |
|
"meta1": "match", |
|
"extra": "some", |
|
}, |
|
}, |
|
}, |
|
expected: true, |
|
}, |
|
{ |
|
name: "single NodeMetaFilter mismatch", |
|
req: structs.ServiceSpecificRequest{ |
|
NodeMetaFilters: map[string]string{ |
|
"meta1": "match", |
|
}, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Node: &structs.Node{ |
|
Meta: map[string]string{ |
|
"meta1": "other", |
|
"extra": "some", |
|
}, |
|
}, |
|
}, |
|
expected: false, |
|
}, |
|
{ |
|
name: "multiple NodeMetaFilter match", |
|
req: structs.ServiceSpecificRequest{ |
|
NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"}, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Node: &structs.Node{ |
|
Meta: map[string]string{ |
|
"meta1": "match", |
|
"meta2": "a", |
|
"extra": "some", |
|
}, |
|
}, |
|
}, |
|
expected: true, |
|
}, |
|
{ |
|
name: "multiple NodeMetaFilter mismatch", |
|
req: structs.ServiceSpecificRequest{ |
|
NodeMetaFilters: map[string]string{ |
|
"meta1": "match", |
|
"meta2": "beta", |
|
}, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Node: &structs.Node{ |
|
Meta: map[string]string{ |
|
"meta1": "other", |
|
"meta2": "gamma", |
|
}, |
|
}, |
|
}, |
|
expected: false, |
|
}, |
|
{ |
|
name: "QueryOptions.Filter match", |
|
req: structs.ServiceSpecificRequest{ |
|
QueryOptions: structs.QueryOptions{ |
|
Filter: `Node.Node == "node3"`, |
|
}, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Node: &structs.Node{Node: "node3"}, |
|
}, |
|
expected: true, |
|
}, |
|
{ |
|
name: "QueryOptions.Filter mismatch", |
|
req: structs.ServiceSpecificRequest{ |
|
QueryOptions: structs.QueryOptions{ |
|
Filter: `Node.Node == "node2"`, |
|
}, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Node: &structs.Node{Node: "node3"}, |
|
}, |
|
expected: false, |
|
}, |
|
{ |
|
name: "all match", |
|
req: structs.ServiceSpecificRequest{ |
|
QueryOptions: structs.QueryOptions{ |
|
Filter: `Node.Node == "node3"`, |
|
}, |
|
ServiceTags: []string{"tag1", "tag2"}, |
|
NodeMetaFilters: map[string]string{ |
|
"meta1": "match1", |
|
"meta2": "match2", |
|
}, |
|
}, |
|
data: structs.CheckServiceNode{ |
|
Node: &structs.Node{ |
|
Node: "node3", |
|
Meta: map[string]string{ |
|
"meta1": "match1", |
|
"meta2": "match2", |
|
"extra": "other", |
|
}, |
|
}, |
|
Service: &structs.NodeService{ |
|
Tags: []string{"tag1", "tag2", "extra"}, |
|
}, |
|
}, |
|
expected: true, |
|
}, |
|
} |
|
|
|
for _, tc := range testCases { |
|
t.Run(tc.name, func(t *testing.T) { |
|
fn(t, tc) |
|
}) |
|
} |
|
}
|
|
|