Browse Source

Add new index for PeeredServiceName and ServiceVirtualIP (#13582)

For TProxy we will be leveraging the VirtualIP table, which needs to become peer-aware
pull/13600/head
Chris S. Kim 2 years ago committed by GitHub
parent
commit
2e4cb6f77d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      agent/consul/catalog_endpoint.go
  2. 12
      agent/consul/fsm/snapshot_oss_test.go
  3. 13
      agent/consul/leader_test.go
  4. 50
      agent/consul/state/catalog.go
  5. 12
      agent/consul/state/catalog_oss.go
  6. 42
      agent/consul/state/catalog_oss_test.go
  7. 14
      agent/consul/state/catalog_schema.go
  8. 57
      agent/consul/state/catalog_test.go

5
agent/consul/catalog_endpoint.go

@ -8,7 +8,7 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
@ -1036,6 +1036,7 @@ func (c *Catalog) VirtualIPForService(args *structs.ServiceSpecificRequest, repl
}
state := c.srv.fsm.State()
*reply, err = state.VirtualIPForService(structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta))
psn := structs.PeeredServiceName{Peer: args.PeerName, ServiceName: structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta)}
*reply, err = state.VirtualIPForService(psn)
return err
}

12
agent/consul/fsm/snapshot_oss_test.go

@ -451,7 +451,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
Port: 8000,
Connect: connectConf,
})
vip, err := fsm.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)}
vip, err := fsm.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.1")
@ -462,7 +463,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
Port: 9000,
Connect: connectConf,
})
vip, err = fsm.state.VirtualIPForService(structs.NewServiceName("backend", nil))
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("backend", nil)}
vip, err = fsm.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")
@ -592,10 +594,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Equal(t, uint64(25), checks[0].ModifyIndex)
// Verify virtual IPs are consistent.
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)}
vip, err = fsm2.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.1")
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("backend", nil))
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("backend", nil)}
vip, err = fsm2.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")

13
agent/consul/leader_test.go

@ -2258,7 +2258,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
})
require.NoError(t, err)
vip, err := state.VirtualIPForService(structs.NewServiceName("api", nil))
psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("api", nil)}
vip, err := state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, "", vip)
@ -2287,7 +2288,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
// Make sure the service referenced in the terminating gateway config doesn't have
// a virtual IP yet.
vip, err = state.VirtualIPForService(structs.NewServiceName("bar", nil))
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("bar", nil)}
vip, err = state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, "", vip)
@ -2316,8 +2318,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
},
})
require.NoError(t, err)
vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil))
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("api", nil)}
vip, err = state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, "240.0.0.1", vip)
@ -2345,7 +2347,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
// Make sure the baz service (only referenced in the config entry so far)
// has a virtual IP.
vip, err = state.VirtualIPForService(structs.NewServiceName("baz", nil))
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("baz", nil)}
vip, err = state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, "240.0.0.2", vip)
}

50
agent/consul/state/catalog.go

