From d90845f26da27d968c1df13e58721739266da1f2 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 12 Feb 2021 13:39:38 -0500 Subject: [PATCH 1/4] state: move services.ID to new pattern --- agent/consul/state/catalog.go | 26 +++++++++++++------ agent/consul/state/catalog_events.go | 3 ++- agent/consul/state/catalog_oss.go | 18 ++++++++++++- agent/consul/state/catalog_schema.go | 14 +++------- agent/consul/state/catalog_test.go | 2 +- agent/consul/state/state_store_test.go | 4 +-- .../testdata/TestStateStoreSchema.golden | 2 +- 7 files changed, 44 insertions(+), 25 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index c91070ea0d..16b43cb284 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -141,7 +141,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b // node info above to make sure we actually need to update the service // definition in order to prevent useless churn if nothing has changed. if req.Service != nil { - _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &req.Service.EnterpriseMeta, req.Node, req.Service.ID) + _, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -602,7 +602,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed") // Returns an error if the write didn't happen and nil if write was successful. func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error { // Retrieve the existing service. - _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) + _, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -627,7 +627,7 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node // existing memdb transaction. func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error { // Check for existing service - _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) + _, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -1142,8 +1142,13 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs. } func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { + // TODO: pass non-pointer type for ent meta + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Query the service - _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) + _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID}) if err != nil { return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) } @@ -1311,8 +1316,13 @@ func (s *Store) deleteServiceCASTxn(tx WriteTxn, idx, cidx uint64, nodeName, ser // deleteServiceTxn is the inner method called to remove a service // registration within an existing transaction. func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error { + // TODO: pass non-pointer type for ent meta + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Look up the service. - _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) + _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -1493,7 +1503,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc // If the check is associated with a service, check that we have // a registration for the service. if hc.ServiceID != "" { - _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", &hc.EnterpriseMeta, hc.Node, hc.ServiceID) + _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -1806,7 +1816,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ return err } - _, svcRaw, err := firstWatchCompoundWithTxn(tx, "services", "id", &existing.EnterpriseMeta, existing.Node, existing.ServiceID) + _, svcRaw, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID}) if err != nil { return fmt.Errorf("failed retrieving service from state store: %v", err) } @@ -2293,7 +2303,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, // We don't want to track an unlimited number of services, so we pull a // top-level watch to use as a fallback. - allServices, err := tx.Get("services", "id") + allServices, err := tx.Get(tableServices, indexID) if err != nil { return 0, nil, fmt.Errorf("failed services lookup: %s", err) } diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 9c123c1f50..4e407baeb3 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -95,6 +95,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { } } +// TODO: this could use NodeServiceQuery type nodeServiceTuple struct { Node string ServiceID string @@ -569,7 +570,7 @@ func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTu return stream.Event{}, err } - svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID) + svc, err := tx.Get(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: tuple.EntMeta, Node: tuple.Node, Service: tuple.ServiceID}) if err != nil { return stream.Event{}, err } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index cf225056c6..391f8529c4 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -87,6 +87,22 @@ func indexFromNodeIdentity(raw interface{}) ([]byte, error) { return b.Bytes(), nil } +func indexFromServiceNode(raw interface{}) ([]byte, error) { + n, ok := raw.(*structs.ServiceNode) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw) + } + + if n.Node == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(n.Node)) + b.String(strings.ToLower(n.ServiceID)) + return b.Bytes(), nil +} + func serviceIndexName(name string, _ *structs.EnterpriseMeta) string { return fmt.Sprintf("service.%s", name) } @@ -169,7 +185,7 @@ func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.Serv } func catalogServiceList(tx ReadTxn, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) { - return tx.Get("services", "id") + return tx.Get(tableServices, indexID) } func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 35bd5405a4..85ca8151ba 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -72,17 +72,9 @@ func servicesTableSchema() *memdb.TableSchema { Name: indexID, AllowMissing: false, Unique: true, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "ServiceID", - Lowercase: true, - }, - }, + Indexer: indexerSingle{ + readIndex: readIndex(indexFromNodeServiceQuery), + writeIndex: writeIndex(indexFromServiceNode), }, }, indexNode: { diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 63586c43ae..1f22948d96 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1298,7 +1298,7 @@ func TestStateStore_DeleteNode(t *testing.T) { // the DB to make sure it is actually gone. tx := s.db.Txn(false) defer tx.Abort() - services, err := getCompoundWithTxn(tx, "services", "id", nil, "node1", "service1") + services, err := tx.Get(tableServices, indexID, NodeServiceQuery{Node: "node1", Service: "service1"}) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 90454d907c..bc87eda27b 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -105,7 +105,7 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s tx := s.db.Txn(false) defer tx.Abort() - _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID) + _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID}) if err != nil { t.Fatalf("err: %s", err) } @@ -138,7 +138,7 @@ func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serv tx := s.db.Txn(false) defer tx.Abort() - _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID) + _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID}) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/consul/state/testdata/TestStateStoreSchema.golden b/agent/consul/state/testdata/TestStateStoreSchema.golden index 618ec8d435..e8ae438df2 100644 --- a/agent/consul/state/testdata/TestStateStoreSchema.golden +++ b/agent/consul/state/testdata/TestStateStoreSchema.golden @@ -150,7 +150,7 @@ table=services index=connect allow-missing indexer=github.com/hashicorp/consul/agent/consul/state.IndexConnectService index=id unique - indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceID Lowercase=true] AllowMissing=false + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromServiceNode index=kind indexer=github.com/hashicorp/consul/agent/consul/state.IndexServiceKind index=node From 627c469f08fc2c8d490a6b07fd673a706d337a12 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 12 Feb 2021 16:52:09 -0500 Subject: [PATCH 2/4] state: fix prefix index with the new pattern Prefix queries are generally being used to match part of a partial index. We can support these indexes by using a function that accept different types for each subset of the index. What I found interesting is that in the generic StringFieldIndexer the implementation for PrefixFromArgs would remove the trailing null, but at least in these 2 cases we actually want a null terminated string. We simply want fewer components in the string. --- agent/consul/state/catalog.go | 2 +- agent/consul/state/catalog_oss.go | 11 +++++++++++ agent/consul/state/catalog_schema.go | 7 ++++--- agent/consul/state/indexer.go | 4 ++-- .../consul/state/testdata/TestStateStoreSchema.golden | 2 +- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 16b43cb284..56e7c1c607 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -28,7 +28,7 @@ const ( minUUIDLookupLen = 2 ) -// Query is type used to query any single value index that may include an +// Query is a type used to query any single value index that may include an // enterprise identifier. type Query struct { Value string diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 391f8529c4..e1bcbdd0ea 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -103,6 +103,17 @@ func indexFromServiceNode(raw interface{}) ([]byte, error) { return b.Bytes(), nil } +func prefixIndexFromQuery(arg interface{}) ([]byte, error) { + var b indexBuilder + switch v := arg.(type) { + case Query: + b.String(strings.ToLower(v.Value)) + return b.Bytes(), nil + } + + return nil, fmt.Errorf("unexpected type %T for NodeServiceQuery prefix index", arg) +} + func serviceIndexName(name string, _ *structs.EnterpriseMeta) string { return fmt.Sprintf("service.%s", name) } diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 85ca8151ba..506da6f5aa 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -72,9 +72,10 @@ func servicesTableSchema() *memdb.TableSchema { Name: indexID, AllowMissing: false, Unique: true, - Indexer: indexerSingle{ - readIndex: readIndex(indexFromNodeServiceQuery), - writeIndex: writeIndex(indexFromServiceNode), + Indexer: indexerSingleWithPrefix{ + readIndex: readIndex(indexFromNodeServiceQuery), + writeIndex: writeIndex(indexFromServiceNode), + prefixIndex: prefixIndex(prefixIndexFromQuery), }, }, indexNode: { diff --git a/agent/consul/state/indexer.go b/agent/consul/state/indexer.go index 22474fc82f..66f1c5e8f0 100644 --- a/agent/consul/state/indexer.go +++ b/agent/consul/state/indexer.go @@ -53,7 +53,7 @@ func (f readIndex) FromArgs(args ...interface{}) ([]byte, error) { var errMissingValueForIndex = fmt.Errorf("object is missing a value for this index") -// writeIndex implements memdb.SingleIndexer. It is used so that a function +// writeIndex implements memdb.SingleIndexer. It exists so that a function // can be used to provide this interface. // // Instead of a bool return value, writeIndex expects errMissingValueForIndex to @@ -69,7 +69,7 @@ func (f writeIndex) FromObject(raw interface{}) (bool, []byte, error) { return err == nil, v, err } -// writeIndexMulti implements memdb.MultiIndexer. It is used so that a function +// writeIndexMulti implements memdb.MultiIndexer. It exists so that a function // can be used to provide this interface. // // Instead of a bool return value, writeIndexMulti expects errMissingValueForIndex to diff --git a/agent/consul/state/testdata/TestStateStoreSchema.golden b/agent/consul/state/testdata/TestStateStoreSchema.golden index e8ae438df2..7f74557800 100644 --- a/agent/consul/state/testdata/TestStateStoreSchema.golden +++ b/agent/consul/state/testdata/TestStateStoreSchema.golden @@ -150,7 +150,7 @@ table=services index=connect allow-missing indexer=github.com/hashicorp/consul/agent/consul/state.IndexConnectService index=id unique - indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromServiceNode + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingleWithPrefix readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromServiceNode prefixIndex=github.com/hashicorp/consul/agent/consul/state.prefixIndexFromQuery index=kind indexer=github.com/hashicorp/consul/agent/consul/state.IndexServiceKind index=node From dbd3cef1ed90cfd2216462ace1685c7910621999 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 10 Mar 2021 18:26:44 -0500 Subject: [PATCH 3/4] state: handle wildcard for services.ID index When listing services, use the id_prefix directly if wildcards are allowed. Error if a wildcard is used for a query that does not index the wildcard --- agent/consul/state/catalog.go | 10 +++++----- agent/consul/state/catalog_oss.go | 6 +++++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 56e7c1c607..0171d6b247 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -691,7 +691,7 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui idx := catalogServicesMaxIndex(tx, entMeta) // List all the services. - services, err := catalogServiceList(tx, entMeta, false) + services, err := catalogServiceListNoWildcard(tx, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed querying services: %s", err) } @@ -735,7 +735,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { idx := catalogServicesMaxIndex(tx, entMeta) - services, err := catalogServiceList(tx, entMeta, true) + services, err := tx.Get(tableServices, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed querying services: %s", err) } @@ -784,7 +784,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, // We don't want to track an unlimited number of services, so we pull a // top-level watch to use as a fallback. - allServices, err := catalogServiceList(tx, entMeta, false) + allServices, err := catalogServiceListNoWildcard(tx, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed services lookup: %s", err) } @@ -1052,7 +1052,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta * defer tx.Abort() // List all the services. - services, err := catalogServiceList(tx, entMeta, true) + services, err := tx.Get(tableServices, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } @@ -2256,7 +2256,7 @@ func serviceDumpAllTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.Enterpris // Get the table index idx := catalogMaxIndexWatch(tx, ws, entMeta, true) - services, err := catalogServiceList(tx, entMeta, true) + services, err := tx.Get(tableServices, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index e1bcbdd0ea..547ea89b11 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -106,6 +106,10 @@ func indexFromServiceNode(raw interface{}) ([]byte, error) { func prefixIndexFromQuery(arg interface{}) ([]byte, error) { var b indexBuilder switch v := arg.(type) { + case *structs.EnterpriseMeta: + return nil, nil + case structs.EnterpriseMeta: + return nil, nil case Query: b.String(strings.ToLower(v.Value)) return b.Bytes(), nil @@ -195,7 +199,7 @@ func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.Serv return maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind, nil)) } -func catalogServiceList(tx ReadTxn, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) { +func catalogServiceListNoWildcard(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get(tableServices, indexID) } From 0c14f3818ded3ccb8754886b6f01f6760479e187 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 10 Mar 2021 18:34:19 -0500 Subject: [PATCH 4/4] state: add indexer test for services.ID index --- agent/consul/state/catalog_oss_test.go | 30 ++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index cbaa1bf1f2..39d61ac10f 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -145,6 +145,36 @@ func testIndexerTableNodes() map[string]indexerTestCase { func testIndexerTableServices() map[string]indexerTestCase { return map[string]indexerTestCase{ + indexID: { + read: indexValue{ + source: NodeServiceQuery{ + Node: "NoDeId", + Service: "SeRvIcE", + }, + expected: []byte("nodeid\x00service\x00"), + }, + write: indexValue{ + source: &structs.ServiceNode{ + Node: "NoDeId", + ServiceID: "SeRviCe", + }, + expected: []byte("nodeid\x00service\x00"), + }, + prefix: []indexValue{ + { + source: (*structs.EnterpriseMeta)(nil), + expected: nil, + }, + { + source: structs.EnterpriseMeta{}, + expected: nil, + }, + { + source: Query{Value: "NoDeId"}, + expected: []byte("nodeid\x00"), + }, + }, + }, indexNode: { read: indexValue{ source: Query{