mirror of https://github.com/hashicorp/consul
Merge pull request #11738 from hashicorp/ap/tproxy
commit
f24a206712
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
connect: **(Enterprise only)** add support for cross-partition transparent proxying.
|
||||
```
|
|
@ -570,7 +570,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In
|
|||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, services, err := state.ServiceList(ws, nil, &args.EnterpriseMeta)
|
||||
index, services, err := state.ServiceList(ws, &args.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -464,6 +464,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, vip, "240.0.0.2")
|
||||
|
||||
_, serviceNames, err := fsm.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
|
||||
require.NoError(t, err)
|
||||
|
||||
expect := []string{"backend", "db", "frontend", "web"}
|
||||
for i, sn := range serviceNames {
|
||||
require.Equal(t, expect[i], sn.Service.Name)
|
||||
}
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
@ -690,10 +698,10 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.Len(t, roots, 2)
|
||||
|
||||
// Verify provider state is restored.
|
||||
_, state, err := fsm2.state.CAProviderState("asdf")
|
||||
_, provider, err := fsm2.state.CAProviderState("asdf")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "foo", state.PrivateKey)
|
||||
require.Equal(t, "bar", state.RootCert)
|
||||
require.Equal(t, "foo", provider.PrivateKey)
|
||||
require.Equal(t, "bar", provider.RootCert)
|
||||
|
||||
// Verify CA configuration is restored.
|
||||
_, caConf, err := fsm2.state.CAConfig(nil)
|
||||
|
@ -751,6 +759,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, meshConfig, meshConfigEntry)
|
||||
|
||||
_, restoredServiceNames, err := fsm2.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
|
||||
require.NoError(t, err)
|
||||
|
||||
expect = []string{"backend", "db", "frontend", "web"}
|
||||
for i, sn := range restoredServiceNames {
|
||||
require.Equal(t, expect[i], sn.Service.Name)
|
||||
}
|
||||
|
||||
// Snapshot
|
||||
snap, err = fsm2.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -741,6 +741,9 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
|
||||
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
||||
}
|
||||
if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil {
|
||||
return fmt.Errorf("failed to persist service name: %v", err)
|
||||
}
|
||||
|
||||
// Update upstream/downstream mappings if it's a connect service
|
||||
if svc.Kind == structs.ServiceKindConnectProxy || svc.Connect.Native {
|
||||
|
@ -965,16 +968,14 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui
|
|||
return idx, results, nil
|
||||
}
|
||||
|
||||
func (s *Store) ServiceList(ws memdb.WatchSet,
|
||||
include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
|
||||
func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return serviceListTxn(tx, ws, include, entMeta)
|
||||
return serviceListTxn(tx, ws, entMeta)
|
||||
}
|
||||
|
||||
func serviceListTxn(tx ReadTxn, ws memdb.WatchSet,
|
||||
include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
|
||||
func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
|
||||
idx := catalogServicesMaxIndex(tx, entMeta)
|
||||
|
||||
services, err := tx.Get(tableServices, indexID+"_prefix", entMeta)
|
||||
|
@ -986,11 +987,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet,
|
|||
unique := make(map[structs.ServiceName]struct{})
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
svc := service.(*structs.ServiceNode)
|
||||
// TODO (freddy) This is a hack to exclude certain kinds.
|
||||
// Need a new index to query by kind and namespace, have to coordinate with consul foundations first
|
||||
if include == nil || include(svc) {
|
||||
unique[svc.CompoundServiceName()] = struct{}{}
|
||||
}
|
||||
unique[svc.CompoundServiceName()] = struct{}{}
|
||||
}
|
||||
|
||||
results := make(structs.ServiceList, 0, len(unique))
|
||||
|
@ -1691,6 +1688,9 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil {
|
||||
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
|
||||
}
|
||||
if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil {
|
||||
return fmt.Errorf("failed to persist service name: %v", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
||||
|
@ -2526,6 +2526,30 @@ func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) {
|
|||
return result.String(), nil
|
||||
}
|
||||
|
||||
func (s *Store) ServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return serviceNamesOfKindTxn(tx, ws, kind)
|
||||
}
|
||||
|
||||
func serviceNamesOfKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
|
||||
var names []*KindServiceName
|
||||
iter, err := tx.Get(tableKindServiceNames, indexKindOnly, kind)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
idx := kindServiceNamesMaxIndex(tx, ws, kind)
|
||||
for name := iter.Next(); name != nil; name = iter.Next() {
|
||||
ksn := name.(*KindServiceName)
|
||||
names = append(names, ksn)
|
||||
}
|
||||
|
||||
return idx, names, nil
|
||||
}
|
||||
|
||||
// parseCheckServiceNodes is used to parse through a given set of services,
|
||||
// and query for an associated node and a set of checks. This is the inner
|
||||
// method used to return a rich set of results from a more simple query.
|
||||
|
@ -3862,3 +3886,44 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func upsertKindServiceName(tx WriteTxn, idx uint64, kind structs.ServiceKind, name structs.ServiceName) error {
|
||||
q := KindServiceNameQuery{Name: name.Name, Kind: kind, EnterpriseMeta: name.EnterpriseMeta}
|
||||
existing, err := tx.First(tableKindServiceNames, indexID, q)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Service name is already known. Nothing to do.
|
||||
if existing != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ksn := KindServiceName{
|
||||
Kind: kind,
|
||||
Service: name,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: idx,
|
||||
ModifyIndex: idx,
|
||||
},
|
||||
}
|
||||
if err := tx.Insert(tableKindServiceNames, &ksn); err != nil {
|
||||
return fmt.Errorf("failed inserting %s/%s into %s: %s", kind, name.String(), tableKindServiceNames, err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, kindServiceNameIndexName(kind)); err != nil {
|
||||
return fmt.Errorf("failed updating %s index: %v", tableKindServiceNames, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanupKindServiceName(tx WriteTxn, idx uint64, name structs.ServiceName, kind structs.ServiceKind) error {
|
||||
q := KindServiceNameQuery{Name: name.Name, Kind: kind, EnterpriseMeta: name.EnterpriseMeta}
|
||||
if _, err := tx.DeleteAll(tableKindServiceNames, indexID, q); err != nil {
|
||||
return fmt.Errorf("failed to delete %s from %s: %s", name, tableKindServiceNames, err)
|
||||
}
|
||||
|
||||
if err := indexUpdateMaxTxn(tx, idx, kindServiceNameIndexName(kind)); err != nil {
|
||||
return fmt.Errorf("failed updating %s index: %v", tableKindServiceNames, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ package state
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
||||
|
@ -18,13 +19,7 @@ func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
|||
}
|
||||
|
||||
func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) string {
|
||||
switch kind {
|
||||
case structs.ServiceKindTypical:
|
||||
// needs a special case here
|
||||
return "service_kind.typical"
|
||||
default:
|
||||
return "service_kind." + string(kind)
|
||||
}
|
||||
return "service_kind." + kind.Normalized()
|
||||
}
|
||||
|
||||
func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error {
|
||||
|
@ -192,3 +187,22 @@ func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest, _ bool) (
|
|||
func (s *Store) ValidateRegisterRequest(_ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func indexFromKindServiceName(arg interface{}) ([]byte, error) {
|
||||
var b indexBuilder
|
||||
|
||||
switch n := arg.(type) {
|
||||
case KindServiceNameQuery:
|
||||
b.String(strings.ToLower(string(n.Kind)))
|
||||
b.String(strings.ToLower(n.Name))
|
||||
return b.Bytes(), nil
|
||||
|
||||
case *KindServiceName:
|
||||
b.String(strings.ToLower(string(n.Kind)))
|
||||
b.String(strings.ToLower(n.Service.Name))
|
||||
return b.Bytes(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("type must be KindServiceNameQuery or *KindServiceName: %T", arg)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -412,3 +412,40 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
func testIndexerTableKindServiceNames() map[string]indexerTestCase {
|
||||
obj := &KindServiceName{
|
||||
Service: structs.ServiceName{
|
||||
Name: "web-sidecar-proxy",
|
||||
},
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
}
|
||||
|
||||
return map[string]indexerTestCase{
|
||||
indexID: {
|
||||
read: indexValue{
|
||||
source: &KindServiceName{
|
||||
Service: structs.ServiceName{
|
||||
Name: "web-sidecar-proxy",
|
||||
},
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
},
|
||||
expected: []byte("connect-proxy\x00web-sidecar-proxy\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: obj,
|
||||
expected: []byte("connect-proxy\x00web-sidecar-proxy\x00"),
|
||||
},
|
||||
},
|
||||
indexKind: {
|
||||
read: indexValue{
|
||||
source: structs.ServiceKindConnectProxy,
|
||||
expected: []byte("connect-proxy\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: obj,
|
||||
expected: []byte("connect-proxy\x00"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ const (
|
|||
tableMeshTopology = "mesh-topology"
|
||||
tableServiceVirtualIPs = "service-virtual-ips"
|
||||
tableFreeVirtualIPs = "free-virtual-ips"
|
||||
tableKindServiceNames = "kind-service-names"
|
||||
|
||||
indexID = "id"
|
||||
indexService = "service"
|
||||
|
@ -661,3 +662,80 @@ func freeVirtualIPTableSchema() *memdb.TableSchema {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
type KindServiceName struct {
|
||||
Kind structs.ServiceKind
|
||||
Service structs.ServiceName
|
||||
|
||||
structs.RaftIndex
|
||||
}
|
||||
|
||||
func kindServiceNameTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: tableKindServiceNames,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
indexID: {
|
||||
Name: indexID,
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: indexerSingle{
|
||||
readIndex: indexFromKindServiceName,
|
||||
writeIndex: indexFromKindServiceName,
|
||||
},
|
||||
},
|
||||
indexKindOnly: {
|
||||
Name: indexKindOnly,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: indexerSingle{
|
||||
readIndex: indexFromKindServiceNameKindOnly,
|
||||
writeIndex: indexFromKindServiceNameKindOnly,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// KindServiceNameQuery is used to lookup service names by kind or enterprise meta.
|
||||
type KindServiceNameQuery struct {
|
||||
Kind structs.ServiceKind
|
||||
Name string
|
||||
structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
// NamespaceOrDefault exists because structs.EnterpriseMeta uses a pointer
|
||||
// receiver for this method. Remove once that is fixed.
|
||||
func (q KindServiceNameQuery) NamespaceOrDefault() string {
|
||||
return q.EnterpriseMeta.NamespaceOrDefault()
|
||||
}
|
||||
|
||||
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
|
||||
// receiver for this method. Remove once that is fixed.
|
||||
func (q KindServiceNameQuery) PartitionOrDefault() string {
|
||||
return q.EnterpriseMeta.PartitionOrDefault()
|
||||
}
|
||||
|
||||
func indexFromKindServiceNameKindOnly(raw interface{}) ([]byte, error) {
|
||||
switch x := raw.(type) {
|
||||
case *KindServiceName:
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(string(x.Kind)))
|
||||
return b.Bytes(), nil
|
||||
|
||||
case structs.ServiceKind:
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(string(x)))
|
||||
return b.Bytes(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("type must be *KindServiceName or structs.ServiceKind: %T", raw)
|
||||
}
|
||||
}
|
||||
|
||||
func kindServiceNamesMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind) uint64 {
|
||||
return maxIndexWatchTxn(tx, ws, kindServiceNameIndexName(kind))
|
||||
}
|
||||
|
||||
func kindServiceNameIndexName(kind structs.ServiceKind) string {
|
||||
return "kind_service_names." + kind.Normalized()
|
||||
}
|
||||
|
|
|
@ -7656,6 +7656,143 @@ func TestProtocolForIngressGateway(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create the service registration.
|
||||
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||
|
||||
services := []structs.NodeService{
|
||||
{
|
||||
Kind: structs.ServiceKindIngressGateway,
|
||||
ID: "ingress-gateway",
|
||||
Service: "ingress-gateway",
|
||||
Address: "2.2.2.2",
|
||||
Port: 2222,
|
||||
EnterpriseMeta: *entMeta,
|
||||
},
|
||||
{
|
||||
Kind: structs.ServiceKindMeshGateway,
|
||||
ID: "mesh-gateway",
|
||||
Service: "mesh-gateway",
|
||||
Address: "4.4.4.4",
|
||||
Port: 4444,
|
||||
EnterpriseMeta: *entMeta,
|
||||
},
|
||||
{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "connect-proxy",
|
||||
Service: "connect-proxy",
|
||||
Address: "1.1.1.1",
|
||||
Port: 1111,
|
||||
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "foo"},
|
||||
EnterpriseMeta: *entMeta,
|
||||
},
|
||||
{
|
||||
Kind: structs.ServiceKindTerminatingGateway,
|
||||
ID: "terminating-gateway",
|
||||
Service: "terminating-gateway",
|
||||
Address: "3.3.3.3",
|
||||
Port: 3333,
|
||||
EnterpriseMeta: *entMeta,
|
||||
},
|
||||
{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
ID: "web",
|
||||
Service: "web",
|
||||
Address: "5.5.5.5",
|
||||
Port: 5555,
|
||||
EnterpriseMeta: *entMeta,
|
||||
},
|
||||
}
|
||||
|
||||
var idx uint64
|
||||
testRegisterNode(t, s, idx, "node1")
|
||||
|
||||
for _, svc := range services {
|
||||
idx++
|
||||
require.NoError(t, s.EnsureService(idx, "node1", &svc))
|
||||
|
||||
// Ensure the service name was stored for all of them under the appropriate kind
|
||||
gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, svc.Kind)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, gotIdx)
|
||||
require.Len(t, gotNames, 1)
|
||||
require.Equal(t, svc.CompoundServiceName(), gotNames[0].Service)
|
||||
require.Equal(t, svc.Kind, gotNames[0].Kind)
|
||||
}
|
||||
|
||||
// Register another ingress gateway and there should be two names under the kind index
|
||||
newIngress := structs.NodeService{
|
||||
Kind: structs.ServiceKindIngressGateway,
|
||||
ID: "new-ingress-gateway",
|
||||
Service: "new-ingress-gateway",
|
||||
Address: "6.6.6.6",
|
||||
Port: 6666,
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
idx++
|
||||
require.NoError(t, s.EnsureService(idx, "node1", &newIngress))
|
||||
|
||||
gotIdx, got, err := s.ServiceNamesOfKind(nil, structs.ServiceKindIngressGateway)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, gotIdx)
|
||||
|
||||
expect := []*KindServiceName{
|
||||
{
|
||||
Kind: structs.ServiceKindIngressGateway,
|
||||
Service: structs.NewServiceName("ingress-gateway", nil),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Kind: structs.ServiceKindIngressGateway,
|
||||
Service: structs.NewServiceName("new-ingress-gateway", nil),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: idx,
|
||||
ModifyIndex: idx,
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Equal(t, expect, got)
|
||||
|
||||
// Deregister an ingress gateway and the index should not slide back
|
||||
idx++
|
||||
require.NoError(t, s.DeleteService(idx, "node1", "new-ingress-gateway", entMeta))
|
||||
|
||||
gotIdx, got, err = s.ServiceNamesOfKind(nil, structs.ServiceKindIngressGateway)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, gotIdx)
|
||||
require.Equal(t, expect[:1], got)
|
||||
|
||||
// Registering another instance of a known service should not bump the kind index
|
||||
newMGW := structs.NodeService{
|
||||
Kind: structs.ServiceKindMeshGateway,
|
||||
ID: "mesh-gateway-1",
|
||||
Service: "mesh-gateway",
|
||||
Address: "7.7.7.7",
|
||||
Port: 7777,
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
idx++
|
||||
require.NoError(t, s.EnsureService(idx, "node1", &newMGW))
|
||||
|
||||
gotIdx, _, err = s.ServiceNamesOfKind(nil, structs.ServiceKindMeshGateway)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), gotIdx)
|
||||
|
||||
// Deregister the single typical service and the service name should also be dropped
|
||||
idx++
|
||||
require.NoError(t, s.DeleteService(idx, "node1", "web", entMeta))
|
||||
|
||||
gotIdx, got, err = s.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, gotIdx)
|
||||
require.Empty(t, got)
|
||||
}
|
||||
|
||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||
t.Helper()
|
||||
if !t.Run(name, fn) {
|
||||
|
|
|
@ -995,36 +995,29 @@ func (s *Store) intentionTopologyTxn(tx ReadTxn, ws memdb.WatchSet,
|
|||
maxIdx = index
|
||||
}
|
||||
|
||||
// Check for a wildcard intention (* -> *) since it overrides the default decision from ACLs
|
||||
if len(intentions) > 0 {
|
||||
// Intentions with wildcard source and destination have the lowest precedence, so they are last in the list
|
||||
ixn := intentions[len(intentions)-1]
|
||||
|
||||
if ixn.HasWildcardSource() && ixn.HasWildcardDestination() {
|
||||
defaultDecision = acl.Allow
|
||||
if ixn.Action == structs.IntentionActionDeny {
|
||||
defaultDecision = acl.Deny
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
index, allServices, err := serviceListTxn(tx, ws, func(svc *structs.ServiceNode) bool {
|
||||
// Only include ingress gateways as downstreams, since they cannot receive service mesh traffic
|
||||
// TODO(freddy): One remaining issue is that this includes non-Connect services (typical services without a proxy)
|
||||
// Ideally those should be excluded as well, since they can't be upstreams/downstreams without a proxy.
|
||||
// Maybe start tracking services represented by proxies? (both sidecar and ingress)
|
||||
if svc.ServiceKind == structs.ServiceKindTypical || (svc.ServiceKind == structs.ServiceKindIngressGateway && downstreams) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}, target.WithWildcardNamespace())
|
||||
// TODO(tproxy): One remaining improvement is that this includes non-Connect services (typical services without a proxy)
|
||||
// Ideally those should be excluded as well, since they can't be upstreams/downstreams without a proxy.
|
||||
// Maybe narrow serviceNamesOfKindTxn to services represented by proxies? (ingress, sidecar-proxy, terminating)
|
||||
index, services, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical)
|
||||
if err != nil {
|
||||
return index, nil, fmt.Errorf("failed to fetch catalog service list: %v", err)
|
||||
return index, nil, fmt.Errorf("failed to list ingress service names: %v", err)
|
||||
}
|
||||
if index > maxIdx {
|
||||
maxIdx = index
|
||||
}
|
||||
|
||||
if downstreams {
|
||||
// Ingress gateways can only ever be downstreams, since mesh services don't dial them.
|
||||
index, ingress, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindIngressGateway)
|
||||
if err != nil {
|
||||
return index, nil, fmt.Errorf("failed to list ingress service names: %v", err)
|
||||
}
|
||||
if index > maxIdx {
|
||||
maxIdx = index
|
||||
}
|
||||
services = append(services, ingress...)
|
||||
}
|
||||
|
||||
// When checking authorization to upstreams, the match type for the decision is `destination` because we are deciding
|
||||
// if upstream candidates are covered by intentions that have the target service as a source.
|
||||
// The reverse is true for downstreams.
|
||||
|
@ -1032,11 +1025,13 @@ func (s *Store) intentionTopologyTxn(tx ReadTxn, ws memdb.WatchSet,
|
|||
if downstreams {
|
||||
decisionMatchType = structs.IntentionMatchSource
|
||||
}
|
||||
result := make([]ServiceWithDecision, 0, len(allServices))
|
||||
for _, candidate := range allServices {
|
||||
result := make([]ServiceWithDecision, 0, len(services))
|
||||
for _, svc := range services {
|
||||
candidate := svc.Service
|
||||
if candidate.Name == structs.ConsulServiceName {
|
||||
continue
|
||||
}
|
||||
|
||||
opts := IntentionDecisionOpts{
|
||||
Target: candidate.Name,
|
||||
Namespace: candidate.NamespaceOrDefault(),
|
||||
|
|
|
@ -40,6 +40,7 @@ func newDBSchema() *memdb.DBSchema {
|
|||
tombstonesTableSchema,
|
||||
usageTableSchema,
|
||||
freeVirtualIPTableSchema,
|
||||
kindServiceNameTableSchema,
|
||||
)
|
||||
withEnterpriseSchema(db)
|
||||
return db
|
||||
|
|
|
@ -50,6 +50,7 @@ func TestNewDBSchema_Indexers(t *testing.T) {
|
|||
tableMeshTopology: testIndexerTableMeshTopology,
|
||||
tableGatewayServices: testIndexerTableGatewayServices,
|
||||
tableServiceVirtualIPs: testIndexerTableServiceVirtualIPs,
|
||||
tableKindServiceNames: testIndexerTableKindServiceNames,
|
||||
// KV
|
||||
tableKVs: testIndexerTableKVs,
|
||||
tableTombstones: testIndexerTableTombstones,
|
||||
|
|
|
@ -72,6 +72,7 @@ const (
|
|||
SystemMetadataRequestType = 31
|
||||
ServiceVirtualIPRequestType = 32
|
||||
FreeVirtualIPRequestType = 33
|
||||
KindServiceNamesType = 34
|
||||
)
|
||||
|
||||
// if a new request type is added above it must be
|
||||
|
@ -114,6 +115,7 @@ var requestTypeStrings = map[MessageType]string{
|
|||
SystemMetadataRequestType: "SystemMetadata",
|
||||
ServiceVirtualIPRequestType: "ServiceVirtualIP",
|
||||
FreeVirtualIPRequestType: "FreeVirtualIP",
|
||||
KindServiceNamesType: "KindServiceName",
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -1029,6 +1031,13 @@ type ServiceNodes []*ServiceNode
|
|||
// ServiceKind is the kind of service being registered.
|
||||
type ServiceKind string
|
||||
|
||||
func (k ServiceKind) Normalized() string {
|
||||
if k == ServiceKindTypical {
|
||||
return "typical"
|
||||
}
|
||||
return string(k)
|
||||
}
|
||||
|
||||
const (
|
||||
// ServiceKindTypical is a typical, classic Consul service. This is
|
||||
// represented by the absence of a value. This was chosen for ease of
|
||||
|
|
|
@ -168,13 +168,22 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
// We do not match on all endpoints here since it would lead to load balancing across
|
||||
// all instances when any instance address is dialed.
|
||||
for _, e := range endpoints {
|
||||
if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" {
|
||||
if vip := e.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]; vip.Address != "" {
|
||||
uniqueAddrs[vip.Address] = struct{}{}
|
||||
}
|
||||
|
||||
// The virtualIPTag is used by consul-k8s to store the ClusterIP for a service.
|
||||
// We only match on this virtual IP if the upstream is in the proxy's partition.
|
||||
// This is because the IP is not guaranteed to be unique across k8s clusters.
|
||||
if structs.EqualPartitions(e.Node.PartitionOrDefault(), cfgSnap.ProxyID.PartitionOrDefault()) {
|
||||
if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" {
|
||||
uniqueAddrs[vip.Address] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(uniqueAddrs) > 1 {
|
||||
s.Logger.Warn("detected multiple virtual IPs for an upstream, all will be used to match traffic",
|
||||
"upstream", id)
|
||||
if len(uniqueAddrs) > 2 {
|
||||
s.Logger.Debug("detected multiple virtual IPs for an upstream, all will be used to match traffic",
|
||||
"upstream", id, "ip_count", len(uniqueAddrs))
|
||||
}
|
||||
|
||||
// For every potential address we collected, create the appropriate address prefix to match on.
|
||||
|
|
|
@ -863,7 +863,8 @@ func TestListenersFromSnapshot(t *testing.T) {
|
|||
Address: "9.9.9.9",
|
||||
Port: 9090,
|
||||
TaggedAddresses: map[string]structs.ServiceAddress{
|
||||
"virtual": {Address: "10.0.0.1"},
|
||||
"virtual": {Address: "10.0.0.1"},
|
||||
structs.TaggedAddressVirtualIP: {Address: "240.0.0.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -42,6 +42,10 @@
|
|||
{
|
||||
"addressPrefix": "10.0.0.1",
|
||||
"prefixLen": 32
|
||||
},
|
||||
{
|
||||
"addressPrefix": "240.0.0.1",
|
||||
"prefixLen": 32
|
||||
}
|
||||
]
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue