From cfe0651208816dc337e76ecae222ac2a0c18f1f5 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Sun, 10 Apr 2016 21:20:39 -0700 Subject: [PATCH] Syncs a check's output with the catalog when output rate limiting isn't in effect. --- command/agent/local.go | 25 +++++++- command/agent/local_test.go | 113 +++++++++++++++++++++++++++++++++++- 2 files changed, 132 insertions(+), 6 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index e456ebb2e5..d0f6c0a471 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -434,11 +434,28 @@ func (l *localState) setSyncState() error { if l.config.CheckUpdateInterval == 0 { equal = existing.IsSame(check) } else { + // Copy the existing check before potentially modifying + // it before the compare operation. eCopy := new(structs.HealthCheck) *eCopy = *existing - eCopy.Output = "" - check.Output = "" - equal = eCopy.IsSame(check) + + // Copy the server's check before modifying, otherwise + // in-memory RPC-based unit tests will have side effects. + cCopy := new(structs.HealthCheck) + *cCopy = *check + + // If there's a defer timer active then we've got a + // potentially spammy check so we don't sync the output + // during this sweep since the timer will mark the check + // out of sync for us. Otherwise, it is safe to sync the + // output now. This is especially important for checks + // that don't change state after they are created, in + // which case we'd never see their output synced back ever. + if _, ok := l.deferCheck[id]; ok { + eCopy.Output = "" + cCopy.Output = "" + } + equal = eCopy.IsSame(cCopy) } // Update the status @@ -499,6 +516,8 @@ func (l *localState) syncChanges() error { if err := l.syncNodeInfo(); err != nil { return err } + } else { + l.logger.Printf("[DEBUG] agent: Node info in sync") } return nil diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 62c418f819..d8abfe58de 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -654,7 +654,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { conf := nextConfig() - conf.CheckUpdateInterval = 100 * time.Millisecond + conf.CheckUpdateInterval = 500 * time.Millisecond dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) defer agent.Shutdown() @@ -693,8 +693,8 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { // Update the check output! Should be deferred agent.state.UpdateCheck("web", structs.HealthPassing, "output") - // Should not update for 100 milliseconds - time.Sleep(50 * time.Millisecond) + // Should not update for 500 milliseconds + time.Sleep(250 * time.Millisecond) if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { t.Fatalf("err: %v", err) } @@ -729,6 +729,113 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { }, func(err error) { t.Fatalf("err: %s", err) }) + + // Change the output in the catalog to force it out of sync. + eCopy := new(structs.HealthCheck) + *eCopy = *check + eCopy.Output = "changed" + reg := structs.RegisterRequest{ + Datacenter: agent.config.Datacenter, + Node: agent.config.NodeName, + Address: agent.config.AdvertiseAddr, + TaggedAddresses: agent.config.TaggedAddresses, + Check: eCopy, + WriteRequest: structs.WriteRequest{}, + } + var out struct{} + if err := agent.RPC("Catalog.Register", ®, &out); err != nil { + t.Fatalf("err: %s", err) + } + + // Verify that the output is out of sync. + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + t.Fatalf("err: %v", err) + } + for _, chk := range checks.HealthChecks { + switch chk.CheckID { + case "web": + if chk.Output != "changed" { + t.Fatalf("unexpected update: %v", chk) + } + } + } + + // Trigger anti-entropy run and wait. + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that the output was synced back to the agent's value. + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + t.Fatalf("err: %v", err) + } + for _, chk := range checks.HealthChecks { + switch chk.CheckID { + case "web": + if chk.Output != "output" { + t.Fatalf("missed update: %v", chk) + } + } + } + + // Reset the catalog again. + if err := agent.RPC("Catalog.Register", ®, &out); err != nil { + t.Fatalf("err: %s", err) + } + + // Verify that the output is out of sync. + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + t.Fatalf("err: %v", err) + } + for _, chk := range checks.HealthChecks { + switch chk.CheckID { + case "web": + if chk.Output != "changed" { + t.Fatalf("unexpected update: %v", chk) + } + } + } + + // Now make an update that should be deferred. + agent.state.UpdateCheck("web", structs.HealthPassing, "deferred") + + // Trigger anti-entropy run and wait. + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that the output is still out of sync since there's a deferred + // update pending. + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + t.Fatalf("err: %v", err) + } + for _, chk := range checks.HealthChecks { + switch chk.CheckID { + case "web": + if chk.Output != "changed" { + t.Fatalf("unexpected update: %v", chk) + } + } + } + + // Wait for the deferred update. + testutil.WaitForResult(func() (bool, error) { + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + return false, err + } + + // Verify updated + for _, chk := range checks.HealthChecks { + switch chk.CheckID { + case "web": + if chk.Output != "deferred" { + return false, fmt.Errorf("no update: %v", chk) + } + } + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) } func TestAgentAntiEntropy_NodeInfo(t *testing.T) {