diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 76d678031e..bdff0f397b 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -89,6 +89,14 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } agent.state.AddService(srv5, "") + srv5_mod := new(structs.NodeService) + *srv5_mod = *srv5 + srv5_mod.Address = "127.0.0.1" + args.Service = srv5_mod + if err := agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + // Exists local, in sync, remote missing (create) srv6 := &structs.NodeService{ ID: "cache", @@ -99,139 +107,144 @@ func TestAgentAntiEntropy_Services(t *testing.T) { agent.state.AddService(srv6, "") agent.state.serviceStatus["cache"] = syncStatus{inSync: true} - srv5_mod := new(structs.NodeService) - *srv5_mod = *srv5 - srv5_mod.Address = "127.0.0.1" - args.Service = srv5_mod - if err := agent.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) - } - // Trigger anti-entropy run and wait agent.StartSync() - time.Sleep(200 * time.Millisecond) - // Verify that we are in sync + var services structs.IndexedNodeServices req := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: agent.config.NodeName, } - var services structs.IndexedNodeServices - if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { - t.Fatalf("err: %v", err) - } - // Make sure we sent along our tagged addresses when we synced. - addrs := services.NodeServices.Node.TaggedAddresses - if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { - t.Fatalf("bad: %v", addrs) - } + verifyServices := func() (bool, error) { + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + return false, fmt.Errorf("err: %v", err) + } - // We should have 6 services (consul included) - if len(services.NodeServices.Services) != 6 { - t.Fatalf("bad: %v", services.NodeServices.Services) - } + // Make sure we sent along our tagged addresses when we synced. + addrs := services.NodeServices.Node.TaggedAddresses + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + return false, fmt.Errorf("bad: %v", addrs) + } - // All the services should match - for id, serv := range services.NodeServices.Services { - serv.CreateIndex, serv.ModifyIndex = 0, 0 - switch id { - case "mysql": - if !reflect.DeepEqual(serv, srv1) { - t.Fatalf("bad: %v %v", serv, srv1) - } - case "redis": - if !reflect.DeepEqual(serv, srv2) { - t.Fatalf("bad: %#v %#v", serv, srv2) - } - case "web": - if !reflect.DeepEqual(serv, srv3) { - t.Fatalf("bad: %v %v", serv, srv3) - } - case "api": - if !reflect.DeepEqual(serv, srv5) { - t.Fatalf("bad: %v %v", serv, srv5) - } - case "cache": - if !reflect.DeepEqual(serv, srv6) { - t.Fatalf("bad: %v %v", serv, srv6) + // We should have 6 services (consul included) + if len(services.NodeServices.Services) != 6 { + return false, fmt.Errorf("bad: %v", services.NodeServices.Services) + } + + // All the services should match + for id, serv := range services.NodeServices.Services { + serv.CreateIndex, serv.ModifyIndex = 0, 0 + switch id { + case "mysql": + if !reflect.DeepEqual(serv, srv1) { + return false, fmt.Errorf("bad: %v %v", serv, srv1) + } + case "redis": + if !reflect.DeepEqual(serv, srv2) { + return false, fmt.Errorf("bad: %#v %#v", serv, srv2) + } + case "web": + if !reflect.DeepEqual(serv, srv3) { + return false, fmt.Errorf("bad: %v %v", serv, srv3) + } + case "api": + if !reflect.DeepEqual(serv, srv5) { + return false, fmt.Errorf("bad: %v %v", serv, srv5) + } + case "cache": + if !reflect.DeepEqual(serv, srv6) { + return false, fmt.Errorf("bad: %v %v", serv, srv6) + } + case "consul": + // ignore + default: + return false, fmt.Errorf("unexpected service: %v", id) } - case "consul": - // ignore - default: - t.Fatalf("unexpected service: %v", id) } - } - // Check the local state - if len(agent.state.services) != 6 { - t.Fatalf("bad: %v", agent.state.services) - } - if len(agent.state.serviceStatus) != 6 { - t.Fatalf("bad: %v", agent.state.serviceStatus) - } - for name, status := range agent.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // Check the local state + if len(agent.state.services) != 6 { + return false, fmt.Errorf("bad: %v", agent.state.services) + } + if len(agent.state.serviceStatus) != 6 { + return false, fmt.Errorf("bad: %v", agent.state.serviceStatus) } + for name, status := range agent.state.serviceStatus { + if !status.inSync { + return false, fmt.Errorf("should be in sync: %v %v", name, status) + } + } + + return true, nil } + testutil.WaitForResult(verifyServices, func(err error) { + t.Fatal(err) + }) + // Remove one of the services agent.state.RemoveService("api") // Trigger anti-entropy run and wait agent.StartSync() - time.Sleep(200 * time.Millisecond) - // Verify that we are in sync - if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { - t.Fatalf("err: %v", err) - } + verifyServicesAfterRemove := func() (bool, error) { + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + return false, fmt.Errorf("err: %v", err) + } - // We should have 5 services (consul included) - if len(services.NodeServices.Services) != 5 { - t.Fatalf("bad: %v", services.NodeServices.Services) - } + // We should have 5 services (consul included) + if len(services.NodeServices.Services) != 5 { + return false, fmt.Errorf("bad: %v", services.NodeServices.Services) + } - // All the services should match - for id, serv := range services.NodeServices.Services { - serv.CreateIndex, serv.ModifyIndex = 0, 0 - switch id { - case "mysql": - if !reflect.DeepEqual(serv, srv1) { - t.Fatalf("bad: %v %v", serv, srv1) - } - case "redis": - if !reflect.DeepEqual(serv, srv2) { - t.Fatalf("bad: %#v %#v", serv, srv2) - } - case "web": - if !reflect.DeepEqual(serv, srv3) { - t.Fatalf("bad: %v %v", serv, srv3) - } - case "cache": - if !reflect.DeepEqual(serv, srv6) { - t.Fatalf("bad: %v %v", serv, srv6) + // All the services should match + for id, serv := range services.NodeServices.Services { + serv.CreateIndex, serv.ModifyIndex = 0, 0 + switch id { + case "mysql": + if !reflect.DeepEqual(serv, srv1) { + return false, fmt.Errorf("bad: %v %v", serv, srv1) + } + case "redis": + if !reflect.DeepEqual(serv, srv2) { + return false, fmt.Errorf("bad: %#v %#v", serv, srv2) + } + case "web": + if !reflect.DeepEqual(serv, srv3) { + return false, fmt.Errorf("bad: %v %v", serv, srv3) + } + case "cache": + if !reflect.DeepEqual(serv, srv6) { + return false, fmt.Errorf("bad: %v %v", serv, srv6) + } + case "consul": + // ignore + default: + return false, fmt.Errorf("unexpected service: %v", id) } - case "consul": - // ignore - default: - t.Fatalf("unexpected service: %v", id) } - } - // Check the local state - if len(agent.state.services) != 5 { - t.Fatalf("bad: %v", agent.state.services) - } - if len(agent.state.serviceStatus) != 5 { - t.Fatalf("bad: %v", agent.state.serviceStatus) - } - for name, status := range agent.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // Check the local state + if len(agent.state.services) != 5 { + return false, fmt.Errorf("bad: %v", agent.state.services) + } + if len(agent.state.serviceStatus) != 5 { + return false, fmt.Errorf("bad: %v", agent.state.serviceStatus) + } + for name, status := range agent.state.serviceStatus { + if !status.inSync { + return false, fmt.Errorf("should be in sync: %v %v", name, status) + } } + + return true, nil } + + testutil.WaitForResult(verifyServicesAfterRemove, func(err error) { + t.Fatal(err) + }) } func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { @@ -287,48 +300,55 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { // Trigger anti-entropy run and wait agent.StartSync() - time.Sleep(200 * time.Millisecond) - // Verify that we are in sync req := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: agent.config.NodeName, } var services structs.IndexedNodeServices - if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { - t.Fatalf("err: %v", err) - } - // All the services should match - for id, serv := range services.NodeServices.Services { - serv.CreateIndex, serv.ModifyIndex = 0, 0 - switch id { - case "svc_id1": - if serv.ID != "svc_id1" || - serv.Service != "svc1" || - serv.Port != 6100 || - !reflect.DeepEqual(serv.Tags, []string{"tag1_mod"}) { - t.Fatalf("bad: %v %v", serv, srv1) - } - case "svc_id2": - if serv.ID != "svc_id2" || - serv.Service != "svc2" || - serv.Port != 6200 || - !reflect.DeepEqual(serv.Tags, []string{"tag2"}) { - t.Fatalf("bad: %v %v", serv, srv2) + verifyServices := func() (bool, error) { + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + return false, fmt.Errorf("err: %v", err) + } + + // All the services should match + for id, serv := range services.NodeServices.Services { + serv.CreateIndex, serv.ModifyIndex = 0, 0 + switch id { + case "svc_id1": + if serv.ID != "svc_id1" || + serv.Service != "svc1" || + serv.Port != 6100 || + !reflect.DeepEqual(serv.Tags, []string{"tag1_mod"}) { + return false, fmt.Errorf("bad: %v %v", serv, srv1) + } + case "svc_id2": + if serv.ID != "svc_id2" || + serv.Service != "svc2" || + serv.Port != 6200 || + !reflect.DeepEqual(serv.Tags, []string{"tag2"}) { + return false, fmt.Errorf("bad: %v %v", serv, srv2) + } + case "consul": + // ignore + default: + return false, fmt.Errorf("unexpected service: %v", id) } - case "consul": - // ignore - default: - t.Fatalf("unexpected service: %v", id) } - } - for name, status := range agent.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + for name, status := range agent.state.serviceStatus { + if !status.inSync { + return false, fmt.Errorf("should be in sync: %v %v", name, status) + } } + + return true, nil } + + testutil.WaitForResult(verifyServices, func(err error) { + t.Fatal(err) + }) } func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { @@ -784,7 +804,6 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { // Trigger anti-entropy run and wait agent.StartSync() - time.Sleep(200 * time.Millisecond) // Verify that we are in sync req := structs.NodeSpecificRequest{ @@ -792,14 +811,21 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { Node: agent.config.NodeName, } var checks structs.IndexedHealthChecks - if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { - t.Fatalf("err: %v", err) - } - // Verify checks in place - if len(checks.HealthChecks) != 2 { - t.Fatalf("checks: %v", check) - } + testutil.WaitForResult(func() (bool, error) { + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + return false, fmt.Errorf("err: %v", err) + } + + // Verify checks in place + if len(checks.HealthChecks) != 2 { + return false, fmt.Errorf("checks: %v", check) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) // Update the check output! Should be deferred agent.state.UpdateCheck("web", structs.HealthPassing, "output") @@ -969,24 +995,30 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { // Trigger anti-entropy run and wait agent.StartSync() - time.Sleep(200 * time.Millisecond) - // Verify that we are in sync req := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: agent.config.NodeName, } var services structs.IndexedNodeServices - if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { - t.Fatalf("err: %v", err) - } - // Make sure we synced our node info - this should have ridden on the - // "consul" service sync - addrs := services.NodeServices.Node.TaggedAddresses - if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { - t.Fatalf("bad: %v", addrs) - } + // Wait for the sync + testutil.WaitForResult(func() (bool, error) { + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + return false, fmt.Errorf("err: %v", err) + } + + // Make sure we synced our node info - this should have ridden on the + // "consul" service sync + addrs := services.NodeServices.Node.TaggedAddresses + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + return false, fmt.Errorf("bad: %v", addrs) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) // Blow away the catalog version of the node info if err := agent.RPC("Catalog.Register", args, &out); err != nil { @@ -995,17 +1027,22 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { // Trigger anti-entropy run and wait agent.StartSync() - time.Sleep(200 * time.Millisecond) - // Verify that we are in sync - this should have been a sync of just the + // Wait for the sync - this should have been a sync of just the // node info - if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { - t.Fatalf("err: %v", err) - } - addrs = services.NodeServices.Node.TaggedAddresses - if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { - t.Fatalf("bad: %v", addrs) - } + testutil.WaitForResult(func() (bool, error) { + if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil { + return false, fmt.Errorf("err: %v", err) + } + addrs := services.NodeServices.Node.TaggedAddresses + if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) { + return false, fmt.Errorf("bad: %v", addrs) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestAgentAntiEntropy_deleteService_fails(t *testing.T) {