diff --git a/agent/agent_test.go b/agent/agent_test.go index fecd1c208d..a4f227150e 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -2,6 +2,7 @@ package agent import ( "bytes" + "context" "crypto/tls" "encoding/json" "fmt" @@ -12,6 +13,7 @@ import ( "os" "path/filepath" "reflect" + "strconv" "strings" "testing" "time" @@ -23,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/types" @@ -1125,6 +1128,184 @@ func TestAgent_AddCheck_GRPC(t *testing.T) { } } +func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) { + // t.Parallel() don't even think about making this parallel + + // This test is very contrived and tests for the absence of race conditions + // related to the implementation of alias checks. As such it is slow, + // serial, full of sleeps and retries, and not generally a great test to + // run all of the time. + // + // That said it made it incredibly easy to root out various race conditions + // quite successfully. + // + // The original set of races was between: + // + // - agent startup reloading Services and Checks from disk + // - API requests to also re-register those same Services and Checks + // - the goroutines for the as-yet-to-be-stopped CheckAlias goroutines + + if os.Getenv("SLOWTEST") != "1" { + t.Skip("skipping slow test; set SLOWTEST=1 to run") + return + } + + // We do this so that the agent logs and the informational messages from + // the test itself are interwoven properly. + logf := func(t *testing.T, a *TestAgent, format string, args ...interface{}) { + a.logger.Printf("[INFO] testharness: "+format, args...) + } + + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + cfg := ` + server = false + bootstrap = false + enable_central_service_config = false + data_dir = "` + dataDir + `" + ` + a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir} + a.LogOutput = testutil.TestWriter(t) + a.Start(t) + defer os.RemoveAll(dataDir) + defer a.Shutdown() + + testCtx, testCancel := context.WithCancel(context.Background()) + defer testCancel() + + testHTTPServer := launchHTTPCheckServer(t, testCtx) + defer testHTTPServer.Close() + + registerServicesAndChecks := func(t *testing.T, a *TestAgent) { + // add one persistent service with a simple check + require.NoError(t, a.AddService( + &structs.NodeService{ + ID: "ping", + Service: "ping", + Port: 8000, + }, + []*structs.CheckType{ + &structs.CheckType{ + HTTP: testHTTPServer.URL, + Method: "GET", + Interval: 5 * time.Second, + Timeout: 1 * time.Second, + }, + }, + true, "", ConfigSourceLocal, + )) + + // add one persistent sidecar service with an alias check in the manner + // of how sidecar_service would add it + require.NoError(t, a.AddService( + &structs.NodeService{ + ID: "ping-sidecar-proxy", + Service: "ping-sidecar-proxy", + Port: 9000, + }, + []*structs.CheckType{ + &structs.CheckType{ + Name: "Connect Sidecar Aliasing ping", + AliasService: "ping", + }, + }, + true, "", ConfigSourceLocal, + )) + } + + retryUntilCheckState := func(t *testing.T, a *TestAgent, checkID string, expectedStatus string) { + t.Helper() + retry.Run(t, func(r *retry.R) { + chk := a.State.CheckState(types.CheckID(checkID)) + if chk == nil { + r.Fatalf("check=%q is completely missing", checkID) + } + if chk.Check.Status != expectedStatus { + logf(t, a, "check=%q expected status %q but got %q", checkID, expectedStatus, chk.Check.Status) + r.Fatalf("check=%q expected status %q but got %q", checkID, expectedStatus, chk.Check.Status) + } + logf(t, a, "check %q has reached desired status %q", checkID, expectedStatus) + }) + } + + registerServicesAndChecks(t, a) + + time.Sleep(1 * time.Second) + + retryUntilCheckState(t, a, "service:ping", api.HealthPassing) + retryUntilCheckState(t, a, "service:ping-sidecar-proxy", api.HealthPassing) + + logf(t, a, "==== POWERING DOWN ORIGINAL ====") + + require.NoError(t, a.Shutdown()) + + time.Sleep(1 * time.Second) + + futureHCL := cfg + ` +node_id = "` + string(a.Config.NodeID) + `" +node_name = "` + a.Config.NodeName + `" + ` + + restartOnce := func(idx int, t *testing.T) { + t.Helper() + + // Reload and retain former NodeID and data directory. + a2 := &TestAgent{Name: t.Name(), HCL: futureHCL, DataDir: dataDir} + a2.LogOutput = testutil.TestWriter(t) + a2.Start(t) + defer a2.Shutdown() + a = nil + + // reregister during standup; we use an adjustable timing to try and force a race + sleepDur := time.Duration(idx+1) * 500 * time.Millisecond + time.Sleep(sleepDur) + logf(t, a2, "re-registering checks and services after a delay of %v", sleepDur) + for i := 0; i < 20; i++ { // RACE RACE RACE! + registerServicesAndChecks(t, a2) + time.Sleep(50 * time.Millisecond) + } + + time.Sleep(1 * time.Second) + + retryUntilCheckState(t, a2, "service:ping", api.HealthPassing) + + logf(t, a2, "giving the alias check a chance to notice...") + time.Sleep(5 * time.Second) + + retryUntilCheckState(t, a2, "service:ping-sidecar-proxy", api.HealthPassing) + } + + for i := 0; i < 20; i++ { + name := "restart-" + strconv.Itoa(i) + ok := t.Run(name, func(t *testing.T) { + restartOnce(i, t) + }) + require.True(t, ok, name+" failed") + } +} + +func launchHTTPCheckServer(t *testing.T, ctx context.Context) *httptest.Server { + ports := freeport.GetT(t, 1) + port := ports[0] + + addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(port)) + + var lc net.ListenConfig + listener, err := lc.Listen(ctx, "tcp", addr) + require.NoError(t, err) + + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK\n")) + }) + + srv := &httptest.Server{ + Listener: listener, + Config: &http.Server{Handler: handler}, + } + srv.Start() + return srv +} + func TestAgent_AddCheck_Alias(t *testing.T) { t.Parallel() diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 1b5bbe843e..ffd96c1a48 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -33,6 +33,8 @@ type CheckAlias struct { stop bool stopCh chan struct{} stopLock sync.Mutex + + stopWg sync.WaitGroup } // AliasNotifier is a CheckNotifier specifically for the Alias check. @@ -58,15 +60,24 @@ func (c *CheckAlias) Start() { // Stop is used to stop the check. func (c *CheckAlias) Stop() { c.stopLock.Lock() - defer c.stopLock.Unlock() if !c.stop { c.stop = true close(c.stopCh) } + c.stopLock.Unlock() + + // Wait until the associated goroutine is definitely complete before + // returning to the caller. This is to prevent the new and old checks from + // both updating the state of the alias check using possibly stale + // information. + c.stopWg.Wait() } // run is invoked in a goroutine until Stop() is called. func (c *CheckAlias) run(stopCh chan struct{}) { + c.stopWg.Add(1) + defer c.stopWg.Done() + // If we have a specific node set, then use a blocking query if c.Node != "" { c.runQuery(stopCh) @@ -85,13 +96,26 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh) defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID) + // maxDurationBetweenUpdates is maximum time we go between explicit + // notifications before we re-query the aliased service checks anyway. This + // helps in the case we miss an edge triggered event and the alias does not + // accurately reflect the underlying service health status. + const maxDurationBetweenUpdates = 1 * time.Minute + + var refreshTimer <-chan time.Time + extendRefreshTimer := func() { + refreshTimer = time.After(maxDurationBetweenUpdates) + } + updateStatus := func() { checks := c.Notify.Checks() checksList := make([]*structs.HealthCheck, 0, len(checks)) for _, chk := range checks { checksList = append(checksList, chk) } + c.processChecks(checksList) + extendRefreshTimer() } // Immediately run to get the current state of the target service @@ -99,6 +123,8 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { for { select { + case <-refreshTimer: + updateStatus() case <-notifyCh: updateStatus() case <-stopCh: @@ -203,6 +229,8 @@ func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { msg = "All checks passing." } + // TODO(rb): if no matching checks found should this default to critical? + // Update our check value c.Notify.UpdateCheck(c.CheckID, health, msg) } diff --git a/agent/local/state.go b/agent/local/state.go index f18bad5be3..faceaf222f 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -70,6 +70,9 @@ func (s *ServiceState) Clone() *ServiceState { // CheckState describes the state of a health check record. type CheckState struct { // Check is the local copy of the health check record. + // + // Must Clone() the overall CheckState before mutating this. After mutation + // reinstall into the checks map. Check *structs.HealthCheck // Token is the ACL record to update or delete the health check @@ -95,12 +98,15 @@ type CheckState struct { Deleted bool } -// Clone returns a shallow copy of the object. The check record and the -// defer timer still point to the original values and must not be -// modified. +// Clone returns a shallow copy of the object. +// +// The defer timer still points to the original value and must not be modified. func (c *CheckState) Clone() *CheckState { c2 := new(CheckState) *c2 = *c + if c.Check != nil { + c2.Check = c.Check.Clone() + } return c2 } @@ -590,6 +596,18 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { return } + // Ensure we only mutate a copy of the check state and put the finalized + // version into the checks map when complete. + // + // Note that we are relying upon the earlier deferred mutex unlock to + // happen AFTER this defer. As per the Go spec this is true, but leaving + // this note here for the future in case of any refactorings which may not + // notice this relationship. + c = c.Clone() + defer func(c *CheckState) { + l.checks[id] = c + }(c) + // Defer a sync if the output has changed. This is an optimization around // frequent updates of output. Instead, we update the output internally, // and periodically do a write-back to the servers. If there is a status @@ -651,9 +669,9 @@ func (l *State) Checks() map[types.CheckID]*structs.HealthCheck { return m } -// CheckState returns a shallow copy of the current health check state -// record. The health check record and the deferred check still point to -// the original values and must not be modified. +// CheckState returns a shallow copy of the current health check state record. +// +// The defer timer still points to the original value and must not be modified. func (l *State) CheckState(id types.CheckID) *CheckState { l.RLock() defer l.RUnlock() @@ -685,8 +703,9 @@ func (l *State) setCheckStateLocked(c *CheckState) { } // CheckStates returns a shallow copy of all health check state records. -// The health check records and the deferred checks still point to -// the original values and must not be modified. +// The map contains a shallow copy of the current check states. +// +// The defer timers still point to the original values and must not be modified. func (l *State) CheckStates() map[types.CheckID]*CheckState { l.RLock() defer l.RUnlock() @@ -703,9 +722,9 @@ func (l *State) CheckStates() map[types.CheckID]*CheckState { // CriticalCheckStates returns the locally registered checks that the // agent is aware of and are being kept in sync with the server. -// The map contains a shallow copy of the current check states but -// references to the actual check definition which must not be -// modified. +// The map contains a shallow copy of the current check states. +// +// The defer timers still point to the original values and must not be modified. func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { l.RLock() defer l.RUnlock() diff --git a/agent/structs/structs.go b/agent/structs/structs.go index dd2cea2147..720e614ace 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1041,7 +1041,8 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool { return true } -// Clone returns a distinct clone of the HealthCheck. +// Clone returns a distinct clone of the HealthCheck. Note that the +// "ServiceTags" and "Definition.Header" field are not deep copied. func (c *HealthCheck) Clone() *HealthCheck { clone := new(HealthCheck) *clone = *c diff --git a/agent/testagent.go b/agent/testagent.go index b217e6c0d4..3c55215350 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -126,14 +126,15 @@ func (a *TestAgent) Start(t *testing.T) *TestAgent { require.NoError(err, fmt.Sprintf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err)) hclDataDir = `data_dir = "` + d + `"` } - id := NodeID() + var id string for i := 10; i >= 0; i-- { a.Config = TestConfig( randomPortsSource(a.UseTLS), config.Source{Name: a.Name, Format: "hcl", Data: a.HCL}, config.Source{Name: a.Name + ".data_dir", Format: "hcl", Data: hclDataDir}, ) + id = string(a.Config.NodeID) // write the keyring if a.Key != "" {