From b8981317237c86aa3ae1c336c7a229f1608485e3 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Thu, 23 Aug 2018 18:06:39 +0200 Subject: [PATCH] [BUGFIX] Avoid returning empty data on startup of a non-leader server (#4554) Ensure that DB is properly initialized when performing stale queries Addresses: - https://github.com/hashicorp/consul-replicate/issues/82 - https://github.com/hashicorp/consul/issues/3975 - https://github.com/hashicorp/consul-template/issues/1131 --- agent/consul/catalog_endpoint_test.go | 68 ++++++++++++++++++++++----- agent/consul/rpc.go | 4 +- agent/consul/rpc_test.go | 42 +++++++++++++++++ testrpc/wait.go | 14 ++++++ 4 files changed, 113 insertions(+), 15 deletions(-) diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index 7873fb7452..a44a9e4776 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -1467,11 +1467,18 @@ func TestCatalog_ListServices_Timeout(t *testing.T) { func TestCatalog_ListServices_Stale(t *testing.T) { t.Parallel() - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + }) defer os.RemoveAll(dir1) defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" // Enable ACLs! + c.Bootstrap = false // Disable bootstrap + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() args := structs.DCSpecificRequest{ Datacenter: "dc1", @@ -1479,27 +1486,62 @@ func TestCatalog_ListServices_Stale(t *testing.T) { args.AllowStale = true var out structs.IndexedServices - // Inject a fake service - if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { - t.Fatalf("err: %v", err) - } - if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + // Inject a node + if err := s1.fsm.State().EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - // Run the query, do not wait for leader! + codec := rpcClient(t, s2) + defer codec.Close() + + // Run the query, do not wait for leader, never any contact with leader, should fail + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() { + t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out) + } + + // Try to join + joinLAN(t, s2, s1) + retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) }) + waitForLeader(s1, s2) + + testrpc.WaitForLeader(t, s2.RPC, "dc1") if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } - // Should find the service + // Should find the services if len(out.Services) != 1 { - t.Fatalf("bad: %v", out) + t.Fatalf("bad: %#v", out.Services) + } + + if !out.KnownLeader { + t.Fatalf("should have a leader: %v", out) + } + + s1.Leave() + s1.Shutdown() + + testrpc.WaitUntilNoLeader(t, s2.RPC, "dc1") + + args.AllowStale = false + // Since the leader is now down, non-stale query should fail now + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() { + t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out) + } + + // With stale, request should still work + args.AllowStale = true + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Should find old service + if len(out.Services) != 1 { + t.Fatalf("bad: %#v", out) } - // Should not have a leader! Stale read if out.KnownLeader { - t.Fatalf("bad: %v", out) + t.Fatalf("should not have a leader anymore: %#v", out) } } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 3bd26f4305..b584cf7c1b 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -204,8 +204,8 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, return true, err } - // Check if we can allow a stale read - if info.IsRead() && info.AllowStaleRead() { + // Check if we can allow a stale read, ensure our local DB is initialized + if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() { return false, nil } diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 62ae3ae246..cd4e3d0aa1 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -48,6 +48,48 @@ func TestRPC_NoLeader_Fail(t *testing.T) { } } +func TestRPC_NoLeader_Fail_on_stale_read(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.RPCHoldTimeout = 1 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + } + var out struct{} + + // Make sure we eventually fail with a no leader error, which we should + // see given the short timeout. + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) + if err == nil || err.Error() != structs.ErrNoLeader.Error() { + t.Fatalf("bad: %v", err) + } + + // Until leader has never been known, stale should fail + getKeysReq := structs.KeyListRequest{ + Datacenter: "dc1", + Prefix: "", + Seperator: "/", + QueryOptions: structs.QueryOptions{AllowStale: true}, + } + var keyList structs.IndexedKeyList + if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err.Error() != structs.ErrNoLeader.Error() { + t.Fatalf("expected %v but got err: %v", structs.ErrNoLeader, err) + } + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err != nil { + t.Fatalf("Did not expect any error but got err: %v", err) + } +} + func TestRPC_NoLeader_Retry(t *testing.T) { t.Parallel() dir1, s1 := testServerWithConfig(t, func(c *Config) { diff --git a/testrpc/wait.go b/testrpc/wait.go index 90d7481bc7..b0e6e2ec0e 100644 --- a/testrpc/wait.go +++ b/testrpc/wait.go @@ -26,6 +26,20 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string) { }) } +// WaitUntilNoLeader ensures no leader is present, useful for testing lost leadership. +func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string) { + var out structs.IndexedNodes + retry.Run(t, func(r *retry.R) { + args := &structs.DCSpecificRequest{Datacenter: dc} + if err := rpc("Catalog.ListNodes", args, &out); err == nil { + r.Fatalf("It still has a leader: %#q", out) + } + if out.QueryMeta.KnownLeader { + r.Fatalf("Has still a leader") + } + }) +} + // WaitForTestAgent ensures we have a node with serfHealth check registered func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string) { var nodes structs.IndexedNodes