From c5d57c9f0777cd2b775bb328d4b5e67ca7177025 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 8 Oct 2020 18:35:56 -0400 Subject: [PATCH] subscribe: add test cases for newEventFromStreamEvent --- agent/rpc/subscribe/subscribe.go | 8 +- agent/rpc/subscribe/subscribe_test.go | 164 +++++++++++++++++++++++++- 2 files changed, 164 insertions(+), 8 deletions(-) diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index bfc1835499..934819e2ec 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub } elog.Trace(event) - e := newEventFromStreamEvent(req, event) + e := newEventFromStreamEvent(req.Topic, event) if err := serverStream.Send(e); err != nil { return err } @@ -139,10 +139,10 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) return event.Filter(fn) } -func newEventFromStreamEvent(req *pbsubscribe.SubscribeRequest, event stream.Event) *pbsubscribe.Event { +func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event { e := &pbsubscribe.Event{ - Topic: req.Topic, - Key: req.Key, + Topic: topic, + Key: event.Key, Index: event.Index, } switch { diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 52fff8ab32..60d73b3367 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" @@ -274,9 +275,9 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event { return nil } -func assertDeepEqual(t *testing.T, x, y interface{}) { +func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { t.Helper() - if diff := cmp.Diff(x, y); diff != "" { + if diff := cmp.Diff(x, y, opts...); diff != "" { t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) } } @@ -594,8 +595,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }) } -// TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events - func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) { if testing.Short() { t.Skip("too slow for -short run") @@ -898,3 +897,160 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.FailNow() } } + +func TestNewEventFromSteamEvent(t *testing.T) { + type testCase struct { + name string + event stream.Event + expected pbsubscribe.Event + } + + testTopic := pbsubscribe.Topic_ServiceHealthConnect + fn := func(t *testing.T, tc testCase) { + expected := tc.expected + expected.Topic = testTopic + actual := newEventFromStreamEvent(testTopic, tc.event) + assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty()) + } + + var testCases = []testCase{ + { + name: "end of snapshot", + event: newEventFromSubscription(t, 0), + expected: pbsubscribe.Event{ + Index: 1, + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + }, + }, + { + name: "new snapshot to follow", + event: newEventFromSubscription(t, 22), + expected: pbsubscribe.Event{ + Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, + }, + }, + { + name: "event batch", + event: stream.Event{ + Key: "web1", + Index: 2002, + Payload: []stream.Event{ + { + Key: "web1", + Index: 2002, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node1"}, + Service: &structs.NodeService{Service: "web1"}, + }, + }, + }, + { + Key: "web1", + Index: 2002, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node2"}, + Service: &structs.NodeService{Service: "web1"}, + }, + }, + }, + }, + }, + expected: pbsubscribe.Event{ + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_EventBatch{ + EventBatch: &pbsubscribe.EventBatch{ + Events: []*pbsubscribe.Event{ + { + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "node1"}, + Service: &pbservice.NodeService{Service: "web1"}, + }, + }, + }, + }, + { + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Deregister, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "node2"}, + Service: &pbservice.NodeService{Service: "web1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "event payload CheckServiceNode", + event: stream.Event{ + Key: "web1", + Index: 2002, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node1"}, + Service: &structs.NodeService{Service: "web1"}, + }, + }, + }, + expected: pbsubscribe.Event{ + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "node1"}, + Service: &pbservice.NodeService{Service: "web1"}, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} + +// newEventFromSubscription is used to return framing events. EndOfSnapshot and +// NewSnapshotToFollow are not exported, but we can get them from a subscription. +func newEventFromSubscription(t *testing.T, index uint64) stream.Event { + t.Helper() + + handlers := map[stream.Topic]stream.SnapshotFunc{ + pbsubscribe.Topic_ServiceHealthConnect: func(stream.SubscribeRequest, stream.SnapshotAppender) (index uint64, err error) { + return 1, nil + }, + } + ep := stream.NewEventPublisher(handlers, 0) + req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Index: index} + + sub, err := ep.Subscribe(req) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + event, err := sub.Next(ctx) + require.NoError(t, err) + return event +}