diff --git a/agent/local/state.go b/agent/local/state.go index 1caca1f3d2..6f90353565 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -170,8 +170,9 @@ type State struct { // Services tracks the local services services map[string]*ServiceState - // Checks tracks the local checks - checks map[types.CheckID]*CheckState + // Checks tracks the local checks. checkAliases are aliased checks. + checks map[types.CheckID]*CheckState + checkAliases map[string]map[types.CheckID]chan<- struct{} // metadata tracks the node metadata fields metadata map[string]string @@ -205,6 +206,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { logger: lg, services: make(map[string]*ServiceState), checks: make(map[types.CheckID]*CheckState), + checkAliases: make(map[string]map[types.CheckID]chan<- struct{}), metadata: make(map[string]string), tokens: tokens, managedProxies: make(map[string]*ManagedProxy), @@ -406,6 +408,40 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { return nil } +// AddAliasCheck creates an alias check. When any check for the srcServiceID +// is changed, checkID will reflect that using the same semantics as +// checks.CheckAlias. +// +// This is a local optimization so that the Alias check doesn't need to +// use blocking queries against the remote server for check updates for +// local services. +func (l *State) AddAliasCheck(checkID types.CheckID, srcServiceID string, notifyCh chan<- struct{}) error { + l.Lock() + defer l.Unlock() + + m, ok := l.checkAliases[srcServiceID] + if !ok { + m = make(map[types.CheckID]chan<- struct{}) + l.checkAliases[srcServiceID] = m + } + m[checkID] = notifyCh + + return nil +} + +// RemoveAliasCheck removes the mapping for the alias check. +func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) { + l.Lock() + defer l.Unlock() + + if m, ok := l.checkAliases[srcServiceID]; ok { + delete(m, checkID) + if len(m) == 0 { + delete(l.checkAliases, srcServiceID) + } + } +} + // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered // todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well. @@ -486,6 +522,20 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { return } + // If this is a check for an aliased service, then notify the waiters. + if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 { + for _, notifyCh := range aliases { + // Do not block. All notify channels should be buffered to at + // least 1 in which case not-blocking does not result in loss + // of data because a failed send means a notification is + // already queued. + select { + case notifyCh <- struct{}{}: + default: + } + } + } + // Update status and mark out of sync c.Check.Status = status c.Check.Output = output diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 5c66a77d04..4a0e8f21fb 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -11,8 +11,6 @@ import ( "github.com/hashicorp/go-memdb" - "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/local" @@ -23,6 +21,7 @@ import ( "github.com/hashicorp/consul/types" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAgentAntiEntropy_Services(t *testing.T) { @@ -1606,6 +1605,59 @@ func TestAgent_AddCheckFailure(t *testing.T) { } } +func TestAgent_AliasCheck(t *testing.T) { + t.Parallel() + + require := require.New(t) + cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l.TriggerSyncChanges = func() {} + + // Add checks + require.NoError(l.AddService(&structs.NodeService{Service: "s1"}, "")) + require.NoError(l.AddService(&structs.NodeService{Service: "s2"}, "")) + require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c1"), ServiceID: "s1"}, "")) + require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c2"), ServiceID: "s2"}, "")) + + // Add an alias + notifyCh := make(chan struct{}, 1) + require.NoError(l.AddAliasCheck(types.CheckID("a1"), "s1", notifyCh)) + + // Update and verify we get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "") + select { + case <-notifyCh: + case <-time.After(100 * time.Millisecond): + t.Fatal("notify not received") + } + + // Update again and verify we do not get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "") + select { + case <-notifyCh: + t.Fatal("notify received") + + case <-time.After(50 * time.Millisecond): + } + + // Update other check and verify we do not get notified + l.UpdateCheck(types.CheckID("c2"), api.HealthCritical, "") + select { + case <-notifyCh: + t.Fatal("notify received") + + case <-time.After(50 * time.Millisecond): + } + + // Update change and verify we get notified + l.UpdateCheck(types.CheckID("c1"), api.HealthPassing, "") + select { + case <-notifyCh: + case <-time.After(100 * time.Millisecond): + t.Fatal("notify not received") + } +} + func TestAgent_sendCoordinate(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t.Name(), `