From 1d1dd8f8d26a42053d273fa060b2f19253b4b178 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 5 Feb 2014 11:00:43 -0800 Subject: [PATCH] consul: Enable ListNodes and ListServices to be a blocking query --- consul/catalog_endpoint.go | 30 ++++--- consul/catalog_endpoint_test.go | 137 ++++++++++++++++++++++++++++---- consul/state_store.go | 3 +- 3 files changed, 140 insertions(+), 30 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 006ca07986..b885653593 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -86,31 +86,35 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { } // ListNodes is used to query the nodes in a DC -func (c *Catalog) ListNodes(dc string, reply *structs.Nodes) error { - if done, err := c.srv.forward("Catalog.ListNodes", dc, dc, reply); done { +func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { + if done, err := c.srv.forward("Catalog.ListNodes", args.Datacenter, args, reply); done { return err } - // Get the current nodes + // Get the local state state := c.srv.fsm.State() - _, nodes := state.Nodes() - - *reply = nodes - return nil + return c.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("Nodes"), + func() (uint64, error) { + reply.Index, reply.Nodes = state.Nodes() + return reply.Index, nil + }) } // ListServices is used to query the services in a DC -func (c *Catalog) ListServices(dc string, reply *structs.Services) error { - if done, err := c.srv.forward("Catalog.ListServices", dc, dc, reply); done { +func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { + if done, err := c.srv.forward("Catalog.ListServices", args.Datacenter, args, reply); done { return err } // Get the current nodes state := c.srv.fsm.State() - _, services := state.Services() - - *reply = services - return nil + return c.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("Services"), + func() (uint64, error) { + reply.Index, reply.Services = state.Services() + return reply.Index, nil + }) } // ServiceNodes returns all the nodes registered as part of a service diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 9c8c03f218..0141576c05 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -197,8 +197,11 @@ func TestCatalogListNodes(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - var out structs.Nodes - err := client.Call("Catalog.ListNodes", "dc1", &out) + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var out structs.IndexedNodes + err := client.Call("Catalog.ListNodes", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } @@ -209,22 +212,22 @@ func TestCatalogListNodes(t *testing.T) { // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) - if err := client.Call("Catalog.ListNodes", "dc1", &out); err != nil { + if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } - if len(out) != 2 { + if len(out.Nodes) != 2 { t.Fatalf("bad: %v", out) } // Server node is auto added from Serf - if out[0].Node != s1.config.NodeName { + if out.Nodes[0].Node != s1.config.NodeName { t.Fatalf("bad: %v", out) } - if out[1].Node != "foo" { + if out.Nodes[1].Node != "foo" { t.Fatalf("bad: %v", out) } - if out[1].Address != "127.0.0.1" { + if out.Nodes[1].Address != "127.0.0.1" { t.Fatalf("bad: %v", out) } } @@ -242,9 +245,12 @@ func BenchmarkCatalogListNodes(t *testing.B) { // Just add a node s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } for i := 0; i < t.N; i++ { - var out structs.Nodes - if err := client.Call("Catalog.ListNodes", "dc1", &out); err != nil { + var out structs.IndexedNodes + if err := client.Call("Catalog.ListNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -257,8 +263,11 @@ func TestCatalogListServices(t *testing.T) { client := rpcClient(t, s1) defer client.Close() - var out structs.Services - err := client.Call("Catalog.ListServices", "dc1", &out) + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var out structs.IndexedServices + err := client.Call("Catalog.ListServices", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } @@ -270,21 +279,117 @@ func TestCatalogListServices(t *testing.T) { s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000}) - if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil { + if err := client.Call("Catalog.ListServices", &args, &out); err != nil { t.Fatalf("err: %v", err) } - if len(out) != 2 { + if len(out.Services) != 2 { t.Fatalf("bad: %v", out) } // Consul service should auto-register - if len(out["consul"]) != 1 { + if len(out.Services["consul"]) != 1 { t.Fatalf("bad: %v", out) } - if len(out["db"]) != 1 { + if len(out.Services["db"]) != 1 { t.Fatalf("bad: %v", out) } - if out["db"][0] != "primary" { + if out.Services["db"][0] != "primary" { + t.Fatalf("bad: %v", out) + } +} + +func TestCatalogListServices_Blocking(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var out structs.IndexedServices + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + // Run the query + if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Setup a blocking query + args.MinQueryIndex = out.Index + args.MaxQueryTime = time.Second + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000}) + }() + + // Re-run the query + out = structs.IndexedServices{} + if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Should block at least 100ms + if time.Now().Sub(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + + // Check the indexes + if out.Index != 2 { + t.Fatalf("bad: %v", out) + } + + // Should find the service + if len(out.Services) != 2 { + t.Fatalf("bad: %v", out) + } +} + +func TestCatalogListServices_Timeout(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var out structs.IndexedServices + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + // Run the query + if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Setup a blocking query + args.MinQueryIndex = out.Index + args.MaxQueryTime = 100 * time.Millisecond + + // Re-run the query + start := time.Now() + out = structs.IndexedServices{} + if err := client.Call("Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Should block at least 100ms + if time.Now().Sub(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + + // Check the indexes, should not change + if out.Index != args.MinQueryIndex { t.Fatalf("bad: %v", out) } } diff --git a/consul/state_store.go b/consul/state_store.go index 52e051d88c..ad7ff3d06e 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -195,7 +195,8 @@ func (s *StateStore) initialize() error { // Setup the query tables // TODO: Other queries... s.queryTables = map[string]MDBTables{ - "Nodes": MDBTables{s.nodeTable}, + "Nodes": MDBTables{s.nodeTable}, + "Services": MDBTables{s.serviceTable}, } return nil }