Add ability for notifications when one of the agent tokens is updated (#8301)

Co-authored-by: Chris Piraino <cpiraino@hashicorp.com>
pull/8310/head
Matt Keeler 2020-07-14 09:53:55 -04:00 committed by GitHub
parent 4b746dc880
commit 8beca1cb8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 201 additions and 0 deletions

View File

@ -13,6 +13,28 @@ const (
TokenSourceAPI TokenSource = true TokenSourceAPI TokenSource = true
) )
type TokenKind int
const (
TokenKindAgent TokenKind = iota
TokenKindAgentMaster
TokenKindUser
TokenKindReplication
)
type watcher struct {
kind TokenKind
ch chan<- struct{}
}
// Notifier holds the channel used to notify a watcher
// of token updates as well as some internal tracking
// information to allow for deregistering the notifier.
type Notifier struct {
id int
Ch <-chan struct{}
}
// Store is used to hold the special ACL tokens used by Consul agents. It is // Store is used to hold the special ACL tokens used by Consul agents. It is
// designed to update the tokens on the fly, so the token store itself should be // designed to update the tokens on the fly, so the token store itself should be
// plumbed around and used to get tokens at runtime, don't save the resulting // plumbed around and used to get tokens at runtime, don't save the resulting
@ -52,10 +74,86 @@ type Store struct {
// replicationTokenSource indicates where this token originated from // replicationTokenSource indicates where this token originated from
replicationTokenSource TokenSource replicationTokenSource TokenSource
watchers map[int]watcher
watcherIndex int
// enterpriseTokens contains tokens only used in consul-enterprise // enterpriseTokens contains tokens only used in consul-enterprise
enterpriseTokens enterpriseTokens
} }
// Notify will set up a watch for when tokens of the desired kind is changed
func (t *Store) Notify(kind TokenKind) Notifier {
// buffering ensures that notifications aren't missed if the watcher
// isn't already in a select and that our notifications don't
// block returning from the Update* methods.
ch := make(chan struct{}, 1)
w := watcher{
kind: kind,
ch: ch,
}
t.l.Lock()
defer t.l.Unlock()
if t.watchers == nil {
t.watchers = make(map[int]watcher)
}
// we specifically want to avoid the zero-value to prevent accidental stop-notification requests
t.watcherIndex += 1
t.watchers[t.watcherIndex] = w
return Notifier{id: t.watcherIndex, Ch: ch}
}
// StopNotify stops the token store from sending notifications to the specified notifiers chan
func (t *Store) StopNotify(n Notifier) {
t.l.Lock()
defer t.l.Unlock()
delete(t.watchers, n.id)
}
// anyKindAllowed returns true if any of the kinds in the `check` list are
// set to be allowed in the `allowed` map.
//
// Note: this is mostly just a convenience to simplify the code in
// sendNotificationLocked and prevent more nested looping with breaks/continues
// and other state tracking.
func anyKindAllowed(allowed TokenKind, check []TokenKind) bool {
for _, kind := range check {
if allowed == kind {
return true
}
}
return false
}
// sendNotificationLocked will iterate through all watchers and notify them that a
// token they are watching has been updated.
//
// NOTE: this function explicitly does not attempt to send the kind or new token value
// along through the channel. With that approach watchers could potentially miss updates
// if the buffered chan fills up. Instead with this approach we just notify that any
// token they care about has been udpated and its up to the caller to retrieve the
// new value (after receiving from the chan). With this approach its entirely possible
// for the watcher to be notified twice before actually retrieving the token after the first
// read from the chan. This is better behavior than missing events. It can cause some
// churn temporarily but in common cases its not expected that these tokens would be updated
// frequently enough to cause this to happen.
func (t *Store) sendNotificationLocked(kinds ...TokenKind) {
for _, watcher := range t.watchers {
if !anyKindAllowed(watcher.kind, kinds) {
// ignore this watcher as it doesn't want events for these kinds of token
continue
}
select {
case watcher.ch <- struct{}{}:
default:
// its already pending a notification
}
}
}
// UpdateUserToken replaces the current user token in the store. // UpdateUserToken replaces the current user token in the store.
// Returns true if it was changed. // Returns true if it was changed.
func (t *Store) UpdateUserToken(token string, source TokenSource) bool { func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
@ -63,6 +161,9 @@ func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
changed := (t.userToken != token || t.userTokenSource != source) changed := (t.userToken != token || t.userTokenSource != source)
t.userToken = token t.userToken = token
t.userTokenSource = source t.userTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindUser)
}
t.l.Unlock() t.l.Unlock()
return changed return changed
} }
@ -74,6 +175,9 @@ func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
changed := (t.agentToken != token || t.agentTokenSource != source) changed := (t.agentToken != token || t.agentTokenSource != source)
t.agentToken = token t.agentToken = token
t.agentTokenSource = source t.agentTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindAgent)
}
t.l.Unlock() t.l.Unlock()
return changed return changed
} }
@ -85,6 +189,9 @@ func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source) changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source)
t.agentMasterToken = token t.agentMasterToken = token
t.agentMasterTokenSource = source t.agentMasterTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindAgentMaster)
}
t.l.Unlock() t.l.Unlock()
return changed return changed
} }
@ -96,6 +203,9 @@ func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
changed := (t.replicationToken != token || t.replicationTokenSource != source) changed := (t.replicationToken != token || t.replicationTokenSource != source)
t.replicationToken = token t.replicationToken = token
t.replicationTokenSource = source t.replicationTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindReplication)
}
t.l.Unlock() t.l.Unlock()
return changed return changed
} }