@ -7,7 +7,7 @@ import (
"reflect"
"strings"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/acl"
@ -872,9 +872,10 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
}
}
if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil {
return fmt.Errorf("failed to persist service name: %v", err)
// Only upsert KindServiceName if service is local
if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil {
return fmt.Errorf("failed to persist service name: %v", err)
}
}
// Update upstream/downstream mappings if it's a connect service
@ -896,7 +897,8 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
}
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
vip, err := assignServiceVirtualIP(tx, sn)
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
vip, err := assignServiceVirtualIP(tx, psn)
if err != nil {
return fmt.Errorf("failed updating virtual IP: %s", err)
}
@ -976,9 +978,8 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
// assignServiceVirtualIP assigns a virtual IP to the target service and updates
// the global virtual IP counter if necessary.
func assignServiceVirtualIP(tx WriteTxn, sn structs.ServiceName) (string, error) {
// TODO(peering): support VIPs
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string, error) {
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn)
if err != nil {
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
}
@ -1049,7 +1050,7 @@ func assignServiceVirtualIP(tx WriteTxn, sn structs.ServiceName) (string, error)
}
assignedVIP := ServiceVirtualIP{
Service: sn,
Service: psn,
IP: newEntry.IP,
}
if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil {
@ -1877,10 +1878,6 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return nil
}
// TODO: accept a non-pointer value for EnterpriseMeta
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}
// Delete any checks associated with the service. This will invalidate
// sessions as necessary.
nsq := NodeServiceQuery{
@ -1965,7 +1962,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}
if err := freeServiceVirtualIP(tx, svc.ServiceName, nil, entMeta); err != nil {
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name}
if err := freeServiceVirtualIP(tx, psn, nil); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
}
if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil {
@ -1981,7 +1979,11 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
// freeServiceVirtualIP is used to free a virtual IP for a service after the last instance
// is removed.
func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.ServiceName, entMeta *acl.EnterpriseMeta) error {
func freeServiceVirtualIP(
tx WriteTxn,
psn structs.PeeredServiceName,
excludeGateway *structs.ServiceName,
) error {
supported, err := virtualIPsSupported(tx, nil)
if err != nil {
return err
@ -1991,15 +1993,14 @@ func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.Servi
}
// Don't deregister the virtual IP if at least one terminating gateway still references this service.
sn := structs.NewServiceName(svc, entMeta)
termGatewaySupported, err := terminatingGatewayVirtualIPsSupported(tx, nil)
if err != nil {
return err
}
if termGatewaySupported {
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
svcGateways, err := tx.Get(tableGatewayServices, indexService, psn.ServiceName)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", sn.Name, err)
return fmt.Errorf("failed gateway lookup for %q: %s", psn.ServiceName.Name, err)
}
for service := svcGateways.Next(); service != nil; service = svcGateways.Next() {
@ -2012,7 +2013,7 @@ func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.Servi
}
}
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn)
if err != nil {
return fmt.Errorf("failed service virtual IP lookup: %s", err)
}
@ -2879,11 +2880,11 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.
return lib.MaxUint64(maxIdx, idx), results, nil
}
func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) {
func (s *Store) VirtualIPForService(psn structs.PeeredServiceName) (string, error) {
tx := s.db.Txn(false)
defer tx.Abort()
vip, err := tx.First(tableServiceVirtualIPs, indexID, sn)
vip, err := tx.First(tableServiceVirtualIPs, indexID, psn)
if err != nil {
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
}
@ -3336,7 +3337,9 @@ func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, ent
addrs := make(map[string]structs.ServiceAddress, len(services))
for _, s := range services {
sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta}
vip, err := assignServiceVirtualIP(tx, sn)
// Terminating Gateways cannot route to services in peered clusters
psn := structs.PeeredServiceName{ServiceName: sn, Peer: structs.DefaultPeerKeyword}
vip, err := assignServiceVirtualIP(tx, psn)
if err != nil {
return nil, err
}
@ -3413,7 +3416,8 @@ func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.T
return err
}
if len(nodes) == 0 {
if err := freeServiceVirtualIP(tx, sn.Name, &gatewayName, &sn.EnterpriseMeta); err != nil {
psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: sn}
if err := freeServiceVirtualIP(tx, psn, &gatewayName); err != nil {
return err
}
}

12
agent/consul/state/catalog_oss.go

@ -299,3 +299,15 @@ func updateKindServiceNamesIndex(tx WriteTxn, idx uint64, kind structs.ServiceKi
}
return nil
}
func indexFromPeeredServiceName(psn structs.PeeredServiceName) ([]byte, error) {
peer := structs.LocalPeerKeyword
if psn.Peer != "" {
peer = psn.Peer
}
var b indexBuilder
b.String(strings.ToLower(peer))
b.String(strings.ToLower(psn.ServiceName.Name))
return b.Bytes(), nil
}

42
agent/consul/state/catalog_oss_test.go

