From 28de159c147986adc7b97f9652360fa0051cd8d4 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 17 Dec 2020 17:46:24 -0500 Subject: [PATCH 01/12] state: add first terminating catalog catalog event Health of a terminating gateway instance changes - Generate an event for creating/destroying this instance of the terminating gateway, duplicate it for each affected service Co-Authored-By: Kyle Havlovitz --- agent/consul/state/catalog_events.go | 37 ++++++++-- agent/consul/state/catalog_events_test.go | 85 ++++++++++++++++++++++- 2 files changed, 116 insertions(+), 6 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 2124194441..7e591c377d 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -267,7 +267,11 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event // Duplicate any events that affected connect-enabled instances (proxies or // native apps) to the relevant Connect topic. - events = append(events, serviceHealthToConnectEvents(events...)...) + connectEvents, err := serviceHealthToConnectEvents(tx, events...) + if err != nil { + return nil, err + } + events = append(events, connectEvents...) return events, nil } @@ -318,10 +322,13 @@ func changeTypeFromChange(change memdb.Change) changeType { // enabled and so of no interest to those subscribers but also involves // switching connection details to be the proxy instead of the actual instance // in case of a sidecar. -func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { +func serviceHealthToConnectEvents( + tx ReadTxn, + events ...stream.Event, +) ([]stream.Event, error) { var result []stream.Event for _, event := range events { - if event.Topic != topicServiceHealth { + if event.Topic != topicServiceHealth { // event.Topic == topicServiceHealthConnect // Skip non-health or any events already emitted to Connect topic continue } @@ -343,13 +350,33 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { connectEvent.Payload = payload result = append(result, connectEvent) + case node.Service.Kind == structs.ServiceKindTerminatingGateway: + iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta) + if err != nil { + return nil, err + } + + // similar to checkServiceNodesTxn -> serviceGatewayNodes + for obj := iter.Next(); obj != nil; obj = iter.Next() { + result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service)) + } + default: - // ServiceKindTerminatingGateway changes are handled separately. // All other cases are not relevant to the connect topic } } - return result + return result, nil +} + +func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event { + event.Topic = topicServiceHealthConnect + payload := event.Payload.(EventPayloadCheckServiceNode) + payload.key = service.Name + event.Payload = payload + // FIXME: we need payload to have an override for namespace, so that it can be filtered + // properly by EventPayloadCheckServiceNode.MatchesKey + return event } func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode { diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index b9ef8eadea..a16e1ce9f0 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1001,6 +1001,56 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), }, }, + { + Name: "terminating gateway registered with no config entry", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + }, + }, + { + Name: "terminating gateway registered after config entry exists", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2")), + }, + }, } for _, tc := range cases { @@ -1037,6 +1087,39 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { } } +func regTerminatingGateway(req *structs.RegisterRequest) error { + req.Service.Service = "tgate1" + req.Service.Kind = structs.ServiceKindTerminatingGateway + req.Service.ID = "tgate1" + req.Service.Port = 22000 + return nil +} + +func evServiceTermingGateway(name string) func(e *stream.Event) error { + return func(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + + csn.Service.Kind = structs.ServiceKindTerminatingGateway + csn.Service.Port = 22000 + + // Convert the check to point to the right ID now. This isn't totally + // realistic - sidecars should have alias checks etc but this is good enough + // to test this code path. + //if len(csn.Checks) >= 2 { + // csn.Checks[1].CheckID = types.CheckID("service:" + svc + "_terminating_gateway") + // csn.Checks[1].ServiceID = svc + "_terminating_gateway" + // csn.Checks[1].ServiceName = svc + "_terminating_gateway" + //} + + if e.Topic == topicServiceHealthConnect { + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = name + e.Payload = payload + } + return nil + } +} + func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { t.Helper() if diff := cmp.Diff(x, y, opts...); diff != "" { @@ -1302,7 +1385,7 @@ func evConnectNative(e *stream.Event) error { // evConnectTopic option converts the base event to the equivalent event that // should be published to the connect topic. When needed it should be applied // first as several other options (notable evSidecar) change behavior subtly -// depending on which topic they are published to and they determin this from +// depending on which topic they are published to and they determine this from // the event. func evConnectTopic(e *stream.Event) error { e.Topic = topicServiceHealthConnect From c2481ca10ff3b01de92c2c33ee11696ba0f03919 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 11 Jan 2021 14:12:51 -0800 Subject: [PATCH 02/12] state: Add terminating gateway events on updating a config entry Co-Authored-By: Daniel Nephin --- agent/consul/state/catalog_events.go | 87 ++++++++++++++++++++++- agent/consul/state/catalog_events_test.go | 66 ++++++++++++++++- 2 files changed, 148 insertions(+), 5 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 7e591c377d..bcdcf227ab 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -123,6 +123,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event var nodeChanges map[string]changeType var serviceChanges map[nodeServiceTuple]serviceChange + var termGatewayChanges map[structs.ServiceName]map[structs.ServiceName]serviceChange markNode := func(node string, typ changeType) { if nodeChanges == nil { @@ -201,9 +202,45 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect) } } + case gatewayServicesTableName: + gs := changeObject(change).(*structs.GatewayService) + if gs.GatewayKind != structs.ServiceKindTerminatingGateway { + continue + } + + gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change} + if termGatewayChanges == nil { + termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange) + } + + gatewayChanges, ok := termGatewayChanges[gs.Gateway] + if !ok { + termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{} + } + + prevChange, ok := gatewayChanges[gs.Service] + if !ok { + termGatewayChanges[gs.Gateway][gs.Service] = gsChange + continue + } + + if changeTypeFromChange(change) == changeDelete { + termGatewayChanges[gs.Gateway][gs.Service] = gsChange + continue + } + + prevGs := changeObject(prevChange.change).(*structs.GatewayService) + if !gs.IsSame(prevGs) { + gsChange.changeType = changeUpdate + termGatewayChanges[gs.Gateway][gs.Service] = gsChange + } else { + delete(termGatewayChanges[gs.Gateway], gs.Service) + } } } + //fmt.Printf("term gateway map: %v", termGatewayChanges) + // Now act on those marked nodes/services for node, changeType := range nodeChanges { if changeType == changeDelete { @@ -221,9 +258,6 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } for tuple, srvChange := range serviceChanges { - // change may be nil if there was a change that _affected_ the service - // like a change to checks but it didn't actually change the service - // record itself. if srvChange.changeType == changeDelete { sn := srvChange.change.Before.(*structs.ServiceNode) e := newServiceHealthEventDeregister(changes.Index, sn) @@ -265,6 +299,53 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event events = append(events, e) } + for gatewayName, serviceChanges := range termGatewayChanges { + for serviceName, gsChange := range serviceChanges { + gs := changeObject(gsChange.change).(*structs.GatewayService) + + _, nodes, err := serviceGatewayNodes(tx, nil, serviceName.Name, gs.GatewayKind, &gatewayName.EnterpriseMeta) + if err != nil { + return nil, err + } + + // Always send deregister events for deletes/updates. + if gsChange.changeType != changeCreate { + for _, sn := range nodes { + e := newServiceHealthEventDeregister(changes.Index, sn) + + e.Topic = topicServiceHealthConnect + // todo(streaming): make namespace-aware in enterprise + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = serviceName.Name + e.Payload = payload + + events = append(events, e) + } + } + + if gsChange.changeType == changeDelete { + continue + } + + // Build service events and append them + for _, sn := range nodes { + tuple := newNodeServiceTupleFromServiceNode(sn) + e, err := newServiceHealthEventForService(tx, changes.Index, tuple) + if err != nil { + return nil, err + } + + e.Topic = topicServiceHealthConnect + // todo(streaming): make namespace-aware in enterprise + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = serviceName.Name + e.Payload = payload + + events = append(events, e) + } + } + } + // Duplicate any events that affected connect-enabled instances (proxies or // native apps) to the relevant Connect topic. connectEvents, err := serviceHealthToConnectEvents(tx, events...) diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index a16e1ce9f0..17139ba82d 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -174,6 +174,9 @@ func evIndexes(idx, create, modify uint64) func(e *stream.Event) error { } func TestServiceHealthEventsFromChanges(t *testing.T) { + setupIndex := uint64(10) + mutateIndex := uint64(100) + cases := []struct { Name string Setup func(s *Store, tx *txn) error @@ -1051,6 +1054,48 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceTermingGateway("srv2")), }, }, + { + Name: "terminating gateway config entry created after gateway exists", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evServiceIndex(setupIndex)), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }, + // terminating gateway with 2 instances + // changing config entry to add a linked service + // changing config entry to remove a linked service + // deleting a config entry + // deregistering a service behind a terminating gateway (should send no term gateway events) } for _, tc := range cases { @@ -1061,7 +1106,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { if tc.Setup != nil { // Bypass the publish mechanism for this test or we get into odd // recursive stuff... - setupTx := s.db.WriteTxn(10) + setupTx := s.db.WriteTxn(setupIndex) require.NoError(t, tc.Setup(s, setupTx)) // Commit the underlying transaction without using wrapped Commit so we // avoid the whole event publishing system for setup here. It _should_ @@ -1070,7 +1115,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { setupTx.Txn.Commit() } - tx := s.db.WriteTxn(100) + tx := s.db.WriteTxn(mutateIndex) require.NoError(t, tc.Mutate(s, tx)) // Note we call the func under test directly rather than publishChanges so @@ -1120,6 +1165,23 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error { } } +func evServiceIndex(idx uint64) func(e *stream.Event) error { + return func(e *stream.Event) error { + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.Value.Node.CreateIndex = idx + payload.Value.Node.ModifyIndex = idx + payload.Value.Service.CreateIndex = idx + payload.Value.Service.ModifyIndex = idx + for _, check := range payload.Value.Checks { + check.CreateIndex = idx + check.ModifyIndex = idx + } + e.Payload = payload + + return nil + } +} + func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { t.Helper() if diff := cmp.Diff(x, y, opts...); diff != "" { From eb58a39738cd5a06ea4d47c907d0a2a5c763f4e9 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 20 Jan 2021 14:56:59 -0500 Subject: [PATCH 03/12] state: Include the override key in the sorting of events Co-Authored-By: Kyle Havlovitz --- agent/consul/state/catalog_events.go | 4 +++- agent/consul/state/catalog_events_test.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index bcdcf227ab..b7fc6118d0 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -24,6 +24,7 @@ type EventPayloadCheckServiceNode struct { // events in the connect topic to specify the name of the underlying service // when the change event is for a sidecar or gateway. key string + // FIXME: we need to be able to override the namespace for some terminating gateway events } func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool { @@ -454,9 +455,10 @@ func copyEventForService(event stream.Event, service structs.ServiceName) stream event.Topic = topicServiceHealthConnect payload := event.Payload.(EventPayloadCheckServiceNode) payload.key = service.Name - event.Payload = payload // FIXME: we need payload to have an override for namespace, so that it can be filtered // properly by EventPayloadCheckServiceNode.MatchesKey + // payload.enterpriseMeta = service.EnterpriseMeta + event.Payload = payload return event } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 17139ba82d..e2f776c856 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1196,7 +1196,7 @@ var cmpPartialOrderEvents = cmp.Options{ cmpopts.SortSlices(func(i, j stream.Event) bool { key := func(e stream.Event) string { csn := getPayloadCheckServiceNode(e.Payload) - return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service) + return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service, e.Payload.(EventPayloadCheckServiceNode).key) } return key(i) < key(j) }), From 06b1c32e2513900a82464477f3af1b8ec7bf4a4b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 20 Jan 2021 15:10:48 -0500 Subject: [PATCH 04/12] state: Add two more tests for connect events with terminating gateways And expand one test case to cover more. Co-Authored-By: Kyle Havlovitz --- agent/consul/state/catalog.go | 13 +-- agent/consul/state/catalog_events.go | 29 ++--- agent/consul/state/catalog_events_test.go | 126 ++++++++++++++++++++-- 3 files changed, 135 insertions(+), 33 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 71a31d2725..43655ffc16 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -900,18 +900,19 @@ func maxIndexAndWatchChsForServiceNodes(tx ReadTxn, // compatible destination for the given service name. This will include // both proxies and native integrations. func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { - return s.serviceNodes(ws, serviceName, true, entMeta) + tx := s.db.ReadTxn() + defer tx.Abort() + return serviceNodesTxn(tx, ws, serviceName, true, entMeta) } // ServiceNodes returns the nodes associated with a given service name. func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { - return s.serviceNodes(ws, serviceName, false, entMeta) + tx := s.db.ReadTxn() + defer tx.Abort() + return serviceNodesTxn(tx, ws, serviceName, false, entMeta) } -func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { - tx := s.db.Txn(false) - defer tx.Abort() - +func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { // Function for lookup index := "service" if connect { diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index b7fc6118d0..4860141445 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -210,38 +210,29 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change} + if termGatewayChanges == nil { termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange) } - gatewayChanges, ok := termGatewayChanges[gs.Gateway] + _, ok := termGatewayChanges[gs.Gateway] if !ok { termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{} } - prevChange, ok := gatewayChanges[gs.Service] - if !ok { + switch gsChange.changeType { + case changeUpdate: + after := gsChange.change.After.(*structs.GatewayService) + if gsChange.change.Before.(*structs.GatewayService).IsSame(after) { + continue + } termGatewayChanges[gs.Gateway][gs.Service] = gsChange - continue - } - - if changeTypeFromChange(change) == changeDelete { + case changeDelete, changeCreate: termGatewayChanges[gs.Gateway][gs.Service] = gsChange - continue - } - - prevGs := changeObject(prevChange.change).(*structs.GatewayService) - if !gs.IsSame(prevGs) { - gsChange.changeType = changeUpdate - termGatewayChanges[gs.Gateway][gs.Service] = gsChange - } else { - delete(termGatewayChanges[gs.Gateway], gs.Service) } } } - //fmt.Printf("term gateway map: %v", termGatewayChanges) - // Now act on those marked nodes/services for node, changeType := range nodeChanges { if changeType == changeDelete { @@ -304,7 +295,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event for serviceName, gsChange := range serviceChanges { gs := changeObject(gsChange.change).(*structs.GatewayService) - _, nodes, err := serviceGatewayNodes(tx, nil, serviceName.Name, gs.GatewayKind, &gatewayName.EnterpriseMeta) + _, nodes, err := serviceNodesTxn(tx, nil, gs.Gateway.Name, false, &gatewayName.EnterpriseMeta) if err != nil { return nil, err } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index e2f776c856..e4b3b52c37 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1037,8 +1037,15 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) }, Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + if err := s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false, + ); err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) }, WantEvents: []stream.Event{ testServiceHealthEvent(t, @@ -1052,6 +1059,20 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { "tgate1", evConnectTopic, evServiceTermingGateway("srv2")), + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evNode2), }, }, { @@ -1091,9 +1112,100 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceIndex(setupIndex)), }, }, - // terminating gateway with 2 instances - // changing config entry to add a linked service - // changing config entry to remove a linked service + { + Name: "change the terminating gateway config entry to add a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }, + { + Name: "change the terminating gateway config entry to remove a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + }, + }, + // change the terminating gateway config entry to update a linked service (new SNI/CAFile/etc) // deleting a config entry // deregistering a service behind a terminating gateway (should send no term gateway events) } @@ -1127,15 +1239,13 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { } require.NoError(t, err) - assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents) + assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty()) }) } } func regTerminatingGateway(req *structs.RegisterRequest) error { - req.Service.Service = "tgate1" req.Service.Kind = structs.ServiceKindTerminatingGateway - req.Service.ID = "tgate1" req.Service.Port = 22000 return nil } From a21be5efa8c4a48748c70a86d6e93771c3b7b893 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 26 Jan 2021 14:23:02 -0800 Subject: [PATCH 05/12] Added 6 new test cases for terminating gateway events Co-Authored-By: Daniel Nephin --- agent/consul/state/catalog_events_test.go | 228 +++++++++++++++++++++- 1 file changed, 225 insertions(+), 3 deletions(-) diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index e4b3b52c37..646142ffe9 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1016,6 +1016,24 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceTermingGateway("tgate1")), }, }, + { + Name: "config entry created with no terminating gateway instance", + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{}, + }, { Name: "terminating gateway registered after config entry exists", Setup: func(s *Store, tx *txn) error { @@ -1075,6 +1093,63 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evNode2), }, }, + { + Name: "terminating gateway updated after config entry exists", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNodeCheckFail), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evNodeCheckFail, + evNodeUnchanged, + evNodeChecksMutated, + evServiceUnchanged), + }, + }, { Name: "terminating gateway config entry created after gateway exists", Setup: func(s *Store, tx *txn) error { @@ -1205,9 +1280,156 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceTermingGateway("srv1")), }, }, - // change the terminating gateway config entry to update a linked service (new SNI/CAFile/etc) - // deleting a config entry - // deregistering a service behind a terminating gateway (should send no term gateway events) + { + Name: "update a linked service within a terminating gateway config entry", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + CAFile: "foo.crt", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evServiceIndex(setupIndex)), + }, + }, + { + Name: "delete a terminating gateway config entry with a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + err = s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + if err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) + }, + Mutate: func(s *Store, tx *txn) error { + return deleteConfigEntryTxn(tx, tx.Index, structs.TerminatingGateway, "tgate1", structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNode2), + }, + }, + { + Name: "create an instance of a linked service in a terminating gateway", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "srv1", evNodeUnchanged), + }, + }, + { + Name: "delete an instance of a linked service in a terminating gateway", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + err = s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "srv1", nil) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, "srv1"), + }, + }, } for _, tc := range cases { From 30a575dd33b0aea26730061af19d1fc00d834945 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 29 Jan 2021 15:06:41 -0500 Subject: [PATCH 06/12] state: add 2 more test cases for terminate gateway streaming events Co-Authored-By: Kyle Havlovitz --- agent/consul/state/catalog_events_test.go | 134 +++++++++++++++++++++- 1 file changed, 128 insertions(+), 6 deletions(-) diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 646142ffe9..f45848a94b 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -483,7 +483,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evRenameService, evServiceMutated, evNodeUnchanged, - evChecksMutated, + evServiceChecksMutated, ), testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, @@ -797,14 +797,14 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceCheckFail, evNodeUnchanged, evServiceUnchanged, - evChecksMutated, + evServiceChecksMutated, ), testServiceHealthEvent(t, "web", evSidecar, evServiceCheckFail, evNodeUnchanged, evServiceUnchanged, - evChecksMutated, + evServiceChecksMutated, ), testServiceHealthEvent(t, "web", evConnectTopic, @@ -812,7 +812,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceCheckFail, evNodeUnchanged, evServiceUnchanged, - evChecksMutated, + evServiceChecksMutated, ), }, WantErr: false, @@ -1430,6 +1430,118 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { testServiceHealthDeregistrationEvent(t, "srv1"), }, }, + { + Name: "rename a terminating gateway instance", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + configEntry = &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate2", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err = ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + rename := func(req *structs.RegisterRequest) error { + req.Service.Service = "tgate2" + req.Checks[1].ServiceName = "tgate2" + return nil + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, rename), false) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evServiceTermingGateway("tgate1")), + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway(""), + evNodeUnchanged, + evServiceMutated, + evServiceChecksMutated, + evTerminatingGatewayRenamed("tgate2")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNodeUnchanged, + evServiceMutated, + evServiceChecksMutated, + evTerminatingGatewayRenamed("tgate2")), + }, + }, + { + Name: "delete a terminating gateway instance", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "tgate1", structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evServiceTermingGateway("")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2")), + }, + }, } for _, tc := range cases { @@ -1528,6 +1640,7 @@ var cmpPartialOrderEvents = cmp.Options{ cmpopts.SortSlices(func(i, j stream.Event) bool { key := func(e stream.Event) string { csn := getPayloadCheckServiceNode(e.Payload) + // TODO: double check this sort key is correct. return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service, e.Payload.(EventPayloadCheckServiceNode).key) } return key(i) < key(j) @@ -1848,12 +1961,12 @@ func evServiceMutated(e *stream.Event) error { return nil } -// evChecksMutated option alters the base event service check to set it's +// evServiceChecksMutated option alters the base event service check to set it's // CreateIndex (but not modify index) to the setup index. This expresses that we // expect the service check records originally created in setup to have been // mutated during the update. NOTE: this must be sequenced after // evServiceUnchanged if both are used. -func evChecksMutated(e *stream.Event) error { +func evServiceChecksMutated(e *stream.Event) error { getPayloadCheckServiceNode(e.Payload).Checks[1].CreateIndex = 10 getPayloadCheckServiceNode(e.Payload).Checks[1].ModifyIndex = 100 return nil @@ -1911,6 +2024,15 @@ func evRenameService(e *stream.Event) error { return nil } +func evTerminatingGatewayRenamed(newName string) func(e *stream.Event) error { + return func(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Service.Service = newName + csn.Checks[1].ServiceName = newName + return nil + } +} + // evNodeMeta option alters the base event node to add some meta data. func evNodeMeta(e *stream.Event) error { csn := getPayloadCheckServiceNode(e.Payload) From 4756ff059dff7db60c6c2a75fb720520ee1384bd Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 26 Feb 2021 13:34:14 -0500 Subject: [PATCH 07/12] state: update calls to ensureConfigEntryTxn The EnterpriseMeta paramter was removed after this code was written, but before it merged. Also the table name constant has changed. --- agent/consul/state/catalog_events.go | 2 +- agent/consul/state/catalog_events_test.go | 32 +++++++++++------------ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 4860141445..4a088ed8fc 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -203,7 +203,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect) } } - case gatewayServicesTableName: + case tableGatewayServices: gs := changeObject(change).(*structs.GatewayService) if gs.GatewayKind != structs.ServiceKindTerminatingGateway { continue diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index f45848a94b..83d7cd184a 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1030,7 +1030,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{}, }, @@ -1052,7 +1052,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, Mutate: func(s *Store, tx *txn) error { if err := s.ensureRegistrationTxn( @@ -1111,7 +1111,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1172,7 +1172,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{ testServiceHealthEvent(t, @@ -1201,7 +1201,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1224,7 +1224,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{ testServiceHealthEvent(t, @@ -1252,7 +1252,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1271,7 +1271,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{ testServiceHealthDeregistrationEvent(t, @@ -1294,7 +1294,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1314,7 +1314,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{ testServiceHealthDeregistrationEvent(t, @@ -1342,7 +1342,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1384,7 +1384,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1412,7 +1412,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1444,7 +1444,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1459,7 +1459,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err = ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err = ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1518,7 +1518,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + err := ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } From ae368768e54439d6b2261746dcd5a6bb8c42dfac Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 29 Jan 2021 15:40:07 -0500 Subject: [PATCH 08/12] state: Add support for override of namespace in MatchesKey also tests for MatchesKey Co-Authored-By: Kyle Havlovitz --- agent/consul/state/catalog_events.go | 34 ++++----- agent/consul/state/catalog_events_oss_test.go | 7 ++ agent/consul/state/catalog_events_test.go | 69 ++++++++++++++----- 3 files changed, 76 insertions(+), 34 deletions(-) create mode 100644 agent/consul/state/catalog_events_oss_test.go diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 4a088ed8fc..9be0a9daee 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -23,8 +23,8 @@ type EventPayloadCheckServiceNode struct { // key is used to override the key used to filter the payload. It is set for // events in the connect topic to specify the name of the underlying service // when the change event is for a sidecar or gateway. - key string - // FIXME: we need to be able to override the namespace for some terminating gateway events + overrideKey string + overrideNamespace string } func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool { @@ -41,11 +41,15 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool { } name := e.Value.Service.Service - if e.key != "" { - name = e.key + if e.overrideKey != "" { + name = e.overrideKey } ns := e.Value.Service.EnterpriseMeta.GetNamespace() - return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns) + if e.overrideNamespace != "" { + ns = e.overrideNamespace + } + return (key == "" || strings.EqualFold(key, name)) && + (namespace == "" || strings.EqualFold(namespace, ns)) } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -74,7 +78,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { } if connect && n.Service.Kind == structs.ServiceKindConnectProxy { - payload.key = n.Service.Proxy.DestinationServiceName + payload.overrideKey = n.Service.Proxy.DestinationServiceName } event.Payload = payload @@ -306,9 +310,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event e := newServiceHealthEventDeregister(changes.Index, sn) e.Topic = topicServiceHealthConnect - // todo(streaming): make namespace-aware in enterprise payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = serviceName.Name + payload.overrideKey = serviceName.Name + payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() e.Payload = payload events = append(events, e) @@ -328,9 +332,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } e.Topic = topicServiceHealthConnect - // todo(streaming): make namespace-aware in enterprise payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = serviceName.Name + payload.overrideKey = serviceName.Name + payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() e.Payload = payload events = append(events, e) @@ -362,7 +366,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S e := newServiceHealthEventDeregister(idx, before) e.Topic = topicServiceHealthConnect payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = payload.Value.Service.Proxy.DestinationServiceName + payload.overrideKey = payload.Value.Service.Proxy.DestinationServiceName e.Payload = payload return e, true } @@ -419,7 +423,7 @@ func serviceHealthToConnectEvents( case node.Service.Kind == structs.ServiceKindConnectProxy: payload := event.Payload.(EventPayloadCheckServiceNode) - payload.key = node.Service.Proxy.DestinationServiceName + payload.overrideKey = node.Service.Proxy.DestinationServiceName connectEvent.Payload = payload result = append(result, connectEvent) @@ -445,10 +449,8 @@ func serviceHealthToConnectEvents( func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event { event.Topic = topicServiceHealthConnect payload := event.Payload.(EventPayloadCheckServiceNode) - payload.key = service.Name - // FIXME: we need payload to have an override for namespace, so that it can be filtered - // properly by EventPayloadCheckServiceNode.MatchesKey - // payload.enterpriseMeta = service.EnterpriseMeta + payload.overrideKey = service.Name + payload.overrideNamespace = service.EnterpriseMeta.GetNamespace() event.Payload = payload return event } diff --git a/agent/consul/state/catalog_events_oss_test.go b/agent/consul/state/catalog_events_oss_test.go new file mode 100644 index 0000000000..bad6fa817c --- /dev/null +++ b/agent/consul/state/catalog_events_oss_test.go @@ -0,0 +1,7 @@ +// +build !consulent + +package state + +func withServiceHealthEnterpriseCases(cases []serviceHealthTestCase) []serviceHealthTestCase { + return cases +} diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 83d7cd184a..108f6eea73 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -98,7 +98,7 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error { e.Index = counter.Last() ep := e.Payload.(EventPayloadCheckServiceNode) - ep.key = "web" + ep.overrideKey = "web" e.Payload = ep csn := ep.Value csn.Node.CreateIndex = 1 @@ -116,7 +116,7 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error { e.Index = counter.Last() ep := e.Payload.(EventPayloadCheckServiceNode) - ep.key = "web" + ep.overrideKey = "web" e.Payload = ep csn := ep.Value csn.Node.CreateIndex = 4 @@ -173,17 +173,19 @@ func evIndexes(idx, create, modify uint64) func(e *stream.Event) error { } } +type serviceHealthTestCase struct { + Name string + Setup func(s *Store, tx *txn) error + Mutate func(s *Store, tx *txn) error + WantEvents []stream.Event + WantErr bool +} + func TestServiceHealthEventsFromChanges(t *testing.T) { setupIndex := uint64(10) mutateIndex := uint64(100) - cases := []struct { - Name string - Setup func(s *Store, tx *txn) error - Mutate func(s *Store, tx *txn) error - WantEvents []stream.Event - WantErr bool - }{ + cases := []serviceHealthTestCase{ { Name: "irrelevant events", Mutate: func(s *Store, tx *txn) error { @@ -1543,6 +1545,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, }, } + cases = withServiceHealthEnterpriseCases(cases) for _, tc := range cases { tc := tc @@ -1602,7 +1605,7 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error { if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = name + payload.overrideKey = name e.Payload = payload } return nil @@ -1641,7 +1644,7 @@ var cmpPartialOrderEvents = cmp.Options{ key := func(e stream.Event) string { csn := getPayloadCheckServiceNode(e.Payload) // TODO: double check this sort key is correct. - return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service, e.Payload.(EventPayloadCheckServiceNode).key) + return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service, e.Payload.(EventPayloadCheckServiceNode).overrideKey) } return key(i) < key(j) }), @@ -1929,7 +1932,7 @@ func evSidecar(e *stream.Event) error { if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = svc + payload.overrideKey = svc e.Payload = payload } return nil @@ -2018,7 +2021,7 @@ func evRenameService(e *stream.Event) error { if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = csn.Service.Proxy.DestinationServiceName + payload.overrideKey = csn.Service.Proxy.DestinationServiceName e.Payload = payload } return nil @@ -2268,14 +2271,42 @@ func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) { }, { name: "override key match", - payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv1"), + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""), key: "srv1", namespace: "ns1", expected: true, }, { - name: "override key match", - payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv2"), + name: "override key mismatch", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""), + key: "proxy", + namespace: "ns1", + expected: false, + }, + { + name: "override namespace match", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"), + key: "proxy", + namespace: "ns2", + expected: true, + }, + { + name: "override namespace mismatch", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"), + key: "proxy", + namespace: "ns1", + expected: false, + }, + { + name: "override both key and namespace match", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"), + key: "srv1", + namespace: "ns2", + expected: true, + }, + { + name: "override both key and namespace mismatch namespace", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"), key: "proxy", namespace: "ns1", expected: false, @@ -2300,7 +2331,8 @@ func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServ } } -func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayloadCheckServiceNode { +func newPayloadCheckServiceNodeWithOverride( + service, namespace, overrideKey, overrideNamespace string) EventPayloadCheckServiceNode { return EventPayloadCheckServiceNode{ Value: &structs.CheckServiceNode{ Service: &structs.NodeService{ @@ -2308,6 +2340,7 @@ func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayl EnterpriseMeta: structs.NewEnterpriseMeta(namespace), }, }, - key: key, + overrideKey: overrideKey, + overrideNamespace: overrideNamespace, } } From 701285e4706e8c906c04bf6e2a3357e2e94027fc Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 29 Jan 2021 15:53:45 -0500 Subject: [PATCH 09/12] Start to setup enterprise tests for terminating gateway streaming events. Co-Authored-By: Kyle Havlovitz --- agent/consul/state/catalog_events.go | 13 ++++++++++--- agent/consul/state/catalog_events_test.go | 5 +++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 9be0a9daee..2fa353d7ad 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -312,7 +312,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event e.Topic = topicServiceHealthConnect payload := e.Payload.(EventPayloadCheckServiceNode) payload.overrideKey = serviceName.Name - payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() + if gatewayName.EnterpriseMeta.GetNamespace() != serviceName.EnterpriseMeta.GetNamespace() { + payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() + } e.Payload = payload events = append(events, e) @@ -334,7 +336,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event e.Topic = topicServiceHealthConnect payload := e.Payload.(EventPayloadCheckServiceNode) payload.overrideKey = serviceName.Name - payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() + if gatewayName.EnterpriseMeta.GetNamespace() != serviceName.EnterpriseMeta.GetNamespace() { + payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() + } e.Payload = payload events = append(events, e) @@ -450,7 +454,10 @@ func copyEventForService(event stream.Event, service structs.ServiceName) stream event.Topic = topicServiceHealthConnect payload := event.Payload.(EventPayloadCheckServiceNode) payload.overrideKey = service.Name - payload.overrideNamespace = service.EnterpriseMeta.GetNamespace() + if payload.Value.Service.EnterpriseMeta.GetNamespace() != service.EnterpriseMeta.GetNamespace() { + payload.overrideNamespace = service.EnterpriseMeta.GetNamespace() + } + event.Payload = payload return event } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 108f6eea73..5257e2108e 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1717,8 +1717,9 @@ func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) strea csn.Node.Address = "10.10.10.10" for _, opt := range opts { - err := opt(&e) - require.NoError(t, err) + if err := opt(&e); err != nil { + t.Fatalf("expected no error, got %v", err) + } } return e } From db572aca5909c1a99d8381076f5490bb0a38dc14 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 3 Feb 2021 12:49:14 -0800 Subject: [PATCH 10/12] Add remaining terminating gateway tests for namespaces Co-Authored-By: Daniel Nephin --- agent/consul/state/catalog_events_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 5257e2108e..8b02a18e62 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1699,7 +1699,9 @@ func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *struc }) for _, opt := range opts { err := opt(r) - require.NoError(t, err) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } } return r } @@ -1727,8 +1729,9 @@ func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) strea func testServiceHealthDeregistrationEvent(t *testing.T, svc string, opts ...eventOption) stream.Event { e := newTestEventServiceHealthDeregister(100, 1, svc) for _, opt := range opts { - err := opt(&e) - require.NoError(t, err) + if err := opt(&e); err != nil { + t.Fatalf("expected no error, got %v", err) + } } return e } From 68ec20f66a17752dafde7e52b7f47731dbe93081 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 26 Feb 2021 19:39:05 -0500 Subject: [PATCH 11/12] state: handle terminating gateway events properly in snapshot Refactored out a function that can be used for both the snapshot and stream of events to translate an event into an appropriate connect event. Previously terminating gateway events would have used the wrong key in the snapshot, which would have caused them to be filtered out later on. Also removed an unused function, and some commented out code. --- agent/consul/state/catalog_events.go | 98 +++++++++++++---------- agent/consul/state/catalog_events_test.go | 64 +++++++++------ 2 files changed, 96 insertions(+), 66 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 2fa353d7ad..9c9b4de988 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -71,21 +71,24 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { event := stream.Event{ Index: idx, Topic: topic, - } - payload := EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &n, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &n, + }, } - if connect && n.Service.Kind == structs.ServiceKindConnectProxy { - payload.overrideKey = n.Service.Proxy.DestinationServiceName + if !connect { + // append each event as a separate item so that they can be serialized + // separately, to prevent the encoding of one massive message. + buf.Append([]stream.Event{event}) + continue } - event.Payload = payload - - // append each event as a separate item so that they can be serialized - // separately, to prevent the encoding of one massive message. - buf.Append([]stream.Event{event}) + events, err := connectEventsByServiceKind(tx, event) + if err != nil { + return idx, err + } + buf.Append(events) } return idx, err @@ -413,43 +416,56 @@ func serviceHealthToConnectEvents( // Skip non-health or any events already emitted to Connect topic continue } - node := getPayloadCheckServiceNode(event.Payload) - if node.Service == nil { - continue + + connectEvents, err := connectEventsByServiceKind(tx, event) + if err != nil { + return nil, err } - connectEvent := event - connectEvent.Topic = topicServiceHealthConnect - - switch { - case node.Service.Connect.Native: - result = append(result, connectEvent) - - case node.Service.Kind == structs.ServiceKindConnectProxy: - payload := event.Payload.(EventPayloadCheckServiceNode) - payload.overrideKey = node.Service.Proxy.DestinationServiceName - connectEvent.Payload = payload - result = append(result, connectEvent) - - case node.Service.Kind == structs.ServiceKindTerminatingGateway: - iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta) - if err != nil { - return nil, err - } - - // similar to checkServiceNodesTxn -> serviceGatewayNodes - for obj := iter.Next(); obj != nil; obj = iter.Next() { - result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service)) - } - - default: - // All other cases are not relevant to the connect topic - } + result = append(result, connectEvents...) } return result, nil } +func connectEventsByServiceKind(tx ReadTxn, origEvent stream.Event) ([]stream.Event, error) { + node := getPayloadCheckServiceNode(origEvent.Payload) + if node.Service == nil { + return nil, nil + } + + event := origEvent // shallow copy the event + event.Topic = topicServiceHealthConnect + + if node.Service.Connect.Native { + return []stream.Event{event}, nil + } + + switch node.Service.Kind { + case structs.ServiceKindConnectProxy: + payload := event.Payload.(EventPayloadCheckServiceNode) + payload.overrideKey = node.Service.Proxy.DestinationServiceName + event.Payload = payload + return []stream.Event{event}, nil + + case structs.ServiceKindTerminatingGateway: + var result []stream.Event + iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta) + if err != nil { + return nil, err + } + + // similar to checkServiceNodesTxn -> serviceGatewayNodes + for obj := iter.Next(); obj != nil; obj = iter.Next() { + result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service)) + } + return result, nil + default: + // All other cases are not relevant to the connect topic + } + return nil, nil +} + func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event { event.Topic = topicServiceHealthConnect payload := event.Payload.(EventPayloadCheckServiceNode) diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 8b02a18e62..d012f81e1c 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -85,6 +85,23 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar)) require.NoError(t, err) + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "web", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err = store.EnsureConfigEntry(counter.Next(), configEntry) + require.NoError(t, err) + + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "tgate1", regTerminatingGateway)) + require.NoError(t, err) + fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect) buf := &snapshotAppender{} req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect} @@ -95,10 +112,9 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { expected := [][]stream.Event{ { - testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error { + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, func(e *stream.Event) error { e.Index = counter.Last() ep := e.Payload.(EventPayloadCheckServiceNode) - ep.overrideKey = "web" e.Payload = ep csn := ep.Value csn.Node.CreateIndex = 1 @@ -113,10 +129,9 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { }), }, { - testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error { + testServiceHealthEvent(t, "web", evConnectTopic, evNode2, evSidecar, func(e *stream.Event) error { e.Index = counter.Last() ep := e.Payload.(EventPayloadCheckServiceNode) - ep.overrideKey = "web" e.Payload = ep csn := ep.Value csn.Node.CreateIndex = 4 @@ -130,6 +145,26 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { return nil }), }, + { + testServiceHealthEvent(t, "tgate1", + evConnectTopic, + evServiceTermingGateway("web"), + func(e *stream.Event) error { + e.Index = counter.Last() + ep := e.Payload.(EventPayloadCheckServiceNode) + e.Payload = ep + csn := ep.Value + csn.Node.CreateIndex = 1 + csn.Node.ModifyIndex = 1 + csn.Service.CreateIndex = 7 + csn.Service.ModifyIndex = 7 + csn.Checks[0].CreateIndex = 1 + csn.Checks[0].ModifyIndex = 1 + csn.Checks[1].CreateIndex = 7 + csn.Checks[1].ModifyIndex = 7 + return nil + }), + }, } assertDeepEqual(t, expected, buf.events, cmpEvents) } @@ -161,18 +196,6 @@ func newIndexCounter() *indexCounter { var _ stream.SnapshotAppender = (*snapshotAppender)(nil) -func evIndexes(idx, create, modify uint64) func(e *stream.Event) error { - return func(e *stream.Event) error { - e.Index = idx - csn := getPayloadCheckServiceNode(e.Payload) - csn.Node.CreateIndex = create - csn.Node.ModifyIndex = modify - csn.Service.CreateIndex = create - csn.Service.ModifyIndex = modify - return nil - } -} - type serviceHealthTestCase struct { Name string Setup func(s *Store, tx *txn) error @@ -1594,15 +1617,6 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error { csn.Service.Kind = structs.ServiceKindTerminatingGateway csn.Service.Port = 22000 - // Convert the check to point to the right ID now. This isn't totally - // realistic - sidecars should have alias checks etc but this is good enough - // to test this code path. - //if len(csn.Checks) >= 2 { - // csn.Checks[1].CheckID = types.CheckID("service:" + svc + "_terminating_gateway") - // csn.Checks[1].ServiceID = svc + "_terminating_gateway" - // csn.Checks[1].ServiceName = svc + "_terminating_gateway" - //} - if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) payload.overrideKey = name From 23421e190c4cff1986e92d02a7f02bff7c228ad4 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 9 Mar 2021 14:00:29 -0500 Subject: [PATCH 12/12] state: adjust compare for catalog events Document that this comparison should roughly match MatchesKey Only sort by overrideKey or service name, but not both Add namespace to the sort. The client side also builds a map of these based on the namespace/node/service key, so the only order that really matters is the ordering of register/dereigster events. --- agent/consul/state/catalog_events_test.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index d012f81e1c..9aa82062e8 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1651,14 +1651,26 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { } // cmpPartialOrderEvents returns a compare option which sorts events so that -// all events for a particular node/service are grouped together. The sort is -// stable so events with the same node/service retain their relative order. +// all events for a particular topic are grouped together. The sort is +// stable so events with the same key retain their relative order. +// +// This sort should match the logic in EventPayloadCheckServiceNode.MatchesKey +// to avoid masking bugs. var cmpPartialOrderEvents = cmp.Options{ cmpopts.SortSlices(func(i, j stream.Event) bool { key := func(e stream.Event) string { - csn := getPayloadCheckServiceNode(e.Payload) - // TODO: double check this sort key is correct. - return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service, e.Payload.(EventPayloadCheckServiceNode).overrideKey) + payload := e.Payload.(EventPayloadCheckServiceNode) + csn := payload.Value + + name := csn.Service.Service + if payload.overrideKey != "" { + name = payload.overrideKey + } + ns := csn.Service.EnterpriseMeta.GetNamespace() + if payload.overrideNamespace != "" { + ns = payload.overrideNamespace + } + return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, ns, name) } return key(i) < key(j) }),