consul: Health endpoints support blocking queries

pull/19/head
Armon Dadgar 2014-02-05 13:30:18 -08:00
parent bd1a140476
commit 4cee14f58a
4 changed files with 59 additions and 36 deletions

View File

@ -12,35 +12,41 @@ type Health struct {
// ChecksInState is used to get all the checks in a given state
func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
reply *structs.HealthChecks) error {
reply *structs.IndexedHealthChecks) error {
if done, err := h.srv.forward("Health.ChecksInState", args.Datacenter, args, reply); done {
return err
}
// Get the state specific checks
state := h.srv.fsm.State()
_, checks := state.ChecksInState(args.State)
*reply = checks
return nil
return h.srv.blockingRPC(&args.BlockingQuery,
state.QueryTables("ChecksInState"),
func() (uint64, error) {
reply.Index, reply.HealthChecks = state.ChecksInState(args.State)
return reply.Index, nil
})
}
// NodeChecks is used to get all the checks for a node
func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
reply *structs.HealthChecks) error {
reply *structs.IndexedHealthChecks) error {
if done, err := h.srv.forward("Health.NodeChecks", args.Datacenter, args, reply); done {
return err
}
// Get the node checks
state := h.srv.fsm.State()
_, checks := state.NodeChecks(args.Node)
*reply = checks
return nil
return h.srv.blockingRPC(&args.BlockingQuery,
state.QueryTables("NodeChecks"),
func() (uint64, error) {
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node)
return reply.Index, nil
})
}
// ServiceChecks is used to get all the checks for a service
func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
reply *structs.HealthChecks) error {
reply *structs.IndexedHealthChecks) error {
// Reject if tag filtering is on
if args.TagFilter {
return fmt.Errorf("Tag filtering is not supported")
@ -53,13 +59,16 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
// Get the service checks
state := h.srv.fsm.State()
_, checks := state.ServiceChecks(args.ServiceName)
*reply = checks
return nil
return h.srv.blockingRPC(&args.BlockingQuery,
state.QueryTables("ServiceChecks"),
func() (uint64, error) {
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName)
return reply.Index, nil
})
}
// ServiceNodes returns all the nodes registered as part of a service including health info
func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.CheckServiceNodes) error {
func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error {
if done, err := h.srv.forward("Health.ServiceNodes", args.Datacenter, args, reply); done {
return err
}
@ -71,13 +80,14 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
// Get the nodes
state := h.srv.fsm.State()
var nodes structs.CheckServiceNodes
if args.TagFilter {
_, nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
_, nodes = state.CheckServiceNodes(args.ServiceName)
}
*reply = nodes
return nil
return h.srv.blockingRPC(&args.BlockingQuery,
state.QueryTables("CheckServiceNodes"),
func() (uint64, error) {
if args.TagFilter {
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName)
}
return reply.Index, nil
})
}

View File

@ -31,15 +31,16 @@ func TestHealth_ChecksInState(t *testing.T) {
t.Fatalf("err: %v", err)
}
var checks structs.HealthChecks
var out2 structs.IndexedHealthChecks
inState := structs.ChecksInStateRequest{
Datacenter: "dc1",
State: structs.HealthPassing,
}
if err := client.Call("Health.ChecksInState", &inState, &checks); err != nil {
if err := client.Call("Health.ChecksInState", &inState, &out2); err != nil {
t.Fatalf("err: %v", err)
}
checks := out2.HealthChecks
if len(checks) != 2 {
t.Fatalf("Bad: %v", checks)
}
@ -77,15 +78,16 @@ func TestHealth_NodeChecks(t *testing.T) {
t.Fatalf("err: %v", err)
}
var checks structs.HealthChecks
var out2 structs.IndexedHealthChecks
node := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "foo",
}
if err := client.Call("Health.NodeChecks", &node, &checks); err != nil {
if err := client.Call("Health.NodeChecks", &node, &out2); err != nil {
t.Fatalf("err: %v", err)
}
checks := out2.HealthChecks
if len(checks) != 1 {
t.Fatalf("Bad: %v", checks)
}
@ -123,15 +125,16 @@ func TestHealth_ServiceChecks(t *testing.T) {
t.Fatalf("err: %v", err)
}
var checks structs.HealthChecks
var out2 structs.IndexedHealthChecks
node := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
}
if err := client.Call("Health.ServiceChecks", &node, &checks); err != nil {
if err := client.Call("Health.ServiceChecks", &node, &out2); err != nil {
t.Fatalf("err: %v", err)
}
checks := out2.HealthChecks
if len(checks) != 1 {
t.Fatalf("Bad: %v", checks)
}
@ -189,17 +192,18 @@ func TestHealth_ServiceNodes(t *testing.T) {
t.Fatalf("err: %v", err)
}
var nodes structs.CheckServiceNodes
var out2 structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
ServiceTag: "master",
TagFilter: false,
}
if err := client.Call("Health.ServiceNodes", &req, &nodes); err != nil {
if err := client.Call("Health.ServiceNodes", &req, &out2); err != nil {
t.Fatalf("err: %v", err)
}
nodes := out2.Nodes
if len(nodes) != 2 {
t.Fatalf("Bad: %v", nodes)
}

View File

@ -195,10 +195,14 @@ func (s *StateStore) initialize() error {
// Setup the query tables
// TODO: Other queries...
s.queryTables = map[string]MDBTables{
"Nodes": MDBTables{s.nodeTable},
"Services": MDBTables{s.serviceTable},
"ServiceNodes": MDBTables{s.nodeTable, s.serviceTable},
"NodeServices": MDBTables{s.nodeTable, s.serviceTable},
"Nodes": MDBTables{s.nodeTable},
"Services": MDBTables{s.serviceTable},
"ServiceNodes": MDBTables{s.nodeTable, s.serviceTable},
"NodeServices": MDBTables{s.nodeTable, s.serviceTable},
"ChecksInState": MDBTables{s.checkTable},
"NodeChecks": MDBTables{s.checkTable},
"ServiceChecks": MDBTables{s.checkTable},
"CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
}
return nil
}
@ -598,7 +602,7 @@ func parseHealthChecks(idx uint64, res []interface{}, err error) (uint64, struct
// CheckServiceNodes returns the nodes associated with a given service, along
// with any associated check
func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes) {
tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable}
tables := s.queryTables["CheckServiceNodes"]
tx, err := tables.StartTxn(true)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
@ -617,7 +621,7 @@ func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckSer
// CheckServiceNodes returns the nodes associated with a given service, along
// with any associated checks
func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes) {
tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable}
tables := s.queryTables["CheckServiceNodes"]
tx, err := tables.StartTxn(true)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))

View File

@ -167,6 +167,11 @@ type IndexedHealthChecks struct {
HealthChecks HealthChecks
}
type IndexedCheckServiceNodes struct {
Index uint64
Nodes CheckServiceNodes
}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle