Browse Source

consul/state: return highest index for queries with compound results

pull/1291/head
Ryan Uber 9 years ago committed by James Phillips
parent
commit
a4a73c3904
  1. 55
      consul/state/state_store.go
  2. 60
      consul/state/state_store_test.go
  3. 2
      consul/structs/structs.go

55
consul/state/state_store.go

@ -131,23 +131,28 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) {
} }
// Nodes is used to return all of the known nodes. // Nodes is used to return all of the known nodes.
func (s *StateStore) Nodes() (structs.Nodes, error) { func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Retrieve all of the nodes // Retrieve all of the nodes
nodes, err := tx.Get("nodes", "id") nodes, err := tx.Get("nodes", "id")
if err != nil { if err != nil {
return nil, fmt.Errorf("failed nodes lookup: %s", err) return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
} }
// Create and return the nodes list. // Create and return the nodes list, tracking the highest
// TODO: Optimize by returning an iterator. // index we see.
var lindex uint64
var results structs.Nodes var results structs.Nodes
for node := nodes.Next(); node != nil; node = nodes.Next() { for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
if n.ModifyIndex > lindex {
lindex = n.ModifyIndex
}
results = append(results, node.(*structs.Node)) results = append(results, node.(*structs.Node))
} }
return results, nil return lindex, results, nil
} }
// DeleteNode is used to delete a given node by its ID. // DeleteNode is used to delete a given node by its ID.
@ -263,24 +268,24 @@ func (s *StateStore) ensureServiceTxn(idx uint64, node string, svc *structs.Node
} }
// NodeServices is used to query service registrations by node ID. // NodeServices is used to query service registrations by node ID.
func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error) { func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Query the node // Query the node
n, err := tx.First("nodes", "id", nodeID) n, err := tx.First("nodes", "id", nodeID)
if err != nil { if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err) return 0, nil, fmt.Errorf("node lookup failed: %s", err)
} }
if n == nil { if n == nil {
return nil, nil return 0, nil, nil
} }
node := n.(*structs.Node) node := n.(*structs.Node)
// Read all of the services // Read all of the services
services, err := tx.Get("services", "node", nodeID) services, err := tx.Get("services", "node", nodeID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err) return 0, nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err)
} }
// Initialize the node services struct // Initialize the node services struct
@ -288,19 +293,15 @@ func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error)
Node: node, Node: node,
Services: make(map[string]*structs.NodeService), Services: make(map[string]*structs.NodeService),
} }
ns.CreateIndex = node.CreateIndex
ns.CreateIndex = node.CreateIndex
// Add all of the services to the map // Add all of the services to the map, tracking the highest index
var lindex uint64
for service := services.Next(); service != nil; service = services.Next() { for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode) sn := service.(*structs.ServiceNode)
// Track the highest index // Track the highest index
if sn.CreateIndex > ns.CreateIndex { if sn.CreateIndex > lindex {
ns.CreateIndex = sn.CreateIndex lindex = sn.CreateIndex
}
if sn.ModifyIndex > ns.ModifyIndex {
ns.ModifyIndex = sn.ModifyIndex
} }
// Create the NodeService // Create the NodeService
@ -318,7 +319,7 @@ func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error)
ns.Services[svc.ID] = svc ns.Services[svc.ID] = svc
} }
return ns, nil return lindex, ns, nil
} }
// DeleteService is used to delete a given service associated with a node. // DeleteService is used to delete a given service associated with a node.
@ -450,7 +451,7 @@ func (s *StateStore) ensureCheckTxn(idx uint64, hc *structs.HealthCheck, tx *mem
// NodeChecks is used to retrieve checks associated with the // NodeChecks is used to retrieve checks associated with the
// given node from the state store. // given node from the state store.
func (s *StateStore) NodeChecks(nodeID string) (structs.HealthChecks, error) { func (s *StateStore) NodeChecks(nodeID string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
return s.parseChecks(tx.Get("checks", "node", nodeID)) return s.parseChecks(tx.Get("checks", "node", nodeID))
@ -458,17 +459,23 @@ func (s *StateStore) NodeChecks(nodeID string) (structs.HealthChecks, error) {
// parseChecks is a helper function used to deduplicate some // parseChecks is a helper function used to deduplicate some
// repetitive code for returning health checks. // repetitive code for returning health checks.
func (s *StateStore) parseChecks(iter memdb.ResultIterator, err error) (structs.HealthChecks, error) { func (s *StateStore) parseChecks(iter memdb.ResultIterator, err error) (uint64, structs.HealthChecks, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("failed health check lookup: %s", err) return 0, nil, fmt.Errorf("failed health check lookup: %s", err)
} }
// Gather the health checks and return them properly type casted // Gather the health checks and return them properly type casted.
// Track the highest index along the way.
var results structs.HealthChecks var results structs.HealthChecks
var lindex uint64
for hc := iter.Next(); hc != nil; hc = iter.Next() { for hc := iter.Next(); hc != nil; hc = iter.Next() {
results = append(results, hc.(*structs.HealthCheck)) check := hc.(*structs.HealthCheck)
if check.ModifyIndex > lindex {
lindex = check.ModifyIndex
}
results = append(results, check)
} }
return results, nil return lindex, results, nil
} }
// DeleteCheck is used to delete a health check registration. // DeleteCheck is used to delete a health check registration.

60
consul/state/state_store_test.go

@ -158,11 +158,16 @@ func TestStateStore_GetNodes(t *testing.T) {
testRegisterNode(t, s, 2, "node2") testRegisterNode(t, s, 2, "node2")
// Retrieve the nodes // Retrieve the nodes
nodes, err := s.Nodes() idx, nodes, err := s.Nodes()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Highest index was returned
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
// All nodes were returned // All nodes were returned
if n := len(nodes); n != 3 { if n := len(nodes); n != 3 {
t.Fatalf("bad node count: %d", n) t.Fatalf("bad node count: %d", n)
@ -231,8 +236,8 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Fetching services for a node with none returns nil // Fetching services for a node with none returns nil
if res, err := s.NodeServices("node1"); err != nil || res != nil { if idx, res, err := s.NodeServices("node1"); err != nil || res != nil || idx != 0 {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", res, err) t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
} }
// Register the nodes // Register the nodes
@ -270,11 +275,16 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
} }
// Retrieve the services // Retrieve the services
out, err := s.NodeServices("node1") idx, out, err := s.NodeServices("node1")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Highest index for the result set was returned
if idx != 20 {
t.Fatalf("bad index: %d", idx)
}
// Only the services for the requested node are returned // Only the services for the requested node are returned
if out == nil || len(out.Services) != 2 { if out == nil || len(out.Services) != 2 {
t.Fatalf("bad services: %#v", out) t.Fatalf("bad services: %#v", out)
@ -293,11 +303,6 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
t.Fatalf("bad: %#v %#v", ns2, svc) t.Fatalf("bad: %#v %#v", ns2, svc)
} }
// Lastly, ensure that the highest index was preserved.
if out.CreateIndex != 20 || out.ModifyIndex != 20 {
t.Fatalf("bad index: %d, %d", out.CreateIndex, out.ModifyIndex)
}
// Index tables were updated // Index tables were updated
if idx := s.maxIndex("services"); idx != 30 { if idx := s.maxIndex("services"); idx != 30 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
@ -307,17 +312,9 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
func TestStateStore_DeleteService(t *testing.T) { func TestStateStore_DeleteService(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Register a node with one service // Register a node with one service and a check
testRegisterNode(t, s, 1, "node1") testRegisterNode(t, s, 1, "node1")
testRegisterService(t, s, 2, "node1", "service1") testRegisterService(t, s, 2, "node1", "service1")
// The service exists
ns, err := s.NodeServices("node1")
if err != nil || ns == nil || len(ns.Services) != 1 {
t.Fatalf("bad: %#v (err: %#v)", ns, err)
}
// Register a check with the service
testRegisterCheck(t, s, 3, "node1", "service1", "check1") testRegisterCheck(t, s, 3, "node1", "service1", "check1")
// Delete the service // Delete the service
@ -325,14 +322,19 @@ func TestStateStore_DeleteService(t *testing.T) {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// The service and check don't exist // Service doesn't exist.
ns, err = s.NodeServices("node1") _, ns, err := s.NodeServices("node1")
if err != nil || ns == nil || len(ns.Services) != 0 { if err != nil || ns == nil || len(ns.Services) != 0 {
t.Fatalf("bad: %#v (err: %#v)", ns, err) t.Fatalf("bad: %#v (err: %#v)", ns, err)
} }
checks, err := s.NodeChecks("node1")
if err != nil || len(checks) != 0 { // Check doesn't exist. Check using the raw DB so we can test
t.Fatalf("bad: %#v (err: %s)", checks, err) // that it actually is removed in the state store.
tx := s.db.Txn(false)
defer tx.Abort()
check, err := tx.First("checks", "id", "node1", "check1")
if err != nil || check != nil {
t.Fatalf("bad: %#v (err: %s)", check, err)
} }
// Index tables were updated // Index tables were updated
@ -381,10 +383,13 @@ func TestStateStore_EnsureCheck(t *testing.T) {
} }
// Retrieve the check and make sure it matches // Retrieve the check and make sure it matches
checks, err := s.NodeChecks("node1") idx, checks, err := s.NodeChecks("node1")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if len(checks) != 1 { if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks)) t.Fatalf("wrong number of checks: %d", len(checks))
} }
@ -399,10 +404,13 @@ func TestStateStore_EnsureCheck(t *testing.T) {
} }
// Check that we successfully updated // Check that we successfully updated
checks, err = s.NodeChecks("node1") idx, checks, err = s.NodeChecks("node1")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if len(checks) != 1 { if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks)) t.Fatalf("wrong number of checks: %d", len(checks))
} }
@ -432,7 +440,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
} }
// Check is gone // Check is gone
checks, err := s.NodeChecks("node1") _, checks, err := s.NodeChecks("node1")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }

2
consul/structs/structs.go

@ -269,8 +269,6 @@ type NodeService struct {
type NodeServices struct { type NodeServices struct {
Node *Node Node *Node
Services map[string]*NodeService Services map[string]*NodeService
RaftIndex
} }
// HealthCheck represents a single check on a given node // HealthCheck represents a single check on a given node

Loading…
Cancel
Save