Browse Source

Fix wildcard picking up services it shouldn't for ingress/terminating gateways

pull/13958/head
Kyle Havlovitz 2 years ago
parent
commit
499211f907
  1. 96
      agent/consul/state/catalog.go
  2. 44
      agent/consul/state/catalog_test.go
  3. 4
      agent/consul/state/config_entry.go
  4. 6
      agent/consul/state/state_store_test.go

96
agent/consul/state/catalog.go

@ -871,7 +871,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
if svc.Kind == structs.ServiceKindTypical && svc.Service != "consul" {
// Check if this service is covered by a gateway's wildcard specifier, we force the service kind to a gateway-service here as that take precedence
sn := structs.NewServiceName(svc.Service, &svc.EnterpriseMeta)
if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil {
if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, svc, structs.GatewayServiceKindService); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err = checkGatewayAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil {
@ -1984,11 +1984,6 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta, svc.PeerName); err != nil {
return err
}
if svc.PeerName == "" {
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name}
if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
@ -2001,6 +1996,12 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
}
if svc.PeerName == "" {
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}
return nil
}
@ -3652,6 +3653,18 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
continue
}
supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, sn.ServiceName, entMeta)
if err != nil {
return err
}
if service.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress {
continue
}
if service.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating {
continue
}
existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
@ -3717,6 +3730,42 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
return nil
}
// serviceConnectInstances returns whether the service has at least one connect instance,
// and at least one non-connect instance.
func serviceConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, bool, error) {
hasConnectInstance := false
connectQuery := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
svc, err := tx.First(tableServices, indexConnect, connectQuery)
if err != nil {
return false, false, fmt.Errorf("failed service lookup: %s", err)
}
if svc != nil {
hasConnectInstance = true
}
hasNonConnectInstance := false
nonConnectQuery := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
iter, err := tx.Get(tableServices, indexService, nonConnectQuery)
if err != nil {
return false, false, fmt.Errorf("failed service lookup: %s", err)
}
for service := iter.Next(); service != nil; service = iter.Next() {
sn := service.(*structs.ServiceNode)
if !sn.ServiceConnect.Native {
hasNonConnectInstance = true
break
}
}
return hasConnectInstance, hasNonConnectInstance, nil
}
// updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
@ -3754,14 +3803,31 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi
// checkWildcardForGatewaysAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the
// gateway-services table.
func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, kind structs.GatewayServiceKind) error {
func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, ns *structs.NodeService, kind structs.GatewayServiceKind) error {
sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta}
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err)
}
supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return err
}
if ns != nil && ns.Connect.Native {
supportsIngress = true
} else {
supportsTerminating = true
}
for service := svcGateways.Next(); service != nil; service = svcGateways.Next() {
if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil {
if wildcardSvc.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress {
continue
}
if wildcardSvc.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating {
continue
}
// Copy the wildcard mapping and modify it
gatewaySvc := wildcardSvc.Clone()
@ -3818,12 +3884,28 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
}
}
// Check whether there are any connect or non-connect instances remaining for this service.
// If there are no connect instances left, ingress gateways with a wildcard entry can remove
// their association with it (same with terminating gateways if there are no non-connect
// instances left).
hasConnectInstance, hasNonConnectInstance, err := serviceConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return err
}
// Do the updates in a separate loop so we don't trash the iterator.
for _, m := range mappings {
// Only delete if association was created by a wildcard specifier.
// Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered
if m.FromWildcard {
if m.GatewayKind == structs.ServiceKindIngressGateway && hasConnectInstance {
continue
}
if m.GatewayKind == structs.ServiceKindTerminatingGateway && hasNonConnectInstance {
continue
}
if err := tx.Delete(tableGatewayServices, m); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}

44
agent/consul/state/catalog_test.go

@ -4,13 +4,14 @@ import (
"context"
crand "crypto/rand"
"fmt"
"github.com/hashicorp/consul/acl"
"reflect"
"sort"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/assert"
@ -5753,6 +5754,10 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
assert.Nil(t, s.EnsureService(13, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
assert.Nil(t, s.EnsureService(14, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
// Connect services (should be ignored by terminating gateway)
assert.Nil(t, s.EnsureService(15, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000}))
assert.Nil(t, s.EnsureService(16, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000}))
// Register two gateways
assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443}))
@ -5895,6 +5900,16 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
},
}
assert.Equal(t, expect, out)
// Delete the non-connect instance of api
assert.Nil(t, s.DeleteService(21, "foo", "api", nil, ""))
// Gateway with wildcard entry should have no services left, because the last
// non-connect instance of 'api' was deleted.
idx, out, err = s.GatewayServices(ws, "other-gateway", nil)
assert.Nil(t, err)
assert.Equal(t, idx, uint64(21))
assert.Empty(t, out)
}
func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
@ -5904,7 +5919,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
t.Run("check service1 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
// Multiple instances of the ingress2 service
require.Len(t, results, 4)
@ -5923,7 +5938,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
t.Run("check service2 ingress gateway", func(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 2)
ids := make(map[string]struct{})
@ -5941,7 +5956,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
ws := memdb.NewWatchSet()
idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 1)
require.Equal(t, "wildcardIngress", results[0].Service.ID)
})
@ -5952,17 +5967,17 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 3)
idx, results, err = s.CheckIngressServiceNodes(ws, "service2", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
require.Len(t, results, 1)
idx, results, err = s.CheckIngressServiceNodes(ws, "service3", nil)
require.NoError(t, err)
require.Equal(t, uint64(15), idx)
require.Equal(t, uint64(18), idx)
// TODO(ingress): index goes backward when deleting last config entry
// require.Equal(t,uint64(11), idx)
require.Len(t, results, 0)
@ -6346,8 +6361,8 @@ func TestStateStore_GatewayServices_IngressProtocolFiltering(t *testing.T) {
}
testRegisterNode(t, s, 0, "node1")
testRegisterService(t, s, 1, "node1", "service1")
testRegisterService(t, s, 2, "node1", "service2")
testRegisterConnectService(t, s, 1, "node1", "service1")
testRegisterConnectService(t, s, 2, "node1", "service2")
assert.NoError(t, s.EnsureConfigEntry(4, ingress1))
})
@ -6510,15 +6525,17 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
// Register a service against the nodes.
// Register some connect and non-connect services against the nodes.
testRegisterIngressService(t, s, 3, "node1", "wildcardIngress")
testRegisterIngressService(t, s, 4, "node1", "ingress1")
testRegisterIngressService(t, s, 5, "node1", "ingress2")
testRegisterIngressService(t, s, 6, "node2", "ingress2")
testRegisterIngressService(t, s, 7, "node1", "nothingIngress")
testRegisterService(t, s, 8, "node1", "service1")
testRegisterService(t, s, 9, "node2", "service2")
testRegisterService(t, s, 10, "node2", "service3")
testRegisterConnectService(t, s, 8, "node1", "service1")
testRegisterConnectService(t, s, 9, "node2", "service2")
testRegisterConnectService(t, s, 10, "node2", "service3")
testRegisterService(t, s, 17, "node1", "service4")
testRegisterService(t, s, 18, "node2", "service5")
// Default protocol to http
proxyDefaults := &structs.ProxyConfigEntry{
@ -7883,6 +7900,7 @@ func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
Address: "127.0.0.3",
Port: 443,
EnterpriseMeta: *defaultMeta,
Connect: structs.ServiceConnect{Native: true},
}
require.NoError(t, s.EnsureService(5, "foo", &svc))
assert.True(t, watchFired(ws))

4
agent/consul/state/config_entry.go

@ -371,7 +371,7 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a
gsKind = structs.GatewayServiceKindUnknown
}
serviceName := structs.NewServiceName(c.GetName(), c.GetEnterpriseMeta())
if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, gsKind); err != nil {
if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, nil, gsKind); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil {
@ -434,7 +434,7 @@ func insertConfigEntryWithTxn(tx WriteTxn, idx uint64, conf structs.ConfigEntry)
if err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, gsKind); err != nil {
if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, nil, gsKind); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err := checkGatewayAndUpdate(tx, idx, &sn, gsKind); err != nil {

6
agent/consul/state/state_store_test.go

@ -193,6 +193,12 @@ func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID s
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
}
func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, true, func(service *structs.NodeService) {
service.Connect = structs.ServiceConnect{Native: true}
})
}
func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
svc := &structs.NodeService{
ID: serviceID,

Loading…
Cancel
Save