Merge pull request #68 from hashicorp/f-consistency

Adding support for "stale" and "consistent" read modes
pull/76/head
Armon Dadgar 2014-04-21 15:55:31 -07:00
commit 57a45ead6b
16 changed files with 745 additions and 198 deletions

View File

@ -57,39 +57,41 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ
return out, nil return out, nil
} }
func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Setup the request // Setup the request
args := structs.DCSpecificRequest{} args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
var out structs.IndexedNodes var out structs.IndexedNodes
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil { if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.Nodes, nil return out.Nodes, nil
} }
func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC // Set default DC
args := structs.DCSpecificRequest{} args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
var out structs.IndexedServices var out structs.IndexedServices
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.Services, nil return out.Services, nil
} }
func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC // Set default DC
args := structs.ServiceSpecificRequest{} args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
// Check for a tag // Check for a tag
@ -104,22 +106,23 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
if args.ServiceName == "" { if args.ServiceName == "" {
resp.WriteHeader(400) resp.WriteHeader(400)
resp.Write([]byte("Missing service name")) resp.Write([]byte("Missing service name"))
return 0, nil, nil return nil, nil
} }
// Make the RPC request // Make the RPC request
var out structs.IndexedServiceNodes var out structs.IndexedServiceNodes
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.ServiceNodes, nil return out.ServiceNodes, nil
} }
func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default Datacenter // Set default Datacenter
args := structs.NodeSpecificRequest{} args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
// Pull out the node name // Pull out the node name
@ -127,13 +130,14 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req
if args.Node == "" { if args.Node == "" {
resp.WriteHeader(400) resp.WriteHeader(400)
resp.Write([]byte("Missing node name")) resp.Write([]byte("Missing node name"))
return 0, nil, nil return nil, nil
} }
// Make the RPC request // Make the RPC request
var out structs.IndexedNodeServices var out structs.IndexedNodeServices
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.NodeServices, nil return out.NodeServices, nil
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"net/http" "net/http"
"net/http/httptest"
"os" "os"
"testing" "testing"
"time" "time"
@ -115,14 +116,14 @@ func TestCatalogNodes(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.CatalogNodes(nil, req) resp := httptest.NewRecorder()
obj, err := srv.CatalogNodes(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if idx == 0 { // Verify an index is set
t.Fatalf("bad: %v", idx) assertIndex(t, resp)
}
nodes := obj.(structs.Nodes) nodes := obj.(structs.Nodes)
if len(nodes) != 2 { if len(nodes) != 2 {
@ -170,7 +171,8 @@ func TestCatalogNodes_Blocking(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.CatalogNodes(nil, req) resp := httptest.NewRecorder()
obj, err := srv.CatalogNodes(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -180,7 +182,7 @@ func TestCatalogNodes_Blocking(t *testing.T) {
t.Fatalf("too fast") t.Fatalf("too fast")
} }
if idx <= out.Index { if idx := getIndex(t, resp); idx <= out.Index {
t.Fatalf("bad: %v", idx) t.Fatalf("bad: %v", idx)
} }
@ -218,14 +220,13 @@ func TestCatalogServices(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.CatalogServices(nil, req) resp := httptest.NewRecorder()
obj, err := srv.CatalogServices(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if idx == 0 { assertIndex(t, resp)
t.Fatalf("bad: %v", idx)
}
services := obj.(structs.Services) services := obj.(structs.Services)
if len(services) != 2 { if len(services) != 2 {
@ -262,14 +263,13 @@ func TestCatalogServiceNodes(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.CatalogServiceNodes(nil, req) resp := httptest.NewRecorder()
obj, err := srv.CatalogServiceNodes(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if idx == 0 { assertIndex(t, resp)
t.Fatalf("bad: %v", idx)
}
nodes := obj.(structs.ServiceNodes) nodes := obj.(structs.ServiceNodes)
if len(nodes) != 1 { if len(nodes) != 1 {
@ -306,14 +306,12 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.CatalogNodeServices(nil, req) resp := httptest.NewRecorder()
obj, err := srv.CatalogNodeServices(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
assertIndex(t, resp)
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
services := obj.(*structs.NodeServices) services := obj.(*structs.NodeServices)
if len(services.Services) != 1 { if len(services.Services) != 1 {

View File

@ -6,11 +6,11 @@ import (
"strings" "strings"
) )
func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC // Set default DC
args := structs.ChecksInStateRequest{} args := structs.ChecksInStateRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
// Pull out the service name // Pull out the service name
@ -18,22 +18,23 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
if args.State == "" { if args.State == "" {
resp.WriteHeader(400) resp.WriteHeader(400)
resp.Write([]byte("Missing check state")) resp.Write([]byte("Missing check state"))
return 0, nil, nil return nil, nil
} }
// Make the RPC request // Make the RPC request
var out structs.IndexedHealthChecks var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil { if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.HealthChecks, nil return out.HealthChecks, nil
} }
func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC // Set default DC
args := structs.NodeSpecificRequest{} args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
// Pull out the service name // Pull out the service name
@ -41,22 +42,23 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques
if args.Node == "" { if args.Node == "" {
resp.WriteHeader(400) resp.WriteHeader(400)
resp.Write([]byte("Missing node name")) resp.Write([]byte("Missing node name"))
return 0, nil, nil return nil, nil
} }
// Make the RPC request // Make the RPC request
var out structs.IndexedHealthChecks var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil { if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.HealthChecks, nil return out.HealthChecks, nil
} }
func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC // Set default DC
args := structs.ServiceSpecificRequest{} args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
// Pull out the service name // Pull out the service name
@ -64,22 +66,23 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
if args.ServiceName == "" { if args.ServiceName == "" {
resp.WriteHeader(400) resp.WriteHeader(400)
resp.Write([]byte("Missing service name")) resp.Write([]byte("Missing service name"))
return 0, nil, nil return nil, nil
} }
// Make the RPC request // Make the RPC request
var out structs.IndexedHealthChecks var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil { if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.HealthChecks, nil return out.HealthChecks, nil
} }
func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC // Set default DC
args := structs.ServiceSpecificRequest{} args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return 0, nil, nil return nil, nil
} }
// Check for a tag // Check for a tag
@ -94,13 +97,14 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
if args.ServiceName == "" { if args.ServiceName == "" {
resp.WriteHeader(400) resp.WriteHeader(400)
resp.Write([]byte("Missing service name")) resp.Write([]byte("Missing service name"))
return 0, nil, nil return nil, nil
} }
// Make the RPC request // Make the RPC request
var out structs.IndexedCheckServiceNodes var out structs.IndexedCheckServiceNodes
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return 0, nil, err return nil, err
} }
return out.Index, out.Nodes, nil return out.Nodes, nil
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"net/http" "net/http"
"net/http/httptest"
"os" "os"
"testing" "testing"
"time" "time"
@ -23,14 +24,12 @@ func TestHealthChecksInState(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.HealthChecksInState(nil, req) resp := httptest.NewRecorder()
obj, err := srv.HealthChecksInState(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
assertIndex(t, resp)
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for the server // Should be 1 health check for the server
nodes := obj.(structs.HealthChecks) nodes := obj.(structs.HealthChecks)
@ -54,14 +53,12 @@ func TestHealthNodeChecks(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.HealthNodeChecks(nil, req) resp := httptest.NewRecorder()
obj, err := srv.HealthNodeChecks(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
assertIndex(t, resp)
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for the server // Should be 1 health check for the server
nodes := obj.(structs.HealthChecks) nodes := obj.(structs.HealthChecks)
@ -100,14 +97,12 @@ func TestHealthServiceChecks(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.HealthServiceChecks(nil, req) resp := httptest.NewRecorder()
obj, err := srv.HealthServiceChecks(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
assertIndex(t, resp)
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for consul // Should be 1 health check for consul
nodes := obj.(structs.HealthChecks) nodes := obj.(structs.HealthChecks)
@ -130,14 +125,12 @@ func TestHealthServiceNodes(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
idx, obj, err := srv.HealthServiceNodes(nil, req) resp := httptest.NewRecorder()
obj, err := srv.HealthServiceNodes(resp, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
assertIndex(t, resp)
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for consul // Should be 1 health check for consul
nodes := obj.(structs.CheckServiceNodes) nodes := obj.(structs.CheckServiceNodes)

View File

@ -64,15 +64,15 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/catalog/register", s.wrap(s.CatalogRegister)) s.mux.HandleFunc("/v1/catalog/register", s.wrap(s.CatalogRegister))
s.mux.HandleFunc("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) s.mux.HandleFunc("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
s.mux.HandleFunc("/v1/catalog/nodes", s.wrapQuery(s.CatalogNodes)) s.mux.HandleFunc("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
s.mux.HandleFunc("/v1/catalog/services", s.wrapQuery(s.CatalogServices)) s.mux.HandleFunc("/v1/catalog/services", s.wrap(s.CatalogServices))
s.mux.HandleFunc("/v1/catalog/service/", s.wrapQuery(s.CatalogServiceNodes)) s.mux.HandleFunc("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes))
s.mux.HandleFunc("/v1/catalog/node/", s.wrapQuery(s.CatalogNodeServices)) s.mux.HandleFunc("/v1/catalog/node/", s.wrap(s.CatalogNodeServices))
s.mux.HandleFunc("/v1/health/node/", s.wrapQuery(s.HealthNodeChecks)) s.mux.HandleFunc("/v1/health/node/", s.wrap(s.HealthNodeChecks))
s.mux.HandleFunc("/v1/health/checks/", s.wrapQuery(s.HealthServiceChecks)) s.mux.HandleFunc("/v1/health/checks/", s.wrap(s.HealthServiceChecks))
s.mux.HandleFunc("/v1/health/state/", s.wrapQuery(s.HealthChecksInState)) s.mux.HandleFunc("/v1/health/state/", s.wrap(s.HealthChecksInState))
s.mux.HandleFunc("/v1/health/service/", s.wrapQuery(s.HealthServiceNodes)) s.mux.HandleFunc("/v1/health/service/", s.wrap(s.HealthServiceNodes))
s.mux.HandleFunc("/v1/agent/services", s.wrap(s.AgentServices)) s.mux.HandleFunc("/v1/agent/services", s.wrap(s.AgentServices))
s.mux.HandleFunc("/v1/agent/checks", s.wrap(s.AgentChecks)) s.mux.HandleFunc("/v1/agent/checks", s.wrap(s.AgentChecks))
@ -132,16 +132,6 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return f return f
} }
// wrapQuery is used to wrap query functions to make them more convenient
func (s *HTTPServer) wrapQuery(handler func(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error)) func(resp http.ResponseWriter, req *http.Request) {
f := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
idx, obj, err := handler(resp, req)
setIndex(resp, idx)
return obj, err
}
return s.wrap(f)
}
// Renders a simple index page // Renders a simple index page
func (s *HTTPServer) Index(resp http.ResponseWriter, req *http.Request) { func (s *HTTPServer) Index(resp http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/" { if req.URL.Path == "/" {
@ -173,9 +163,31 @@ func setIndex(resp http.ResponseWriter, index uint64) {
resp.Header().Add("X-Consul-Index", strconv.FormatUint(index, 10)) resp.Header().Add("X-Consul-Index", strconv.FormatUint(index, 10))
} }
// setKnownLeader is used to set the known leader header
func setKnownLeader(resp http.ResponseWriter, known bool) {
s := "true"
if !known {
s = "false"
}
resp.Header().Add("X-Consul-KnownLeader", s)
}
// setLastContact is used to set the last contact header
func setLastContact(resp http.ResponseWriter, last time.Duration) {
lastMsec := uint64(last / time.Millisecond)
resp.Header().Add("X-Consul-LastContact", strconv.FormatUint(lastMsec, 10))
}
// setMeta is used to set the query response meta data
func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
setIndex(resp, m.Index)
setLastContact(resp, m.LastContact)
setKnownLeader(resp, m.KnownLeader)
}
// parseWait is used to parse the ?wait and ?index query params // parseWait is used to parse the ?wait and ?index query params
// Returns true on error // Returns true on error
func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQuery) bool { func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
query := req.URL.Query() query := req.URL.Query()
if wait := query.Get("wait"); wait != "" { if wait := query.Get("wait"); wait != "" {
dur, err := time.ParseDuration(wait) dur, err := time.ParseDuration(wait)
@ -198,6 +210,24 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQ
return false return false
} }
// parseConsistency is used to parse the ?stale and ?consistent query params.
// Returns true on error
func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
query := req.URL.Query()
if _, ok := query["stale"]; ok {
b.AllowStale = true
}
if _, ok := query["consistent"]; ok {
b.RequireConsistent = true
}
if b.AllowStale && b.RequireConsistent {
resp.WriteHeader(400)
resp.Write([]byte("Cannot specify ?stale with ?consistent, conflicting semantics."))
return true
}
return false
}
// parseDC is used to parse the ?dc query param // parseDC is used to parse the ?dc query param
func (s *HTTPServer) parseDC(req *http.Request, dc *string) { func (s *HTTPServer) parseDC(req *http.Request, dc *string) {
if other := req.URL.Query().Get("dc"); other != "" { if other := req.URL.Query().Get("dc"); other != "" {
@ -209,7 +239,10 @@ func (s *HTTPServer) parseDC(req *http.Request, dc *string) {
// parse is a convenience method for endpoints that need // parse is a convenience method for endpoints that need
// to use both parseWait and parseDC. // to use both parseWait and parseDC.
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.BlockingQuery) bool { func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {
s.parseDC(req, dc) s.parseDC(req, dc)
if parseConsistency(resp, req, b) {
return true
}
return parseWait(resp, req, b) return parseWait(resp, req, b)
} }

View File

@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"strconv"
"testing" "testing"
"time" "time"
) )
@ -40,6 +41,52 @@ func TestSetIndex(t *testing.T) {
} }
} }
func TestSetKnownLeader(t *testing.T) {
resp := httptest.NewRecorder()
setKnownLeader(resp, true)
header := resp.Header().Get("X-Consul-KnownLeader")
if header != "true" {
t.Fatalf("Bad: %v", header)
}
resp = httptest.NewRecorder()
setKnownLeader(resp, false)
header = resp.Header().Get("X-Consul-KnownLeader")
if header != "false" {
t.Fatalf("Bad: %v", header)
}
}
func TestSetLastContact(t *testing.T) {
resp := httptest.NewRecorder()
setLastContact(resp, 123456*time.Microsecond)
header := resp.Header().Get("X-Consul-LastContact")
if header != "123" {
t.Fatalf("Bad: %v", header)
}
}
func TestSetMeta(t *testing.T) {
meta := structs.QueryMeta{
Index: 1000,
KnownLeader: true,
LastContact: 123456 * time.Microsecond,
}
resp := httptest.NewRecorder()
setMeta(resp, &meta)
header := resp.Header().Get("X-Consul-Index")
if header != "1000" {
t.Fatalf("Bad: %v", header)
}
header = resp.Header().Get("X-Consul-KnownLeader")
if header != "true" {
t.Fatalf("Bad: %v", header)
}
header = resp.Header().Get("X-Consul-LastContact")
if header != "123" {
t.Fatalf("Bad: %v", header)
}
}
func TestContentTypeIsJSON(t *testing.T) { func TestContentTypeIsJSON(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
@ -69,7 +116,7 @@ func TestContentTypeIsJSON(t *testing.T) {
func TestParseWait(t *testing.T) { func TestParseWait(t *testing.T) {
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
var b structs.BlockingQuery var b structs.QueryOptions
req, err := http.NewRequest("GET", req, err := http.NewRequest("GET",
"/v1/catalog/nodes?wait=60s&index=1000", nil) "/v1/catalog/nodes?wait=60s&index=1000", nil)
@ -91,7 +138,7 @@ func TestParseWait(t *testing.T) {
func TestParseWait_InvalidTime(t *testing.T) { func TestParseWait_InvalidTime(t *testing.T) {
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
var b structs.BlockingQuery var b structs.QueryOptions
req, err := http.NewRequest("GET", req, err := http.NewRequest("GET",
"/v1/catalog/nodes?wait=60foo&index=1000", nil) "/v1/catalog/nodes?wait=60foo&index=1000", nil)
@ -110,7 +157,7 @@ func TestParseWait_InvalidTime(t *testing.T) {
func TestParseWait_InvalidIndex(t *testing.T) { func TestParseWait_InvalidIndex(t *testing.T) {
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
var b structs.BlockingQuery var b structs.QueryOptions
req, err := http.NewRequest("GET", req, err := http.NewRequest("GET",
"/v1/catalog/nodes?wait=60s&index=foo", nil) "/v1/catalog/nodes?wait=60s&index=foo", nil)
@ -126,3 +173,83 @@ func TestParseWait_InvalidIndex(t *testing.T) {
t.Fatalf("bad code: %v", resp.Code) t.Fatalf("bad code: %v", resp.Code)
} }
} }
func TestParseConsistency(t *testing.T) {
resp := httptest.NewRecorder()
var b structs.QueryOptions
req, err := http.NewRequest("GET",
"/v1/catalog/nodes?stale", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d := parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}
if !b.AllowStale {
t.Fatalf("Bad: %v", b)
}
if b.RequireConsistent {
t.Fatalf("Bad: %v", b)
}
b = structs.QueryOptions{}
req, err = http.NewRequest("GET",
"/v1/catalog/nodes?consistent", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d := parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}
if b.AllowStale {
t.Fatalf("Bad: %v", b)
}
if !b.RequireConsistent {
t.Fatalf("Bad: %v", b)
}
}
func TestParseConsistency_Invalid(t *testing.T) {
resp := httptest.NewRecorder()
var b structs.QueryOptions
req, err := http.NewRequest("GET",
"/v1/catalog/nodes?stale&consistent", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d := parseConsistency(resp, req, &b); !d {
t.Fatalf("expected done")
}
if resp.Code != 400 {
t.Fatalf("bad code: %v", resp.Code)
}
}
// assertIndex tests that X-Consul-Index is set and non-zero
func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) {
header := resp.Header().Get("X-Consul-Index")
if header == "" || header == "0" {
t.Fatalf("Bad: %v", header)
}
}
// getIndex parses X-Consul-Index
func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 {
header := resp.Header().Get("X-Consul-Index")
if header == "" {
t.Fatalf("Bad: %v", header)
}
val, err := strconv.Atoi(header)
if err != nil {
t.Fatalf("Bad: %v", header)
}
return uint64(val)
}

View File

@ -12,7 +12,7 @@ import (
func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC // Set default DC
args := structs.KeyRequest{} args := structs.KeyRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil return nil, nil
} }
@ -47,10 +47,10 @@ func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *s
// Make the RPC // Make the RPC
var out structs.IndexedDirEntries var out structs.IndexedDirEntries
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC(method, &args, &out); err != nil { if err := s.agent.RPC(method, &args, &out); err != nil {
return nil, err return nil, err
} }
setIndex(resp, out.Index)
// Check if we get a not found // Check if we get a not found
if len(out.Entries) == 0 { if len(out.Entries) == 0 {

View File

@ -57,11 +57,7 @@ func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
assertIndex(t, resp)
header := resp.Header().Get("X-Consul-Index")
if header == "" {
t.Fatalf("Bad: %v", header)
}
res, ok := obj.(structs.DirEntries) res, ok := obj.(structs.DirEntries)
if !ok { if !ok {
@ -138,11 +134,7 @@ func TestKVSEndpoint_Recurse(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
assertIndex(t, resp)
header := resp.Header().Get("X-Consul-Index")
if header == "" {
t.Fatalf("Bad: %v", header)
}
res, ok := obj.(structs.DirEntries) res, ok := obj.(structs.DirEntries)
if !ok { if !ok {

View File

@ -14,7 +14,7 @@ type Catalog struct {
// Register is used register that a node is providing a given service. // Register is used register that a node is providing a given service.
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.Register", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "catalog", "register"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "catalog", "register"}, time.Now())
@ -55,7 +55,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
// Deregister is used to remove a service registration for a given node. // Deregister is used to remove a service registration for a given node.
func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error {
if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.Deregister", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "catalog", "deregister"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "catalog", "deregister"}, time.Now())
@ -91,39 +91,41 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
// ListNodes is used to query the nodes in a DC // ListNodes is used to query the nodes in a DC
func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error {
if done, err := c.srv.forward("Catalog.ListNodes", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.ListNodes", args, args, reply); done {
return err return err
} }
// Get the local state // Get the local state
state := c.srv.fsm.State() state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.BlockingQuery, return c.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Nodes"), state.QueryTables("Nodes"),
func() (uint64, error) { func() error {
reply.Index, reply.Nodes = state.Nodes() reply.Index, reply.Nodes = state.Nodes()
return reply.Index, nil return nil
}) })
} }
// ListServices is used to query the services in a DC // ListServices is used to query the services in a DC
func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error {
if done, err := c.srv.forward("Catalog.ListServices", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done {
return err return err
} }
// Get the current nodes // Get the current nodes
state := c.srv.fsm.State() state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.BlockingQuery, return c.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Services"), state.QueryTables("Services"),
func() (uint64, error) { func() error {
reply.Index, reply.Services = state.Services() reply.Index, reply.Services = state.Services()
return reply.Index, nil return nil
}) })
} }
// ServiceNodes returns all the nodes registered as part of a service // ServiceNodes returns all the nodes registered as part of a service
func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error {
if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.ServiceNodes", args, args, reply); done {
return err return err
} }
@ -134,15 +136,16 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
// Get the nodes // Get the nodes
state := c.srv.fsm.State() state := c.srv.fsm.State()
err := c.srv.blockingRPC(&args.BlockingQuery, err := c.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ServiceNodes"), state.QueryTables("ServiceNodes"),
func() (uint64, error) { func() error {
if args.TagFilter { if args.TagFilter {
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else { } else {
reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName) reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName)
} }
return reply.Index, nil return nil
}) })
// Provide some metrics // Provide some metrics
@ -160,7 +163,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
// NodeServices returns all the services registered as part of a node // NodeServices returns all the services registered as part of a node
func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error {
if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.NodeServices", args, args, reply); done {
return err return err
} }
@ -171,10 +174,11 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
// Get the node services // Get the node services
state := c.srv.fsm.State() state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.BlockingQuery, return c.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeServices"), state.QueryTables("NodeServices"),
func() (uint64, error) { func() error {
reply.Index, reply.NodeServices = state.NodeServices(args.Node) reply.Index, reply.NodeServices = state.NodeServices(args.Node)
return reply.Index, nil return nil
}) })
} }

View File

@ -6,6 +6,7 @@ import (
"net/rpc" "net/rpc"
"os" "os"
"sort" "sort"
"strings"
"testing" "testing"
"time" "time"
) )
@ -232,6 +233,168 @@ func TestCatalogListNodes(t *testing.T) {
} }
} }
func TestCatalogListNodes_StaleRaad(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client1 := rpcClient(t, s1)
defer client1.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
client2 := rpcClient(t, s2)
defer client2.Close()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for a leader
time.Sleep(100 * time.Millisecond)
// Use the follower as the client
var client *rpc.Client
if !s1.IsLeader() {
client = client1
// Inject fake data on the follower!
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
} else {
client = client2
// Inject fake data on the follower!
s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{AllowStale: true},
}
var out structs.IndexedNodes
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
found := false
for _, n := range out.Nodes {
if n.Node == "foo" {
found = true
}
}
if !found {
t.Fatalf("failed to find foo")
}
if out.QueryMeta.LastContact == 0 {
t.Fatalf("should have a last contact time")
}
if !out.QueryMeta.KnownLeader {
t.Fatalf("should have known leader")
}
}
func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client1 := rpcClient(t, s1)
defer client1.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
client2 := rpcClient(t, s2)
defer client2.Close()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for a leader
time.Sleep(100 * time.Millisecond)
// Use the leader as the client, kill the follower
var client *rpc.Client
if s1.IsLeader() {
client = client1
s2.Shutdown()
} else {
client = client2
s1.Shutdown()
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{RequireConsistent: true},
}
var out structs.IndexedNodes
if err := client.Call("Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") {
t.Fatalf("err: %v", err)
}
if out.QueryMeta.LastContact != 0 {
t.Fatalf("should not have a last contact time")
}
if out.QueryMeta.KnownLeader {
t.Fatalf("should have no known leader")
}
}
func TestCatalogListNodes_ConsistentRead(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client1 := rpcClient(t, s1)
defer client1.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
client2 := rpcClient(t, s2)
defer client2.Close()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for a leader
time.Sleep(100 * time.Millisecond)
// Use the leader as the client, kill the follower
var client *rpc.Client
if s1.IsLeader() {
client = client1
} else {
client = client2
}
args := structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{RequireConsistent: true},
}
var out structs.IndexedNodes
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out.QueryMeta.LastContact != 0 {
t.Fatalf("should not have a last contact time")
}
if !out.QueryMeta.KnownLeader {
t.Fatalf("should have known leader")
}
}
func BenchmarkCatalogListNodes(t *testing.B) { func BenchmarkCatalogListNodes(t *testing.B) {
dir1, s1 := testServer(nil) dir1, s1 := testServer(nil)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -394,6 +557,39 @@ func TestCatalogListServices_Timeout(t *testing.T) {
} }
} }
func TestCatalogListServices_Stale(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
args.AllowStale = true
var out structs.IndexedServices
// Inject a fake service
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000})
// Run the query, do not wait for leader!
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Should find the service
if len(out.Services) != 1 {
t.Fatalf("bad: %v", out)
}
// Should not have a leader! Stale read
if out.KnownLeader {
t.Fatalf("bad: %v", out)
}
}
func TestCatalogListServiceNodes(t *testing.T) { func TestCatalogListServiceNodes(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)

View File

@ -14,34 +14,36 @@ type Health struct {
// ChecksInState is used to get all the checks in a given state // ChecksInState is used to get all the checks in a given state
func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
reply *structs.IndexedHealthChecks) error { reply *structs.IndexedHealthChecks) error {
if done, err := h.srv.forward("Health.ChecksInState", args.Datacenter, args, reply); done { if done, err := h.srv.forward("Health.ChecksInState", args, args, reply); done {
return err return err
} }
// Get the state specific checks // Get the state specific checks
state := h.srv.fsm.State() state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.BlockingQuery, return h.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ChecksInState"), state.QueryTables("ChecksInState"),
func() (uint64, error) { func() error {
reply.Index, reply.HealthChecks = state.ChecksInState(args.State) reply.Index, reply.HealthChecks = state.ChecksInState(args.State)
return reply.Index, nil return nil
}) })
} }
// NodeChecks is used to get all the checks for a node // NodeChecks is used to get all the checks for a node
func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
reply *structs.IndexedHealthChecks) error { reply *structs.IndexedHealthChecks) error {
if done, err := h.srv.forward("Health.NodeChecks", args.Datacenter, args, reply); done { if done, err := h.srv.forward("Health.NodeChecks", args, args, reply); done {
return err return err
} }
// Get the node checks // Get the node checks
state := h.srv.fsm.State() state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.BlockingQuery, return h.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeChecks"), state.QueryTables("NodeChecks"),
func() (uint64, error) { func() error {
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) reply.Index, reply.HealthChecks = state.NodeChecks(args.Node)
return reply.Index, nil return nil
}) })
} }
@ -54,23 +56,24 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
} }
// Potentially forward // Potentially forward
if done, err := h.srv.forward("Health.ServiceChecks", args.Datacenter, args, reply); done { if done, err := h.srv.forward("Health.ServiceChecks", args, args, reply); done {
return err return err
} }
// Get the service checks // Get the service checks
state := h.srv.fsm.State() state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.BlockingQuery, return h.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ServiceChecks"), state.QueryTables("ServiceChecks"),
func() (uint64, error) { func() error {
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName)
return reply.Index, nil return nil
}) })
} }
// ServiceNodes returns all the nodes registered as part of a service including health info // ServiceNodes returns all the nodes registered as part of a service including health info
func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error {
if done, err := h.srv.forward("Health.ServiceNodes", args.Datacenter, args, reply); done { if done, err := h.srv.forward("Health.ServiceNodes", args, args, reply); done {
return err return err
} }
@ -81,15 +84,16 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
// Get the nodes // Get the nodes
state := h.srv.fsm.State() state := h.srv.fsm.State()
err := h.srv.blockingRPC(&args.BlockingQuery, err := h.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("CheckServiceNodes"), state.QueryTables("CheckServiceNodes"),
func() (uint64, error) { func() error {
if args.TagFilter { if args.TagFilter {
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else { } else {
reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName) reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName)
} }
return reply.Index, nil return nil
}) })
// Provide some metrics // Provide some metrics

