mirror of https://github.com/hashicorp/consul
Merge pull request #9797 from hashicorp/dnephin/state-index-node-id
state: convert nodes.ID to the new pattern of functional indexerspull/9867/head
commit
9d924a81a9
|
@ -29,6 +29,13 @@ const (
|
|||
minUUIDLookupLen = 2
|
||||
)
|
||||
|
||||
// Query is type used to query any single value index that may include an
|
||||
// enterprise identifier.
|
||||
type Query struct {
|
||||
Value string
|
||||
structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
func resizeNodeLookupKey(s string) string {
|
||||
l := len(s)
|
||||
|
||||
|
@ -41,7 +48,7 @@ func resizeNodeLookupKey(s string) string {
|
|||
|
||||
// Nodes is used to pull the full list of nodes for use during snapshots.
|
||||
func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
|
||||
iter, err := s.tx.Get("nodes", "id")
|
||||
iter, err := s.tx.Get(tableNodes, indexID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -128,7 +135,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
|
|||
// modify the node at all so we prevent watch churn and useless writes
|
||||
// and modify index bumps on the node.
|
||||
{
|
||||
existing, err := tx.First("nodes", "id", node.Node)
|
||||
existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node})
|
||||
if err != nil {
|
||||
return fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
|
@ -187,7 +194,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
|
|||
// If allowClashWithoutID then, getting a conflict on another node without ID will be allowed
|
||||
func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error {
|
||||
// Retrieve all of the nodes
|
||||
enodes, err := tx.Get("nodes", "id")
|
||||
enodes, err := tx.Get(tableNodes, indexID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot lookup all nodes: %s", err)
|
||||
}
|
||||
|
@ -289,7 +296,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
|
|||
|
||||
// Check for an existing node by name to support nodes with no IDs.
|
||||
if n == nil {
|
||||
existing, err := tx.First("nodes", "id", node.Node)
|
||||
existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node})
|
||||
if err != nil {
|
||||
return fmt.Errorf("node name lookup failed: %s", err)
|
||||
}
|
||||
|
@ -354,7 +361,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
|
|||
}
|
||||
|
||||
func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) {
|
||||
node, err := tx.First("nodes", "id", nodeName)
|
||||
node, err := tx.First(tableNodes, indexID, Query{Value: nodeName})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
|
@ -403,7 +410,7 @@ func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
|
|||
idx := maxIndexTxn(tx, "nodes")
|
||||
|
||||
// Retrieve all of the nodes
|
||||
nodes, err := tx.Get("nodes", "id")
|
||||
nodes, err := tx.Get(tableNodes, indexID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
@ -493,7 +500,7 @@ func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string)
|
|||
// the store within a given transaction.
|
||||
func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
|
||||
// Look up the node.
|
||||
node, err := tx.First("nodes", "id", nodeName)
|
||||
node, err := tx.First(tableNodes, indexID, Query{Value: nodeName})
|
||||
if err != nil {
|
||||
return fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
|
@ -654,7 +661,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||
// That's always populated when we read from the state store.
|
||||
entry := svc.ToServiceNode(node)
|
||||
// Get the node
|
||||
n, err := tx.First("nodes", "id", node)
|
||||
n, err := tx.First(tableNodes, indexID, Query{Value: node})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
@ -1083,7 +1090,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
|
|||
func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
|
||||
// We don't want to track an unlimited number of nodes, so we pull a
|
||||
// top-level watch to use as a fallback.
|
||||
allNodes, err := tx.Get("nodes", "id")
|
||||
allNodes, err := tx.Get(tableNodes, indexID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
@ -1098,7 +1105,7 @@ func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNo
|
|||
s := sn.PartialClone()
|
||||
|
||||
// Grab the corresponding node record.
|
||||
watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node)
|
||||
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
@ -1159,7 +1166,7 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
|
|||
idx := catalogMaxIndex(tx, entMeta, false)
|
||||
|
||||
// Query the node by node name
|
||||
watchCh, n, err := tx.FirstWatch("nodes", "id", nodeNameOrID)
|
||||
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: nodeNameOrID})
|
||||
if err != nil {
|
||||
return true, 0, nil, nil, fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
|
@ -1477,7 +1484,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
|
|||
}
|
||||
|
||||
// Get the node
|
||||
node, err := tx.First("nodes", "id", hc.Node)
|
||||
node, err := tx.First(tableNodes, indexID, Query{Value: hc.Node})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
@ -1703,7 +1710,7 @@ func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
|
|||
|
||||
// We don't want to track an unlimited number of nodes, so we pull a
|
||||
// top-level watch to use as a fallback.
|
||||
allNodes, err := tx.Get("nodes", "id")
|
||||
allNodes, err := tx.Get(tableNodes, indexID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
@ -1713,7 +1720,7 @@ func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
|
|||
var results structs.HealthChecks
|
||||
for check := iter.Next(); check != nil; check = iter.Next() {
|
||||
healthCheck := check.(*structs.HealthCheck)
|
||||
watchCh, node, err := tx.FirstWatch("nodes", "id", healthCheck.Node)
|
||||
watchCh, node, err := tx.FirstWatch(tableNodes, indexID, Query{Value: healthCheck.Node})
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
@ -2122,7 +2129,7 @@ func parseCheckServiceNodes(
|
|||
|
||||
// We don't want to track an unlimited number of nodes, so we pull a
|
||||
// top-level watch to use as a fallback.
|
||||
allNodes, err := tx.Get("nodes", "id")
|
||||
allNodes, err := tx.Get(tableNodes, indexID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
@ -2140,7 +2147,7 @@ func parseCheckServiceNodes(
|
|||
results := make(structs.CheckServiceNodes, 0, len(services))
|
||||
for _, sn := range services {
|
||||
// Retrieve the node.
|
||||
watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node)
|
||||
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node})
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
@ -2196,7 +2203,7 @@ func (s *Store) NodeInfo(ws memdb.WatchSet, node string, entMeta *structs.Enterp
|
|||
idx := catalogMaxIndex(tx, entMeta, true)
|
||||
|
||||
// Query the node by the passed node
|
||||
nodes, err := tx.Get("nodes", "id", node)
|
||||
nodes, err := tx.Get(tableNodes, indexID, Query{Value: node})
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
@ -2215,7 +2222,7 @@ func (s *Store) NodeDump(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui
|
|||
idx := catalogMaxIndex(tx, entMeta, true)
|
||||
|
||||
// Fetch all of the registered nodes
|
||||
nodes, err := tx.Get("nodes", "id")
|
||||
nodes, err := tx.Get(tableNodes, indexID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
|
|
@ -516,7 +516,7 @@ func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]strea
|
|||
// the full list of checks for a specific service on that node.
|
||||
func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc, error) {
|
||||
// Fetch the node
|
||||
nodeRaw, err := tx.First("nodes", "id", node)
|
||||
nodeRaw, err := tx.First(tableNodes, indexID, Query{Value: node})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -41,6 +41,34 @@ func indexFromNodeServiceQuery(arg interface{}) ([]byte, error) {
|
|||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func indexFromNode(raw interface{}) ([]byte, error) {
|
||||
n, ok := raw.(*structs.Node)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for structs.Node index", raw)
|
||||
}
|
||||
|
||||
if n.Node == "" {
|
||||
return nil, errMissingValueForIndex
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(n.Node))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
// indexFromNodeQuery builds an index key where Query.Value is lowercase, and is
|
||||
// a required value.
|
||||
func indexFromNodeQuery(arg interface{}) ([]byte, error) {
|
||||
q, ok := arg.(Query)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for Query index", arg)
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(q.Value))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
||||
return fmt.Sprintf("service.%s", name)
|
||||
}
|
||||
|
|
|
@ -24,3 +24,18 @@ func testIndexerTableChecks() map[string]indexerTestCase {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
func testIndexerTableNodes() map[string]indexerTestCase {
|
||||
return map[string]indexerTestCase{
|
||||
indexID: {
|
||||
read: indexValue{
|
||||
source: Query{Value: "NoDeId"},
|
||||
expected: []byte("nodeid\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.Node{Node: "NoDeId"},
|
||||
expected: []byte("nodeid\x00"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,8 +25,7 @@ const (
|
|||
indexNodeService = "node_service"
|
||||
)
|
||||
|
||||
// nodesTableSchema returns a new table schema used for storing node
|
||||
// information.
|
||||
// nodesTableSchema returns a new table schema used for storing struct.Node.
|
||||
func nodesTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: tableNodes,
|
||||
|
@ -35,18 +34,16 @@ func nodesTableSchema() *memdb.TableSchema {
|
|||
Name: indexID,
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
Indexer: indexerSingle{
|
||||
readIndex: readIndex(indexFromNodeQuery),
|
||||
writeIndex: writeIndex(indexFromNode),
|
||||
},
|
||||
},
|
||||
"uuid": {
|
||||
Name: "uuid",
|
||||
AllowMissing: true,
|
||||
Unique: true,
|
||||
Indexer: &memdb.UUIDFieldIndex{
|
||||
Field: "ID",
|
||||
},
|
||||
Indexer: &memdb.UUIDFieldIndex{Field: "ID"},
|
||||
},
|
||||
"meta": {
|
||||
Name: "meta",
|
||||
|
|
|
@ -146,7 +146,7 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
|
|||
// don't carefully sequence this, and since it will fix itself
|
||||
// on the next coordinate update from that node, we don't return
|
||||
// an error or log anything.
|
||||
node, err := tx.First("nodes", "id", update.Node)
|
||||
node, err := tx.First(tableNodes, indexID, Query{Value: update.Node})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
|
|
@ -129,6 +129,7 @@ func TestNewDBSchema_Indexers(t *testing.T) {
|
|||
|
||||
var testcases = map[string]func() map[string]indexerTestCase{
|
||||
tableChecks: testIndexerTableChecks,
|
||||
tableNodes: testIndexerTableNodes,
|
||||
}
|
||||
|
||||
for _, table := range schema.Tables {
|
||||
|
|
|
@ -195,7 +195,7 @@ func sessionCreateTxn(tx *txn, idx uint64, sess *structs.Session) error {
|
|||
sess.ModifyIndex = idx
|
||||
|
||||
// Check that the node exists
|
||||
node, err := tx.First("nodes", "id", sess.Node)
|
||||
node, err := tx.First(tableNodes, indexID, Query{Value: sess.Node})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string,
|
|||
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
n, err := tx.First("nodes", "id", nodeID)
|
||||
n, err := tx.First(tableNodes, indexID, Query{Value: nodeID})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ table=mesh-topology
|
|||
|
||||
table=nodes
|
||||
index=id unique
|
||||
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNode
|
||||
index=meta allow-missing
|
||||
indexer=github.com/hashicorp/go-memdb.StringMapFieldIndex Field=Meta Lowercase=false
|
||||
index=uuid unique allow-missing
|
||||
|
|
Loading…
Reference in New Issue