Browse Source

consul/state: fetch node/check sets by service ID

pull/1291/head
Ryan Uber 9 years ago committed by James Phillips
parent
commit
26f717f215
  1. 76
      consul/state/state_store.go
  2. 80
      consul/state/state_store_test.go
  3. 4
      consul/structs/structs.go

76
consul/state/state_store.go

@ -250,7 +250,7 @@ func (s *StateStore) ensureServiceTxn(idx uint64, node string, svc *structs.Node
// Populate the indexes
if existing != nil {
entry.CreateIndex = existing.(*structs.NodeService).CreateIndex
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
entry.ModifyIndex = idx
} else {
entry.CreateIndex = idx
@ -537,3 +537,77 @@ func (s *StateStore) deleteCheckTxn(idx uint64, node, id string, tx *memdb.Txn)
// TODO: watch triggers
return nil
}
// CheckServiceNodes is used to query all nodes and checks for a given service
// ID. The results are compounded into a CheckServiceNodes, and the index
// returned is the maximum index observed over any node, check, or service
// in the result set.
func (s *StateStore) CheckServiceNodes(serviceID string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Query the state store for the service.
services, err := tx.Get("services", "service", serviceID)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
return s.parseCheckServiceNodes(tx, services, err)
}
// parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.
func (s *StateStore) parseCheckServiceNodes(
tx *memdb.Txn, iter memdb.ResultIterator,
err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil {
return 0, nil, err
}
var results structs.CheckServiceNodes
var lindex uint64
for service := iter.Next(); service != nil; service = iter.Next() {
// Compute the index
svc := service.(*structs.ServiceNode)
if svc.ModifyIndex > lindex {
lindex = svc.ModifyIndex
}
// Retrieve the node
n, err := tx.First("nodes", "id", svc.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
if n == nil {
return 0, nil, ErrMissingNode
}
node := n.(*structs.Node)
if node.ModifyIndex > lindex {
lindex = node.ModifyIndex
}
// Get the checks
idx, checks, err := s.parseChecks(tx.Get("checks", "node_service", svc.Node, svc.ServiceID))
if err != nil {
return 0, nil, err
}
if idx > lindex {
lindex = idx
}
// Append to the results
results = append(results, structs.CheckServiceNode{
Node: node,
Service: &structs.NodeService{
ID: svc.ServiceID,
Service: svc.ServiceName,
Address: svc.ServiceAddress,
Port: svc.ServicePort,
Tags: svc.ServiceTags,
},
Checks: checks,
})
}
return lindex, results, nil
}

80
consul/state/state_store_test.go

@ -516,3 +516,83 @@ func TestStateStore_ChecksInState(t *testing.T) {
t.Fatalf("expected 3 checks, got: %d", n)
}
}
func TestStateStore_CheckServiceNodes(t *testing.T) {
s := testStateStore(t)
// Querying with no matches gives an empty response
idx, results, err := s.CheckServiceNodes("service1")
if idx != 0 || results != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, results, err)
}
// Register some nodes
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
// Register node-level checks. These should not be returned
// in the final result.
testRegisterCheck(t, s, 2, "node1", "", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 3, "node2", "", "check2", structs.HealthPassing)
// Register a service against the nodes
testRegisterService(t, s, 4, "node1", "service1")
testRegisterService(t, s, 5, "node2", "service2")
// Register checks against the services
testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing)
testRegisterCheck(t, s, 7, "node2", "service2", "check4", structs.HealthPassing)
// Query the state store for nodes and checks which
// have been registered with a specific service.
idx, results, err = s.CheckServiceNodes("service1")
if err != nil {
t.Fatalf("err: %s", err)
}
// Check the index returned matches the result set. The index
// should be the highest observed from the result, in this case
// this comes from the check registration.
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Make sure we get the expected result
if n := len(results); n != 1 {
t.Fatalf("expected 1 result, got: %d", n)
}
csn := results[0]
if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 1 {
t.Fatalf("bad output: %#v", csn)
}
// Node updates alter the returned index
testRegisterNode(t, s, 8, "node1")
idx, results, err = s.CheckServiceNodes("service1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Service updates alter the returned index
testRegisterService(t, s, 9, "node1", "service1")
idx, results, err = s.CheckServiceNodes("service1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 9 {
t.Fatalf("bad index: %d", idx)
}
// Check updates alter the returned index
testRegisterCheck(t, s, 10, "node1", "service1", "check1", structs.HealthCritical)
idx, results, err = s.CheckServiceNodes("service1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 10 {
t.Fatalf("bad index: %d", idx)
}
}

4
consul/structs/structs.go

@ -289,8 +289,8 @@ type HealthChecks []*HealthCheck
// CheckServiceNode is used to provide the node, it's service
// definition, as well as a HealthCheck that is associated
type CheckServiceNode struct {
Node Node
Service NodeService
Node *Node
Service *NodeService
Checks HealthChecks
}
type CheckServiceNodes []CheckServiceNode

Loading…
Cancel
Save