View File

@ -15,7 +15,7 @@ type KVS struct {
// Apply is used to apply a KVS request to the data store. This should // Apply is used to apply a KVS request to the data store. This should
// only be used for operations that modify the data // only be used for operations that modify the data
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done { if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
@ -44,18 +44,19 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
// Get is used to lookup a single key // Get is used to lookup a single key
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done { if done, err := k.srv.forward("KVS.Get", args, args, reply); done {
return err return err
} }
// Get the local state // Get the local state
state := k.srv.fsm.State() state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.BlockingQuery, return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSGet"), state.QueryTables("KVSGet"),
func() (uint64, error) { func() error {
index, ent, err := state.KVSGet(args.Key) index, ent, err := state.KVSGet(args.Key)
if err != nil { if err != nil {
return 0, err return err
} }
if ent == nil { if ent == nil {
// Must provide non-zero index to prevent blocking // Must provide non-zero index to prevent blocking
@ -70,24 +71,25 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
reply.Index = ent.ModifyIndex reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent} reply.Entries = structs.DirEntries{ent}
} }
return reply.Index, nil return nil
}) })
} }
// List is used to list all keys with a given prefix // List is used to list all keys with a given prefix
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done { if done, err := k.srv.forward("KVS.List", args, args, reply); done {
return err return err
} }
// Get the local state // Get the local state
state := k.srv.fsm.State() state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.BlockingQuery, return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSList"), state.QueryTables("KVSList"),
func() (uint64, error) { func() error {
index, ent, err := state.KVSList(args.Key) index, ent, err := state.KVSList(args.Key)
if err != nil { if err != nil {
return 0, err return err
} }
if len(ent) == 0 { if len(ent) == 0 {
// Must provide non-zero index to prevent blocking // Must provide non-zero index to prevent blocking
@ -110,6 +112,6 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Index = maxIndex reply.Index = maxIndex
reply.Entries = ent reply.Entries = ent
} }
return reply.Index, nil return nil
}) })
} }

