Merge pull request #4320 from hashicorp/f-alias-check

Add "Alias" Check Type
pull/4429/head
Mitchell Hashimoto 2018-07-20 13:01:33 -05:00 committed by GitHub
commit 7fa6bb022f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 990 additions and 9 deletions

View File

@ -151,6 +151,9 @@ type Agent struct {
// checkDockers maps the check ID to an associated Docker Exec based check
checkDockers map[types.CheckID]*checks.CheckDocker
// checkAliases maps the check ID to an associated Alias checks
checkAliases map[types.CheckID]*checks.CheckAlias
// checkLock protects updates to the check* maps
checkLock sync.Mutex
@ -235,6 +238,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
checkTCPs: make(map[types.CheckID]*checks.CheckTCP),
checkGRPCs: make(map[types.CheckID]*checks.CheckGRPC),
checkDockers: make(map[types.CheckID]*checks.CheckDocker),
checkAliases: make(map[types.CheckID]*checks.CheckAlias),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
joinLANNotifier: &systemd.Notifier{},
@ -1314,6 +1318,9 @@ func (a *Agent) ShutdownAgent() error {
for _, chk := range a.checkDockers {
chk.Stop()
}
for _, chk := range a.checkAliases {
chk.Stop()
}
// Stop the proxy manager
if a.proxyManager != nil {
@ -2007,6 +2014,35 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
case chkType.IsAlias():
if existing, ok := a.checkAliases[check.CheckID]; ok {
existing.Stop()
delete(a.checkAliases, check.CheckID)
}
var rpcReq structs.NodeSpecificRequest
rpcReq.Datacenter = a.config.Datacenter
// The token to set is really important. The behavior below follows
// the same behavior as anti-entropy: we use the user-specified token
// if set (either on the service or check definition), otherwise
// we use the "UserToken" on the agent. This is tested.
rpcReq.Token = a.tokens.UserToken()
if token != "" {
rpcReq.Token = token
}
chkImpl := &checks.CheckAlias{
Notify: a.State,
RPC: a.delegate,
RPCReq: rpcReq,
CheckID: check.CheckID,
Node: chkType.AliasNode,
ServiceID: chkType.AliasService,
}
chkImpl.Start()
a.checkAliases[check.CheckID] = chkImpl
default:
return fmt.Errorf("Check type is not valid")
}

View File

@ -936,6 +936,128 @@ func TestAgent_AddCheck_GRPC(t *testing.T) {
}
}
func TestAgent_AddCheck_Alias(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "")
require.NoError(err)
// Ensure we have a check mapping
sChk, ok := a.State.Checks()["aliashealth"]
require.True(ok, "missing aliashealth check")
require.NotNil(sChk)
require.Equal(api.HealthCritical, sChk.Status)
chkImpl, ok := a.checkAliases["aliashealth"]
require.True(ok, "missing aliashealth check")
require.Equal("", chkImpl.RPCReq.Token)
cs := a.State.CheckState("aliashealth")
require.NotNil(cs)
require.Equal("", cs.Token)
}
func TestAgent_AddCheck_Alias_setToken(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "foo")
require.NoError(err)
cs := a.State.CheckState("aliashealth")
require.NotNil(cs)
require.Equal("foo", cs.Token)
chkImpl, ok := a.checkAliases["aliashealth"]
require.True(ok, "missing aliashealth check")
require.Equal("foo", chkImpl.RPCReq.Token)
}
func TestAgent_AddCheck_Alias_userToken(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t.Name(), `
acl_token = "hello"
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "")
require.NoError(err)
cs := a.State.CheckState("aliashealth")
require.NotNil(cs)
require.Equal("", cs.Token) // State token should still be empty
chkImpl, ok := a.checkAliases["aliashealth"]
require.True(ok, "missing aliashealth check")
require.Equal("hello", chkImpl.RPCReq.Token) // Check should use the token
}
func TestAgent_AddCheck_Alias_userAndSetToken(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t.Name(), `
acl_token = "hello"
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "goodbye")
require.NoError(err)
cs := a.State.CheckState("aliashealth")
require.NotNil(cs)
require.Equal("goodbye", cs.Token)
chkImpl, ok := a.checkAliases["aliashealth"]
require.True(ok, "missing aliashealth check")
require.Equal("goodbye", chkImpl.RPCReq.Token)
}
func TestAgent_RemoveCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), `

202
agent/checks/alias.go Normal file
View File

@ -0,0 +1,202 @@
package checks
import (
"fmt"
"sync"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
)
// Constants related to alias check backoff.
const (
checkAliasBackoffMin = 3 // 3 attempts before backing off
checkAliasBackoffMaxWait = 1 * time.Minute // maximum backoff wait time
)
// CheckAlias is a check type that aliases the health of another service
// instance or node. If the service aliased has any critical health checks, then
// this check is critical. If the service has no critical but warnings,
// then this check is warning, and if a service has only passing checks, then
// this check is passing.
type CheckAlias struct {
Node string // Node name of the service. If empty, assumed to be this node.
ServiceID string // ID (not name) of the service to alias
CheckID types.CheckID // ID of this check
RPC RPC // Used to query remote server if necessary
RPCReq structs.NodeSpecificRequest // Base request
Notify AliasNotifier // For updating the check state
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
// AliasNotifier is a CheckNotifier specifically for the Alias check.
// This requires additional methods that are satisfied by the agent
// local state.
type AliasNotifier interface {
CheckNotifier
AddAliasCheck(types.CheckID, string, chan<- struct{}) error
RemoveAliasCheck(types.CheckID, string)
Checks() map[types.CheckID]*structs.HealthCheck
}
// Start is used to start the check, runs until Stop() func (c *CheckAlias) Start() {
func (c *CheckAlias) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
c.stop = false
c.stopCh = make(chan struct{})
go c.run(c.stopCh)
}
// Stop is used to stop the check.
func (c *CheckAlias) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
}
// run is invoked in a goroutine until Stop() is called.
func (c *CheckAlias) run(stopCh chan struct{}) {
// If we have a specific node set, then use a blocking query
if c.Node != "" {
c.runQuery(stopCh)
return
}
// Use the local state to match the service.
c.runLocal(stopCh)
}
func (c *CheckAlias) runLocal(stopCh chan struct{}) {
// Very important this is buffered as 1 so that we do not lose any
// queued updates. This only has to be exactly 1 since the existence
// of any update triggers us to load the full health check state.
notifyCh := make(chan struct{}, 1)
c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh)
defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID)
for {
select {
case <-notifyCh:
checks := c.Notify.Checks()
checksList := make([]*structs.HealthCheck, 0, len(checks))
for _, chk := range checks {
checksList = append(checksList, chk)
}
c.processChecks(checksList)
case <-stopCh:
return
}
}
}
func (c *CheckAlias) runQuery(stopCh chan struct{}) {
args := c.RPCReq
args.Node = c.Node
args.AllowStale = true
args.MaxQueryTime = 1 * time.Minute
var attempt uint
for {
// Check if we're stopped. We fallthrough and block otherwise,
// which has a maximum time set above so we'll always check for
// stop within a reasonable amount of time.
select {
case <-stopCh:
return
default:
}
// Backoff if we have to
if attempt > checkAliasBackoffMin {
shift := attempt - checkAliasBackoffMin
if shift > 31 {
shift = 31 // so we don't overflow to 0
}
waitTime := (1 << shift) * time.Second
if waitTime > checkAliasBackoffMaxWait {
waitTime = checkAliasBackoffMaxWait
}
time.Sleep(waitTime)
}
// Get the current health checks for the specified node.
//
// NOTE(mitchellh): This currently returns ALL health checks for
// a node even though we also have the service ID. This can be
// optimized if we introduce a new RPC endpoint to filter both,
// but for blocking queries isn't that much more efficient since the checks
// index is global to the cluster.
var out structs.IndexedHealthChecks
if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil {
attempt++
if attempt > 1 {
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical,
fmt.Sprintf("Failure checking aliased node or service: %s", err))
}
continue
}
attempt = 0 // Reset the attempts so we don't backoff the next
// Set our index for the next request
args.MinQueryIndex = out.Index
// We want to ensure that we're always blocking on subsequent requests
// to avoid hot loops. Index 1 is always safe since the min raft index
// is at least 5. Note this shouldn't happen but protecting against this
// case is safer than a 100% CPU loop.
if args.MinQueryIndex < 1 {
args.MinQueryIndex = 1
}
c.processChecks(out.HealthChecks)
}
}
// processChecks is a common helper for taking a set of health checks and
// using them to update our alias. This is abstracted since the checks can
// come from both the remote server as well as local state.
func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) {
health := api.HealthPassing
msg := "No checks found."
for _, chk := range checks {
if c.Node != "" && chk.Node != c.Node {
continue
}
// We allow ServiceID == "" so that we also check node checks
if chk.ServiceID != "" && chk.ServiceID != c.ServiceID {
continue
}
if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning {
health = chk.Status
msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output)
// Critical checks exit the for loop immediately since we
// know that this is the health state. Warnings do not since
// there may still be a critical check.
if chk.Status == api.HealthCritical {
break
}
}
msg = "All checks passing."
}
// Update our check value
c.Notify.UpdateCheck(c.CheckID, health, msg)
}

437
agent/checks/alias_test.go Normal file
View File

@ -0,0 +1,437 @@
package checks
import (
"fmt"
"reflect"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/consul/agent/mock"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/consul/types"
//"github.com/stretchr/testify/require"
)
// Test that we do a backoff on error.
func TestCheckAlias_remoteErrBackoff(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: "web",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(fmt.Errorf("failure"))
chk.Start()
defer chk.Stop()
time.Sleep(100 * time.Millisecond)
if got, want := atomic.LoadUint32(&rpc.Calls), uint32(6); got > want {
t.Fatalf("got %d updates want at most %d", got, want)
}
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// No remote health checks should result in passing on the check.
func TestCheckAlias_remoteNoChecks(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: "web",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(structs.IndexedHealthChecks{})
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// If the node is critical then the check is critical
func TestCheckAlias_remoteNodeFailure(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: "web",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Node failure
&structs.HealthCheck{
Node: "remote",
ServiceID: "",
Status: api.HealthCritical,
},
// Match
&structs.HealthCheck{
Node: "remote",
ServiceID: "web",
Status: api.HealthPassing,
},
},
})
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// Only passing should result in passing
func TestCheckAlias_remotePassing(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: "web",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Should ignore non-matching service
&structs.HealthCheck{
Node: "remote",
ServiceID: "db",
Status: api.HealthCritical,
},
// Match
&structs.HealthCheck{
Node: "remote",
ServiceID: "web",
Status: api.HealthPassing,
},
},
})
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// If any checks are critical, it should be critical
func TestCheckAlias_remoteCritical(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: "web",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Should ignore non-matching service
&structs.HealthCheck{
Node: "remote",
ServiceID: "db",
Status: api.HealthCritical,
},
// Match
&structs.HealthCheck{
Node: "remote",
ServiceID: "web",
Status: api.HealthPassing,
},
&structs.HealthCheck{
Node: "remote",
ServiceID: "web",
Status: api.HealthCritical,
},
},
})
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// If no checks are critical and at least one is warning, then it should warn
func TestCheckAlias_remoteWarning(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
ServiceID: "web",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Should ignore non-matching service
&structs.HealthCheck{
Node: "remote",
ServiceID: "db",
Status: api.HealthCritical,
},
// Match
&structs.HealthCheck{
Node: "remote",
ServiceID: "web",
Status: api.HealthPassing,
},
&structs.HealthCheck{
Node: "remote",
ServiceID: "web",
Status: api.HealthWarning,
},
},
})
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthWarning; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// Only passing should result in passing for node-only checks
func TestCheckAlias_remoteNodeOnlyPassing(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Should ignore any services
&structs.HealthCheck{
Node: "remote",
ServiceID: "db",
Status: api.HealthCritical,
},
// Match
&structs.HealthCheck{
Node: "remote",
Status: api.HealthPassing,
},
},
})
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
// Only critical should result in passing for node-only checks
func TestCheckAlias_remoteNodeOnlyCritical(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
Node: "remote",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
rpc.Reply.Store(structs.IndexedHealthChecks{
HealthChecks: []*structs.HealthCheck{
// Should ignore non-matching node
&structs.HealthCheck{
Node: "A",
ServiceID: "web",
Status: api.HealthCritical,
},
// Should ignore any services
&structs.HealthCheck{
Node: "remote",
ServiceID: "db",
Status: api.HealthCritical,
},
// Match
&structs.HealthCheck{
Node: "remote",
Status: api.HealthCritical,
},
},
})
chk.Start()
defer chk.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
type mockAliasNotify struct {
*mock.Notify
}
func newMockAliasNotify() *mockAliasNotify {
return &mockAliasNotify{
Notify: mock.NewNotify(),
}
}
func (m *mockAliasNotify) AddAliasCheck(chkID types.CheckID, serviceID string, ch chan<- struct{}) error {
return nil
}
func (m *mockAliasNotify) RemoveAliasCheck(chkID types.CheckID, serviceID string) {
}
func (m *mockAliasNotify) Checks() map[types.CheckID]*structs.HealthCheck {
return nil
}
// mockRPC is an implementation of RPC that can be used for tests. The
// atomic.Value fields can be set concurrently and will reflect on the next
// RPC call.
type mockRPC struct {
Calls uint32 // Read-only, number of RPC calls
Args atomic.Value // Read-only, the last args sent
// Write-only, the reply to send. If of type "error" then an error will
// be returned from the RPC call.
Reply atomic.Value
}
func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error {
atomic.AddUint32(&m.Calls, 1)
m.Args.Store(args)
// We don't adhere to blocking queries, so this helps prevent
// too much CPU usage on the check loop.
time.Sleep(10 * time.Millisecond)
// This whole machinery below sets the value of the reply. This is
// basically what net/rpc does internally, though much condensed.
replyv := reflect.ValueOf(reply)
if replyv.Kind() != reflect.Ptr {
return fmt.Errorf("RPC reply must be pointer")
}
replyv = replyv.Elem() // Get pointer value
replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value
if v := m.Reply.Load(); v != nil {
// Return an error if the reply is an error type
if err, ok := v.(error); ok {
return err
}
replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil
}
return nil
}

View File

@ -38,6 +38,13 @@ const (
UserAgent = "Consul Health Check"
)
// RPC is an interface that an RPC client must implement. This is a helper
// interface that is implemented by the agent delegate for checks that need
// to make RPC calls.
type RPC interface {
RPC(method string, args interface{}, reply interface{}) error
}
// CheckNotifier interface is used by the CheckMonitor
// to notify when a check has a status update. The update
// should take care to be idempotent.

View File

@ -1046,6 +1046,8 @@ func (b *Builder) checkVal(v *CheckDefinition) *structs.CheckDefinition {
GRPC: b.stringVal(v.GRPC),
GRPCUseTLS: b.boolVal(v.GRPCUseTLS),
TLSSkipVerify: b.boolVal(v.TLSSkipVerify),
AliasNode: b.stringVal(v.AliasNode),
AliasService: b.stringVal(v.AliasService),
Timeout: b.durationVal(fmt.Sprintf("check[%s].timeout", id), v.Timeout),
TTL: b.durationVal(fmt.Sprintf("check[%s].ttl", id), v.TTL),
DeregisterCriticalServiceAfter: b.durationVal(fmt.Sprintf("check[%s].deregister_critical_service_after", id), v.DeregisterCriticalServiceAfter),

View File

@ -348,6 +348,8 @@ type CheckDefinition struct {
GRPC *string `json:"grpc,omitempty" hcl:"grpc" mapstructure:"grpc"`
GRPCUseTLS *bool `json:"grpc_use_tls,omitempty" hcl:"grpc_use_tls" mapstructure:"grpc_use_tls"`
TLSSkipVerify *bool `json:"tls_skip_verify,omitempty" hcl:"tls_skip_verify" mapstructure:"tls_skip_verify"`
AliasNode *string `json:"alias_node,omitempty" hcl:"alias_node" mapstructure:"alias_node"`
AliasService *string `json:"alias_service,omitempty" hcl:"alias_service" mapstructure:"alias_service"`
Timeout *string `json:"timeout,omitempty" hcl:"timeout" mapstructure:"timeout"`
TTL *string `json:"ttl,omitempty" hcl:"ttl" mapstructure:"ttl"`
DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"`

