diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 9fe11800b6..6508ba220b 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -8,7 +8,7 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - bexpr "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -1036,6 +1036,7 @@ func (c *Catalog) VirtualIPForService(args *structs.ServiceSpecificRequest, repl } state := c.srv.fsm.State() - *reply, err = state.VirtualIPForService(structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta)) + psn := structs.PeeredServiceName{Peer: args.PeerName, ServiceName: structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta)} + *reply, err = state.VirtualIPForService(psn) return err } diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 29678d1d00..bb81d1627d 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -451,7 +451,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { Port: 8000, Connect: connectConf, }) - vip, err := fsm.state.VirtualIPForService(structs.NewServiceName("frontend", nil)) + psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)} + vip, err := fsm.state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, vip, "240.0.0.1") @@ -462,7 +463,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { Port: 9000, Connect: connectConf, }) - vip, err = fsm.state.VirtualIPForService(structs.NewServiceName("backend", nil)) + psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("backend", nil)} + vip, err = fsm.state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, vip, "240.0.0.2") @@ -592,10 +594,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.Equal(t, uint64(25), checks[0].ModifyIndex) // Verify virtual IPs are consistent. - vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("frontend", nil)) + psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)} + vip, err = fsm2.state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, vip, "240.0.0.1") - vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("backend", nil)) + psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("backend", nil)} + vip, err = fsm2.state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, vip, "240.0.0.2") diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 7b04518fc4..b041d2f925 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -2258,7 +2258,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { }) require.NoError(t, err) - vip, err := state.VirtualIPForService(structs.NewServiceName("api", nil)) + psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("api", nil)} + vip, err := state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, "", vip) @@ -2287,7 +2288,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { // Make sure the service referenced in the terminating gateway config doesn't have // a virtual IP yet. - vip, err = state.VirtualIPForService(structs.NewServiceName("bar", nil)) + psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("bar", nil)} + vip, err = state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, "", vip) @@ -2316,8 +2318,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { }, }) require.NoError(t, err) - - vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil)) + psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("api", nil)} + vip, err = state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, "240.0.0.1", vip) @@ -2345,7 +2347,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { // Make sure the baz service (only referenced in the config entry so far) // has a virtual IP. - vip, err = state.VirtualIPForService(structs.NewServiceName("baz", nil)) + psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("baz", nil)} + vip, err = state.VirtualIPForService(psn) require.NoError(t, err) require.Equal(t, "240.0.0.2", vip) } diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 06990011ed..2777b2fd1f 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -7,7 +7,7 @@ import ( "reflect" "strings" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/mitchellh/copystructure" "github.com/hashicorp/consul/acl" @@ -872,9 +872,10 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool return fmt.Errorf("failed updating gateway mapping: %s", err) } } - } - if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil { - return fmt.Errorf("failed to persist service name: %v", err) + // Only upsert KindServiceName if service is local + if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil { + return fmt.Errorf("failed to persist service name: %v", err) + } } // Update upstream/downstream mappings if it's a connect service @@ -896,7 +897,8 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool } sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta} - vip, err := assignServiceVirtualIP(tx, sn) + psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn} + vip, err := assignServiceVirtualIP(tx, psn) if err != nil { return fmt.Errorf("failed updating virtual IP: %s", err) } @@ -976,9 +978,8 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool // assignServiceVirtualIP assigns a virtual IP to the target service and updates // the global virtual IP counter if necessary. -func assignServiceVirtualIP(tx WriteTxn, sn structs.ServiceName) (string, error) { - // TODO(peering): support VIPs - serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn) +func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string, error) { + serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn) if err != nil { return "", fmt.Errorf("failed service virtual IP lookup: %s", err) } @@ -1049,7 +1050,7 @@ func assignServiceVirtualIP(tx WriteTxn, sn structs.ServiceName) (string, error) } assignedVIP := ServiceVirtualIP{ - Service: sn, + Service: psn, IP: newEntry.IP, } if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil { @@ -1877,10 +1878,6 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return nil } - // TODO: accept a non-pointer value for EnterpriseMeta - if entMeta == nil { - entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() - } // Delete any checks associated with the service. This will invalidate // sessions as necessary. nsq := NodeServiceQuery{ @@ -1965,7 +1962,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) } } - if err := freeServiceVirtualIP(tx, svc.ServiceName, nil, entMeta); err != nil { + psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name} + if err := freeServiceVirtualIP(tx, psn, nil); err != nil { return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) } if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil { @@ -1981,7 +1979,11 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st // freeServiceVirtualIP is used to free a virtual IP for a service after the last instance // is removed. -func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.ServiceName, entMeta *acl.EnterpriseMeta) error { +func freeServiceVirtualIP( + tx WriteTxn, + psn structs.PeeredServiceName, + excludeGateway *structs.ServiceName, +) error { supported, err := virtualIPsSupported(tx, nil) if err != nil { return err @@ -1991,15 +1993,14 @@ func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.Servi } // Don't deregister the virtual IP if at least one terminating gateway still references this service. - sn := structs.NewServiceName(svc, entMeta) termGatewaySupported, err := terminatingGatewayVirtualIPsSupported(tx, nil) if err != nil { return err } if termGatewaySupported { - svcGateways, err := tx.Get(tableGatewayServices, indexService, sn) + svcGateways, err := tx.Get(tableGatewayServices, indexService, psn.ServiceName) if err != nil { - return fmt.Errorf("failed gateway lookup for %q: %s", sn.Name, err) + return fmt.Errorf("failed gateway lookup for %q: %s", psn.ServiceName.Name, err) } for service := svcGateways.Next(); service != nil; service = svcGateways.Next() { @@ -2012,7 +2013,7 @@ func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.Servi } } - serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn) + serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn) if err != nil { return fmt.Errorf("failed service virtual IP lookup: %s", err) } @@ -2879,11 +2880,11 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl. return lib.MaxUint64(maxIdx, idx), results, nil } -func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) { +func (s *Store) VirtualIPForService(psn structs.PeeredServiceName) (string, error) { tx := s.db.Txn(false) defer tx.Abort() - vip, err := tx.First(tableServiceVirtualIPs, indexID, sn) + vip, err := tx.First(tableServiceVirtualIPs, indexID, psn) if err != nil { return "", fmt.Errorf("failed service virtual IP lookup: %s", err) } @@ -3336,7 +3337,9 @@ func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, ent addrs := make(map[string]structs.ServiceAddress, len(services)) for _, s := range services { sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta} - vip, err := assignServiceVirtualIP(tx, sn) + // Terminating Gateways cannot route to services in peered clusters + psn := structs.PeeredServiceName{ServiceName: sn, Peer: structs.DefaultPeerKeyword} + vip, err := assignServiceVirtualIP(tx, psn) if err != nil { return nil, err } @@ -3413,7 +3416,8 @@ func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.T return err } if len(nodes) == 0 { - if err := freeServiceVirtualIP(tx, sn.Name, &gatewayName, &sn.EnterpriseMeta); err != nil { + psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: sn} + if err := freeServiceVirtualIP(tx, psn, &gatewayName); err != nil { return err } } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 706a265323..ee3a9e4871 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -299,3 +299,15 @@ func updateKindServiceNamesIndex(tx WriteTxn, idx uint64, kind structs.ServiceKi } return nil } + +func indexFromPeeredServiceName(psn structs.PeeredServiceName) ([]byte, error) { + peer := structs.LocalPeerKeyword + if psn.Peer != "" { + peer = psn.Peer + } + + var b indexBuilder + b.String(strings.ToLower(peer)) + b.String(strings.ToLower(psn.ServiceName.Name)) + return b.Bytes(), nil +} diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 7ed7429fc9..36d15b954d 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -669,8 +669,19 @@ func testIndexerTableServices() map[string]indexerTestCase { func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase { obj := ServiceVirtualIP{ - Service: structs.ServiceName{ - Name: "foo", + Service: structs.PeeredServiceName{ + ServiceName: structs.ServiceName{ + Name: "foo", + }, + }, + IP: net.ParseIP("127.0.0.1"), + } + peeredObj := ServiceVirtualIP{ + Service: structs.PeeredServiceName{ + ServiceName: structs.ServiceName{ + Name: "foo", + }, + Peer: "Billing", }, IP: net.ParseIP("127.0.0.1"), } @@ -678,14 +689,33 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase { return map[string]indexerTestCase{ indexID: { read: indexValue{ - source: structs.ServiceName{ - Name: "foo", + source: structs.PeeredServiceName{ + ServiceName: structs.ServiceName{ + Name: "foo", + }, }, - expected: []byte("foo\x00"), + expected: []byte("internal\x00foo\x00"), }, write: indexValue{ source: obj, - expected: []byte("foo\x00"), + expected: []byte("internal\x00foo\x00"), + }, + extra: []indexerTestCase{ + { + read: indexValue{ + source: structs.PeeredServiceName{ + ServiceName: structs.ServiceName{ + Name: "foo", + }, + Peer: "Billing", + }, + expected: []byte("billing\x00foo\x00"), + }, + write: indexValue{ + source: peeredObj, + expected: []byte("billing\x00foo\x00"), + }, + }, }, }, } diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 552d9c0bdd..d77487c4a2 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -605,7 +605,7 @@ func (q NodeCheckQuery) PartitionOrDefault() string { // ServiceVirtualIP is used to store a virtual IP associated with a service. // It is also used to store assigned virtual IPs when a snapshot is created. type ServiceVirtualIP struct { - Service structs.ServiceName + Service structs.PeeredServiceName IP net.IP } @@ -631,14 +631,22 @@ func serviceVirtualIPTableSchema() *memdb.TableSchema { Name: indexID, AllowMissing: false, Unique: true, - Indexer: &ServiceNameIndex{ - Field: "Service", + Indexer: indexerSingle[structs.PeeredServiceName, ServiceVirtualIP]{ + readIndex: indexFromPeeredServiceName, + writeIndex: indexFromServiceVirtualIP, }, }, }, } } +func indexFromServiceVirtualIP(vip ServiceVirtualIP) ([]byte, error) { + if vip.Service.ServiceName.Name == "" { + return nil, errMissingValueForIndex + } + return indexFromPeeredServiceName(vip.Service) +} + func freeVirtualIPTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: tableFreeVirtualIPs, diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index fd88ac2e10..d2a970b075 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -11,7 +11,7 @@ import ( "time" "github.com/hashicorp/go-memdb" - uuid "github.com/hashicorp/go-uuid" + "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1799,7 +1799,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { require.NoError(t, s.EnsureService(10, "node1", ns1)) // Make sure there's a virtual IP for the foo service. - vip, err := s.VirtualIPForService(structs.ServiceName{Name: "foo"}) + vip, err := s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "foo"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.1", vip) @@ -1830,7 +1830,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { require.NoError(t, s.EnsureService(11, "node1", ns2)) // Make sure the virtual IP has been incremented for the redis service. - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.2", vip) @@ -1846,7 +1846,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { // Delete the first service and make sure it no longer has a virtual IP assigned. require.NoError(t, s.DeleteService(12, "node1", "foo", entMeta, "")) - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "connect-proxy"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "connect-proxy"}}) require.NoError(t, err) assert.Equal(t, "", vip) @@ -1867,7 +1867,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { require.NoError(t, s.EnsureService(13, "node1", ns3)) // Make sure the virtual IP is unchanged for the redis service. - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.2", vip) @@ -1895,7 +1895,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { require.NoError(t, s.EnsureService(14, "node1", ns4)) // Make sure the virtual IP has allocated from the previously freed service. - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "web"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "web"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.1", vip) @@ -1905,6 +1905,41 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { taggedAddress = out.Services["web-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP] assert.Equal(t, vip, taggedAddress.Address) assert.Equal(t, ns4.Port, taggedAddress.Port) + + // Register a node1 in another peer (technically this node would be imported + // and stored through the peering stream handlers). + testRegisterNodeOpts(t, s, 15, "node1", func(node *structs.Node) error { + node.PeerName = "billing" + return nil + }) + // Register an identical service but imported from a peer + ns5 := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Address: "4.4.4.4", + Port: 4444, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + Proxy: structs.ConnectProxyConfig{DestinationServiceName: "web"}, + EnterpriseMeta: *entMeta, + PeerName: "billing", + } + require.NoError(t, s.EnsureService(15, "node1", ns5)) + + // Make sure the virtual IP is different from the identically named local service. + vip, err = s.VirtualIPForService(structs.PeeredServiceName{Peer: "billing", ServiceName: structs.ServiceName{Name: "web"}}) + require.NoError(t, err) + assert.Equal(t, "240.0.0.3", vip) + + // Retrieve and verify + _, out, err = s.NodeServices(nil, "node1", nil, "billing") + require.NoError(t, err) + taggedAddress = out.Services["web-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP] + assert.Equal(t, vip, taggedAddress.Address) + assert.Equal(t, ns5.Port, taggedAddress.Port) } func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { @@ -1931,7 +1966,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { require.NoError(t, s.EnsureService(10, "node1", ns1)) // Make sure there's a virtual IP for the foo service. - vip, err := s.VirtualIPForService(structs.ServiceName{Name: "foo"}) + vip, err := s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "foo"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.1", vip) @@ -1961,7 +1996,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { require.NoError(t, s.EnsureService(11, "node1", ns2)) // Make sure the virtual IP has been incremented for the redis service. - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.2", vip) @@ -1976,7 +2011,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { // Delete the last service and make sure it no longer has a virtual IP assigned. require.NoError(t, s.DeleteService(12, "node1", "redis", entMeta, "")) - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}}) require.NoError(t, err) assert.Equal(t, "", vip) @@ -1996,7 +2031,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { } require.NoError(t, s.EnsureService(13, "node1", ns3)) - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "backend"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "backend"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.2", vip) @@ -2026,7 +2061,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { require.NoError(t, s.EnsureService(14, "node1", ns4)) // Make sure the virtual IP has been incremented for the frontend service. - vip, err = s.VirtualIPForService(structs.ServiceName{Name: "frontend"}) + vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "frontend"}}) require.NoError(t, err) assert.Equal(t, "240.0.0.3", vip)