View File

@ -134,13 +134,19 @@ func (s *Server) handleConsulConn(conn net.Conn) {
// forward is used to forward to a remote DC or to forward to the local leader // forward is used to forward to a remote DC or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error // Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method, dc string, args interface{}, reply interface{}) (bool, error) { func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
// Handle DC forwarding // Handle DC forwarding
dc := info.RequestDatacenter()
if dc != s.config.Datacenter { if dc != s.config.Datacenter {
err := s.forwardDC(method, dc, args, reply) err := s.forwardDC(method, dc, args, reply)
return true, err return true, err
} }
// Check if we can allow a stale read
if info.IsRead() && info.AllowStaleRead() {
return false, nil
}
// Handle leader forwarding // Handle leader forwarding
if !s.IsLeader() { if !s.IsLeader() {
err := s.forwardLeader(method, args, reply) err := s.forwardLeader(method, args, reply)
@ -197,7 +203,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// blockingRPC is used for queries that need to wait for a // blockingRPC is used for queries that need to wait for a
// minimum index. This is used to block and wait for changes. // minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error { func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta,
tables MDBTables, run func() error) error {
var timeout <-chan time.Time var timeout <-chan time.Time
var notifyCh chan struct{} var notifyCh chan struct{}
@ -233,12 +240,22 @@ SETUP_NOTIFY:
s.fsm.State().Watch(tables, notifyCh) s.fsm.State().Watch(tables, notifyCh)
} }
// Run the query function
RUN_QUERY: RUN_QUERY:
idx, err := run() // Update the query meta data
s.setQueryMeta(m)
// Check if query must be consistent
if b.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query function
err := run()
// Check for minimum query time // Check for minimum query time
if err == nil && idx <= b.MinQueryIndex { if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex {
select { select {
case <-notifyCh: case <-notifyCh:
goto SETUP_NOTIFY goto SETUP_NOTIFY
@ -247,3 +264,22 @@ RUN_QUERY:
} }
return err return err
} }
// setQueryMeta is used to populate the QueryMeta data for an RPC call
func (s *Server) setQueryMeta(m *structs.QueryMeta) {
if s.IsLeader() {
m.LastContact = 0
m.KnownLeader = true
} else {
m.LastContact = time.Now().Sub(s.raft.LastContact())
m.KnownLeader = (s.raft.Leader() != nil)
}
}
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
return future.Error()
}

