mirror of https://github.com/hashicorp/consul
Merge pull request #9881 from hashicorp/dnephin/state-index-service-check-nodes
state: convert services.node and checks.node indexespull/9865/head
commit
9f03e23e44
|
@ -58,21 +58,13 @@ func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
|
|||
// Services is used to pull the full list of services for a given node for use
|
||||
// during snapshots.
|
||||
func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) {
|
||||
iter, err := catalogServiceListByNode(s.tx, node, structs.WildcardEnterpriseMeta(), true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
return s.tx.Get(tableServices, indexNode, Query{Value: node})
|
||||
}
|
||||
|
||||
// Checks is used to pull the full list of checks for a given node for use
|
||||
// during snapshots.
|
||||
func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
|
||||
iter, err := catalogListChecksByNode(s.tx, node, structs.WildcardEnterpriseMeta())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
return s.tx.Get(tableChecks, indexNode, Query{Value: node})
|
||||
}
|
||||
|
||||
// Registration is used to make sure a node, service, and check registration is
|
||||
|
@ -509,7 +501,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
|
|||
}
|
||||
|
||||
// Delete all services associated with the node and update the service index.
|
||||
services, err := tx.Get("services", "node", nodeName)
|
||||
services, err := tx.Get(tableServices, indexNode, Query{Value: nodeName})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -535,7 +527,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
|
|||
|
||||
// Delete all checks associated with the node. This will invalidate
|
||||
// sessions as necessary.
|
||||
checks, err := tx.Get("checks", "node", nodeName)
|
||||
checks, err := tx.Get(tableChecks, indexNode, Query{Value: nodeName})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
|
@ -1414,7 +1406,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
|||
|
||||
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
|
||||
func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error {
|
||||
services, err := tx.Get("services", "node", nodeID)
|
||||
services, err := tx.Get(tableServices, indexNode, Query{Value: nodeID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed updating services for node %s: %s", nodeID, err)
|
||||
}
|
||||
|
@ -1589,11 +1581,15 @@ func (s *Store) NodeChecks(ws memdb.WatchSet, nodeName string, entMeta *structs.
|
|||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
// Get the table index.
|
||||
idx := catalogChecksMaxIndex(tx, entMeta)
|
||||
|
||||
// Return the checks.
|
||||
iter, err := catalogListChecksByNode(tx, nodeName, entMeta)
|
||||
iter, err := catalogListChecksByNode(tx, Query{Value: nodeName, EnterpriseMeta: *entMeta})
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
|
@ -2286,6 +2282,10 @@ func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind,
|
|||
func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
|
||||
iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) {
|
||||
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
// We don't want to track an unlimited number of services, so we pull a
|
||||
// top-level watch to use as a fallback.
|
||||
allServices, err := tx.Get("services", "id")
|
||||
|
@ -2326,7 +2326,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
|
|||
}
|
||||
|
||||
// Query the service level checks
|
||||
checks, err := catalogListChecksByNode(tx, node.Node, entMeta)
|
||||
checks, err := catalogListChecksByNode(tx, Query{Value: node.Node, EnterpriseMeta: *entMeta})
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
|
|
|
@ -491,7 +491,7 @@ func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNod
|
|||
// parseCheckServiceNodes but is more efficient since we know they are all on
|
||||
// the same node.
|
||||
func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) {
|
||||
services, err := catalogServiceListByNode(tx, node, structs.WildcardEnterpriseMeta(), true)
|
||||
services, err := tx.Get(tableServices, indexNode, Query{Value: node})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -525,7 +525,7 @@ func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc
|
|||
}
|
||||
n := nodeRaw.(*structs.Node)
|
||||
|
||||
iter, err := catalogListChecksByNode(tx, node, structs.WildcardEnterpriseMeta())
|
||||
iter, err := tx.Get(tableChecks, indexNode, Query{Value: node})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -69,6 +69,24 @@ func indexFromNodeQuery(arg interface{}) ([]byte, error) {
|
|||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func indexFromNodeIdentity(raw interface{}) ([]byte, error) {
|
||||
n, ok := raw.(interface {
|
||||
NodeIdentity() structs.Identity
|
||||
})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for index, type must provide NodeIdentity()", raw)
|
||||
}
|
||||
|
||||
id := n.NodeIdentity()
|
||||
if id.ID == "" {
|
||||
return nil, errMissingValueForIndex
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(id.ID))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
||||
return fmt.Sprintf("service.%s", name)
|
||||
}
|
||||
|
@ -159,7 +177,7 @@ func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.E
|
|||
}
|
||||
|
||||
func catalogServiceListByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
|
||||
return tx.Get("services", "node", node)
|
||||
return tx.Get(tableServices, indexNode, Query{Value: node})
|
||||
}
|
||||
|
||||
func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
|
@ -196,8 +214,8 @@ func catalogChecksMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
|
|||
return maxIndexTxn(tx, "checks")
|
||||
}
|
||||
|
||||
func catalogListChecksByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
return tx.Get("checks", "node", node)
|
||||
func catalogListChecksByNode(tx ReadTxn, q Query) (memdb.ResultIterator, error) {
|
||||
return tx.Get(tableChecks, indexNode, q)
|
||||
}
|
||||
|
||||
func catalogListChecksByService(tx ReadTxn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
|
|
|
@ -22,6 +22,21 @@ func testIndexerTableChecks() map[string]indexerTestCase {
|
|||
expected: []byte("node\x00service\x00"),
|
||||
},
|
||||
},
|
||||
indexNode: {
|
||||
read: indexValue{
|
||||
source: Query{
|
||||
Value: "NoDe",
|
||||
},
|
||||
expected: []byte("node\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.HealthCheck{
|
||||
Node: "NoDe",
|
||||
ServiceID: "SeRvIcE",
|
||||
},
|
||||
expected: []byte("node\x00"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,3 +54,23 @@ func testIndexerTableNodes() map[string]indexerTestCase {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
func testIndexerTableServices() map[string]indexerTestCase {
|
||||
return map[string]indexerTestCase{
|
||||
indexNode: {
|
||||
read: indexValue{
|
||||
source: Query{
|
||||
Value: "NoDe",
|
||||
},
|
||||
expected: []byte("node\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.ServiceNode{
|
||||
Node: "NoDe",
|
||||
ServiceID: "SeRvIcE",
|
||||
},
|
||||
expected: []byte("node\x00"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ const (
|
|||
indexKind = "kind"
|
||||
indexStatus = "status"
|
||||
indexNodeService = "node_service"
|
||||
indexNode = "node"
|
||||
)
|
||||
|
||||
// nodesTableSchema returns a new table schema used for storing struct.Node.
|
||||
|
@ -81,13 +82,13 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
},
|
||||
},
|
||||
},
|
||||
"node": {
|
||||
Name: "node",
|
||||
indexNode: {
|
||||
Name: indexNode,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
Indexer: indexerSingle{
|
||||
readIndex: readIndex(indexFromNodeQuery),
|
||||
writeIndex: writeIndex(indexFromNodeIdentity),
|
||||
},
|
||||
},
|
||||
indexServiceName: {
|
||||
|
@ -157,13 +158,13 @@ func checksTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
"node": {
|
||||
Name: "node",
|
||||
indexNode: {
|
||||
Name: indexNode,
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
Indexer: indexerSingle{
|
||||
readIndex: readIndex(indexFromNodeQuery),
|
||||
writeIndex: writeIndex(indexFromNodeIdentity),
|
||||
},
|
||||
},
|
||||
indexNodeService: {
|
||||
|
|
|
@ -128,8 +128,9 @@ func TestNewDBSchema_Indexers(t *testing.T) {
|
|||
require.NoError(t, schema.Validate())
|
||||
|
||||
var testcases = map[string]func() map[string]indexerTestCase{
|
||||
tableChecks: testIndexerTableChecks,
|
||||
tableNodes: testIndexerTableNodes,
|
||||
tableChecks: testIndexerTableChecks,
|
||||
tableServices: testIndexerTableServices,
|
||||
tableNodes: testIndexerTableNodes,
|
||||
}
|
||||
|
||||
for _, table := range schema.Tables {
|
||||
|
|
|
@ -50,7 +50,7 @@ table=checks
|
|||
index=id unique
|
||||
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=CheckID Lowercase=true] AllowMissing=false
|
||||
index=node allow-missing
|
||||
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity
|
||||
index=node_service allow-missing
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck
|
||||
index=service allow-missing
|
||||
|
@ -154,7 +154,7 @@ table=services
|
|||
index=kind
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.IndexServiceKind
|
||||
index=node
|
||||
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity
|
||||
index=service allow-missing
|
||||
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceName Lowercase=true
|
||||
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
package structs
|
||||
|
||||
// Identity of some entity (ex: service, node, check).
|
||||
//
|
||||
// TODO: this type should replace ServiceID, ServiceName, and CheckID which all
|
||||
// have roughly identical implementations.
|
||||
type Identity struct {
|
||||
ID string
|
||||
EnterpriseMeta
|
||||
}
|
|
@ -833,6 +833,10 @@ type ServiceNode struct {
|
|||
RaftIndex `bexpr:"-"`
|
||||
}
|
||||
|
||||
func (s *ServiceNode) NodeIdentity() Identity {
|
||||
return Identity{ID: s.Node}
|
||||
}
|
||||
|
||||
// PartialClone() returns a clone of the given service node, minus the node-
|
||||
// related fields that get filled in later, Address and TaggedAddresses.
|
||||
func (s *ServiceNode) PartialClone() *ServiceNode {
|
||||
|
@ -1402,6 +1406,10 @@ type HealthCheck struct {
|
|||
RaftIndex `bexpr:"-"`
|
||||
}
|
||||
|
||||
func (hc *HealthCheck) NodeIdentity() Identity {
|
||||
return Identity{ID: hc.Node}
|
||||
}
|
||||
|
||||
func (hc *HealthCheck) CompoundServiceID() ServiceID {
|
||||
id := hc.ServiceID
|
||||
if id == "" {
|
||||
|
|
Loading…
Reference in New Issue