diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 2e86e362a7..ec5fc7821e 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -57,51 +57,43 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ return out, nil } -func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - // Set default DC - dc := s.agent.config.Datacenter - - // Check for other DC - if other := req.URL.Query().Get("dc"); other != "" { - dc = other +func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { + // Setup the request + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } - var out structs.Nodes - if err := s.agent.RPC("Catalog.ListNodes", dc, &out); err != nil { - return nil, err + var out structs.IndexedNodes + if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil { + return 0, nil, err } - return out, nil + return out.Index, out.Nodes, nil } -func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC - dc := s.agent.config.Datacenter - - // Check for other DC - if other := req.URL.Query().Get("dc"); other != "" { - dc = other + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } - var out structs.Services - if err := s.agent.RPC("Catalog.ListServices", dc, &out); err != nil { - return nil, err + var out structs.IndexedServices + if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { + return 0, nil, err } - return out, nil + return out.Index, out.Services, nil } -func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC - args := structs.ServiceSpecificRequest{ - Datacenter: s.agent.config.Datacenter, - } - - // Check for other DC - params := req.URL.Query() - if other := params.Get("dc"); other != "" { - args.Datacenter = other + args := structs.ServiceSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } // Check for a tag + params := req.URL.Query() if _, ok := params["tag"]; ok { args.ServiceTag = params.Get("tag") args.TagFilter = true @@ -112,27 +104,22 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return nil, nil + return 0, nil, nil } // Make the RPC request - var out structs.ServiceNodes + var out structs.IndexedServiceNodes if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { - return nil, err + return 0, nil, err } - return out, nil + return out.Index, out.ServiceNodes, nil } -func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default Datacenter - args := structs.NodeSpecificRequest{ - Datacenter: s.agent.config.Datacenter, - } - - // Check for other DC - params := req.URL.Query() - if other := params.Get("dc"); other != "" { - args.Datacenter = other + args := structs.NodeSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } // Pull out the node name @@ -140,13 +127,13 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req if args.Node == "" { resp.WriteHeader(400) resp.Write([]byte("Missing node name")) - return nil, nil + return 0, nil, nil } // Make the RPC request - out := new(structs.NodeServices) - if err := s.agent.RPC("Catalog.NodeServices", &args, out); err != nil { - return nil, err + var out structs.IndexedNodeServices + if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { + return 0, nil, err } - return out, nil + return out.Index, out.NodeServices, nil } diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 37770cdcd3..8ee5ff828a 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -114,11 +114,15 @@ func TestCatalogNodes(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.CatalogNodes(nil, req) + idx, obj, err := srv.CatalogNodes(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + nodes := obj.(structs.Nodes) if len(nodes) != 2 { t.Fatalf("bad: %v", obj) @@ -153,11 +157,15 @@ func TestCatalogServices(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.CatalogServices(nil, req) + idx, obj, err := srv.CatalogServices(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + services := obj.(structs.Services) if len(services) != 2 { t.Fatalf("bad: %v", obj) @@ -193,11 +201,15 @@ func TestCatalogServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.CatalogServiceNodes(nil, req) + idx, obj, err := srv.CatalogServiceNodes(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + nodes := obj.(structs.ServiceNodes) if len(nodes) != 1 { t.Fatalf("bad: %v", obj) @@ -233,11 +245,15 @@ func TestCatalogNodeServices(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.CatalogNodeServices(nil, req) + idx, obj, err := srv.CatalogNodeServices(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + services := obj.(*structs.NodeServices) if len(services.Services) != 1 { t.Fatalf("bad: %v", obj) diff --git a/command/agent/dns.go b/command/agent/dns.go index ccf9706345..6c1299dec0 100644 --- a/command/agent/dns.go +++ b/command/agent/dns.go @@ -257,7 +257,7 @@ func (d *DNSServer) nodeLookup(datacenter, node string, req, resp *dns.Msg) { Datacenter: datacenter, Node: node, } - var out structs.NodeServices + var out structs.IndexedNodeServices if err := d.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { d.logger.Printf("[ERR] dns: rpc error: %v", err) resp.SetRcode(req, dns.RcodeServerFailure) @@ -265,15 +265,15 @@ func (d *DNSServer) nodeLookup(datacenter, node string, req, resp *dns.Msg) { } // If we have no address, return not found! - if out.Node.Address == "" { + if out.NodeServices.Node.Address == "" { resp.SetRcode(req, dns.RcodeNameError) return } // Parse the IP - ip := net.ParseIP(out.Node.Address) + ip := net.ParseIP(out.NodeServices.Node.Address) if ip == nil { - d.logger.Printf("[ERR] dns: failed to parse IP %v", out.Node) + d.logger.Printf("[ERR] dns: failed to parse IP %v", out.NodeServices.Node) resp.SetRcode(req, dns.RcodeServerFailure) return } @@ -302,7 +302,7 @@ func (d *DNSServer) serviceLookup(datacenter, service, tag string, req, resp *dn ServiceTag: tag, TagFilter: tag != "", } - var out structs.CheckServiceNodes + var out structs.IndexedCheckServiceNodes if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { d.logger.Printf("[ERR] dns: rpc error: %v", err) resp.SetRcode(req, dns.RcodeServerFailure) @@ -310,21 +310,21 @@ func (d *DNSServer) serviceLookup(datacenter, service, tag string, req, resp *dn } // If we have no nodes, return not found! - if len(out) == 0 { + if len(out.Nodes) == 0 { resp.SetRcode(req, dns.RcodeNameError) return } // Filter out any service nodes due to health checks - out = d.filterServiceNodes(out) + out.Nodes = d.filterServiceNodes(out.Nodes) // Add various responses depending on the request qType := req.Question[0].Qtype if qType == dns.TypeANY || qType == dns.TypeA { - d.serviceARecords(out, req, resp) + d.serviceARecords(out.Nodes, req, resp) } if qType == dns.TypeANY || qType == dns.TypeSRV { - d.serviceSRVRecords(datacenter, out, req, resp) + d.serviceSRVRecords(datacenter, out.Nodes, req, resp) } } diff --git a/command/agent/health_endpoint.go b/command/agent/health_endpoint.go index 7a1c8a3659..3520cc2a06 100644 --- a/command/agent/health_endpoint.go +++ b/command/agent/health_endpoint.go @@ -6,16 +6,11 @@ import ( "strings" ) -func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC - args := structs.ChecksInStateRequest{ - Datacenter: s.agent.config.Datacenter, - } - - // Check for other DC - params := req.URL.Query() - if other := params.Get("dc"); other != "" { - args.Datacenter = other + args := structs.ChecksInStateRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } // Pull out the service name @@ -23,27 +18,22 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req if args.State == "" { resp.WriteHeader(400) resp.Write([]byte("Missing check state")) - return nil, nil + return 0, nil, nil } // Make the RPC request - var out structs.HealthChecks + var out structs.IndexedHealthChecks if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil { - return nil, err + return 0, nil, err } - return out, nil + return out.Index, out.HealthChecks, nil } -func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC - args := structs.NodeSpecificRequest{ - Datacenter: s.agent.config.Datacenter, - } - - // Check for other DC - params := req.URL.Query() - if other := params.Get("dc"); other != "" { - args.Datacenter = other + args := structs.NodeSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } // Pull out the service name @@ -51,27 +41,22 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques if args.Node == "" { resp.WriteHeader(400) resp.Write([]byte("Missing node name")) - return nil, nil + return 0, nil, nil } // Make the RPC request - var out structs.HealthChecks + var out structs.IndexedHealthChecks if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil { - return nil, err + return 0, nil, err } - return out, nil + return out.Index, out.HealthChecks, nil } -func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC - args := structs.ServiceSpecificRequest{ - Datacenter: s.agent.config.Datacenter, - } - - // Check for other DC - params := req.URL.Query() - if other := params.Get("dc"); other != "" { - args.Datacenter = other + args := structs.ServiceSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } // Pull out the service name @@ -79,30 +64,26 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return nil, nil + return 0, nil, nil } // Make the RPC request - var out structs.HealthChecks + var out structs.IndexedHealthChecks if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil { - return nil, err + return 0, nil, err } - return out, nil + return out.Index, out.HealthChecks, nil } -func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC - args := structs.ServiceSpecificRequest{ - Datacenter: s.agent.config.Datacenter, - } - - // Check for other DC - params := req.URL.Query() - if other := params.Get("dc"); other != "" { - args.Datacenter = other + args := structs.ServiceSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return 0, nil, nil } // Check for a tag + params := req.URL.Query() if _, ok := params["tag"]; ok { args.ServiceTag = params.Get("tag") args.TagFilter = true @@ -113,13 +94,13 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return nil, nil + return 0, nil, nil } // Make the RPC request - var out structs.CheckServiceNodes + var out structs.IndexedCheckServiceNodes if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { - return nil, err + return 0, nil, err } - return out, nil + return out.Index, out.Nodes, nil } diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index a8dc0b611c..1bc9e6a04b 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -23,11 +23,15 @@ func TestHealthChecksInState(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.HealthChecksInState(nil, req) + idx, obj, err := srv.HealthChecksInState(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + // Should be 1 health check for the server nodes := obj.(structs.HealthChecks) if len(nodes) != 1 { @@ -50,11 +54,15 @@ func TestHealthNodeChecks(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.HealthNodeChecks(nil, req) + idx, obj, err := srv.HealthNodeChecks(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + // Should be 1 health check for the server nodes := obj.(structs.HealthChecks) if len(nodes) != 1 { @@ -92,11 +100,15 @@ func TestHealthServiceChecks(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.HealthServiceChecks(nil, req) + idx, obj, err := srv.HealthServiceChecks(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + // Should be 1 health check for consul nodes := obj.(structs.HealthChecks) if len(nodes) != 1 { @@ -118,11 +130,15 @@ func TestHealthServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - obj, err := srv.HealthServiceNodes(nil, req) + idx, obj, err := srv.HealthServiceNodes(nil, req) if err != nil { t.Fatalf("err: %v", err) } + if idx == 0 { + t.Fatalf("bad: %v", idx) + } + // Should be 1 health check for consul nodes := obj.(structs.CheckServiceNodes) if len(nodes) != 1 { diff --git a/command/agent/http.go b/command/agent/http.go index afc4829682..921050b536 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -3,10 +3,12 @@ package agent import ( "bytes" "encoding/json" + "github.com/hashicorp/consul/consul/structs" "io" "log" "net" "net/http" + "strconv" "time" ) @@ -60,15 +62,15 @@ func (s *HTTPServer) registerHandlers() { s.mux.HandleFunc("/v1/catalog/register", s.wrap(s.CatalogRegister)) s.mux.HandleFunc("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) - s.mux.HandleFunc("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) - s.mux.HandleFunc("/v1/catalog/services", s.wrap(s.CatalogServices)) - s.mux.HandleFunc("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) - s.mux.HandleFunc("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) + s.mux.HandleFunc("/v1/catalog/nodes", s.wrapQuery(s.CatalogNodes)) + s.mux.HandleFunc("/v1/catalog/services", s.wrapQuery(s.CatalogServices)) + s.mux.HandleFunc("/v1/catalog/service/", s.wrapQuery(s.CatalogServiceNodes)) + s.mux.HandleFunc("/v1/catalog/node/", s.wrapQuery(s.CatalogNodeServices)) - s.mux.HandleFunc("/v1/health/node/", s.wrap(s.HealthNodeChecks)) - s.mux.HandleFunc("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) - s.mux.HandleFunc("/v1/health/state/", s.wrap(s.HealthChecksInState)) - s.mux.HandleFunc("/v1/health/service/", s.wrap(s.HealthServiceNodes)) + s.mux.HandleFunc("/v1/health/node/", s.wrapQuery(s.HealthNodeChecks)) + s.mux.HandleFunc("/v1/health/checks/", s.wrapQuery(s.HealthServiceChecks)) + s.mux.HandleFunc("/v1/health/state/", s.wrapQuery(s.HealthChecksInState)) + s.mux.HandleFunc("/v1/health/service/", s.wrapQuery(s.HealthServiceNodes)) s.mux.HandleFunc("/v1/agent/services", s.wrap(s.AgentServices)) s.mux.HandleFunc("/v1/agent/checks", s.wrap(s.AgentChecks)) @@ -118,6 +120,16 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque return f } +// wrapQuery is used to wrap query functions to make them more convenient +func (s *HTTPServer) wrapQuery(handler func(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { + f := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + idx, obj, err := handler(resp, req) + setIndex(resp, idx) + return obj, err + } + return s.wrap(f) +} + // Renders a simple index page func (s *HTTPServer) Index(resp http.ResponseWriter, req *http.Request) { if req.URL.Path == "/" { @@ -132,3 +144,49 @@ func decodeBody(req *http.Request, out interface{}) error { dec := json.NewDecoder(req.Body) return dec.Decode(out) } + +// setIndex is used to set the index response header +func setIndex(resp http.ResponseWriter, index uint64) { + resp.Header().Add("X-Consul-Index", strconv.FormatUint(index, 10)) +} + +// parseWait is used to parse the ?wait and ?index query params +// Returns true on error +func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQuery) bool { + query := req.URL.Query() + if wait := query.Get("wait"); wait != "" { + dur, err := time.ParseDuration(wait) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte("Invalid wait time")) + return true + } + b.MaxQueryTime = dur + } + if idx := query.Get("index"); idx != "" { + index, err := strconv.ParseUint(idx, 10, 64) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte("Invalid index")) + return true + } + b.MinQueryIndex = index + } + return false +} + +// parseDC is used to parse the ?dc query param +func (s *HTTPServer) parseDC(req *http.Request, dc *string) { + if other := req.URL.Query().Get("dc"); other != "" { + *dc = other + } else if *dc == "" { + *dc = s.agent.config.Datacenter + } +} + +// parse is a convenience method for endpoints that need +// to use both parseWait and parseDC. +func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.BlockingQuery) bool { + s.parseDC(req, dc) + return parseWait(resp, req, b) +} diff --git a/command/agent/local.go b/command/agent/local.go index 06c3917977..f4d3b9a96f 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -216,14 +216,16 @@ func (l *localState) setSyncState() error { Datacenter: l.config.Datacenter, Node: l.config.NodeName, } - var services structs.NodeServices - var checks structs.HealthChecks - if e := l.iface.RPC("Catalog.NodeServices", &req, &services); e != nil { + var out1 structs.IndexedNodeServices + var out2 structs.IndexedHealthChecks + if e := l.iface.RPC("Catalog.NodeServices", &req, &out1); e != nil { return e } - if err := l.iface.RPC("Health.NodeChecks", &req, &checks); err != nil { + if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil { return err } + services := out1.NodeServices + checks := out2.HealthChecks l.Lock() defer l.Unlock() diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 4004128086..be6bdbbebb 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -85,18 +85,18 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Datacenter: "dc1", Node: agent.config.NodeName, } - var services structs.NodeServices + var services structs.IndexedNodeServices if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { t.Fatalf("err: %v", err) } // We should have 4 services (consul included) - if len(services.Services) != 4 { - t.Fatalf("bad: %v", services.Services) + if len(services.NodeServices.Services) != 4 { + t.Fatalf("bad: %v", services.NodeServices.Services) } // All the services should match - for id, serv := range services.Services { + for id, serv := range services.NodeServices.Services { switch id { case "mysql": if !reflect.DeepEqual(serv, srv1) { @@ -208,18 +208,18 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Datacenter: "dc1", Node: agent.config.NodeName, } - var checks structs.HealthChecks + var checks structs.IndexedHealthChecks if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { t.Fatalf("err: %v", err) } // We should have 4 services (serf included) - if len(checks) != 4 { + if len(checks.HealthChecks) != 4 { t.Fatalf("bad: %v", checks) } // All the checks should match - for _, chk := range checks { + for _, chk := range checks.HealthChecks { switch chk.CheckID { case "mysql": if !reflect.DeepEqual(chk, chk1) {