@ -669,8 +669,19 @@ func testIndexerTableServices() map[string]indexerTestCase {
func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
obj := ServiceVirtualIP{
Service: structs.ServiceName{
Name: "foo",
Service: structs.PeeredServiceName{
ServiceName: structs.ServiceName{
Name: "foo",
},
},
IP: net.ParseIP("127.0.0.1"),
}
peeredObj := ServiceVirtualIP{
Service: structs.PeeredServiceName{
ServiceName: structs.ServiceName{
Name: "foo",
},
Peer: "Billing",
},
IP: net.ParseIP("127.0.0.1"),
}
@ -678,14 +689,33 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
return map[string]indexerTestCase{
indexID: {
read: indexValue{
source: structs.ServiceName{
Name: "foo",
source: structs.PeeredServiceName{
ServiceName: structs.ServiceName{
Name: "foo",
},
},
expected: []byte("foo\x00"),
expected: []byte("internal\x00foo\x00"),
},
write: indexValue{
source: obj,
expected: []byte("foo\x00"),
expected: []byte("internal\x00foo\x00"),
},
extra: []indexerTestCase{
{
read: indexValue{
source: structs.PeeredServiceName{
ServiceName: structs.ServiceName{
Name: "foo",
},
Peer: "Billing",
},
expected: []byte("billing\x00foo\x00"),
},
write: indexValue{
source: peeredObj,
expected: []byte("billing\x00foo\x00"),
},
},
},
},
}

14
agent/consul/state/catalog_schema.go

@ -605,7 +605,7 @@ func (q NodeCheckQuery) PartitionOrDefault() string {
// ServiceVirtualIP is used to store a virtual IP associated with a service.
// It is also used to store assigned virtual IPs when a snapshot is created.
type ServiceVirtualIP struct {
Service structs.ServiceName
Service structs.PeeredServiceName
IP net.IP
}
@ -631,14 +631,22 @@ func serviceVirtualIPTableSchema() *memdb.TableSchema {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &ServiceNameIndex{
Field: "Service",
Indexer: indexerSingle[structs.PeeredServiceName, ServiceVirtualIP]{
readIndex: indexFromPeeredServiceName,
writeIndex: indexFromServiceVirtualIP,
},
},
},
}
}
func indexFromServiceVirtualIP(vip ServiceVirtualIP) ([]byte, error) {
if vip.Service.ServiceName.Name == "" {
return nil, errMissingValueForIndex
}
return indexFromPeeredServiceName(vip.Service)
}
func freeVirtualIPTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: tableFreeVirtualIPs,

57
agent/consul/state/catalog_test.go

@ -11,7 +11,7 @@ import (
"time"
"github.com/hashicorp/go-memdb"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -1799,7 +1799,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
require.NoError(t, s.EnsureService(10, "node1", ns1))
// Make sure there's a virtual IP for the foo service.
vip, err := s.VirtualIPForService(structs.ServiceName{Name: "foo"})
vip, err := s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "foo"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.1", vip)
@ -1830,7 +1830,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
require.NoError(t, s.EnsureService(11, "node1", ns2))
// Make sure the virtual IP has been incremented for the redis service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.2", vip)
@ -1846,7 +1846,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
// Delete the first service and make sure it no longer has a virtual IP assigned.
require.NoError(t, s.DeleteService(12, "node1", "foo", entMeta, ""))
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "connect-proxy"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "connect-proxy"}})
require.NoError(t, err)
assert.Equal(t, "", vip)
@ -1867,7 +1867,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
require.NoError(t, s.EnsureService(13, "node1", ns3))
// Make sure the virtual IP is unchanged for the redis service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.2", vip)
@ -1895,7 +1895,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
require.NoError(t, s.EnsureService(14, "node1", ns4))
// Make sure the virtual IP has allocated from the previously freed service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "web"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "web"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.1", vip)
@ -1905,6 +1905,41 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
taggedAddress = out.Services["web-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
assert.Equal(t, vip, taggedAddress.Address)
assert.Equal(t, ns4.Port, taggedAddress.Port)
// Register a node1 in another peer (technically this node would be imported
// and stored through the peering stream handlers).
testRegisterNodeOpts(t, s, 15, "node1", func(node *structs.Node) error {
node.PeerName = "billing"
return nil
})
// Register an identical service but imported from a peer
ns5 := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "4.4.4.4",
Port: 4444,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "web"},
EnterpriseMeta: *entMeta,
PeerName: "billing",
}
require.NoError(t, s.EnsureService(15, "node1", ns5))
// Make sure the virtual IP is different from the identically named local service.
vip, err = s.VirtualIPForService(structs.PeeredServiceName{Peer: "billing", ServiceName: structs.ServiceName{Name: "web"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.3", vip)
// Retrieve and verify
_, out, err = s.NodeServices(nil, "node1", nil, "billing")
require.NoError(t, err)
taggedAddress = out.Services["web-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
assert.Equal(t, vip, taggedAddress.Address)
assert.Equal(t, ns5.Port, taggedAddress.Port)
}
func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
@ -1931,7 +1966,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
require.NoError(t, s.EnsureService(10, "node1", ns1))
// Make sure there's a virtual IP for the foo service.
vip, err := s.VirtualIPForService(structs.ServiceName{Name: "foo"})
vip, err := s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "foo"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.1", vip)
@ -1961,7 +1996,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
require.NoError(t, s.EnsureService(11, "node1", ns2))
// Make sure the virtual IP has been incremented for the redis service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.2", vip)
@ -1976,7 +2011,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
// Delete the last service and make sure it no longer has a virtual IP assigned.
require.NoError(t, s.DeleteService(12, "node1", "redis", entMeta, ""))
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "redis"}})
require.NoError(t, err)
assert.Equal(t, "", vip)
@ -1996,7 +2031,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
}
require.NoError(t, s.EnsureService(13, "node1", ns3))
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "backend"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "backend"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.2", vip)
@ -2026,7 +2061,7 @@ func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
require.NoError(t, s.EnsureService(14, "node1", ns4))
// Make sure the virtual IP has been incremented for the frontend service.
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "frontend"})
vip, err = s.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "frontend"}})
require.NoError(t, err)
assert.Equal(t, "240.0.0.3", vip)

Loading…
Cancel
Save