diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 7e4e5fe53a..f2951844be 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -250,7 +250,7 @@ func (s *StateStore) ensureServiceTxn(idx uint64, node string, svc *structs.Node // Populate the indexes if existing != nil { - entry.CreateIndex = existing.(*structs.NodeService).CreateIndex + entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex entry.ModifyIndex = idx } else { entry.CreateIndex = idx @@ -537,3 +537,77 @@ func (s *StateStore) deleteCheckTxn(idx uint64, node, id string, tx *memdb.Txn) // TODO: watch triggers return nil } + +// CheckServiceNodes is used to query all nodes and checks for a given service +// ID. The results are compounded into a CheckServiceNodes, and the index +// returned is the maximum index observed over any node, check, or service +// in the result set. +func (s *StateStore) CheckServiceNodes(serviceID string) (uint64, structs.CheckServiceNodes, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Query the state store for the service. + services, err := tx.Get("services", "service", serviceID) + if err != nil { + return 0, nil, fmt.Errorf("failed service lookup: %s", err) + } + return s.parseCheckServiceNodes(tx, services, err) +} + +// parseCheckServiceNodes is used to parse through a given set of services, +// and query for an associated node and a set of checks. This is the inner +// method used to return a rich set of results from a more simple query. +func (s *StateStore) parseCheckServiceNodes( + tx *memdb.Txn, iter memdb.ResultIterator, + err error) (uint64, structs.CheckServiceNodes, error) { + if err != nil { + return 0, nil, err + } + + var results structs.CheckServiceNodes + var lindex uint64 + for service := iter.Next(); service != nil; service = iter.Next() { + // Compute the index + svc := service.(*structs.ServiceNode) + if svc.ModifyIndex > lindex { + lindex = svc.ModifyIndex + } + + // Retrieve the node + n, err := tx.First("nodes", "id", svc.Node) + if err != nil { + return 0, nil, fmt.Errorf("failed node lookup: %s", err) + } + if n == nil { + return 0, nil, ErrMissingNode + } + node := n.(*structs.Node) + if node.ModifyIndex > lindex { + lindex = node.ModifyIndex + } + + // Get the checks + idx, checks, err := s.parseChecks(tx.Get("checks", "node_service", svc.Node, svc.ServiceID)) + if err != nil { + return 0, nil, err + } + if idx > lindex { + lindex = idx + } + + // Append to the results + results = append(results, structs.CheckServiceNode{ + Node: node, + Service: &structs.NodeService{ + ID: svc.ServiceID, + Service: svc.ServiceName, + Address: svc.ServiceAddress, + Port: svc.ServicePort, + Tags: svc.ServiceTags, + }, + Checks: checks, + }) + } + + return lindex, results, nil +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 239cd31b28..b357ce3812 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -516,3 +516,83 @@ func TestStateStore_ChecksInState(t *testing.T) { t.Fatalf("expected 3 checks, got: %d", n) } } + +func TestStateStore_CheckServiceNodes(t *testing.T) { + s := testStateStore(t) + + // Querying with no matches gives an empty response + idx, results, err := s.CheckServiceNodes("service1") + if idx != 0 || results != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, results, err) + } + + // Register some nodes + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + // Register node-level checks. These should not be returned + // in the final result. + testRegisterCheck(t, s, 2, "node1", "", "check1", structs.HealthPassing) + testRegisterCheck(t, s, 3, "node2", "", "check2", structs.HealthPassing) + + // Register a service against the nodes + testRegisterService(t, s, 4, "node1", "service1") + testRegisterService(t, s, 5, "node2", "service2") + + // Register checks against the services + testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing) + testRegisterCheck(t, s, 7, "node2", "service2", "check4", structs.HealthPassing) + + // Query the state store for nodes and checks which + // have been registered with a specific service. + idx, results, err = s.CheckServiceNodes("service1") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Check the index returned matches the result set. The index + // should be the highest observed from the result, in this case + // this comes from the check registration. + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure we get the expected result + if n := len(results); n != 1 { + t.Fatalf("expected 1 result, got: %d", n) + } + csn := results[0] + if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 1 { + t.Fatalf("bad output: %#v", csn) + } + + // Node updates alter the returned index + testRegisterNode(t, s, 8, "node1") + idx, results, err = s.CheckServiceNodes("service1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + + // Service updates alter the returned index + testRegisterService(t, s, 9, "node1", "service1") + idx, results, err = s.CheckServiceNodes("service1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 9 { + t.Fatalf("bad index: %d", idx) + } + + // Check updates alter the returned index + testRegisterCheck(t, s, 10, "node1", "service1", "check1", structs.HealthCritical) + idx, results, err = s.CheckServiceNodes("service1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 10 { + t.Fatalf("bad index: %d", idx) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 139a936a7d..2681ed6c6d 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -289,8 +289,8 @@ type HealthChecks []*HealthCheck // CheckServiceNode is used to provide the node, it's service // definition, as well as a HealthCheck that is associated type CheckServiceNode struct { - Node Node - Service NodeService + Node *Node + Service *NodeService Checks HealthChecks } type CheckServiceNodes []CheckServiceNode