View File

@ -150,3 +150,94 @@ func TestStore_AgentMasterToken(t *testing.T) {
s.UpdateAgentMasterToken("", TokenSourceConfig) s.UpdateAgentMasterToken("", TokenSourceConfig)
verify(false, "", "nope", "master", "another") verify(false, "", "nope", "master", "another")
} }
func TestStore_Notify(t *testing.T) {
t.Parallel()
s := new(Store)
newNotification := func(t *testing.T, s *Store, kind TokenKind) Notifier {
n := s.Notify(kind)
require.NotNil(t, n.Ch)
return n
}
requireNotNotified := func(t *testing.T, ch <-chan struct{}) {
require.Empty(t, ch)
}
requireNotifiedOnce := func(t *testing.T, ch <-chan struct{}) {
require.Len(t, ch, 1)
// drain the channel
<-ch
// just to be safe
require.Empty(t, ch)
}
agentNotifier := newNotification(t, s, TokenKindAgent)
userNotifier := newNotification(t, s, TokenKindUser)
agentMasterNotifier := newNotification(t, s, TokenKindAgentMaster)
replicationNotifier := newNotification(t, s, TokenKindReplication)
replicationNotifier2 := newNotification(t, s, TokenKindReplication)
// perform an update of the user token
require.True(t, s.UpdateUserToken("edcae2a2-3b51-4864-b412-c7a568f49cb1", TokenSourceConfig))
// do it again to ensure it doesn't block even though nothing has read from the 1 buffered chan yet
require.True(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI))
// ensure notifications were sent to the user and all notifiers
requireNotNotified(t, agentNotifier.Ch)
requireNotifiedOnce(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
// now update the agent token which should send notificaitons to the agent and all notifier
require.True(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI))
requireNotifiedOnce(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
// now update the agent master token which should send notificaitons to the agent master and all notifier
require.True(t, s.UpdateAgentMasterToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))
requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotifiedOnce(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
// now update the replication token which should send notificaitons to the replication and all notifier
require.True(t, s.UpdateReplicationToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))
requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotifiedOnce(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotifiedOnce(t, replicationNotifier2.Ch)
s.StopNotify(replicationNotifier2)
// now update the replication token which should send notificaitons to the replication and all notifier
require.True(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI))
requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotifiedOnce(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)
// request updates but that are not changes
require.False(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI))
require.False(t, s.UpdateAgentMasterToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))
require.False(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI))
require.False(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI))
// ensure that notifications were not sent
requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
}