diff --git a/agent/agent.go b/agent/agent.go index c4eea97038..603ac23c7f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -151,6 +151,9 @@ type Agent struct { // checkDockers maps the check ID to an associated Docker Exec based check checkDockers map[types.CheckID]*checks.CheckDocker + // checkAliases maps the check ID to an associated Alias checks + checkAliases map[types.CheckID]*checks.CheckAlias + // checkLock protects updates to the check* maps checkLock sync.Mutex @@ -235,6 +238,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) { checkTCPs: make(map[types.CheckID]*checks.CheckTCP), checkGRPCs: make(map[types.CheckID]*checks.CheckGRPC), checkDockers: make(map[types.CheckID]*checks.CheckDocker), + checkAliases: make(map[types.CheckID]*checks.CheckAlias), eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), joinLANNotifier: &systemd.Notifier{}, @@ -1314,6 +1318,9 @@ func (a *Agent) ShutdownAgent() error { for _, chk := range a.checkDockers { chk.Stop() } + for _, chk := range a.checkAliases { + chk.Stop() + } // Stop the proxy manager if a.proxyManager != nil { @@ -2007,6 +2014,35 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, monitor.Start() a.checkMonitors[check.CheckID] = monitor + case chkType.IsAlias(): + if existing, ok := a.checkAliases[check.CheckID]; ok { + existing.Stop() + delete(a.checkAliases, check.CheckID) + } + + var rpcReq structs.NodeSpecificRequest + rpcReq.Datacenter = a.config.Datacenter + + // The token to set is really important. The behavior below follows + // the same behavior as anti-entropy: we use the user-specified token + // if set (either on the service or check definition), otherwise + // we use the "UserToken" on the agent. This is tested. + rpcReq.Token = a.tokens.UserToken() + if token != "" { + rpcReq.Token = token + } + + chkImpl := &checks.CheckAlias{ + Notify: a.State, + RPC: a.delegate, + RPCReq: rpcReq, + CheckID: check.CheckID, + Node: chkType.AliasNode, + ServiceID: chkType.AliasService, + } + chkImpl.Start() + a.checkAliases[check.CheckID] = chkImpl + default: return fmt.Errorf("Check type is not valid") } diff --git a/agent/agent_test.go b/agent/agent_test.go index 8f46b5abc4..1905beb882 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -936,6 +936,128 @@ func TestAgent_AddCheck_GRPC(t *testing.T) { } } +func TestAgent_AddCheck_Alias(t *testing.T) { + t.Parallel() + + require := require.New(t) + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, + } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "") + require.NoError(err) + + // Ensure we have a check mapping + sChk, ok := a.State.Checks()["aliashealth"] + require.True(ok, "missing aliashealth check") + require.NotNil(sChk) + require.Equal(api.HealthCritical, sChk.Status) + + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("", chkImpl.RPCReq.Token) + + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("", cs.Token) +} + +func TestAgent_AddCheck_Alias_setToken(t *testing.T) { + t.Parallel() + + require := require.New(t) + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, + } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "foo") + require.NoError(err) + + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("foo", cs.Token) + + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("foo", chkImpl.RPCReq.Token) +} + +func TestAgent_AddCheck_Alias_userToken(t *testing.T) { + t.Parallel() + + require := require.New(t) + a := NewTestAgent(t.Name(), ` +acl_token = "hello" + `) + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, + } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "") + require.NoError(err) + + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("", cs.Token) // State token should still be empty + + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("hello", chkImpl.RPCReq.Token) // Check should use the token +} + +func TestAgent_AddCheck_Alias_userAndSetToken(t *testing.T) { + t.Parallel() + + require := require.New(t) + a := NewTestAgent(t.Name(), ` +acl_token = "hello" + `) + defer a.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "aliashealth", + Name: "Alias health check", + Status: api.HealthCritical, + } + chk := &structs.CheckType{ + AliasService: "foo", + } + err := a.AddCheck(health, chk, false, "goodbye") + require.NoError(err) + + cs := a.State.CheckState("aliashealth") + require.NotNil(cs) + require.Equal("goodbye", cs.Token) + + chkImpl, ok := a.checkAliases["aliashealth"] + require.True(ok, "missing aliashealth check") + require.Equal("goodbye", chkImpl.RPCReq.Token) +} + func TestAgent_RemoveCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), ` diff --git a/agent/checks/alias.go b/agent/checks/alias.go new file mode 100644 index 0000000000..e6c7082e57 --- /dev/null +++ b/agent/checks/alias.go @@ -0,0 +1,202 @@ +package checks + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" +) + +// Constants related to alias check backoff. +const ( + checkAliasBackoffMin = 3 // 3 attempts before backing off + checkAliasBackoffMaxWait = 1 * time.Minute // maximum backoff wait time +) + +// CheckAlias is a check type that aliases the health of another service +// instance or node. If the service aliased has any critical health checks, then +// this check is critical. If the service has no critical but warnings, +// then this check is warning, and if a service has only passing checks, then +// this check is passing. +type CheckAlias struct { + Node string // Node name of the service. If empty, assumed to be this node. + ServiceID string // ID (not name) of the service to alias + + CheckID types.CheckID // ID of this check + RPC RPC // Used to query remote server if necessary + RPCReq structs.NodeSpecificRequest // Base request + Notify AliasNotifier // For updating the check state + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// AliasNotifier is a CheckNotifier specifically for the Alias check. +// This requires additional methods that are satisfied by the agent +// local state. +type AliasNotifier interface { + CheckNotifier + + AddAliasCheck(types.CheckID, string, chan<- struct{}) error + RemoveAliasCheck(types.CheckID, string) + Checks() map[types.CheckID]*structs.HealthCheck +} + +// Start is used to start the check, runs until Stop() func (c *CheckAlias) Start() { +func (c *CheckAlias) Start() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + c.stop = false + c.stopCh = make(chan struct{}) + go c.run(c.stopCh) +} + +// Stop is used to stop the check. +func (c *CheckAlias) Stop() { + c.stopLock.Lock() + defer c.stopLock.Unlock() + if !c.stop { + c.stop = true + close(c.stopCh) + } +} + +// run is invoked in a goroutine until Stop() is called. +func (c *CheckAlias) run(stopCh chan struct{}) { + // If we have a specific node set, then use a blocking query + if c.Node != "" { + c.runQuery(stopCh) + return + } + + // Use the local state to match the service. + c.runLocal(stopCh) +} + +func (c *CheckAlias) runLocal(stopCh chan struct{}) { + // Very important this is buffered as 1 so that we do not lose any + // queued updates. This only has to be exactly 1 since the existence + // of any update triggers us to load the full health check state. + notifyCh := make(chan struct{}, 1) + c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh) + defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID) + + for { + select { + case <-notifyCh: + checks := c.Notify.Checks() + checksList := make([]*structs.HealthCheck, 0, len(checks)) + for _, chk := range checks { + checksList = append(checksList, chk) + } + c.processChecks(checksList) + + case <-stopCh: + return + } + } +} + +func (c *CheckAlias) runQuery(stopCh chan struct{}) { + args := c.RPCReq + args.Node = c.Node + args.AllowStale = true + args.MaxQueryTime = 1 * time.Minute + + var attempt uint + for { + // Check if we're stopped. We fallthrough and block otherwise, + // which has a maximum time set above so we'll always check for + // stop within a reasonable amount of time. + select { + case <-stopCh: + return + default: + } + + // Backoff if we have to + if attempt > checkAliasBackoffMin { + shift := attempt - checkAliasBackoffMin + if shift > 31 { + shift = 31 // so we don't overflow to 0 + } + waitTime := (1 << shift) * time.Second + if waitTime > checkAliasBackoffMaxWait { + waitTime = checkAliasBackoffMaxWait + } + time.Sleep(waitTime) + } + + // Get the current health checks for the specified node. + // + // NOTE(mitchellh): This currently returns ALL health checks for + // a node even though we also have the service ID. This can be + // optimized if we introduce a new RPC endpoint to filter both, + // but for blocking queries isn't that much more efficient since the checks + // index is global to the cluster. + var out structs.IndexedHealthChecks + if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { + attempt++ + if attempt > 1 { + c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, + fmt.Sprintf("Failure checking aliased node or service: %s", err)) + } + + continue + } + + attempt = 0 // Reset the attempts so we don't backoff the next + + // Set our index for the next request + args.MinQueryIndex = out.Index + + // We want to ensure that we're always blocking on subsequent requests + // to avoid hot loops. Index 1 is always safe since the min raft index + // is at least 5. Note this shouldn't happen but protecting against this + // case is safer than a 100% CPU loop. + if args.MinQueryIndex < 1 { + args.MinQueryIndex = 1 + } + + c.processChecks(out.HealthChecks) + } +} + +// processChecks is a common helper for taking a set of health checks and +// using them to update our alias. This is abstracted since the checks can +// come from both the remote server as well as local state. +func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { + health := api.HealthPassing + msg := "No checks found." + for _, chk := range checks { + if c.Node != "" && chk.Node != c.Node { + continue + } + + // We allow ServiceID == "" so that we also check node checks + if chk.ServiceID != "" && chk.ServiceID != c.ServiceID { + continue + } + + if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { + health = chk.Status + msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) + + // Critical checks exit the for loop immediately since we + // know that this is the health state. Warnings do not since + // there may still be a critical check. + if chk.Status == api.HealthCritical { + break + } + } + + msg = "All checks passing." + } + + // Update our check value + c.Notify.UpdateCheck(c.CheckID, health, msg) +} diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go new file mode 100644 index 0000000000..ea6603345f --- /dev/null +++ b/agent/checks/alias_test.go @@ -0,0 +1,437 @@ +package checks + +import ( + "fmt" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/consul/agent/mock" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/consul/types" + //"github.com/stretchr/testify/require" +) + +// Test that we do a backoff on error. +func TestCheckAlias_remoteErrBackoff(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(fmt.Errorf("failure")) + + chk.Start() + defer chk.Stop() + + time.Sleep(100 * time.Millisecond) + if got, want := atomic.LoadUint32(&rpc.Calls), uint32(6); got > want { + t.Fatalf("got %d updates want at most %d", got, want) + } + + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// No remote health checks should result in passing on the check. +func TestCheckAlias_remoteNoChecks(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{}) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// If the node is critical then the check is critical +func TestCheckAlias_remoteNodeFailure(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Node failure + &structs.HealthCheck{ + Node: "remote", + ServiceID: "", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Only passing should result in passing +func TestCheckAlias_remotePassing(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// If any checks are critical, it should be critical +func TestCheckAlias_remoteCritical(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthCritical, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// If no checks are critical and at least one is warning, then it should warn +func TestCheckAlias_remoteWarning(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthPassing, + }, + + &structs.HealthCheck{ + Node: "remote", + ServiceID: "web", + Status: api.HealthWarning, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthWarning; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Only passing should result in passing for node-only checks +func TestCheckAlias_remoteNodeOnlyPassing(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore any services + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + Status: api.HealthPassing, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Only critical should result in passing for node-only checks +func TestCheckAlias_remoteNodeOnlyCritical(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.Reply.Store(structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore any services + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + + // Match + &structs.HealthCheck{ + Node: "remote", + Status: api.HealthCritical, + }, + }, + }) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +type mockAliasNotify struct { + *mock.Notify +} + +func newMockAliasNotify() *mockAliasNotify { + return &mockAliasNotify{ + Notify: mock.NewNotify(), + } +} + +func (m *mockAliasNotify) AddAliasCheck(chkID types.CheckID, serviceID string, ch chan<- struct{}) error { + return nil +} + +func (m *mockAliasNotify) RemoveAliasCheck(chkID types.CheckID, serviceID string) { +} + +func (m *mockAliasNotify) Checks() map[types.CheckID]*structs.HealthCheck { + return nil +} + +// mockRPC is an implementation of RPC that can be used for tests. The +// atomic.Value fields can be set concurrently and will reflect on the next +// RPC call. +type mockRPC struct { + Calls uint32 // Read-only, number of RPC calls + Args atomic.Value // Read-only, the last args sent + + // Write-only, the reply to send. If of type "error" then an error will + // be returned from the RPC call. + Reply atomic.Value +} + +func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error { + atomic.AddUint32(&m.Calls, 1) + m.Args.Store(args) + + // We don't adhere to blocking queries, so this helps prevent + // too much CPU usage on the check loop. + time.Sleep(10 * time.Millisecond) + + // This whole machinery below sets the value of the reply. This is + // basically what net/rpc does internally, though much condensed. + replyv := reflect.ValueOf(reply) + if replyv.Kind() != reflect.Ptr { + return fmt.Errorf("RPC reply must be pointer") + } + replyv = replyv.Elem() // Get pointer value + replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value + if v := m.Reply.Load(); v != nil { + // Return an error if the reply is an error type + if err, ok := v.(error); ok { + return err + } + + replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil + } + + return nil +} diff --git a/agent/checks/check.go b/agent/checks/check.go index bc41206f08..b09bdc8be1 100644 --- a/agent/checks/check.go +++ b/agent/checks/check.go @@ -38,6 +38,13 @@ const ( UserAgent = "Consul Health Check" ) +// RPC is an interface that an RPC client must implement. This is a helper +// interface that is implemented by the agent delegate for checks that need +// to make RPC calls. +type RPC interface { + RPC(method string, args interface{}, reply interface{}) error +} + // CheckNotifier interface is used by the CheckMonitor // to notify when a check has a status update. The update // should take care to be idempotent. diff --git a/agent/config/builder.go b/agent/config/builder.go index e7958056e1..940e6f2074 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1046,6 +1046,8 @@ func (b *Builder) checkVal(v *CheckDefinition) *structs.CheckDefinition { GRPC: b.stringVal(v.GRPC), GRPCUseTLS: b.boolVal(v.GRPCUseTLS), TLSSkipVerify: b.boolVal(v.TLSSkipVerify), + AliasNode: b.stringVal(v.AliasNode), + AliasService: b.stringVal(v.AliasService), Timeout: b.durationVal(fmt.Sprintf("check[%s].timeout", id), v.Timeout), TTL: b.durationVal(fmt.Sprintf("check[%s].ttl", id), v.TTL), DeregisterCriticalServiceAfter: b.durationVal(fmt.Sprintf("check[%s].deregister_critical_service_after", id), v.DeregisterCriticalServiceAfter), diff --git a/agent/config/config.go b/agent/config/config.go index ab81c2f718..c54090b3f7 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -348,6 +348,8 @@ type CheckDefinition struct { GRPC *string `json:"grpc,omitempty" hcl:"grpc" mapstructure:"grpc"` GRPCUseTLS *bool `json:"grpc_use_tls,omitempty" hcl:"grpc_use_tls" mapstructure:"grpc_use_tls"` TLSSkipVerify *bool `json:"tls_skip_verify,omitempty" hcl:"tls_skip_verify" mapstructure:"tls_skip_verify"` + AliasNode *string `json:"alias_node,omitempty" hcl:"alias_node" mapstructure:"alias_node"` + AliasService *string `json:"alias_service,omitempty" hcl:"alias_service" mapstructure:"alias_service"` Timeout *string `json:"timeout,omitempty" hcl:"timeout" mapstructure:"timeout"` TTL *string `json:"ttl,omitempty" hcl:"ttl" mapstructure:"ttl"` DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"` diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 9326e8ff6b..7eab8d2d85 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -1941,6 +1941,24 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { rt.DataDir = dataDir }, }, + { + desc: "alias check with no node", + args: []string{ + `-data-dir=` + dataDir, + }, + json: []string{ + `{ "check": { "name": "a", "alias_service": "foo" } }`, + }, + hcl: []string{ + `check = { name = "a", alias_service = "foo" }`, + }, + patch: func(rt *RuntimeConfig) { + rt.Checks = []*structs.CheckDefinition{ + &structs.CheckDefinition{Name: "a", AliasService: "foo"}, + } + rt.DataDir = dataDir + }, + }, { desc: "multiple service files", args: []string{ @@ -4271,6 +4289,8 @@ func TestSanitize(t *testing.T) { "CheckUpdateInterval": "0s", "Checks": [ { + "AliasNode": "", + "AliasService": "", "DeregisterCriticalServiceAfter": "0s", "DockerContainerID": "", "GRPC": "", @@ -4417,6 +4437,8 @@ func TestSanitize(t *testing.T) { { "Address": "", "Check": { + "AliasNode": "", + "AliasService": "", "CheckID": "", "DeregisterCriticalServiceAfter": "0s", "DockerContainerID": "", diff --git a/agent/local/state.go b/agent/local/state.go index 1caca1f3d2..1f15f231a9 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -170,8 +170,9 @@ type State struct { // Services tracks the local services services map[string]*ServiceState - // Checks tracks the local checks - checks map[types.CheckID]*CheckState + // Checks tracks the local checks. checkAliases are aliased checks. + checks map[types.CheckID]*CheckState + checkAliases map[string]map[types.CheckID]chan<- struct{} // metadata tracks the node metadata fields metadata map[string]string @@ -205,6 +206,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { logger: lg, services: make(map[string]*ServiceState), checks: make(map[types.CheckID]*CheckState), + checkAliases: make(map[string]map[types.CheckID]chan<- struct{}), metadata: make(map[string]string), tokens: tokens, managedProxies: make(map[string]*ManagedProxy), @@ -406,6 +408,40 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { return nil } +// AddAliasCheck creates an alias check. When any check for the srcServiceID +// is changed, checkID will reflect that using the same semantics as +// checks.CheckAlias. +// +// This is a local optimization so that the Alias check doesn't need to +// use blocking queries against the remote server for check updates for +// local services. +func (l *State) AddAliasCheck(checkID types.CheckID, srcServiceID string, notifyCh chan<- struct{}) error { + l.Lock() + defer l.Unlock() + + m, ok := l.checkAliases[srcServiceID] + if !ok { + m = make(map[types.CheckID]chan<- struct{}) + l.checkAliases[srcServiceID] = m + } + m[checkID] = notifyCh + + return nil +} + +// RemoveAliasCheck removes the mapping for the alias check. +func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) { + l.Lock() + defer l.Unlock() + + if m, ok := l.checkAliases[srcServiceID]; ok { + delete(m, checkID) + if len(m) == 0 { + delete(l.checkAliases, srcServiceID) + } + } +} + // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered // todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well. @@ -486,6 +522,20 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { return } + // If this is a check for an aliased service, then notify the waiters. + if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 { + for _, notifyCh := range aliases { + // Do not block. All notify channels should be buffered to at + // least 1 in which case not-blocking does not result in loss + // of data because a failed send means a notification is + // already queued. This must be called with the lock held. + select { + case notifyCh <- struct{}{}: + default: + } + } + } + // Update status and mark out of sync c.Check.Status = status c.Check.Output = output diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 5c66a77d04..3cdf2a28a1 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -11,8 +11,6 @@ import ( "github.com/hashicorp/go-memdb" - "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/local" @@ -23,6 +21,7 @@ import ( "github.com/hashicorp/consul/types" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAgentAntiEntropy_Services(t *testing.T) { @@ -1606,6 +1605,57 @@ func TestAgent_AddCheckFailure(t *testing.T) { } } +func TestAgent_AliasCheck(t *testing.T) { + t.Parallel() + + require := require.New(t) + cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l.TriggerSyncChanges = func() {} + + // Add checks + require.NoError(l.AddService(&structs.NodeService{Service: "s1"}, "")) + require.NoError(l.AddService(&structs.NodeService{Service: "s2"}, "")) + require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c1"), ServiceID: "s1"}, "")) + require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c2"), ServiceID: "s2"}, "")) + + // Add an alias + notifyCh := make(chan struct{}, 1) + require.NoError(l.AddAliasCheck(types.CheckID("a1"), "s1", notifyCh)) + + // Update and verify we get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "") + select { + case <-notifyCh: + default: + t.Fatal("notify not received") + } + + // Update again and verify we do not get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "") + select { + case <-notifyCh: + t.Fatal("notify received") + default: + } + + // Update other check and verify we do not get notified + l.UpdateCheck(types.CheckID("c2"), api.HealthCritical, "") + select { + case <-notifyCh: + t.Fatal("notify received") + default: + } + + // Update change and verify we get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthPassing, "") + select { + case <-notifyCh: + default: + t.Fatal("notify not received") + } +} + func TestAgent_sendCoordinate(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t.Name(), ` diff --git a/agent/structs/check_definition.go b/agent/structs/check_definition.go index 0ee6c20495..42e9692fce 100644 --- a/agent/structs/check_definition.go +++ b/agent/structs/check_definition.go @@ -32,6 +32,8 @@ type CheckDefinition struct { GRPC string GRPCUseTLS bool TLSSkipVerify bool + AliasNode string + AliasService string Timeout time.Duration TTL time.Duration DeregisterCriticalServiceAfter time.Duration @@ -63,6 +65,8 @@ func (c *CheckDefinition) CheckType() *CheckType { Notes: c.Notes, ScriptArgs: c.ScriptArgs, + AliasNode: c.AliasNode, + AliasService: c.AliasService, HTTP: c.HTTP, GRPC: c.GRPC, GRPCUseTLS: c.GRPCUseTLS, diff --git a/agent/structs/check_type.go b/agent/structs/check_type.go index 23a6830777..43b76057c6 100644 --- a/agent/structs/check_type.go +++ b/agent/structs/check_type.go @@ -9,10 +9,10 @@ import ( ) // CheckType is used to create either the CheckMonitor or the CheckTTL. -// Six types are supported: Script, HTTP, TCP, Docker, TTL and GRPC. Script, +// The following types are supported: Script, HTTP, TCP, Docker, TTL, GRPC, Alias. Script, // HTTP, Docker, TCP and GRPC all require Interval. Only one of the types may // to be provided: TTL or Script/Interval or HTTP/Interval or TCP/Interval or -// Docker/Interval or GRPC/Interval. +// Docker/Interval or GRPC/Interval or AliasService. type CheckType struct { // fields already embedded in CheckDefinition // Note: CheckType.CheckID == CheckDefinition.ID @@ -31,6 +31,8 @@ type CheckType struct { Method string TCP string Interval time.Duration + AliasNode string + AliasService string DockerContainerID string Shell string GRPC string @@ -56,7 +58,13 @@ func (c *CheckType) Validate() error { if intervalCheck && c.Interval <= 0 { return fmt.Errorf("Interval must be > 0 for Script, HTTP, or TCP checks") } - if !intervalCheck && c.TTL <= 0 { + if intervalCheck && c.IsAlias() { + return fmt.Errorf("Interval cannot be set for Alias checks") + } + if c.IsAlias() && c.TTL > 0 { + return fmt.Errorf("TTL must be not be set for Alias checks") + } + if !intervalCheck && !c.IsAlias() && c.TTL <= 0 { return fmt.Errorf("TTL must be > 0 for TTL checks") } return nil @@ -67,6 +75,11 @@ func (c *CheckType) Empty() bool { return reflect.DeepEqual(c, &CheckType{}) } +// IsAlias checks if this is an alias check. +func (c *CheckType) IsAlias() bool { + return c.AliasNode != "" || c.AliasService != "" +} + // IsScript checks if this is a check that execs some kind of script. func (c *CheckType) IsScript() bool { return len(c.ScriptArgs) > 0 diff --git a/website/source/api/agent/check.html.md b/website/source/api/agent/check.html.md index 052693335a..326811eea9 100644 --- a/website/source/api/agent/check.html.md +++ b/website/source/api/agent/check.html.md @@ -116,6 +116,16 @@ The table below shows this endpoint's support for continue to be accepted in future versions of Consul), and `Args` in Consul 1.0.1 and later. +- `AliasNode` `(string: "")` - Specifies the ID of the node for an alias check. + If no service is specified, the check will alias the health of the node. + If a service is specified, the check will alias the specified service on + this particular node. + +- `AliasService` `(string: "")` - Specifies the ID of a service for an + alias check. If the service is not registered with the same agent, + `AliasNode` must also be specified. Note this is the service _ID_ and + not the service _name_ (though they are very often the same). + - `DockerContainerID` `(string: "")` - Specifies that the check is a Docker check, and Consul will evaluate the script every `Interval` in the given container using the specified `Shell`. Note that `Shell` is currently only diff --git a/website/source/docs/agent/checks.html.md b/website/source/docs/agent/checks.html.md index a563de5778..25520f2167 100644 --- a/website/source/docs/agent/checks.html.md +++ b/website/source/docs/agent/checks.html.md @@ -101,6 +101,17 @@ There are several different kinds of checks: TLS certificate is expected. Certificate verification can be turned off by setting the `tls_skip_verify` field to `true` in the check definition. +* Alias - These checks alias the health state of another registered + node or service. The state of the check will be updated asynchronously, + but is nearly instant. For aliased services on the same agent, the local + state is monitored and no additional network resources are consumed. For + other services and nodes, the check maintains a blocking query over the + agent's connection with a current server and allows stale requests. If there + are any errors in watching the aliased node or service, the check state will be + critical. For the blocking query, the check will use the ACL token set + on the service or check definition or otherwise will fall back to the default ACL + token set with the agent (`acl_token`). + ## Check Definition A script check: @@ -165,7 +176,7 @@ A Docker check: ```javascript { -"check": { + "check": { "id": "mem-util", "name": "Memory utilization", "docker_container_id": "f972c95ebf0e", @@ -180,7 +191,7 @@ A gRPC check: ```javascript { -"check": { + "check": { "id": "mem-util", "name": "Service health status", "grpc": "127.0.0.1:12345", @@ -190,6 +201,17 @@ A gRPC check: } ``` +An alias check for a local service: + +```javascript +{ + "check": { + "id": "web-alias", + "alias_service": "web" + } +} +``` + Each type of definition must include a `name` and may optionally provide an `id` and `notes` field. The `id` must be unique per _agent_ otherwise only the last defined check with that `id` will be registered. If the `id` is not set @@ -205,6 +227,8 @@ a TTL check via the HTTP interface can set the `notes` value. Checks may also contain a `token` field to provide an ACL token. This token is used for any interaction with the catalog for the check, including [anti-entropy syncs](/docs/internals/anti-entropy.html) and deregistration. +For Alias checks, this token is used if a remote blocking query is necessary +to watch the state of the aliased node or service. Script, TCP, HTTP, Docker, and gRPC checks must include an `interval` field. This field is parsed by Go's `time` package, and has the following