From 632e4a2c692512817fe9e282388d353f39149ad7 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Jun 2018 18:15:48 -0700 Subject: [PATCH] agent/checks: add Alias check type --- agent/checks/alias.go | 135 +++++++++++++++++++ agent/checks/alias_test.go | 267 +++++++++++++++++++++++++++++++++++++ agent/checks/check.go | 7 + 3 files changed, 409 insertions(+) create mode 100644 agent/checks/alias.go create mode 100644 agent/checks/alias_test.go diff --git a/agent/checks/alias.go b/agent/checks/alias.go new file mode 100644 index 0000000000..3ee7d62171 --- /dev/null +++ b/agent/checks/alias.go @@ -0,0 +1,135 @@ +package checks + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" +) + +// Constants related to alias check backoff. +const ( + checkAliasBackoffMin = 3 // 3 attempts before backing off + checkAliasBackoffMaxWait = 1 * time.Minute // maximum backoff wait time +) + +// CheckAlias is a check type that aliases the health of another service +// instance. If the service aliased has any critical health checks, then +// this check is critical. If the service has no critical but warnings, +// then this check is warning, and if a service has only passing checks, then +// this check is passing. +type CheckAlias struct { + Node string // Node name of the service. If empty, assumed to be this node. + ServiceID string // ID (not name) of the service to alias + + CheckID types.CheckID // ID of this check + RPC RPC // Used to query remote server if necessary + Notify CheckNotifier // For updating the check state + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// Start is used to start a check ttl, runs until Stop() func (c *CheckAlias) Start() { +func (c *CheckAlias) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + c.stop = false + c.stopCh = make(chan struct{}) + go c.run(c.stopCh) +} + +// Stop is used to stop a check ttl. +func (c *CheckAlias) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.stop = true + close(c.stopCh) + } +} + +// run is invoked in a goroutine until Stop() is called. +func (c *CheckAlias) run(stopCh chan struct{}) { + args := structs.NodeSpecificRequest{Node: c.Node} + args.AllowStale = true + args.MaxQueryTime = 1 * time.Minute + + var attempt uint + for { + // Check if we're stopped. We fallthrough and block otherwise, + // which has a maximum time set above so we'll always check for + // stop within a reasonable amount of time. + select { + case <-stopCh: + return + default: + } + + // Backoff if we have to + if attempt > checkAliasBackoffMin { + waitTime := (1 << (attempt - checkAliasBackoffMin)) * time.Second + if waitTime > checkAliasBackoffMaxWait { + waitTime = checkAliasBackoffMaxWait + } + time.Sleep(waitTime) + } + + // Get the current health checks for the specified node. + // + // NOTE(mitchellh): This currently returns ALL health checks for + // a node even though we also have the service ID. This can be + // optimized if we introduce a new RPC endpoint to filter both, + // but for blocking queries isn't that more efficient since the checks + // index is global to the cluster. + var out structs.IndexedHealthChecks + if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { + attempt++ + continue + } + + attempt = 0 // Reset the attempts so we don't backoff the next + + // Set our index for the next request + args.MinQueryIndex = out.Index + + // We want to ensure that we're always blocking on subsequent requests + // to avoid hot loops. Index 1 is always safe since the min raft index + // is at least 5. Note this shouldn't happen but protecting against this + // case is safer than a 100% CPU loop. + if args.MinQueryIndex < 1 { + args.MinQueryIndex = 1 + } + + health := api.HealthPassing + msg := "All checks passing." + if len(out.HealthChecks) == 0 { + // No health checks means we're healthy by default + msg = "No checks found." + } + for _, chk := range out.HealthChecks { + if chk.ServiceID != c.ServiceID || chk.Node != c.Node { + continue + } + + if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { + health = chk.Status + msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) + + // Critical checks exit the for loop immediately since we + // know that this is the health state. Warnings do not since + // there may still be a critical check. + if chk.Status == api.HealthCritical { + break + } + } + } + + // Update our check value + c.Notify.UpdateCheck(c.CheckID, health, msg) + } +} diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go new file mode 100644 index 0000000000..27f089bb57 --- /dev/null +++ b/agent/checks/alias_test.go @@ -0,0 +1,267 @@ +package checks + +import ( + "fmt" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/consul/agent/mock" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/consul/types" + //"github.com/stretchr/testify/require" +) + +// Test that we do a backoff on error. +func TestCheckAlias_remoteErrBackoff(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(fmt.Errorf("failure")) + + chk.Start() + defer chk.Stop() + + time.Sleep(100 * time.Millisecond) + if got, want := atomic.LoadUint32(&rpc.Calls), uint32(6); got > want { + t.Fatalf("got %d updates want at most %d", got, want) + } +} + +// No remote health checks should result in passing on the check. +func TestCheckAlias_remoteNoChecks(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{}) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Only passing should result in passing +func TestCheckAlias_remotePassing(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// If any checks are critical, it should be critical +func TestCheckAlias_remoteCritical(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthCritical, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// If no checks are critical and at least one is warning, then it should warn +func TestCheckAlias_remoteWarning(t *testing.T) { + t.Parallel() + + notify := mock.NewNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthWarning, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthWarning; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// mockRPC is an implementation of RPC that can be used for tests. The +// atomic.Value fields can be set concurrently and will reflect on the next +// RPC call. +type mockRPC struct { + Calls uint32 // Read-only, number of RPC calls + Args atomic.Value // Read-only, the last args sent + + // Write-only, the reply to send. If of type "error" then an error will + // be returned from the RPC call. + Reply atomic.Value +} + +func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error { + atomic.AddUint32(&m.Calls, 1) + m.Args.Store(args) + + // We don't adhere to blocking queries, so this helps prevent + // too much CPU usage on the check loop. + time.Sleep(10 * time.Millisecond) + + // This whole machinery below sets the value of the reply. This is + // basically what net/rpc does internally, though much condensed. + replyv := reflect.ValueOf(reply) + if replyv.Kind() != reflect.Ptr { + return fmt.Errorf("RPC reply must be pointer") + } + replyv = replyv.Elem() // Get pointer value + replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value + if v := m.Reply.Load(); v != nil { + // Return an error if the reply is an error type + if err, ok := v.(error); ok { + return err + } + + replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil + } + + return nil +} diff --git a/agent/checks/check.go b/agent/checks/check.go index bc41206f08..b09bdc8be1 100644 --- a/agent/checks/check.go +++ b/agent/checks/check.go @@ -38,6 +38,13 @@ const ( UserAgent = "Consul Health Check" ) +// RPC is an interface that an RPC client must implement. This is a helper +// interface that is implemented by the agent delegate for checks that need +// to make RPC calls. +type RPC interface { + RPC(method string, args interface{}, reply interface{}) error +} + // CheckNotifier interface is used by the CheckMonitor // to notify when a check has a status update. The update // should take care to be idempotent.