From fbc526e8c1c50238cbcd30b2aa123fb27bb630b1 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 16:46:51 -0700 Subject: [PATCH 01/22] consul: Adding additional query options and return meta data --- consul/structs/structs.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 5c11801a7f..4b0a8de9d6 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -38,6 +38,29 @@ type BlockingQuery struct { MaxQueryTime time.Duration } +// QueryOptions is used to specify various flags for read queries +type QueryOptions struct { + // If set, any follower can service the request. Results + // may be arbitrarily stale. + AllowStale bool + + // If set, the leader must verify leadership prior to + // servicing the request. Prevents a stale read. + RequireConsistent bool +} + +// QueryMeta allows a query response to include potentially +// useful metadata about a query +type QueryMeta struct { + // If AllowStale is used, this is time elapsed since + // last contact between the follower and leader. This + // can be used to gauge staleness. + LastContact time.Duration + + // Used to indicate if there is a known leader node + KnownLeader bool +} + // RegisterRequest is used for the Catalog.Register endpoint // to register a node as providing a service. If no service // is provided, the node is registered. @@ -63,6 +86,7 @@ type DeregisterRequest struct { type DCSpecificRequest struct { Datacenter string BlockingQuery + QueryOptions } // ServiceSpecificRequest is used to query about a specific node @@ -72,6 +96,7 @@ type ServiceSpecificRequest struct { ServiceTag string TagFilter bool // Controls tag filtering BlockingQuery + QueryOptions } // NodeSpecificRequest is used to request the information about a single node @@ -79,6 +104,7 @@ type NodeSpecificRequest struct { Datacenter string Node string BlockingQuery + QueryOptions } // ChecksInStateRequest is used to query for nodes in a state @@ -86,6 +112,7 @@ type ChecksInStateRequest struct { Datacenter string State string BlockingQuery + QueryOptions } // Used to return information about a node @@ -146,31 +173,37 @@ type CheckServiceNodes []CheckServiceNode type IndexedNodes struct { Index uint64 Nodes Nodes + QueryMeta } type IndexedServices struct { Index uint64 Services Services + QueryMeta } type IndexedServiceNodes struct { Index uint64 ServiceNodes ServiceNodes + QueryMeta } type IndexedNodeServices struct { Index uint64 NodeServices *NodeServices + QueryMeta } type IndexedHealthChecks struct { Index uint64 HealthChecks HealthChecks + QueryMeta } type IndexedCheckServiceNodes struct { Index uint64 Nodes CheckServiceNodes + QueryMeta } // DirEntry is used to represent a directory entry. This is @@ -205,11 +238,13 @@ type KeyRequest struct { Datacenter string Key string BlockingQuery + QueryOptions } type IndexedDirEntries struct { Index uint64 Entries DirEntries + QueryMeta } // Decode is used to decode a MsgPack encoded object From cea8a4f9f221f015f56a7690a5e7e8a9aed1ca1e Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 17:14:00 -0700 Subject: [PATCH 02/22] consul: Adding RPCInfo to get common info --- consul/structs/structs.go | 62 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 4b0a8de9d6..11346ab13f 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -28,6 +28,13 @@ const ( HealthCritical = "critical" ) +// RPCInfo is used to describe common information about query +type RPCInfo interface { + RequestDatacenter() string + IsRead() bool + AllowStaleRead() bool +} + // BlockingQuery is used to block on a query and wait for a change. // Either both fields, or neither must be provided. type BlockingQuery struct { @@ -49,6 +56,26 @@ type QueryOptions struct { RequireConsistent bool } +// QueryOption only applies to reads, so always true +func (q QueryOptions) IsRead() bool { + return true +} + +func (q QueryOptions) AllowStaleRead() bool { + return q.AllowStale +} + +type WriteRequest struct{} + +// WriteRequest only applies to writes, always false +func (w WriteRequest) IsRead() bool { + return false +} + +func (w WriteRequest) AllowStaleRead() bool { + return false +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { @@ -70,6 +97,11 @@ type RegisterRequest struct { Address string Service *NodeService Check *HealthCheck + WriteRequest +} + +func (r *RegisterRequest) RequestDatacenter() string { + return r.Datacenter } // DeregisterRequest is used for the Catalog.Deregister endpoint @@ -80,6 +112,11 @@ type DeregisterRequest struct { Node string ServiceID string CheckID string + WriteRequest +} + +func (r *DeregisterRequest) RequestDatacenter() string { + return r.Datacenter } // DCSpecificRequest is used to query about a specific DC @@ -89,6 +126,10 @@ type DCSpecificRequest struct { QueryOptions } +func (r *DCSpecificRequest) RequestDatacenter() string { + return r.Datacenter +} + // ServiceSpecificRequest is used to query about a specific node type ServiceSpecificRequest struct { Datacenter string @@ -99,6 +140,10 @@ type ServiceSpecificRequest struct { QueryOptions } +func (r *ServiceSpecificRequest) RequestDatacenter() string { + return r.Datacenter +} + // NodeSpecificRequest is used to request the information about a single node type NodeSpecificRequest struct { Datacenter string @@ -107,6 +152,10 @@ type NodeSpecificRequest struct { QueryOptions } +func (r *NodeSpecificRequest) RequestDatacenter() string { + return r.Datacenter +} + // ChecksInStateRequest is used to query for nodes in a state type ChecksInStateRequest struct { Datacenter string @@ -115,6 +164,10 @@ type ChecksInStateRequest struct { QueryOptions } +func (r *ChecksInStateRequest) RequestDatacenter() string { + return r.Datacenter +} + // Used to return information about a node type Node struct { Node string @@ -231,6 +284,11 @@ type KVSRequest struct { Datacenter string Op KVSOp // Which operation are we performing DirEnt DirEntry // Which directory entry + WriteRequest +} + +func (r *KVSRequest) RequestDatacenter() string { + return r.Datacenter } // KeyRequest is used to request a key, or key prefix @@ -241,6 +299,10 @@ type KeyRequest struct { QueryOptions } +func (r *KeyRequest) RequestDatacenter() string { + return r.Datacenter +} + type IndexedDirEntries struct { Index uint64 Entries DirEntries From a9d4e2357ef4a6c5819d5ade0cc36ad6074d5dc3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 17:17:12 -0700 Subject: [PATCH 03/22] consul: Switch to RPCInfo --- consul/catalog_endpoint.go | 12 ++++++------ consul/health_endpoint.go | 8 ++++---- consul/kvs_endpoint.go | 6 +++--- consul/rpc.go | 3 ++- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index e288229937..52fbb46de3 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -14,7 +14,7 @@ type Catalog struct { // Register is used register that a node is providing a given service. func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.Register", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "catalog", "register"}, time.Now()) @@ -55,7 +55,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error // Deregister is used to remove a service registration for a given node. func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.Deregister", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "catalog", "deregister"}, time.Now()) @@ -91,7 +91,7 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { // ListNodes is used to query the nodes in a DC func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { - if done, err := c.srv.forward("Catalog.ListNodes", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.ListNodes", args, args, reply); done { return err } @@ -107,7 +107,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde // ListServices is used to query the services in a DC func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { - if done, err := c.srv.forward("Catalog.ListServices", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done { return err } @@ -123,7 +123,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I // ServiceNodes returns all the nodes registered as part of a service func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { - if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.ServiceNodes", args, args, reply); done { return err } @@ -160,7 +160,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // NodeServices returns all the services registered as part of a node func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { - if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done { + if done, err := c.srv.forward("Catalog.NodeServices", args, args, reply); done { return err } diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index f1428c43f7..421d4b8862 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -14,7 +14,7 @@ type Health struct { // ChecksInState is used to get all the checks in a given state func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.ChecksInState", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.ChecksInState", args, args, reply); done { return err } @@ -31,7 +31,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // NodeChecks is used to get all the checks for a node func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.NodeChecks", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.NodeChecks", args, args, reply); done { return err } @@ -54,7 +54,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, } // Potentially forward - if done, err := h.srv.forward("Health.ServiceChecks", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.ServiceChecks", args, args, reply); done { return err } @@ -70,7 +70,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // ServiceNodes returns all the nodes registered as part of a service including health info func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := h.srv.forward("Health.ServiceNodes", args.Datacenter, args, reply); done { + if done, err := h.srv.forward("Health.ServiceNodes", args, args, reply); done { return err } diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 0e884524f3..57f3e49956 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -15,7 +15,7 @@ type KVS struct { // Apply is used to apply a KVS request to the data store. This should // only be used for operations that modify the data func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { - if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done { + if done, err := k.srv.forward("KVS.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now()) @@ -44,7 +44,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Get is used to lookup a single key func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done { + if done, err := k.srv.forward("KVS.Get", args, args, reply); done { return err } @@ -76,7 +76,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // List is used to list all keys with a given prefix func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done { + if done, err := k.srv.forward("KVS.List", args, args, reply); done { return err } diff --git a/consul/rpc.go b/consul/rpc.go index cb4185a3dc..831a5a9df9 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -134,8 +134,9 @@ func (s *Server) handleConsulConn(conn net.Conn) { // forward is used to forward to a remote DC or to forward to the local leader // Returns a bool of if forwarding was performed, as well as any error -func (s *Server) forward(method, dc string, args interface{}, reply interface{}) (bool, error) { +func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { // Handle DC forwarding + dc := info.RequestDatacenter() if dc != s.config.Datacenter { err := s.forwardDC(method, dc, args, reply) return true, err From fa90f1cd0d2c25f7bcb305645e61d2124f56326f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 17:26:59 -0700 Subject: [PATCH 04/22] consul: Support a stale read query --- consul/catalog_endpoint_test.go | 51 +++++++++++++++++++++++++++++++++ consul/rpc.go | 5 ++++ 2 files changed, 56 insertions(+) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 1184d6ddb5..53a23d6b3f 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -232,6 +232,57 @@ func TestCatalogListNodes(t *testing.T) { } } +func TestCatalogListNodes_StaleRaad(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client1 := rpcClient(t, s1) + defer client1.Close() + + dir2, s2 := testServer(t) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + client2 := rpcClient(t, s2) + defer client2.Close() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Use the follower as the client + var client *rpc.Client + if !s1.IsLeader() { + client = client1 + + // Inject fake data on the follower! + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + } else { + client = client2 + + // Inject fake data on the follower! + s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{AllowStale: true}, + } + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out.Nodes) != 3 { + t.Fatalf("bad: %v", out) + } +} + func BenchmarkCatalogListNodes(t *testing.B) { dir1, s1 := testServer(nil) defer os.RemoveAll(dir1) diff --git a/consul/rpc.go b/consul/rpc.go index 831a5a9df9..c950f25d52 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -142,6 +142,11 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, return true, err } + // Check if we can allow a stale read + if info.IsRead() && info.AllowStaleRead() { + return false, nil + } + // Handle leader forwarding if !s.IsLeader() { err := s.forwardLeader(method, args, reply) From afa6a1ae4960adee7b7f3fabd4d36ac4edc8a899 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 17:37:19 -0700 Subject: [PATCH 05/22] consul: Adding support for QueryMeta on all queries --- consul/catalog_endpoint.go | 4 ++++ consul/health_endpoint.go | 4 ++++ consul/kvs_endpoint.go | 2 ++ consul/rpc.go | 11 +++++++++++ 4 files changed, 21 insertions(+) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 52fbb46de3..6a65f1534a 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -100,6 +100,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return c.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("Nodes"), func() (uint64, error) { + c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.Nodes = state.Nodes() return reply.Index, nil }) @@ -116,6 +117,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return c.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("Services"), func() (uint64, error) { + c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.Services = state.Services() return reply.Index, nil }) @@ -137,6 +139,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru err := c.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("ServiceNodes"), func() (uint64, error) { + c.srv.setQueryMeta(&reply.QueryMeta) if args.TagFilter { reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { @@ -174,6 +177,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs return c.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("NodeServices"), func() (uint64, error) { + c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.NodeServices = state.NodeServices(args.Node) return reply.Index, nil }) diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 421d4b8862..fee6ef9952 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -23,6 +23,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, return h.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("ChecksInState"), func() (uint64, error) { + h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.ChecksInState(args.State) return reply.Index, nil }) @@ -40,6 +41,7 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, return h.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("NodeChecks"), func() (uint64, error) { + h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) return reply.Index, nil }) @@ -63,6 +65,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, return h.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("ServiceChecks"), func() (uint64, error) { + h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) return reply.Index, nil }) @@ -84,6 +87,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc err := h.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("CheckServiceNodes"), func() (uint64, error) { + h.srv.setQueryMeta(&reply.QueryMeta) if args.TagFilter { reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 57f3e49956..fa99e3c0e6 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -53,6 +53,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er return k.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("KVSGet"), func() (uint64, error) { + k.srv.setQueryMeta(&reply.QueryMeta) index, ent, err := state.KVSGet(args.Key) if err != nil { return 0, err @@ -85,6 +86,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e return k.srv.blockingRPC(&args.BlockingQuery, state.QueryTables("KVSList"), func() (uint64, error) { + k.srv.setQueryMeta(&reply.QueryMeta) index, ent, err := state.KVSList(args.Key) if err != nil { return 0, err diff --git a/consul/rpc.go b/consul/rpc.go index c950f25d52..46f01b4166 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -253,3 +253,14 @@ RUN_QUERY: } return err } + +// setQueryMeta is used to populate the QueryMeta data for an RPC call +func (s *Server) setQueryMeta(m *structs.QueryMeta) { + if s.IsLeader() { + m.LastContact = 0 + m.KnownLeader = true + } else { + m.LastContact = time.Now().Sub(s.raft.LastContact()) + m.KnownLeader = (s.raft.Leader() != nil) + } +} From 93abc19845f9ed480b3358a5fbcc1576d145b3e8 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 17:48:50 -0700 Subject: [PATCH 06/22] consul: Testing QueryMeta --- consul/catalog_endpoint_test.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 53a23d6b3f..37b418af1b 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -239,7 +239,7 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) { client1 := rpcClient(t, s1) defer client1.Close() - dir2, s2 := testServer(t) + dir2, s2 := testServerDCBootstrap(t, "dc1", false) defer os.RemoveAll(dir2) defer s2.Shutdown() client2 := rpcClient(t, s2) @@ -278,8 +278,21 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) { t.Fatalf("err: %v", err) } - if len(out.Nodes) != 3 { - t.Fatalf("bad: %v", out) + found := false + for _, n := range out.Nodes { + if n.Node == "foo" { + found = true + } + } + if !found { + t.Fatalf("failed to find foo") + } + + if out.QueryMeta.LastContact == 0 { + t.Fatalf("should have a last contact time") + } + if !out.QueryMeta.KnownLeader { + t.Fatalf("should have known leader") } } From a2acbe732e56897437e1f065e1ab13952b4bb093 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 17:49:01 -0700 Subject: [PATCH 07/22] consul: Adding a method to enforce consistent read --- consul/rpc.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/consul/rpc.go b/consul/rpc.go index 46f01b4166..a80840e22a 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -264,3 +264,11 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { m.KnownLeader = (s.raft.Leader() != nil) } } + +// consistentRead is used to ensure we do not perform a stale +// read. This is done by verifying leadership before the read. +func (s *Server) consistentRead() error { + defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) + future := s.raft.VerifyLeader() + return future.Error() +} From beeeb86a12faeb23fe2b3b1efd13f2ba9820511d Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:04:52 -0700 Subject: [PATCH 08/22] consul: Moving QueryMeta handling into blockingRPC --- consul/catalog_endpoint.go | 8 ++++---- consul/health_endpoint.go | 8 ++++---- consul/kvs_endpoint.go | 4 ++-- consul/rpc.go | 8 ++++++-- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 6a65f1534a..a1ec50863c 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -98,9 +98,9 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde // Get the local state state := c.srv.fsm.State() return c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("Nodes"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.Nodes = state.Nodes() return reply.Index, nil }) @@ -115,9 +115,9 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I // Get the current nodes state := c.srv.fsm.State() return c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("Services"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.Services = state.Services() return reply.Index, nil }) @@ -137,9 +137,9 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Get the nodes state := c.srv.fsm.State() err := c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("ServiceNodes"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) if args.TagFilter { reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { @@ -175,9 +175,9 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs // Get the node services state := c.srv.fsm.State() return c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("NodeServices"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.NodeServices = state.NodeServices(args.Node) return reply.Index, nil }) diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index fee6ef9952..c081a1afd2 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -21,9 +21,9 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // Get the state specific checks state := h.srv.fsm.State() return h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("ChecksInState"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.ChecksInState(args.State) return reply.Index, nil }) @@ -39,9 +39,9 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, // Get the node checks state := h.srv.fsm.State() return h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("NodeChecks"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) return reply.Index, nil }) @@ -63,9 +63,9 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // Get the service checks state := h.srv.fsm.State() return h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("ServiceChecks"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) return reply.Index, nil }) @@ -85,9 +85,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc // Get the nodes state := h.srv.fsm.State() err := h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("CheckServiceNodes"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) if args.TagFilter { reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index fa99e3c0e6..0e1ed1ead2 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -51,9 +51,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // Get the local state state := k.srv.fsm.State() return k.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("KVSGet"), func() (uint64, error) { - k.srv.setQueryMeta(&reply.QueryMeta) index, ent, err := state.KVSGet(args.Key) if err != nil { return 0, err @@ -84,9 +84,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // Get the local state state := k.srv.fsm.State() return k.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("KVSList"), func() (uint64, error) { - k.srv.setQueryMeta(&reply.QueryMeta) index, ent, err := state.KVSList(args.Key) if err != nil { return 0, err diff --git a/consul/rpc.go b/consul/rpc.go index a80840e22a..afe0c5152c 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -203,7 +203,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // blockingRPC is used for queries that need to wait for a // minimum index. This is used to block and wait for changes. -func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error { +func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta, + tables MDBTables, run func() (uint64, error)) error { var timeout <-chan time.Time var notifyCh chan struct{} @@ -239,8 +240,11 @@ SETUP_NOTIFY: s.fsm.State().Watch(tables, notifyCh) } - // Run the query function RUN_QUERY: + // Update the query meta data + s.setQueryMeta(m) + + // Run the query function idx, err := run() // Check for minimum query time From d2b487edb618faf43985ca3c14cd6598871afacd Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:13:36 -0700 Subject: [PATCH 09/22] consul: Move the Index into QueryMeta --- consul/structs/structs.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 11346ab13f..3ea3d4a5fe 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -79,6 +79,9 @@ func (w WriteRequest) AllowStaleRead() bool { // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { + // This is the index associated with the read + Index uint64 + // If AllowStale is used, this is time elapsed since // last contact between the follower and leader. This // can be used to gauge staleness. @@ -224,37 +227,31 @@ type CheckServiceNode struct { type CheckServiceNodes []CheckServiceNode type IndexedNodes struct { - Index uint64 Nodes Nodes QueryMeta } type IndexedServices struct { - Index uint64 Services Services QueryMeta } type IndexedServiceNodes struct { - Index uint64 ServiceNodes ServiceNodes QueryMeta } type IndexedNodeServices struct { - Index uint64 NodeServices *NodeServices QueryMeta } type IndexedHealthChecks struct { - Index uint64 HealthChecks HealthChecks QueryMeta } type IndexedCheckServiceNodes struct { - Index uint64 Nodes CheckServiceNodes QueryMeta } @@ -304,7 +301,6 @@ func (r *KeyRequest) RequestDatacenter() string { } type IndexedDirEntries struct { - Index uint64 Entries DirEntries QueryMeta } From 180cc330304d11f974dd6008430e3122d7f55a9b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:18:27 -0700 Subject: [PATCH 10/22] consul: Use QueryMeta to simplify blockingRPC interface --- consul/catalog_endpoint.go | 16 ++++++++-------- consul/health_endpoint.go | 16 ++++++++-------- consul/kvs_endpoint.go | 12 ++++++------ consul/rpc.go | 6 +++--- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index a1ec50863c..35a08a1c47 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -100,9 +100,9 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("Nodes"), - func() (uint64, error) { + func() error { reply.Index, reply.Nodes = state.Nodes() - return reply.Index, nil + return nil }) } @@ -117,9 +117,9 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("Services"), - func() (uint64, error) { + func() error { reply.Index, reply.Services = state.Services() - return reply.Index, nil + return nil }) } @@ -139,13 +139,13 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru err := c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("ServiceNodes"), - func() (uint64, error) { + func() error { if args.TagFilter { reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName) } - return reply.Index, nil + return nil }) // Provide some metrics @@ -177,8 +177,8 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs return c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("NodeServices"), - func() (uint64, error) { + func() error { reply.Index, reply.NodeServices = state.NodeServices(args.Node) - return reply.Index, nil + return nil }) } diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index c081a1afd2..67da630d9b 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -23,9 +23,9 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, return h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("ChecksInState"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.ChecksInState(args.State) - return reply.Index, nil + return nil }) } @@ -41,9 +41,9 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, return h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("NodeChecks"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) - return reply.Index, nil + return nil }) } @@ -65,9 +65,9 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, return h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("ServiceChecks"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) - return reply.Index, nil + return nil }) } @@ -87,13 +87,13 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc err := h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("CheckServiceNodes"), - func() (uint64, error) { + func() error { if args.TagFilter { reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName) } - return reply.Index, nil + return nil }) // Provide some metrics diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 0e1ed1ead2..780e52da5d 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -53,10 +53,10 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er return k.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("KVSGet"), - func() (uint64, error) { + func() error { index, ent, err := state.KVSGet(args.Key) if err != nil { - return 0, err + return err } if ent == nil { // Must provide non-zero index to prevent blocking @@ -71,7 +71,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er reply.Index = ent.ModifyIndex reply.Entries = structs.DirEntries{ent} } - return reply.Index, nil + return nil }) } @@ -86,10 +86,10 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e return k.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("KVSList"), - func() (uint64, error) { + func() error { index, ent, err := state.KVSList(args.Key) if err != nil { - return 0, err + return err } if len(ent) == 0 { // Must provide non-zero index to prevent blocking @@ -112,6 +112,6 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e reply.Index = maxIndex reply.Entries = ent } - return reply.Index, nil + return nil }) } diff --git a/consul/rpc.go b/consul/rpc.go index afe0c5152c..ffaa9664e8 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -204,7 +204,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // blockingRPC is used for queries that need to wait for a // minimum index. This is used to block and wait for changes. func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta, - tables MDBTables, run func() (uint64, error)) error { + tables MDBTables, run func() error) error { var timeout <-chan time.Time var notifyCh chan struct{} @@ -245,10 +245,10 @@ RUN_QUERY: s.setQueryMeta(m) // Run the query function - idx, err := run() + err := run() // Check for minimum query time - if err == nil && idx <= b.MinQueryIndex { + if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex { select { case <-notifyCh: goto SETUP_NOTIFY From e706c988b835ca75d48bc2581c521c89739e79b9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:31:15 -0700 Subject: [PATCH 11/22] consul: Merging BlockingQuery into QueryOptions --- consul/catalog_endpoint.go | 8 ++++---- consul/health_endpoint.go | 8 ++++---- consul/kvs_endpoint.go | 4 ++-- consul/rpc.go | 2 +- consul/structs/structs.go | 22 +++++++--------------- 5 files changed, 18 insertions(+), 26 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 35a08a1c47..51dc11bd01 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -97,7 +97,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde // Get the local state state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.BlockingQuery, + return c.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("Nodes"), func() error { @@ -114,7 +114,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I // Get the current nodes state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.BlockingQuery, + return c.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("Services"), func() error { @@ -136,7 +136,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Get the nodes state := c.srv.fsm.State() - err := c.srv.blockingRPC(&args.BlockingQuery, + err := c.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("ServiceNodes"), func() error { @@ -174,7 +174,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs // Get the node services state := c.srv.fsm.State() - return c.srv.blockingRPC(&args.BlockingQuery, + return c.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("NodeServices"), func() error { diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 67da630d9b..e6db8c99ad 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -20,7 +20,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // Get the state specific checks state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.BlockingQuery, + return h.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("ChecksInState"), func() error { @@ -38,7 +38,7 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, // Get the node checks state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.BlockingQuery, + return h.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("NodeChecks"), func() error { @@ -62,7 +62,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // Get the service checks state := h.srv.fsm.State() - return h.srv.blockingRPC(&args.BlockingQuery, + return h.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("ServiceChecks"), func() error { @@ -84,7 +84,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc // Get the nodes state := h.srv.fsm.State() - err := h.srv.blockingRPC(&args.BlockingQuery, + err := h.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("CheckServiceNodes"), func() error { diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 780e52da5d..b955f3428a 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -50,7 +50,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.BlockingQuery, + return k.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("KVSGet"), func() error { @@ -83,7 +83,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.BlockingQuery, + return k.srv.blockingRPC(&args.QueryOptions, &reply.QueryMeta, state.QueryTables("KVSList"), func() error { diff --git a/consul/rpc.go b/consul/rpc.go index ffaa9664e8..cebe8627b2 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -203,7 +203,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // blockingRPC is used for queries that need to wait for a // minimum index. This is used to block and wait for changes. -func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta, +func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta, tables MDBTables, run func() error) error { var timeout <-chan time.Time var notifyCh chan struct{} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 3ea3d4a5fe..2aac5ee982 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -35,18 +35,15 @@ type RPCInfo interface { AllowStaleRead() bool } -// BlockingQuery is used to block on a query and wait for a change. -// Either both fields, or neither must be provided. -type BlockingQuery struct { - // If set, wait until query exceeds given index - MinQueryIndex uint64 - - // Provided with MinQueryIndex to wait for change - MaxQueryTime time.Duration -} - // QueryOptions is used to specify various flags for read queries type QueryOptions struct { + // If set, wait until query exceeds given index. Must be provided + // with MaxQueryTime. + MinQueryIndex uint64 + + // Provided with MinQueryIndex to wait for change. + MaxQueryTime time.Duration + // If set, any follower can service the request. Results // may be arbitrarily stale. AllowStale bool @@ -125,7 +122,6 @@ func (r *DeregisterRequest) RequestDatacenter() string { // DCSpecificRequest is used to query about a specific DC type DCSpecificRequest struct { Datacenter string - BlockingQuery QueryOptions } @@ -139,7 +135,6 @@ type ServiceSpecificRequest struct { ServiceName string ServiceTag string TagFilter bool // Controls tag filtering - BlockingQuery QueryOptions } @@ -151,7 +146,6 @@ func (r *ServiceSpecificRequest) RequestDatacenter() string { type NodeSpecificRequest struct { Datacenter string Node string - BlockingQuery QueryOptions } @@ -163,7 +157,6 @@ func (r *NodeSpecificRequest) RequestDatacenter() string { type ChecksInStateRequest struct { Datacenter string State string - BlockingQuery QueryOptions } @@ -292,7 +285,6 @@ func (r *KVSRequest) RequestDatacenter() string { type KeyRequest struct { Datacenter string Key string - BlockingQuery QueryOptions } From e5e97274d4680b2778c0b1d92a19c83d71f3ddd4 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:49:21 -0700 Subject: [PATCH 12/22] consul: Adding consistent read enforcement --- consul/rpc.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/consul/rpc.go b/consul/rpc.go index cebe8627b2..fa80a582d8 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -244,6 +244,13 @@ RUN_QUERY: // Update the query meta data s.setQueryMeta(m) + // Check if query must be consistent + if b.RequireConsistent { + if err := s.consistentRead(); err != nil { + return err + } + } + // Run the query function err := run() From 67263e2ca351670cb8da0b3ae71e12ab110d33d7 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:57:39 -0700 Subject: [PATCH 13/22] consul: Testing a stale read --- consul/catalog_endpoint_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 37b418af1b..c9c87dcee2 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -458,6 +458,39 @@ func TestCatalogListServices_Timeout(t *testing.T) { } } +func TestCatalogListServices_Stale(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + args.AllowStale = true + var out structs.IndexedServices + + // Inject a fake service + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, 5000}) + + // Run the query, do not wait for leader! + if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Should find the service + if len(out.Services) != 1 { + t.Fatalf("bad: %v", out) + } + + // Should not have a leader! Stale read + if out.KnownLeader { + t.Fatalf("bad: %v", out) + } +} + func TestCatalogListServiceNodes(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) From 3d9e6795eb6ac4cbcf42d0020ea86986dc1f076c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 12:08:00 -0700 Subject: [PATCH 14/22] consul: Adding strongly consistent read tests --- consul/catalog_endpoint_test.go | 99 +++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index c9c87dcee2..3a174ce456 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -6,6 +6,7 @@ import ( "net/rpc" "os" "sort" + "strings" "testing" "time" ) @@ -296,6 +297,104 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) { } } +func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client1 := rpcClient(t, s1) + defer client1.Close() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + client2 := rpcClient(t, s2) + defer client2.Close() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Use the leader as the client, kill the follower + var client *rpc.Client + if s1.IsLeader() { + client = client1 + s2.Shutdown() + } else { + client = client2 + s1.Shutdown() + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{RequireConsistent: true}, + } + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") { + t.Fatalf("err: %v", err) + } + + if out.QueryMeta.LastContact != 0 { + t.Fatalf("should not have a last contact time") + } + if out.QueryMeta.KnownLeader { + t.Fatalf("should have no known leader") + } +} + +func TestCatalogListNodes_ConsistentRead(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client1 := rpcClient(t, s1) + defer client1.Close() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + client2 := rpcClient(t, s2) + defer client2.Close() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Use the leader as the client, kill the follower + var client *rpc.Client + if s1.IsLeader() { + client = client1 + } else { + client = client2 + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{RequireConsistent: true}, + } + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if out.QueryMeta.LastContact != 0 { + t.Fatalf("should not have a last contact time") + } + if !out.QueryMeta.KnownLeader { + t.Fatalf("should have known leader") + } +} + func BenchmarkCatalogListNodes(t *testing.B) { dir1, s1 := testServer(nil) defer os.RemoveAll(dir1) From 09ddc01d27d93ab2140b5359bd18dc8fad958a4f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 12:25:36 -0700 Subject: [PATCH 15/22] agent: Updating to remove BlockingQuery --- command/agent/catalog_endpoint.go | 8 ++++---- command/agent/health_endpoint.go | 8 ++++---- command/agent/kvs_endpoint.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index ec5fc7821e..69b618b1f2 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -60,7 +60,7 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Setup the request args := structs.DCSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } @@ -74,7 +74,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC args := structs.DCSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } @@ -88,7 +88,7 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } @@ -118,7 +118,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default Datacenter args := structs.NodeSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } diff --git a/command/agent/health_endpoint.go b/command/agent/health_endpoint.go index 3520cc2a06..7c76da90ff 100644 --- a/command/agent/health_endpoint.go +++ b/command/agent/health_endpoint.go @@ -9,7 +9,7 @@ import ( func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC args := structs.ChecksInStateRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } @@ -32,7 +32,7 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC args := structs.NodeSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } @@ -55,7 +55,7 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } @@ -78,7 +78,7 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return 0, nil, nil } diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index af97aa5c16..7dd4a1f65d 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -12,7 +12,7 @@ import ( func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.KeyRequest{} - if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } From 3fbbbc4eea8a86878804239b05032a72bdda4439 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 12:26:12 -0700 Subject: [PATCH 16/22] agent: Parse the consistency flags --- command/agent/http.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/command/agent/http.go b/command/agent/http.go index 8e187f94fd..b886178d62 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -163,7 +163,7 @@ func setIndex(resp http.ResponseWriter, index uint64) { // parseWait is used to parse the ?wait and ?index query params // Returns true on error -func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQuery) bool { +func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { query := req.URL.Query() if wait := query.Get("wait"); wait != "" { dur, err := time.ParseDuration(wait) @@ -186,6 +186,24 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQ return false } +// parseConsistency is used to parse the ?stale and ?consistent query params. +// Returns true on error +func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { + query := req.URL.Query() + if _, ok := query["stale"]; ok { + b.AllowStale = true + } + if _, ok := query["consistent"]; ok { + b.RequireConsistent = true + } + if b.AllowStale && b.RequireConsistent { + resp.WriteHeader(400) + resp.Write([]byte("Cannot specify ?stale with ?consistent, conflicting semantics.")) + return true + } + return false +} + // parseDC is used to parse the ?dc query param func (s *HTTPServer) parseDC(req *http.Request, dc *string) { if other := req.URL.Query().Get("dc"); other != "" { @@ -197,7 +215,10 @@ func (s *HTTPServer) parseDC(req *http.Request, dc *string) { // parse is a convenience method for endpoints that need // to use both parseWait and parseDC. -func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.BlockingQuery) bool { +func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool { s.parseDC(req, dc) + if parseConsistency(resp, req, b) { + return true + } return parseWait(resp, req, b) } From 386d60f8b2d100790e355e1cd9e7153c3d9bc0d9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 12:40:11 -0700 Subject: [PATCH 17/22] agent: Remove wrapQuery, call setMeta directly --- command/agent/catalog_endpoint.go | 40 ++++++++++++++------------ command/agent/health_endpoint.go | 44 +++++++++++++++------------- command/agent/http.go | 48 +++++++++++++++++++------------ command/agent/kvs_endpoint.go | 2 +- 4 files changed, 77 insertions(+), 57 deletions(-) diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 69b618b1f2..97917465b7 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -57,39 +57,41 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ return out, nil } -func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Setup the request args := structs.DCSpecificRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } var out structs.IndexedNodes + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.Nodes, nil + return out.Nodes, nil } -func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.DCSpecificRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } var out structs.IndexedServices + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.Services, nil + return out.Services, nil } -func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } // Check for a tag @@ -104,22 +106,23 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedServiceNodes + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.ServiceNodes, nil + return out.ServiceNodes, nil } -func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default Datacenter args := structs.NodeSpecificRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } // Pull out the node name @@ -127,13 +130,14 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req if args.Node == "" { resp.WriteHeader(400) resp.Write([]byte("Missing node name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedNodeServices + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.NodeServices, nil + return out.NodeServices, nil } diff --git a/command/agent/health_endpoint.go b/command/agent/health_endpoint.go index 7c76da90ff..e77444e395 100644 --- a/command/agent/health_endpoint.go +++ b/command/agent/health_endpoint.go @@ -6,11 +6,11 @@ import ( "strings" ) -func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ChecksInStateRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } // Pull out the service name @@ -18,22 +18,23 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req if args.State == "" { resp.WriteHeader(400) resp.Write([]byte("Missing check state")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedHealthChecks + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.HealthChecks, nil + return out.HealthChecks, nil } -func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.NodeSpecificRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } // Pull out the service name @@ -41,22 +42,23 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques if args.Node == "" { resp.WriteHeader(400) resp.Write([]byte("Missing node name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedHealthChecks + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.HealthChecks, nil + return out.HealthChecks, nil } -func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } // Pull out the service name @@ -64,22 +66,23 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedHealthChecks + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.HealthChecks, nil + return out.HealthChecks, nil } -func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) { +func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return 0, nil, nil + return nil, nil } // Check for a tag @@ -94,13 +97,14 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ if args.ServiceName == "" { resp.WriteHeader(400) resp.Write([]byte("Missing service name")) - return 0, nil, nil + return nil, nil } // Make the RPC request var out structs.IndexedCheckServiceNodes + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { - return 0, nil, err + return nil, err } - return out.Index, out.Nodes, nil + return out.Nodes, nil } diff --git a/command/agent/http.go b/command/agent/http.go index b886178d62..1c97234230 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -63,15 +63,15 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/catalog/register", s.wrap(s.CatalogRegister)) s.mux.HandleFunc("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) - s.mux.HandleFunc("/v1/catalog/nodes", s.wrapQuery(s.CatalogNodes)) - s.mux.HandleFunc("/v1/catalog/services", s.wrapQuery(s.CatalogServices)) - s.mux.HandleFunc("/v1/catalog/service/", s.wrapQuery(s.CatalogServiceNodes)) - s.mux.HandleFunc("/v1/catalog/node/", s.wrapQuery(s.CatalogNodeServices)) + s.mux.HandleFunc("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) + s.mux.HandleFunc("/v1/catalog/services", s.wrap(s.CatalogServices)) + s.mux.HandleFunc("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) + s.mux.HandleFunc("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) - s.mux.HandleFunc("/v1/health/node/", s.wrapQuery(s.HealthNodeChecks)) - s.mux.HandleFunc("/v1/health/checks/", s.wrapQuery(s.HealthServiceChecks)) - s.mux.HandleFunc("/v1/health/state/", s.wrapQuery(s.HealthChecksInState)) - s.mux.HandleFunc("/v1/health/service/", s.wrapQuery(s.HealthServiceNodes)) + s.mux.HandleFunc("/v1/health/node/", s.wrap(s.HealthNodeChecks)) + s.mux.HandleFunc("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) + s.mux.HandleFunc("/v1/health/state/", s.wrap(s.HealthChecksInState)) + s.mux.HandleFunc("/v1/health/service/", s.wrap(s.HealthServiceNodes)) s.mux.HandleFunc("/v1/agent/services", s.wrap(s.AgentServices)) s.mux.HandleFunc("/v1/agent/checks", s.wrap(s.AgentChecks)) @@ -131,16 +131,6 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque return f } -// wrapQuery is used to wrap query functions to make them more convenient -func (s *HTTPServer) wrapQuery(handler func(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { - f := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - idx, obj, err := handler(resp, req) - setIndex(resp, idx) - return obj, err - } - return s.wrap(f) -} - // Renders a simple index page func (s *HTTPServer) Index(resp http.ResponseWriter, req *http.Request) { if req.URL.Path == "/" { @@ -161,6 +151,28 @@ func setIndex(resp http.ResponseWriter, index uint64) { resp.Header().Add("X-Consul-Index", strconv.FormatUint(index, 10)) } +// setKnownLeader is used to set the known leader header +func setKnownLeader(resp http.ResponseWriter, known bool) { + s := "true" + if !known { + s = "false" + } + resp.Header().Add("X-Consul-KnownLeader", s) +} + +// setLastContact is used to set the last contact header +func setLastContact(resp http.ResponseWriter, last time.Duration) { + lastMsec := uint64(last / time.Millisecond) + resp.Header().Add("X-Consul-LastContact", strconv.FormatUint(lastMsec, 10)) +} + +// setMeta is used to set the query response meta data +func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) { + setIndex(resp, m.Index) + setLastContact(resp, m.LastContact) + setKnownLeader(resp, m.KnownLeader) +} + // parseWait is used to parse the ?wait and ?index query params // Returns true on error func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index 7dd4a1f65d..a72b7286c4 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -47,10 +47,10 @@ func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *s // Make the RPC var out structs.IndexedDirEntries + defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC(method, &args, &out); err != nil { return nil, err } - setIndex(resp, out.Index) // Check if we get a not found if len(out.Entries) == 0 { From e2ea4804bd5c9c232dfc976940bcb530860f465c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 13:11:05 -0700 Subject: [PATCH 18/22] agent: Updating tests for new API --- command/agent/catalog_endpoint_test.go | 36 ++++++++++++-------------- command/agent/health_endpoint_test.go | 33 ++++++++++------------- command/agent/http_test.go | 28 +++++++++++++++++--- command/agent/kvs_endpoint_test.go | 12 ++------- 4 files changed, 57 insertions(+), 52 deletions(-) diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 573ddf8863..2d6dc39d4e 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/consul/structs" "net/http" + "net/http/httptest" "os" "testing" "time" @@ -115,14 +116,14 @@ func TestCatalogNodes(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + // Verify an index is set + assertIndex(t, resp) nodes := obj.(structs.Nodes) if len(nodes) != 2 { @@ -170,7 +171,8 @@ func TestCatalogNodes_Blocking(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } @@ -180,7 +182,7 @@ func TestCatalogNodes_Blocking(t *testing.T) { t.Fatalf("too fast") } - if idx <= out.Index { + if idx := getIndex(t, resp); idx <= out.Index { t.Fatalf("bad: %v", idx) } @@ -218,14 +220,13 @@ func TestCatalogServices(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogServices(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogServices(resp, req) if err != nil { t.Fatalf("err: %v", err) } - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) services := obj.(structs.Services) if len(services) != 2 { @@ -262,14 +263,13 @@ func TestCatalogServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogServiceNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) if len(nodes) != 1 { @@ -306,14 +306,12 @@ func TestCatalogNodeServices(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.CatalogNodeServices(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.CatalogNodeServices(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) services := obj.(*structs.NodeServices) if len(services.Services) != 1 { diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index 1bc9e6a04b..b8bf1bef5e 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/consul/structs" "net/http" + "net/http/httptest" "os" "testing" "time" @@ -23,14 +24,12 @@ func TestHealthChecksInState(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthChecksInState(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthChecksInState(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for the server nodes := obj.(structs.HealthChecks) @@ -54,14 +53,12 @@ func TestHealthNodeChecks(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthNodeChecks(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthNodeChecks(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for the server nodes := obj.(structs.HealthChecks) @@ -100,14 +97,12 @@ func TestHealthServiceChecks(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthServiceChecks(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthServiceChecks(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for consul nodes := obj.(structs.HealthChecks) @@ -130,14 +125,12 @@ func TestHealthServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - idx, obj, err := srv.HealthServiceNodes(nil, req) + resp := httptest.NewRecorder() + obj, err := srv.HealthServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } - - if idx == 0 { - t.Fatalf("bad: %v", idx) - } + assertIndex(t, resp) // Should be 1 health check for consul nodes := obj.(structs.CheckServiceNodes) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 4b97c597ae..3beffbf11c 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -9,6 +9,7 @@ import ( "net/http" "net/http/httptest" "os" + "strconv" "testing" "time" ) @@ -69,7 +70,7 @@ func TestContentTypeIsJSON(t *testing.T) { func TestParseWait(t *testing.T) { resp := httptest.NewRecorder() - var b structs.BlockingQuery + var b structs.QueryOptions req, err := http.NewRequest("GET", "/v1/catalog/nodes?wait=60s&index=1000", nil) @@ -91,7 +92,7 @@ func TestParseWait(t *testing.T) { func TestParseWait_InvalidTime(t *testing.T) { resp := httptest.NewRecorder() - var b structs.BlockingQuery + var b structs.QueryOptions req, err := http.NewRequest("GET", "/v1/catalog/nodes?wait=60foo&index=1000", nil) @@ -110,7 +111,7 @@ func TestParseWait_InvalidTime(t *testing.T) { func TestParseWait_InvalidIndex(t *testing.T) { resp := httptest.NewRecorder() - var b structs.BlockingQuery + var b structs.QueryOptions req, err := http.NewRequest("GET", "/v1/catalog/nodes?wait=60s&index=foo", nil) @@ -126,3 +127,24 @@ func TestParseWait_InvalidIndex(t *testing.T) { t.Fatalf("bad code: %v", resp.Code) } } + +// assertIndex tests that X-Consul-Index is set and non-zero +func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) { + header := resp.Header().Get("X-Consul-Index") + if header == "" || header == "0" { + t.Fatalf("Bad: %v", header) + } +} + +// getIndex parses X-Consul-Index +func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 { + header := resp.Header().Get("X-Consul-Index") + if header == "" { + t.Fatalf("Bad: %v", header) + } + val, err := strconv.Atoi(header) + if err != nil { + t.Fatalf("Bad: %v", header) + } + return uint64(val) +} diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go index 6bb476e0a8..f28ef17cf2 100644 --- a/command/agent/kvs_endpoint_test.go +++ b/command/agent/kvs_endpoint_test.go @@ -57,11 +57,7 @@ func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - - header := resp.Header().Get("X-Consul-Index") - if header == "" { - t.Fatalf("Bad: %v", header) - } + assertIndex(t, resp) res, ok := obj.(structs.DirEntries) if !ok { @@ -138,11 +134,7 @@ func TestKVSEndpoint_Recurse(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - - header := resp.Header().Get("X-Consul-Index") - if header == "" { - t.Fatalf("Bad: %v", header) - } + assertIndex(t, resp) res, ok := obj.(structs.DirEntries) if !ok { From 91bed7b31333795f16778ba7e4dd0a807f9f80fe Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 13:19:18 -0700 Subject: [PATCH 19/22] agent: Adding HTTP tests for new features --- command/agent/http_test.go | 105 +++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 3beffbf11c..84d99b44fa 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -41,6 +41,52 @@ func TestSetIndex(t *testing.T) { } } +func TestSetKnownLeader(t *testing.T) { + resp := httptest.NewRecorder() + setKnownLeader(resp, true) + header := resp.Header().Get("X-Consul-KnownLeader") + if header != "true" { + t.Fatalf("Bad: %v", header) + } + resp = httptest.NewRecorder() + setKnownLeader(resp, false) + header = resp.Header().Get("X-Consul-KnownLeader") + if header != "false" { + t.Fatalf("Bad: %v", header) + } +} + +func TestSetLastContact(t *testing.T) { + resp := httptest.NewRecorder() + setLastContact(resp, 123456*time.Microsecond) + header := resp.Header().Get("X-Consul-LastContact") + if header != "123" { + t.Fatalf("Bad: %v", header) + } +} + +func TestSetMeta(t *testing.T) { + meta := structs.QueryMeta{ + Index: 1000, + KnownLeader: true, + LastContact: 123456 * time.Microsecond, + } + resp := httptest.NewRecorder() + setMeta(resp, &meta) + header := resp.Header().Get("X-Consul-Index") + if header != "1000" { + t.Fatalf("Bad: %v", header) + } + header = resp.Header().Get("X-Consul-KnownLeader") + if header != "true" { + t.Fatalf("Bad: %v", header) + } + header = resp.Header().Get("X-Consul-LastContact") + if header != "123" { + t.Fatalf("Bad: %v", header) + } +} + func TestContentTypeIsJSON(t *testing.T) { dir, srv := makeHTTPServer(t) @@ -128,6 +174,65 @@ func TestParseWait_InvalidIndex(t *testing.T) { } } +func TestParseConsistency(t *testing.T) { + resp := httptest.NewRecorder() + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?stale", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseConsistency(resp, req, &b); d { + t.Fatalf("unexpected done") + } + + if !b.AllowStale { + t.Fatalf("Bad: %v", b) + } + if b.RequireConsistent { + t.Fatalf("Bad: %v", b) + } + + b = structs.QueryOptions{} + req, err = http.NewRequest("GET", + "/v1/catalog/nodes?consistent", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseConsistency(resp, req, &b); d { + t.Fatalf("unexpected done") + } + + if b.AllowStale { + t.Fatalf("Bad: %v", b) + } + if !b.RequireConsistent { + t.Fatalf("Bad: %v", b) + } +} + +func TestParseConsistency_Invalid(t *testing.T) { + resp := httptest.NewRecorder() + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?stale&consistent", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseConsistency(resp, req, &b); !d { + t.Fatalf("expected done") + } + + if resp.Code != 400 { + t.Fatalf("bad code: %v", resp.Code) + } +} + // assertIndex tests that X-Consul-Index is set and non-zero func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) { header := resp.Header().Get("X-Consul-Index") From 03adb99a914d69d2c13ce99048ec6b2b22617180 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 13:40:16 -0700 Subject: [PATCH 20/22] website: Document HTTP changes --- website/source/docs/agent/http.html.markdown | 58 ++++++++++++++++---- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index ec616ea7a1..8d4079422a 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -42,6 +42,40 @@ note is that when the query returns there is **no guarantee** of a change. It is possible that the timeout was reached, or that there was an idempotent write that does not affect the result. +## Consistency Modes + +Most of the read query endpoints support multiple levels of consistency. +These are to provide a tuning knob that clients can be used to find a happy +medium that best matches their needs. + +The three read modes are: + +* default - If not specified, this mode is used. It is strongly consistent + in almost all cases. However, there is a small window in which an new + leader may be elected, and the old leader may service stale values. The + trade off is fast reads, but potentially stale values. This condition is + hard to trigger, and most clients should not need to worry about the stale read. + This only applies to reads, and a split-brain is not possible on writes. + +* consistent - This mode is strongly consistent without caveats. It requires + that a leader verify with a quorum of peers that it is still leader. This + introduces an additional round-trip to all server nodes. The trade off is + always consistent reads, but increased latency due to an extra round trip. + Most clients should not use this unless they cannot tolerate a stale read. + +* stale - This mode allows any server to service the read, regardless of if + it is the leader. This means reads can be arbitrarily stale, but are generally + within 50 milliseconds of the leader. The trade off is very fast and scalable + reads but values will be stale. This mode allows reads without a leader, meaning + a cluster that is unavailable will still be able to respond. + +To switch these modes, either the "?stale" or "?consistent" query parameters +are provided. It is an error to provide both. + +To support bounding how stale data is, there is an "X-Consul-LastContact" +which is the last time a server was contacted by the leader node in +milliseconds. The "X-Consul-KnownLeader" also indicates if there is a known +leader. These can be used to gauage if a stale read should be used. ## KV @@ -81,7 +115,8 @@ that modified this key. This index corresponds to the `X-Consul-Index` header value that is returned. A blocking query can be used to wait for a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds to the latest `ModifyIndex` and so a blocking query waits until any of the -listed keys are updated. +listed keys are updated. The multiple consistency modes can be used for +`GET` requests as well. The `Key` is simply the full path of the entry. `Flags` are an opaque unsigned integer that can be attached to each entry. The use of this is @@ -347,7 +382,8 @@ The following endpoints are supported: * /v1/catalog/service/\ : Lists the nodes in a given service * /v1/catalog/node/\ : Lists the services provided by a node -The last 4 endpoints of the catalog support blocking queries. +The last 4 endpoints of the catalog support blocking queries and +consistency modes. ### /v1/catalog/register @@ -473,7 +509,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/catalog/services @@ -492,7 +528,7 @@ It returns a JSON body like this: The main object keys are the service names, while the array provides all the known tags for a given service. -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/catalog/service/\ @@ -517,7 +553,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/catalog/node/\ @@ -549,7 +585,7 @@ It returns a JSON body like this: } } -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ## Health @@ -564,7 +600,7 @@ The following endpoints are supported: * /v1/health/service/\: Returns the nodes and health info of a service * /v1/health/state/\: Returns the checks in a given state -All of the health endpoints supports blocking queries. +All of the health endpoints supports blocking queries and all consistency modes. ### /v1/health/node/\ @@ -603,7 +639,7 @@ joins the Consul cluster, it is part of a distributed failure detection provided by Serf. If a node fails, it is detected and the status is automatically changed to "critical". -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/health/checks/\ @@ -627,7 +663,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/health/service/\ @@ -684,7 +720,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ### /v1/health/state/\ @@ -718,7 +754,7 @@ It returns a JSON body like this: } ] -This endpoint supports blocking queries. +This endpoint supports blocking queries and all consistency modes. ## Status From 832cc90c6ba772314de925514a45890ba65f75ef Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 13:46:57 -0700 Subject: [PATCH 21/22] website: Document the consistency modes --- .../docs/internals/consensus.html.markdown | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/website/source/docs/internals/consensus.html.markdown b/website/source/docs/internals/consensus.html.markdown index 98e52f778e..c4df7a61c5 100644 --- a/website/source/docs/internals/consensus.html.markdown +++ b/website/source/docs/internals/consensus.html.markdown @@ -131,6 +131,39 @@ only for data in their datacenter. When a request is received for a remote datac the request is forwarded to the correct leader. This design allows for lower latency transactions and higher availability without sacrificing consistency. +## Consistency Modes + +Although all writes to the replicated log go through Raft, reads are more +flexible. To support various tradeoffs that developers may want, Consul +supports 3 different consistency modes for reads. + +The three read modes are: + +* default - Raft makes use of leader leasing, providing a time window + in which the leader assumes it's role is stable. However, if a leader + is partitioned from the remaining peers, they may elect a new leader + while the leader is still holding the lease. This means there are 2 leader + nodes. There is no risk of a split-brain since the old leader will be + unable to commit new logs. However, if the old leader services any reads + the values are potentially stale. The default consistency mode relies only + on leader leasing, exposing clients to potentially stale values. We make + this trade off because reads are fast, usually strongly consistent, and + only stale in a hard to trigger situation. The time window of stale reads + is also bounded, since the leader will step down due to the partition. + +* consistent - This mode is strongly consistent without caveats. It requires + that a leader verify with a quorum of peers that it is still leader. This + introduces an additional round-trip to all server nodes. The trade off is + always consistent reads, but increased latency due to an extra round trip. + +* stale - This mode allows any server to service the read, regardless of if + it is the leader. This means reads can be arbitrarily stale, but are generally + within 50 milliseconds of the leader. The trade off is very fast and scalable + reads but values will be stale. This mode allows reads without a leader, meaning + a cluster that is unavailable will still be able to respond. + +For more documentation about using these various modes, see the [HTTP API](/docs/agent/http.html). + ## Deployment Table Below is a table that shows for the number of servers how large the From 23dd1a023cd78b74a52c0f885f8c497f60d74423 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 13:50:18 -0700 Subject: [PATCH 22/22] website: Cleanup verbage --- website/source/docs/internals/consensus.html.markdown | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/source/docs/internals/consensus.html.markdown b/website/source/docs/internals/consensus.html.markdown index c4df7a61c5..55ccad3158 100644 --- a/website/source/docs/internals/consensus.html.markdown +++ b/website/source/docs/internals/consensus.html.markdown @@ -141,8 +141,8 @@ The three read modes are: * default - Raft makes use of leader leasing, providing a time window in which the leader assumes it's role is stable. However, if a leader - is partitioned from the remaining peers, they may elect a new leader - while the leader is still holding the lease. This means there are 2 leader + is partitioned from the remaining peers, a new leader may be elected + while the old leader is holding the lease. This means there are 2 leader nodes. There is no risk of a split-brain since the old leader will be unable to commit new logs. However, if the old leader services any reads the values are potentially stale. The default consistency mode relies only