diff --git a/.changelog/11738.txt b/.changelog/11738.txt new file mode 100644 index 0000000000..6584863e41 --- /dev/null +++ b/.changelog/11738.txt @@ -0,0 +1,3 @@ +```release-note:improvement +connect: **(Enterprise only)** add support for cross-partition transparent proxying. +``` \ No newline at end of file diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index cfddeff18c..b853b4acab 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -570,7 +570,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - index, services, err := state.ServiceList(ws, nil, &args.EnterpriseMeta) + index, services, err := state.ServiceList(ws, &args.EnterpriseMeta) if err != nil { return err } diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 10e1cd0610..6527068650 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -464,6 +464,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.NoError(t, err) require.Equal(t, vip, "240.0.0.2") + _, serviceNames, err := fsm.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical) + require.NoError(t, err) + + expect := []string{"backend", "db", "frontend", "web"} + for i, sn := range serviceNames { + require.Equal(t, expect[i], sn.Service.Name) + } + // Snapshot snap, err := fsm.Snapshot() require.NoError(t, err) @@ -690,10 +698,10 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.Len(t, roots, 2) // Verify provider state is restored. - _, state, err := fsm2.state.CAProviderState("asdf") + _, provider, err := fsm2.state.CAProviderState("asdf") require.NoError(t, err) - require.Equal(t, "foo", state.PrivateKey) - require.Equal(t, "bar", state.RootCert) + require.Equal(t, "foo", provider.PrivateKey) + require.Equal(t, "bar", provider.RootCert) // Verify CA configuration is restored. _, caConf, err := fsm2.state.CAConfig(nil) @@ -751,6 +759,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.NoError(t, err) require.Equal(t, meshConfig, meshConfigEntry) + _, restoredServiceNames, err := fsm2.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical) + require.NoError(t, err) + + expect = []string{"backend", "db", "frontend", "web"} + for i, sn := range restoredServiceNames { + require.Equal(t, expect[i], sn.Service.Name) + } + // Snapshot snap, err = fsm2.Snapshot() require.NoError(t, err) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index d0974c30e7..31bef38e3b 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -741,6 +741,9 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } + if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil { + return fmt.Errorf("failed to persist service name: %v", err) + } // Update upstream/downstream mappings if it's a connect service if svc.Kind == structs.ServiceKindConnectProxy || svc.Connect.Native { @@ -965,16 +968,14 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui return idx, results, nil } -func (s *Store) ServiceList(ws memdb.WatchSet, - include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { +func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { tx := s.db.Txn(false) defer tx.Abort() - return serviceListTxn(tx, ws, include, entMeta) + return serviceListTxn(tx, ws, entMeta) } -func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, - include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { +func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { idx := catalogServicesMaxIndex(tx, entMeta) services, err := tx.Get(tableServices, indexID+"_prefix", entMeta) @@ -986,11 +987,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, unique := make(map[structs.ServiceName]struct{}) for service := services.Next(); service != nil; service = services.Next() { svc := service.(*structs.ServiceNode) - // TODO (freddy) This is a hack to exclude certain kinds. - // Need a new index to query by kind and namespace, have to coordinate with consul foundations first - if include == nil || include(svc) { - unique[svc.CompoundServiceName()] = struct{}{} - } + unique[svc.CompoundServiceName()] = struct{}{} } results := make(structs.ServiceList, 0, len(unique)) @@ -1691,6 +1688,9 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil { return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) } + if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil { + return fmt.Errorf("failed to persist service name: %v", err) + } } } else { return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) @@ -2526,6 +2526,30 @@ func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) { return result.String(), nil } +func (s *Store) ServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + return serviceNamesOfKindTxn(tx, ws, kind) +} + +func serviceNamesOfKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) { + var names []*KindServiceName + iter, err := tx.Get(tableKindServiceNames, indexKindOnly, kind) + if err != nil { + return 0, nil, err + } + ws.Add(iter.WatchCh()) + + idx := kindServiceNamesMaxIndex(tx, ws, kind) + for name := iter.Next(); name != nil; name = iter.Next() { + ksn := name.(*KindServiceName) + names = append(names, ksn) + } + + return idx, names, nil +} + // parseCheckServiceNodes is used to parse through a given set of services, // and query for an associated node and a set of checks. This is the inner // method used to return a rich set of results from a more simple query. @@ -3862,3 +3886,44 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str return nil } + +func upsertKindServiceName(tx WriteTxn, idx uint64, kind structs.ServiceKind, name structs.ServiceName) error { + q := KindServiceNameQuery{Name: name.Name, Kind: kind, EnterpriseMeta: name.EnterpriseMeta} + existing, err := tx.First(tableKindServiceNames, indexID, q) + if err != nil { + return err + } + + // Service name is already known. Nothing to do. + if existing != nil { + return nil + } + + ksn := KindServiceName{ + Kind: kind, + Service: name, + RaftIndex: structs.RaftIndex{ + CreateIndex: idx, + ModifyIndex: idx, + }, + } + if err := tx.Insert(tableKindServiceNames, &ksn); err != nil { + return fmt.Errorf("failed inserting %s/%s into %s: %s", kind, name.String(), tableKindServiceNames, err) + } + if err := indexUpdateMaxTxn(tx, idx, kindServiceNameIndexName(kind)); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableKindServiceNames, err) + } + return nil +} + +func cleanupKindServiceName(tx WriteTxn, idx uint64, name structs.ServiceName, kind structs.ServiceKind) error { + q := KindServiceNameQuery{Name: name.Name, Kind: kind, EnterpriseMeta: name.EnterpriseMeta} + if _, err := tx.DeleteAll(tableKindServiceNames, indexID, q); err != nil { + return fmt.Errorf("failed to delete %s from %s: %s", name, tableKindServiceNames, err) + } + + if err := indexUpdateMaxTxn(tx, idx, kindServiceNameIndexName(kind)); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableKindServiceNames, err) + } + return nil +} diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index e71d13ae35..f2902ca719 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -5,6 +5,7 @@ package state import ( "fmt" + "strings" memdb "github.com/hashicorp/go-memdb" @@ -18,13 +19,7 @@ func serviceIndexName(name string, _ *structs.EnterpriseMeta) string { } func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) string { - switch kind { - case structs.ServiceKindTypical: - // needs a special case here - return "service_kind.typical" - default: - return "service_kind." + string(kind) - } + return "service_kind." + kind.Normalized() } func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error { @@ -192,3 +187,22 @@ func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest, _ bool) ( func (s *Store) ValidateRegisterRequest(_ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) { return nil, nil } + +func indexFromKindServiceName(arg interface{}) ([]byte, error) { + var b indexBuilder + + switch n := arg.(type) { + case KindServiceNameQuery: + b.String(strings.ToLower(string(n.Kind))) + b.String(strings.ToLower(n.Name)) + return b.Bytes(), nil + + case *KindServiceName: + b.String(strings.ToLower(string(n.Kind))) + b.String(strings.ToLower(n.Service.Name)) + return b.Bytes(), nil + + default: + return nil, fmt.Errorf("type must be KindServiceNameQuery or *KindServiceName: %T", arg) + } +} diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 04162072bc..5811416b15 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -412,3 +412,40 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase { }, } } + +func testIndexerTableKindServiceNames() map[string]indexerTestCase { + obj := &KindServiceName{ + Service: structs.ServiceName{ + Name: "web-sidecar-proxy", + }, + Kind: structs.ServiceKindConnectProxy, + } + + return map[string]indexerTestCase{ + indexID: { + read: indexValue{ + source: &KindServiceName{ + Service: structs.ServiceName{ + Name: "web-sidecar-proxy", + }, + Kind: structs.ServiceKindConnectProxy, + }, + expected: []byte("connect-proxy\x00web-sidecar-proxy\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("connect-proxy\x00web-sidecar-proxy\x00"), + }, + }, + indexKind: { + read: indexValue{ + source: structs.ServiceKindConnectProxy, + expected: []byte("connect-proxy\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("connect-proxy\x00"), + }, + }, + } +} diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index b67bf50490..c03f649be8 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -19,6 +19,7 @@ const ( tableMeshTopology = "mesh-topology" tableServiceVirtualIPs = "service-virtual-ips" tableFreeVirtualIPs = "free-virtual-ips" + tableKindServiceNames = "kind-service-names" indexID = "id" indexService = "service" @@ -661,3 +662,80 @@ func freeVirtualIPTableSchema() *memdb.TableSchema { }, } } + +type KindServiceName struct { + Kind structs.ServiceKind + Service structs.ServiceName + + structs.RaftIndex +} + +func kindServiceNameTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: tableKindServiceNames, + Indexes: map[string]*memdb.IndexSchema{ + indexID: { + Name: indexID, + AllowMissing: false, + Unique: true, + Indexer: indexerSingle{ + readIndex: indexFromKindServiceName, + writeIndex: indexFromKindServiceName, + }, + }, + indexKindOnly: { + Name: indexKindOnly, + AllowMissing: false, + Unique: false, + Indexer: indexerSingle{ + readIndex: indexFromKindServiceNameKindOnly, + writeIndex: indexFromKindServiceNameKindOnly, + }, + }, + }, + } +} + +// KindServiceNameQuery is used to lookup service names by kind or enterprise meta. +type KindServiceNameQuery struct { + Kind structs.ServiceKind + Name string + structs.EnterpriseMeta +} + +// NamespaceOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (q KindServiceNameQuery) NamespaceOrDefault() string { + return q.EnterpriseMeta.NamespaceOrDefault() +} + +// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (q KindServiceNameQuery) PartitionOrDefault() string { + return q.EnterpriseMeta.PartitionOrDefault() +} + +func indexFromKindServiceNameKindOnly(raw interface{}) ([]byte, error) { + switch x := raw.(type) { + case *KindServiceName: + var b indexBuilder + b.String(strings.ToLower(string(x.Kind))) + return b.Bytes(), nil + + case structs.ServiceKind: + var b indexBuilder + b.String(strings.ToLower(string(x))) + return b.Bytes(), nil + + default: + return nil, fmt.Errorf("type must be *KindServiceName or structs.ServiceKind: %T", raw) + } +} + +func kindServiceNamesMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind) uint64 { + return maxIndexWatchTxn(tx, ws, kindServiceNameIndexName(kind)) +} + +func kindServiceNameIndexName(kind structs.ServiceKind) string { + return "kind_service_names." + kind.Normalized() +} diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index c95989b809..bfca9a2d9a 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -7656,6 +7656,143 @@ func TestProtocolForIngressGateway(t *testing.T) { } } +func TestStateStore_EnsureService_ServiceNames(t *testing.T) { + s := testStateStore(t) + + // Create the service registration. + entMeta := structs.DefaultEnterpriseMetaInDefaultPartition() + + services := []structs.NodeService{ + { + Kind: structs.ServiceKindIngressGateway, + ID: "ingress-gateway", + Service: "ingress-gateway", + Address: "2.2.2.2", + Port: 2222, + EnterpriseMeta: *entMeta, + }, + { + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway", + Service: "mesh-gateway", + Address: "4.4.4.4", + Port: 4444, + EnterpriseMeta: *entMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "connect-proxy", + Service: "connect-proxy", + Address: "1.1.1.1", + Port: 1111, + Proxy: structs.ConnectProxyConfig{DestinationServiceName: "foo"}, + EnterpriseMeta: *entMeta, + }, + { + Kind: structs.ServiceKindTerminatingGateway, + ID: "terminating-gateway", + Service: "terminating-gateway", + Address: "3.3.3.3", + Port: 3333, + EnterpriseMeta: *entMeta, + }, + { + Kind: structs.ServiceKindTypical, + ID: "web", + Service: "web", + Address: "5.5.5.5", + Port: 5555, + EnterpriseMeta: *entMeta, + }, + } + + var idx uint64 + testRegisterNode(t, s, idx, "node1") + + for _, svc := range services { + idx++ + require.NoError(t, s.EnsureService(idx, "node1", &svc)) + + // Ensure the service name was stored for all of them under the appropriate kind + gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, svc.Kind) + require.NoError(t, err) + require.Equal(t, idx, gotIdx) + require.Len(t, gotNames, 1) + require.Equal(t, svc.CompoundServiceName(), gotNames[0].Service) + require.Equal(t, svc.Kind, gotNames[0].Kind) + } + + // Register another ingress gateway and there should be two names under the kind index + newIngress := structs.NodeService{ + Kind: structs.ServiceKindIngressGateway, + ID: "new-ingress-gateway", + Service: "new-ingress-gateway", + Address: "6.6.6.6", + Port: 6666, + EnterpriseMeta: *entMeta, + } + idx++ + require.NoError(t, s.EnsureService(idx, "node1", &newIngress)) + + gotIdx, got, err := s.ServiceNamesOfKind(nil, structs.ServiceKindIngressGateway) + require.NoError(t, err) + require.Equal(t, idx, gotIdx) + + expect := []*KindServiceName{ + { + Kind: structs.ServiceKindIngressGateway, + Service: structs.NewServiceName("ingress-gateway", nil), + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + { + Kind: structs.ServiceKindIngressGateway, + Service: structs.NewServiceName("new-ingress-gateway", nil), + RaftIndex: structs.RaftIndex{ + CreateIndex: idx, + ModifyIndex: idx, + }, + }, + } + require.Equal(t, expect, got) + + // Deregister an ingress gateway and the index should not slide back + idx++ + require.NoError(t, s.DeleteService(idx, "node1", "new-ingress-gateway", entMeta)) + + gotIdx, got, err = s.ServiceNamesOfKind(nil, structs.ServiceKindIngressGateway) + require.NoError(t, err) + require.Equal(t, idx, gotIdx) + require.Equal(t, expect[:1], got) + + // Registering another instance of a known service should not bump the kind index + newMGW := structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-1", + Service: "mesh-gateway", + Address: "7.7.7.7", + Port: 7777, + EnterpriseMeta: *entMeta, + } + idx++ + require.NoError(t, s.EnsureService(idx, "node1", &newMGW)) + + gotIdx, _, err = s.ServiceNamesOfKind(nil, structs.ServiceKindMeshGateway) + require.NoError(t, err) + require.Equal(t, uint64(2), gotIdx) + + // Deregister the single typical service and the service name should also be dropped + idx++ + require.NoError(t, s.DeleteService(idx, "node1", "web", entMeta)) + + gotIdx, got, err = s.ServiceNamesOfKind(nil, structs.ServiceKindTypical) + require.NoError(t, err) + require.Equal(t, idx, gotIdx) + require.Empty(t, got) +} + func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.Helper() if !t.Run(name, fn) { diff --git a/agent/consul/state/intention.go b/agent/consul/state/intention.go index 72850b29e9..f2f64500f3 100644 --- a/agent/consul/state/intention.go +++ b/agent/consul/state/intention.go @@ -995,36 +995,29 @@ func (s *Store) intentionTopologyTxn(tx ReadTxn, ws memdb.WatchSet, maxIdx = index } - // Check for a wildcard intention (* -> *) since it overrides the default decision from ACLs - if len(intentions) > 0 { - // Intentions with wildcard source and destination have the lowest precedence, so they are last in the list - ixn := intentions[len(intentions)-1] - - if ixn.HasWildcardSource() && ixn.HasWildcardDestination() { - defaultDecision = acl.Allow - if ixn.Action == structs.IntentionActionDeny { - defaultDecision = acl.Deny - } - } - } - - index, allServices, err := serviceListTxn(tx, ws, func(svc *structs.ServiceNode) bool { - // Only include ingress gateways as downstreams, since they cannot receive service mesh traffic - // TODO(freddy): One remaining issue is that this includes non-Connect services (typical services without a proxy) - // Ideally those should be excluded as well, since they can't be upstreams/downstreams without a proxy. - // Maybe start tracking services represented by proxies? (both sidecar and ingress) - if svc.ServiceKind == structs.ServiceKindTypical || (svc.ServiceKind == structs.ServiceKindIngressGateway && downstreams) { - return true - } - return false - }, target.WithWildcardNamespace()) + // TODO(tproxy): One remaining improvement is that this includes non-Connect services (typical services without a proxy) + // Ideally those should be excluded as well, since they can't be upstreams/downstreams without a proxy. + // Maybe narrow serviceNamesOfKindTxn to services represented by proxies? (ingress, sidecar-proxy, terminating) + index, services, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical) if err != nil { - return index, nil, fmt.Errorf("failed to fetch catalog service list: %v", err) + return index, nil, fmt.Errorf("failed to list ingress service names: %v", err) } if index > maxIdx { maxIdx = index } + if downstreams { + // Ingress gateways can only ever be downstreams, since mesh services don't dial them. + index, ingress, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindIngressGateway) + if err != nil { + return index, nil, fmt.Errorf("failed to list ingress service names: %v", err) + } + if index > maxIdx { + maxIdx = index + } + services = append(services, ingress...) + } + // When checking authorization to upstreams, the match type for the decision is `destination` because we are deciding // if upstream candidates are covered by intentions that have the target service as a source. // The reverse is true for downstreams. @@ -1032,11 +1025,13 @@ func (s *Store) intentionTopologyTxn(tx ReadTxn, ws memdb.WatchSet, if downstreams { decisionMatchType = structs.IntentionMatchSource } - result := make([]ServiceWithDecision, 0, len(allServices)) - for _, candidate := range allServices { + result := make([]ServiceWithDecision, 0, len(services)) + for _, svc := range services { + candidate := svc.Service if candidate.Name == structs.ConsulServiceName { continue } + opts := IntentionDecisionOpts{ Target: candidate.Name, Namespace: candidate.NamespaceOrDefault(), diff --git a/agent/consul/state/schema.go b/agent/consul/state/schema.go index 4005469fdd..75a2ffa747 100644 --- a/agent/consul/state/schema.go +++ b/agent/consul/state/schema.go @@ -40,6 +40,7 @@ func newDBSchema() *memdb.DBSchema { tombstonesTableSchema, usageTableSchema, freeVirtualIPTableSchema, + kindServiceNameTableSchema, ) withEnterpriseSchema(db) return db diff --git a/agent/consul/state/schema_test.go b/agent/consul/state/schema_test.go index b83491587d..7ef17c8fda 100644 --- a/agent/consul/state/schema_test.go +++ b/agent/consul/state/schema_test.go @@ -50,6 +50,7 @@ func TestNewDBSchema_Indexers(t *testing.T) { tableMeshTopology: testIndexerTableMeshTopology, tableGatewayServices: testIndexerTableGatewayServices, tableServiceVirtualIPs: testIndexerTableServiceVirtualIPs, + tableKindServiceNames: testIndexerTableKindServiceNames, // KV tableKVs: testIndexerTableKVs, tableTombstones: testIndexerTableTombstones, diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 6edbc5545f..e79cf6ef9e 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -72,6 +72,7 @@ const ( SystemMetadataRequestType = 31 ServiceVirtualIPRequestType = 32 FreeVirtualIPRequestType = 33 + KindServiceNamesType = 34 ) // if a new request type is added above it must be @@ -114,6 +115,7 @@ var requestTypeStrings = map[MessageType]string{ SystemMetadataRequestType: "SystemMetadata", ServiceVirtualIPRequestType: "ServiceVirtualIP", FreeVirtualIPRequestType: "FreeVirtualIP", + KindServiceNamesType: "KindServiceName", } const ( @@ -1029,6 +1031,13 @@ type ServiceNodes []*ServiceNode // ServiceKind is the kind of service being registered. type ServiceKind string +func (k ServiceKind) Normalized() string { + if k == ServiceKindTypical { + return "typical" + } + return string(k) +} + const ( // ServiceKindTypical is a typical, classic Consul service. This is // represented by the absence of a value. This was chosen for ease of diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 1b1292d64b..4cb85aea04 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -168,13 +168,22 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // We do not match on all endpoints here since it would lead to load balancing across // all instances when any instance address is dialed. for _, e := range endpoints { - if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" { + if vip := e.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]; vip.Address != "" { uniqueAddrs[vip.Address] = struct{}{} } + + // The virtualIPTag is used by consul-k8s to store the ClusterIP for a service. + // We only match on this virtual IP if the upstream is in the proxy's partition. + // This is because the IP is not guaranteed to be unique across k8s clusters. + if structs.EqualPartitions(e.Node.PartitionOrDefault(), cfgSnap.ProxyID.PartitionOrDefault()) { + if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" { + uniqueAddrs[vip.Address] = struct{}{} + } + } } - if len(uniqueAddrs) > 1 { - s.Logger.Warn("detected multiple virtual IPs for an upstream, all will be used to match traffic", - "upstream", id) + if len(uniqueAddrs) > 2 { + s.Logger.Debug("detected multiple virtual IPs for an upstream, all will be used to match traffic", + "upstream", id, "ip_count", len(uniqueAddrs)) } // For every potential address we collected, create the appropriate address prefix to match on. diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index c082d41ea6..acf197961b 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -863,7 +863,8 @@ func TestListenersFromSnapshot(t *testing.T) { Address: "9.9.9.9", Port: 9090, TaggedAddresses: map[string]structs.ServiceAddress{ - "virtual": {Address: "10.0.0.1"}, + "virtual": {Address: "10.0.0.1"}, + structs.TaggedAddressVirtualIP: {Address: "240.0.0.1"}, }, }, }, diff --git a/agent/xds/testdata/listeners/transparent-proxy.envoy-1-20-x.golden b/agent/xds/testdata/listeners/transparent-proxy.envoy-1-20-x.golden index 6c6691c618..d390e3d9f7 100644 --- a/agent/xds/testdata/listeners/transparent-proxy.envoy-1-20-x.golden +++ b/agent/xds/testdata/listeners/transparent-proxy.envoy-1-20-x.golden @@ -42,6 +42,10 @@ { "addressPrefix": "10.0.0.1", "prefixLen": 32 + }, + { + "addressPrefix": "240.0.0.1", + "prefixLen": 32 } ] },