From 499211f9078e53640f1335fdc0c98374d8206ced Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 28 Jul 2022 12:51:01 -0700 Subject: [PATCH] Fix wildcard picking up services it shouldn't for ingress/terminating gateways --- agent/consul/state/catalog.go | 96 ++++++++++++++++++++++++-- agent/consul/state/catalog_test.go | 44 ++++++++---- agent/consul/state/config_entry.go | 4 +- agent/consul/state/state_store_test.go | 6 ++ 4 files changed, 128 insertions(+), 22 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 849d0820c3..a280ba9bf4 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -871,7 +871,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool if svc.Kind == structs.ServiceKindTypical && svc.Service != "consul" { // Check if this service is covered by a gateway's wildcard specifier, we force the service kind to a gateway-service here as that take precedence sn := structs.NewServiceName(svc.Service, &svc.EnterpriseMeta) - if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil { + if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, svc, structs.GatewayServiceKindService); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } if err = checkGatewayAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil { @@ -1984,11 +1984,6 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta, svc.PeerName); err != nil { return err } - if svc.PeerName == "" { - if err := cleanupGatewayWildcards(tx, idx, svc); err != nil { - return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) - } - } psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name} if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil { return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) @@ -2001,6 +1996,12 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) } + if svc.PeerName == "" { + if err := cleanupGatewayWildcards(tx, idx, svc); err != nil { + return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) + } + } + return nil } @@ -3652,6 +3653,18 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer continue } + supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, sn.ServiceName, entMeta) + if err != nil { + return err + } + + if service.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress { + continue + } + if service.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating { + continue + } + existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port) if err != nil { return fmt.Errorf("gateway service lookup failed: %s", err) @@ -3717,6 +3730,42 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer return nil } +// serviceConnectInstances returns whether the service has at least one connect instance, +// and at least one non-connect instance. +func serviceConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, bool, error) { + hasConnectInstance := false + connectQuery := Query{ + Value: serviceName, + EnterpriseMeta: *entMeta, + } + svc, err := tx.First(tableServices, indexConnect, connectQuery) + if err != nil { + return false, false, fmt.Errorf("failed service lookup: %s", err) + } + if svc != nil { + hasConnectInstance = true + } + + hasNonConnectInstance := false + nonConnectQuery := Query{ + Value: serviceName, + EnterpriseMeta: *entMeta, + } + iter, err := tx.Get(tableServices, indexService, nonConnectQuery) + if err != nil { + return false, false, fmt.Errorf("failed service lookup: %s", err) + } + for service := iter.Next(); service != nil; service = iter.Next() { + sn := service.(*structs.ServiceNode) + if !sn.ServiceConnect.Native { + hasNonConnectInstance = true + break + } + } + + return hasConnectInstance, hasNonConnectInstance, nil +} + // updateGatewayService associates services with gateways after an eligible event // ie. Registering a service in a namespace targeted by a gateway func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error { @@ -3754,14 +3803,31 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi // checkWildcardForGatewaysAndUpdate checks whether a service matches a // wildcard definition in gateway config entries and if so adds it the the // gateway-services table. -func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, kind structs.GatewayServiceKind) error { +func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, ns *structs.NodeService, kind structs.GatewayServiceKind) error { sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta} svcGateways, err := tx.Get(tableGatewayServices, indexService, sn) if err != nil { return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err) } + + supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, svc.Name, &svc.EnterpriseMeta) + if err != nil { + return err + } + if ns != nil && ns.Connect.Native { + supportsIngress = true + } else { + supportsTerminating = true + } + for service := svcGateways.Next(); service != nil; service = svcGateways.Next() { if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil { + if wildcardSvc.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress { + continue + } + if wildcardSvc.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating { + continue + } // Copy the wildcard mapping and modify it gatewaySvc := wildcardSvc.Clone() @@ -3818,12 +3884,28 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) } } + // Check whether there are any connect or non-connect instances remaining for this service. + // If there are no connect instances left, ingress gateways with a wildcard entry can remove + // their association with it (same with terminating gateways if there are no non-connect + // instances left). + hasConnectInstance, hasNonConnectInstance, err := serviceConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta) + if err != nil { + return err + } + // Do the updates in a separate loop so we don't trash the iterator. for _, m := range mappings { // Only delete if association was created by a wildcard specifier. // Otherwise the service was specified in the config entry, and the association should be maintained // for when the service is re-registered if m.FromWildcard { + if m.GatewayKind == structs.ServiceKindIngressGateway && hasConnectInstance { + continue + } + if m.GatewayKind == structs.ServiceKindTerminatingGateway && hasNonConnectInstance { + continue + } + if err := tx.Delete(tableGatewayServices, m); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index fed3bd0ee3..357844dc01 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -4,13 +4,14 @@ import ( "context" crand "crypto/rand" "fmt" - "github.com/hashicorp/consul/acl" "reflect" "sort" "strings" "testing" "time" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/assert" @@ -5753,6 +5754,10 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { assert.Nil(t, s.EnsureService(13, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) assert.Nil(t, s.EnsureService(14, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) + // Connect services (should be ignored by terminating gateway) + assert.Nil(t, s.EnsureService(15, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000})) + assert.Nil(t, s.EnsureService(16, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000})) + // Register two gateways assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443})) @@ -5895,6 +5900,16 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { }, } assert.Equal(t, expect, out) + + // Delete the non-connect instance of api + assert.Nil(t, s.DeleteService(21, "foo", "api", nil, "")) + + // Gateway with wildcard entry should have no services left, because the last + // non-connect instance of 'api' was deleted. + idx, out, err = s.GatewayServices(ws, "other-gateway", nil) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(21)) + assert.Empty(t, out) } func TestStateStore_CheckIngressServiceNodes(t *testing.T) { @@ -5904,7 +5919,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { t.Run("check service1 ingress gateway", func(t *testing.T) { idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) // Multiple instances of the ingress2 service require.Len(t, results, 4) @@ -5923,7 +5938,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { t.Run("check service2 ingress gateway", func(t *testing.T) { idx, results, err := s.CheckIngressServiceNodes(ws, "service2", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 2) ids := make(map[string]struct{}) @@ -5941,7 +5956,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { ws := memdb.NewWatchSet() idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 1) require.Equal(t, "wildcardIngress", results[0].Service.ID) }) @@ -5952,17 +5967,17 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 3) idx, results, err = s.CheckIngressServiceNodes(ws, "service2", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 1) idx, results, err = s.CheckIngressServiceNodes(ws, "service3", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) // TODO(ingress): index goes backward when deleting last config entry // require.Equal(t,uint64(11), idx) require.Len(t, results, 0) @@ -6346,8 +6361,8 @@ func TestStateStore_GatewayServices_IngressProtocolFiltering(t *testing.T) { } testRegisterNode(t, s, 0, "node1") - testRegisterService(t, s, 1, "node1", "service1") - testRegisterService(t, s, 2, "node1", "service2") + testRegisterConnectService(t, s, 1, "node1", "service1") + testRegisterConnectService(t, s, 2, "node1", "service2") assert.NoError(t, s.EnsureConfigEntry(4, ingress1)) }) @@ -6510,15 +6525,17 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 1, "node2") - // Register a service against the nodes. + // Register some connect and non-connect services against the nodes. testRegisterIngressService(t, s, 3, "node1", "wildcardIngress") testRegisterIngressService(t, s, 4, "node1", "ingress1") testRegisterIngressService(t, s, 5, "node1", "ingress2") testRegisterIngressService(t, s, 6, "node2", "ingress2") testRegisterIngressService(t, s, 7, "node1", "nothingIngress") - testRegisterService(t, s, 8, "node1", "service1") - testRegisterService(t, s, 9, "node2", "service2") - testRegisterService(t, s, 10, "node2", "service3") + testRegisterConnectService(t, s, 8, "node1", "service1") + testRegisterConnectService(t, s, 9, "node2", "service2") + testRegisterConnectService(t, s, 10, "node2", "service3") + testRegisterService(t, s, 17, "node1", "service4") + testRegisterService(t, s, 18, "node2", "service5") // Default protocol to http proxyDefaults := &structs.ProxyConfigEntry{ @@ -7883,6 +7900,7 @@ func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) { Address: "127.0.0.3", Port: 443, EnterpriseMeta: *defaultMeta, + Connect: structs.ServiceConnect{Native: true}, } require.NoError(t, s.EnsureService(5, "foo", &svc)) assert.True(t, watchFired(ws)) diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index af0fe21133..f5b15115c3 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -371,7 +371,7 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a gsKind = structs.GatewayServiceKindUnknown } serviceName := structs.NewServiceName(c.GetName(), c.GetEnterpriseMeta()) - if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, gsKind); err != nil { + if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, nil, gsKind); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil { @@ -434,7 +434,7 @@ func insertConfigEntryWithTxn(tx WriteTxn, idx uint64, conf structs.ConfigEntry) if err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } - if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, gsKind); err != nil { + if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, nil, gsKind); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } if err := checkGatewayAndUpdate(tx, idx, &sn, gsKind); err != nil { diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 4946c50f21..c8460ca821 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -193,6 +193,12 @@ func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID s testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false) } +func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { + testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, true, func(service *structs.NodeService) { + service.Connect = structs.ServiceConnect{Native: true} + }) +} + func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { svc := &structs.NodeService{ ID: serviceID,