View File

@ -28,14 +28,64 @@ const (
HealthCritical = "critical" HealthCritical = "critical"
) )
// BlockingQuery is used to block on a query and wait for a change. // RPCInfo is used to describe common information about query
// Either both fields, or neither must be provided. type RPCInfo interface {
type BlockingQuery struct { RequestDatacenter() string
// If set, wait until query exceeds given index IsRead() bool
AllowStaleRead() bool
}
// QueryOptions is used to specify various flags for read queries
type QueryOptions struct {
// If set, wait until query exceeds given index. Must be provided
// with MaxQueryTime.
MinQueryIndex uint64 MinQueryIndex uint64
// Provided with MinQueryIndex to wait for change // Provided with MinQueryIndex to wait for change.
MaxQueryTime time.Duration MaxQueryTime time.Duration
// If set, any follower can service the request. Results
// may be arbitrarily stale.
AllowStale bool
// If set, the leader must verify leadership prior to
// servicing the request. Prevents a stale read.
RequireConsistent bool
}
// QueryOption only applies to reads, so always true
func (q QueryOptions) IsRead() bool {
return true
}
func (q QueryOptions) AllowStaleRead() bool {
return q.AllowStale
}
type WriteRequest struct{}
// WriteRequest only applies to writes, always false
func (w WriteRequest) IsRead() bool {
return false
}
func (w WriteRequest) AllowStaleRead() bool {
return false
}
// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
// This is the index associated with the read
Index uint64
// If AllowStale is used, this is time elapsed since
// last contact between the follower and leader. This
// can be used to gauge staleness.
LastContact time.Duration
// Used to indicate if there is a known leader node
KnownLeader bool
} }
// RegisterRequest is used for the Catalog.Register endpoint // RegisterRequest is used for the Catalog.Register endpoint
@ -47,6 +97,11 @@ type RegisterRequest struct {
Address string Address string
Service *NodeService Service *NodeService
Check *HealthCheck Check *HealthCheck
WriteRequest
}
func (r *RegisterRequest) RequestDatacenter() string {
return r.Datacenter
} }
// DeregisterRequest is used for the Catalog.Deregister endpoint // DeregisterRequest is used for the Catalog.Deregister endpoint
@ -57,12 +112,21 @@ type DeregisterRequest struct {
Node string Node string
ServiceID string ServiceID string
CheckID string CheckID string
WriteRequest
}
func (r *DeregisterRequest) RequestDatacenter() string {
return r.Datacenter
} }
// DCSpecificRequest is used to query about a specific DC // DCSpecificRequest is used to query about a specific DC
type DCSpecificRequest struct { type DCSpecificRequest struct {
Datacenter string Datacenter string
BlockingQuery QueryOptions
}
func (r *DCSpecificRequest) RequestDatacenter() string {
return r.Datacenter
} }
// ServiceSpecificRequest is used to query about a specific node // ServiceSpecificRequest is used to query about a specific node
@ -71,21 +135,33 @@ type ServiceSpecificRequest struct {
ServiceName string ServiceName string
ServiceTag string ServiceTag string
TagFilter bool // Controls tag filtering TagFilter bool // Controls tag filtering
BlockingQuery QueryOptions
}
func (r *ServiceSpecificRequest) RequestDatacenter() string {
return r.Datacenter
} }
// NodeSpecificRequest is used to request the information about a single node // NodeSpecificRequest is used to request the information about a single node
type NodeSpecificRequest struct { type NodeSpecificRequest struct {
Datacenter string Datacenter string
Node string Node string
BlockingQuery QueryOptions
}
func (r *NodeSpecificRequest) RequestDatacenter() string {
return r.Datacenter
} }
// ChecksInStateRequest is used to query for nodes in a state // ChecksInStateRequest is used to query for nodes in a state
type ChecksInStateRequest struct { type ChecksInStateRequest struct {
Datacenter string Datacenter string
State string State string
BlockingQuery QueryOptions
}
func (r *ChecksInStateRequest) RequestDatacenter() string {
return r.Datacenter
} }
// Used to return information about a node // Used to return information about a node
@ -144,33 +220,33 @@ type CheckServiceNode struct {
type CheckServiceNodes []CheckServiceNode type CheckServiceNodes []CheckServiceNode
type IndexedNodes struct { type IndexedNodes struct {
Index uint64
Nodes Nodes Nodes Nodes
QueryMeta
} }
type IndexedServices struct { type IndexedServices struct {
Index uint64
Services Services Services Services
QueryMeta
} }
type IndexedServiceNodes struct { type IndexedServiceNodes struct {
Index uint64
ServiceNodes ServiceNodes ServiceNodes ServiceNodes
QueryMeta
} }
type IndexedNodeServices struct { type IndexedNodeServices struct {
Index uint64
NodeServices *NodeServices NodeServices *NodeServices
QueryMeta
} }
type IndexedHealthChecks struct { type IndexedHealthChecks struct {
Index uint64
HealthChecks HealthChecks HealthChecks HealthChecks
QueryMeta
} }
type IndexedCheckServiceNodes struct { type IndexedCheckServiceNodes struct {
Index uint64
Nodes CheckServiceNodes Nodes CheckServiceNodes
QueryMeta
} }
// DirEntry is used to represent a directory entry. This is // DirEntry is used to represent a directory entry. This is
@ -198,18 +274,27 @@ type KVSRequest struct {
Datacenter string Datacenter string
Op KVSOp // Which operation are we performing Op KVSOp // Which operation are we performing
DirEnt DirEntry // Which directory entry DirEnt DirEntry // Which directory entry
WriteRequest
}
func (r *KVSRequest) RequestDatacenter() string {
return r.Datacenter
} }
// KeyRequest is used to request a key, or key prefix // KeyRequest is used to request a key, or key prefix
type KeyRequest struct { type KeyRequest struct {
Datacenter string Datacenter string
Key string Key string
BlockingQuery QueryOptions
}
func (r *KeyRequest) RequestDatacenter() string {
return r.Datacenter
} }
type IndexedDirEntries struct { type IndexedDirEntries struct {
Index uint64
Entries DirEntries Entries DirEntries
QueryMeta
} }
// Decode is used to decode a MsgPack encoded object // Decode is used to decode a MsgPack encoded object