View File

@ -1941,6 +1941,24 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
rt.DataDir = dataDir
},
},
{
desc: "alias check with no node",
args: []string{
`-data-dir=` + dataDir,
},
json: []string{
`{ "check": { "name": "a", "alias_service": "foo" } }`,
},
hcl: []string{
`check = { name = "a", alias_service = "foo" }`,
},
patch: func(rt *RuntimeConfig) {
rt.Checks = []*structs.CheckDefinition{
&structs.CheckDefinition{Name: "a", AliasService: "foo"},
}
rt.DataDir = dataDir
},
},
{
desc: "multiple service files",
args: []string{
@ -4271,6 +4289,8 @@ func TestSanitize(t *testing.T) {
"CheckUpdateInterval": "0s",
"Checks": [
{
"AliasNode": "",
"AliasService": "",
"DeregisterCriticalServiceAfter": "0s",
"DockerContainerID": "",
"GRPC": "",
@ -4417,6 +4437,8 @@ func TestSanitize(t *testing.T) {
{
"Address": "",
"Check": {
"AliasNode": "",
"AliasService": "",
"CheckID": "",
"DeregisterCriticalServiceAfter": "0s",
"DockerContainerID": "",

View File

@ -170,8 +170,9 @@ type State struct {
// Services tracks the local services
services map[string]*ServiceState
// Checks tracks the local checks
// Checks tracks the local checks. checkAliases are aliased checks.
checks map[types.CheckID]*CheckState
checkAliases map[string]map[types.CheckID]chan<- struct{}
// metadata tracks the node metadata fields
metadata map[string]string
@ -205,6 +206,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
logger: lg,
services: make(map[string]*ServiceState),
checks: make(map[types.CheckID]*CheckState),
checkAliases: make(map[string]map[types.CheckID]chan<- struct{}),
metadata: make(map[string]string),
tokens: tokens,
managedProxies: make(map[string]*ManagedProxy),
@ -406,6 +408,40 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
return nil
}
// AddAliasCheck creates an alias check. When any check for the srcServiceID
// is changed, checkID will reflect that using the same semantics as
// checks.CheckAlias.
//
// This is a local optimization so that the Alias check doesn't need to
// use blocking queries against the remote server for check updates for
// local services.
func (l *State) AddAliasCheck(checkID types.CheckID, srcServiceID string, notifyCh chan<- struct{}) error {
l.Lock()
defer l.Unlock()
m, ok := l.checkAliases[srcServiceID]
if !ok {
m = make(map[types.CheckID]chan<- struct{})
l.checkAliases[srcServiceID] = m
}
m[checkID] = notifyCh
return nil
}
// RemoveAliasCheck removes the mapping for the alias check.
func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) {
l.Lock()
defer l.Unlock()
if m, ok := l.checkAliases[srcServiceID]; ok {
delete(m, checkID)
if len(m) == 0 {
delete(l.checkAliases, srcServiceID)
}
}
}
// RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered
// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well.
@ -486,6 +522,20 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
return
}
// If this is a check for an aliased service, then notify the waiters.
if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 {
for _, notifyCh := range aliases {
// Do not block. All notify channels should be buffered to at
// least 1 in which case not-blocking does not result in loss
// of data because a failed send means a notification is
// already queued. This must be called with the lock held.
select {
case notifyCh <- struct{}{}:
default:
}
}
}
// Update status and mark out of sync
c.Check.Status = status
c.Check.Output = output

View File

@ -11,8 +11,6 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/local"
@ -23,6 +21,7 @@ import (
"github.com/hashicorp/consul/types"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAgentAntiEntropy_Services(t *testing.T) {
@ -1606,6 +1605,57 @@ func TestAgent_AddCheckFailure(t *testing.T) {
}
}
func TestAgent_AliasCheck(t *testing.T) {
t.Parallel()
require := require.New(t)
cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)
l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store))
l.TriggerSyncChanges = func() {}
// Add checks
require.NoError(l.AddService(&structs.NodeService{Service: "s1"}, ""))
require.NoError(l.AddService(&structs.NodeService{Service: "s2"}, ""))
require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c1"), ServiceID: "s1"}, ""))
require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c2"), ServiceID: "s2"}, ""))
// Add an alias
notifyCh := make(chan struct{}, 1)
require.NoError(l.AddAliasCheck(types.CheckID("a1"), "s1", notifyCh))
// Update and verify we get notified
l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "")
select {
case <-notifyCh:
default:
t.Fatal("notify not received")
}
// Update again and verify we do not get notified
l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "")
select {
case <-notifyCh:
t.Fatal("notify received")
default:
}
// Update other check and verify we do not get notified
l.UpdateCheck(types.CheckID("c2"), api.HealthCritical, "")
select {
case <-notifyCh:
t.Fatal("notify received")
default:
}
// Update change and verify we get notified
l.UpdateCheck(types.CheckID("c1"), api.HealthPassing, "")
select {
case <-notifyCh:
default:
t.Fatal("notify not received")
}
}
func TestAgent_sendCoordinate(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), `

View File

@ -32,6 +32,8 @@ type CheckDefinition struct {
GRPC string
GRPCUseTLS bool
TLSSkipVerify bool
AliasNode string
AliasService string
Timeout time.Duration
TTL time.Duration
DeregisterCriticalServiceAfter time.Duration
@ -63,6 +65,8 @@ func (c *CheckDefinition) CheckType() *CheckType {
Notes: c.Notes,
ScriptArgs: c.ScriptArgs,
AliasNode: c.AliasNode,
AliasService: c.AliasService,
HTTP: c.HTTP,
GRPC: c.GRPC,
GRPCUseTLS: c.GRPCUseTLS,

View File

@ -9,10 +9,10 @@ import (
)
// CheckType is used to create either the CheckMonitor or the CheckTTL.
// Six types are supported: Script, HTTP, TCP, Docker, TTL and GRPC. Script,
// The following types are supported: Script, HTTP, TCP, Docker, TTL, GRPC, Alias. Script,
// HTTP, Docker, TCP and GRPC all require Interval. Only one of the types may
// to be provided: TTL or Script/Interval or HTTP/Interval or TCP/Interval or
// Docker/Interval or GRPC/Interval.
// Docker/Interval or GRPC/Interval or AliasService.
type CheckType struct {
// fields already embedded in CheckDefinition
// Note: CheckType.CheckID == CheckDefinition.ID
@ -31,6 +31,8 @@ type CheckType struct {
Method string
TCP string
Interval time.Duration
AliasNode string
AliasService string
DockerContainerID string
Shell string
GRPC string
@ -56,7 +58,13 @@ func (c *CheckType) Validate() error {
if intervalCheck && c.Interval <= 0 {
return fmt.Errorf("Interval must be > 0 for Script, HTTP, or TCP checks")
}
if !intervalCheck && c.TTL <= 0 {
if intervalCheck && c.IsAlias() {
return fmt.Errorf("Interval cannot be set for Alias checks")
}
if c.IsAlias() && c.TTL > 0 {
return fmt.Errorf("TTL must be not be set for Alias checks")
}
if !intervalCheck && !c.IsAlias() && c.TTL <= 0 {
return fmt.Errorf("TTL must be > 0 for TTL checks")
}
return nil
@ -67,6 +75,11 @@ func (c *CheckType) Empty() bool {
return reflect.DeepEqual(c, &CheckType{})
}
// IsAlias checks if this is an alias check.
func (c *CheckType) IsAlias() bool {
return c.AliasNode != "" || c.AliasService != ""
}
// IsScript checks if this is a check that execs some kind of script.
func (c *CheckType) IsScript() bool {
return len(c.ScriptArgs) > 0

View File

@ -116,6 +116,16 @@ The table below shows this endpoint's support for
continue to be accepted in future versions of Consul), and `Args` in Consul
1.0.1 and later.
- `AliasNode` `(string: "")` - Specifies the ID of the node for an alias check.
If no service is specified, the check will alias the health of the node.
If a service is specified, the check will alias the specified service on
this particular node.
- `AliasService` `(string: "")` - Specifies the ID of a service for an
alias check. If the service is not registered with the same agent,
`AliasNode` must also be specified. Note this is the service _ID_ and
not the service _name_ (though they are very often the same).
- `DockerContainerID` `(string: "")` - Specifies that the check is a Docker
check, and Consul will evaluate the script every `Interval` in the given
container using the specified `Shell`. Note that `Shell` is currently only

View File

@ -101,6 +101,17 @@ There are several different kinds of checks:
TLS certificate is expected. Certificate verification can be turned off by setting the
`tls_skip_verify` field to `true` in the check definition.
* <a name="alias"></a>Alias - These checks alias the health state of another registered
node or service. The state of the check will be updated asynchronously,
but is nearly instant. For aliased services on the same agent, the local
state is monitored and no additional network resources are consumed. For
other services and nodes, the check maintains a blocking query over the
agent's connection with a current server and allows stale requests. If there
are any errors in watching the aliased node or service, the check state will be
critical. For the blocking query, the check will use the ACL token set
on the service or check definition or otherwise will fall back to the default ACL
token set with the agent (`acl_token`).
## Check Definition
A script check:
@ -165,7 +176,7 @@ A Docker check:
```javascript
{
"check": {
"check": {
"id": "mem-util",
"name": "Memory utilization",
"docker_container_id": "f972c95ebf0e",
@ -180,7 +191,7 @@ A gRPC check:
```javascript
{
"check": {
"check": {
"id": "mem-util",
"name": "Service health status",
"grpc": "127.0.0.1:12345",
@ -190,6 +201,17 @@ A gRPC check:
}
```
An alias check for a local service:
```javascript
{
"check": {
"id": "web-alias",
"alias_service": "web"
}
}
```
Each type of definition must include a `name` and may optionally provide an
`id` and `notes` field. The `id` must be unique per _agent_ otherwise only the
last defined check with that `id` will be registered. If the `id` is not set
@ -205,6 +227,8 @@ a TTL check via the HTTP interface can set the `notes` value.
Checks may also contain a `token` field to provide an ACL token. This token is
used for any interaction with the catalog for the check, including
[anti-entropy syncs](/docs/internals/anti-entropy.html) and deregistration.
For Alias checks, this token is used if a remote blocking query is necessary
to watch the state of the aliased node or service.
Script, TCP, HTTP, Docker, and gRPC checks must include an `interval` field. This
field is parsed by Go's `time` package, and has the following