mirror of https://github.com/hashicorp/consul
Merge pull request #9866 from hashicorp/dnephin/state-index-service-id
state: convert services.ID to the new pattern of functional indexerspull/9885/head
commit
1b1e91a2b7
|
@ -28,7 +28,7 @@ const (
|
|||
minUUIDLookupLen = 2
|
||||
)
|
||||
|
||||
// Query is type used to query any single value index that may include an
|
||||
// Query is a type used to query any single value index that may include an
|
||||
// enterprise identifier.
|
||||
type Query struct {
|
||||
Value string
|
||||
|
@ -141,7 +141,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
|
|||
// node info above to make sure we actually need to update the service
|
||||
// definition in order to prevent useless churn if nothing has changed.
|
||||
if req.Service != nil {
|
||||
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &req.Service.EnterpriseMeta, req.Node, req.Service.ID)
|
||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -602,7 +602,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
|
|||
// Returns an error if the write didn't happen and nil if write was successful.
|
||||
func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
|
||||
// Retrieve the existing service.
|
||||
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
|
||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -627,7 +627,7 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node
|
|||
// existing memdb transaction.
|
||||
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
|
||||
// Check for existing service
|
||||
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
|
||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -691,7 +691,7 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui
|
|||
idx := catalogServicesMaxIndex(tx, entMeta)
|
||||
|
||||
// List all the services.
|
||||
services, err := catalogServiceList(tx, entMeta, false)
|
||||
services, err := catalogServiceListNoWildcard(tx, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed querying services: %s", err)
|
||||
}
|
||||
|
@ -735,7 +735,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet,
|
|||
include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
|
||||
idx := catalogServicesMaxIndex(tx, entMeta)
|
||||
|
||||
services, err := catalogServiceList(tx, entMeta, true)
|
||||
services, err := tx.Get(tableServices, indexID+"_prefix", entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed querying services: %s", err)
|
||||
}
|
||||
|
@ -784,7 +784,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
|
|||
|
||||
// We don't want to track an unlimited number of services, so we pull a
|
||||
// top-level watch to use as a fallback.
|
||||
allServices, err := catalogServiceList(tx, entMeta, false)
|
||||
allServices, err := catalogServiceListNoWildcard(tx, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
@ -1052,7 +1052,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
|
|||
defer tx.Abort()
|
||||
|
||||
// List all the services.
|
||||
services, err := catalogServiceList(tx, entMeta, true)
|
||||
services, err := tx.Get(tableServices, indexID+"_prefix", entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -1142,8 +1142,13 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
|
|||
}
|
||||
|
||||
func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
|
||||
// TODO: pass non-pointer type for ent meta
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
// Query the service
|
||||
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
||||
}
|
||||
|
@ -1311,8 +1316,13 @@ func (s *Store) deleteServiceCASTxn(tx WriteTxn, idx, cidx uint64, nodeName, ser
|
|||
// deleteServiceTxn is the inner method called to remove a service
|
||||
// registration within an existing transaction.
|
||||
func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error {
|
||||
// TODO: pass non-pointer type for ent meta
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
// Look up the service.
|
||||
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -1493,7 +1503,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
|
|||
// If the check is associated with a service, check that we have
|
||||
// a registration for the service.
|
||||
if hc.ServiceID != "" {
|
||||
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", &hc.EnterpriseMeta, hc.Node, hc.ServiceID)
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -1806,7 +1816,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
|
|||
return err
|
||||
}
|
||||
|
||||
_, svcRaw, err := firstWatchCompoundWithTxn(tx, "services", "id", &existing.EnterpriseMeta, existing.Node, existing.ServiceID)
|
||||
_, svcRaw, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed retrieving service from state store: %v", err)
|
||||
}
|
||||
|
@ -2246,7 +2256,7 @@ func serviceDumpAllTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.Enterpris
|
|||
// Get the table index
|
||||
idx := catalogMaxIndexWatch(tx, ws, entMeta, true)
|
||||
|
||||
services, err := catalogServiceList(tx, entMeta, true)
|
||||
services, err := tx.Get(tableServices, indexID+"_prefix", entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -2293,7 +2303,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
|
|||
|
||||
// We don't want to track an unlimited number of services, so we pull a
|
||||
// top-level watch to use as a fallback.
|
||||
allServices, err := tx.Get("services", "id")
|
||||
allServices, err := tx.Get(tableServices, indexID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
|
|
@ -95,6 +95,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: this could use NodeServiceQuery
|
||||
type nodeServiceTuple struct {
|
||||
Node string
|
||||
ServiceID string
|
||||
|
@ -569,7 +570,7 @@ func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTu
|
|||
return stream.Event{}, err
|
||||
}
|
||||
|
||||
svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID)
|
||||
svc, err := tx.Get(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: tuple.EntMeta, Node: tuple.Node, Service: tuple.ServiceID})
|
||||
if err != nil {
|
||||
return stream.Event{}, err
|
||||
}
|
||||
|
|
|
@ -87,6 +87,37 @@ func indexFromNodeIdentity(raw interface{}) ([]byte, error) {
|
|||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func indexFromServiceNode(raw interface{}) ([]byte, error) {
|
||||
n, ok := raw.(*structs.ServiceNode)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw)
|
||||
}
|
||||
|
||||
if n.Node == "" {
|
||||
return nil, errMissingValueForIndex
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(n.Node))
|
||||
b.String(strings.ToLower(n.ServiceID))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func prefixIndexFromQuery(arg interface{}) ([]byte, error) {
|
||||
var b indexBuilder
|
||||
switch v := arg.(type) {
|
||||
case *structs.EnterpriseMeta:
|
||||
return nil, nil
|
||||
case structs.EnterpriseMeta:
|
||||
return nil, nil
|
||||
case Query:
|
||||
b.String(strings.ToLower(v.Value))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unexpected type %T for NodeServiceQuery prefix index", arg)
|
||||
}
|
||||
|
||||
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
||||
return fmt.Sprintf("service.%s", name)
|
||||
}
|
||||
|
@ -168,8 +199,8 @@ func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.Serv
|
|||
return maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind, nil))
|
||||
}
|
||||
|
||||
func catalogServiceList(tx ReadTxn, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
|
||||
return tx.Get("services", "id")
|
||||
func catalogServiceListNoWildcard(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
return tx.Get(tableServices, indexID)
|
||||
}
|
||||
|
||||
func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
|
|
|
@ -145,6 +145,36 @@ func testIndexerTableNodes() map[string]indexerTestCase {
|
|||
|
||||
func testIndexerTableServices() map[string]indexerTestCase {
|
||||
return map[string]indexerTestCase{
|
||||
indexID: {
|
||||
read: indexValue{
|
||||
source: NodeServiceQuery{
|
||||
Node: "NoDeId",
|
||||
Service: "SeRvIcE",
|
||||
},
|
||||
expected: []byte("nodeid\x00service\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.ServiceNode{
|
||||
Node: "NoDeId",
|
||||
ServiceID: "SeRviCe",
|
||||
},
|
||||
expected: []byte("nodeid\x00service\x00"),
|
||||
},
|
||||
prefix: []indexValue{
|
||||
{
|
||||
source: (*structs.EnterpriseMeta)(nil),
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
source: structs.EnterpriseMeta{},
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
source: Query{Value: "NoDeId"},
|
||||
expected: []byte("nodeid\x00"),
|
||||
},
|
||||
},
|
||||
},
|
||||
indexNode: {
|
||||
read: indexValue{
|
||||
source: Query{
|
||||
|
|
|
@ -72,17 +72,10 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
Name: indexID,
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "ServiceID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
Indexer: indexerSingleWithPrefix{
|
||||
readIndex: readIndex(indexFromNodeServiceQuery),
|
||||
writeIndex: writeIndex(indexFromServiceNode),
|
||||
prefixIndex: prefixIndex(prefixIndexFromQuery),
|
||||
},
|
||||
},
|
||||
indexNode: {
|
||||
|
|
|
@ -1298,7 +1298,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
|
|||
// the DB to make sure it is actually gone.
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
services, err := getCompoundWithTxn(tx, "services", "id", nil, "node1", "service1")
|
||||
services, err := tx.Get(tableServices, indexID, NodeServiceQuery{Node: "node1", Service: "service1"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ func (f readIndex) FromArgs(args ...interface{}) ([]byte, error) {
|
|||
|
||||
var errMissingValueForIndex = fmt.Errorf("object is missing a value for this index")
|
||||
|
||||
// writeIndex implements memdb.SingleIndexer. It is used so that a function
|
||||
// writeIndex implements memdb.SingleIndexer. It exists so that a function
|
||||
// can be used to provide this interface.
|
||||
//
|
||||
// Instead of a bool return value, writeIndex expects errMissingValueForIndex to
|
||||
|
@ -69,7 +69,7 @@ func (f writeIndex) FromObject(raw interface{}) (bool, []byte, error) {
|
|||
return err == nil, v, err
|
||||
}
|
||||
|
||||
// writeIndexMulti implements memdb.MultiIndexer. It is used so that a function
|
||||
// writeIndexMulti implements memdb.MultiIndexer. It exists so that a function
|
||||
// can be used to provide this interface.
|
||||
//
|
||||
// Instead of a bool return value, writeIndexMulti expects errMissingValueForIndex to
|
||||
|
|
|
@ -105,7 +105,7 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s
|
|||
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID)
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serv
|
|||
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID)
|
||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ table=services
|
|||
index=connect allow-missing
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.IndexConnectService
|
||||
index=id unique
|
||||
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceID Lowercase=true] AllowMissing=false
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingleWithPrefix readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromServiceNode prefixIndex=github.com/hashicorp/consul/agent/consul/state.prefixIndexFromQuery
|
||||
index=kind
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.IndexServiceKind
|
||||
index=node
|
||||
|
|
Loading…
Reference in New Issue