View File

@ -42,6 +42,40 @@ note is that when the query returns there is **no guarantee** of a change. It is
possible that the timeout was reached, or that there was an idempotent write that possible that the timeout was reached, or that there was an idempotent write that
does not affect the result. does not affect the result.
## Consistency Modes
Most of the read query endpoints support multiple levels of consistency.
These are to provide a tuning knob that clients can be used to find a happy
medium that best matches their needs.
The three read modes are:
* default - If not specified, this mode is used. It is strongly consistent
in almost all cases. However, there is a small window in which an new
leader may be elected, and the old leader may service stale values. The
trade off is fast reads, but potentially stale values. This condition is
hard to trigger, and most clients should not need to worry about the stale read.
This only applies to reads, and a split-brain is not possible on writes.
* consistent - This mode is strongly consistent without caveats. It requires
that a leader verify with a quorum of peers that it is still leader. This
introduces an additional round-trip to all server nodes. The trade off is
always consistent reads, but increased latency due to an extra round trip.
Most clients should not use this unless they cannot tolerate a stale read.
* stale - This mode allows any server to service the read, regardless of if
it is the leader. This means reads can be arbitrarily stale, but are generally
within 50 milliseconds of the leader. The trade off is very fast and scalable
reads but values will be stale. This mode allows reads without a leader, meaning
a cluster that is unavailable will still be able to respond.
To switch these modes, either the "?stale" or "?consistent" query parameters
are provided. It is an error to provide both.
To support bounding how stale data is, there is an "X-Consul-LastContact"
which is the last time a server was contacted by the leader node in
milliseconds. The "X-Consul-KnownLeader" also indicates if there is a known
leader. These can be used to gauage if a stale read should be used.
## KV ## KV
@ -81,7 +115,8 @@ that modified this key. This index corresponds to the `X-Consul-Index`
header value that is returned. A blocking query can be used to wait for header value that is returned. A blocking query can be used to wait for
a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds
to the latest `ModifyIndex` and so a blocking query waits until any of the to the latest `ModifyIndex` and so a blocking query waits until any of the
listed keys are updated. listed keys are updated. The multiple consistency modes can be used for
`GET` requests as well.
The `Key` is simply the full path of the entry. `Flags` are an opaque The `Key` is simply the full path of the entry. `Flags` are an opaque
unsigned integer that can be attached to each entry. The use of this is unsigned integer that can be attached to each entry. The use of this is
@ -347,7 +382,8 @@ The following endpoints are supported:
* /v1/catalog/service/\<service\> : Lists the nodes in a given service * /v1/catalog/service/\<service\> : Lists the nodes in a given service
* /v1/catalog/node/\<node\> : Lists the services provided by a node * /v1/catalog/node/\<node\> : Lists the services provided by a node
The last 4 endpoints of the catalog support blocking queries. The last 4 endpoints of the catalog support blocking queries and
consistency modes.
### /v1/catalog/register ### /v1/catalog/register
@ -473,7 +509,7 @@ It returns a JSON body like this:
} }
] ]
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
### /v1/catalog/services ### /v1/catalog/services
@ -492,7 +528,7 @@ It returns a JSON body like this:
The main object keys are the service names, while the array The main object keys are the service names, while the array
provides all the known tags for a given service. provides all the known tags for a given service.
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
### /v1/catalog/service/\<service\> ### /v1/catalog/service/\<service\>
@ -517,7 +553,7 @@ It returns a JSON body like this:
} }
] ]
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
### /v1/catalog/node/\<node\> ### /v1/catalog/node/\<node\>
@ -549,7 +585,7 @@ It returns a JSON body like this:
} }
} }
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
## Health ## Health
@ -564,7 +600,7 @@ The following endpoints are supported:
* /v1/health/service/\<service\>: Returns the nodes and health info of a service * /v1/health/service/\<service\>: Returns the nodes and health info of a service
* /v1/health/state/\<state\>: Returns the checks in a given state * /v1/health/state/\<state\>: Returns the checks in a given state
All of the health endpoints supports blocking queries. All of the health endpoints supports blocking queries and all consistency modes.
### /v1/health/node/\<node\> ### /v1/health/node/\<node\>
@ -603,7 +639,7 @@ joins the Consul cluster, it is part of a distributed failure detection
provided by Serf. If a node fails, it is detected and the status is automatically provided by Serf. If a node fails, it is detected and the status is automatically
changed to "critical". changed to "critical".
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
### /v1/health/checks/\<service\> ### /v1/health/checks/\<service\>
@ -627,7 +663,7 @@ It returns a JSON body like this:
} }
] ]
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
### /v1/health/service/\<service\> ### /v1/health/service/\<service\>
@ -684,7 +720,7 @@ It returns a JSON body like this:
} }
] ]
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
### /v1/health/state/\<state\> ### /v1/health/state/\<state\>
@ -718,7 +754,7 @@ It returns a JSON body like this:
} }
] ]
This endpoint supports blocking queries. This endpoint supports blocking queries and all consistency modes.
## Status ## Status

