From fa90f1cd0d2c25f7bcb305645e61d2124f56326f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 18 Apr 2014 17:26:59 -0700 Subject: [PATCH] consul: Support a stale read query --- consul/catalog_endpoint_test.go | 51 +++++++++++++++++++++++++++++++++ consul/rpc.go | 5 ++++ 2 files changed, 56 insertions(+) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 1184d6ddb5..53a23d6b3f 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -232,6 +232,57 @@ func TestCatalogListNodes(t *testing.T) { } } +func TestCatalogListNodes_StaleRaad(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client1 := rpcClient(t, s1) + defer client1.Close() + + dir2, s2 := testServer(t) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + client2 := rpcClient(t, s2) + defer client2.Close() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Use the follower as the client + var client *rpc.Client + if !s1.IsLeader() { + client = client1 + + // Inject fake data on the follower! + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + } else { + client = client2 + + // Inject fake data on the follower! + s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + } + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{AllowStale: true}, + } + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out.Nodes) != 3 { + t.Fatalf("bad: %v", out) + } +} + func BenchmarkCatalogListNodes(t *testing.B) { dir1, s1 := testServer(nil) defer os.RemoveAll(dir1) diff --git a/consul/rpc.go b/consul/rpc.go index 831a5a9df9..c950f25d52 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -142,6 +142,11 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, return true, err } + // Check if we can allow a stale read + if info.IsRead() && info.AllowStaleRead() { + return false, nil + } + // Handle leader forwarding if !s.IsLeader() { err := s.forwardLeader(method, args, reply)