View File

@ -131,6 +131,39 @@ only for data in their datacenter. When a request is received for a remote datac
the request is forwarded to the correct leader. This design allows for lower latency the request is forwarded to the correct leader. This design allows for lower latency
transactions and higher availability without sacrificing consistency. transactions and higher availability without sacrificing consistency.
## Consistency Modes
Although all writes to the replicated log go through Raft, reads are more
flexible. To support various tradeoffs that developers may want, Consul
supports 3 different consistency modes for reads.
The three read modes are:
* default - Raft makes use of leader leasing, providing a time window
in which the leader assumes it's role is stable. However, if a leader
is partitioned from the remaining peers, a new leader may be elected
while the old leader is holding the lease. This means there are 2 leader
nodes. There is no risk of a split-brain since the old leader will be
unable to commit new logs. However, if the old leader services any reads
the values are potentially stale. The default consistency mode relies only
on leader leasing, exposing clients to potentially stale values. We make
this trade off because reads are fast, usually strongly consistent, and
only stale in a hard to trigger situation. The time window of stale reads
is also bounded, since the leader will step down due to the partition.
* consistent - This mode is strongly consistent without caveats. It requires
that a leader verify with a quorum of peers that it is still leader. This
introduces an additional round-trip to all server nodes. The trade off is
always consistent reads, but increased latency due to an extra round trip.
* stale - This mode allows any server to service the read, regardless of if
it is the leader. This means reads can be arbitrarily stale, but are generally
within 50 milliseconds of the leader. The trade off is very fast and scalable
reads but values will be stale. This mode allows reads without a leader, meaning
a cluster that is unavailable will still be able to respond.
For more documentation about using these various modes, see the [HTTP API](/docs/agent/http.html).
## Deployment Table ## Deployment Table
Below is a table that shows for the number of servers how large the Below is a table that shows for the number of servers how large the