mirror of https://github.com/hashicorp/consul
acl: reduce complexity of token resolution process with alternative singleflighting (#5480)
acl: reduce complexity of token resolution process with alternative singleflighting Switches acl resolution to use golang.org/x/sync/singleflight. For the identity/legacy lookups this is a drop-in replacement with the same overall approach to request coalescing. For policies this is technically a change in behavior, but when considered holistically is approximately performance neutral (with the benefit of less code). There are two goals with this blob of code (speaking specifically of policy resolution here): 1) Minimize cross-DC requests. 2) Minimize client-to-server LAN requests. The previous iteration of this code was optimizing for the case of many possibly different tokens being resolved concurrently that have a significant overlap in linked policies such that deduplication would be worth the complexity. While this is laudable there are some things to consider that can help to adjust expectations: 1) For v1.4+ policies are always replicated, and once a single policy shows up in a secondary DC the replicated data is considered authoritative for requests made in that DC. This means that our earlier concerns about minimizing cross-DC requests are irrelevant because there will be no cross-DC policy reads that occur. 2) For Server nodes the in-memory ACL policy cache is capped at zero, meaning it has no caching. Only Client nodes run with a cache. This means that instead of having an entire DC's worth of tokens (what a Server might see) that can have policy resolutions coalesced these nodes will only ever be seeing node-local token resolutions. In a reasonable worst-case scenario where a scheduler like Kubernetes has "filled" a node with Connect services, even that will only schedule ~100 connect services per node. If every service has a unique token there will only be 100 tokens to coalesce and even then those requests have to occur concurrently AND be hitting an empty consul cache. Instead of seeing a great coalescing opportunity for cutting down on redundant Policy resolutions, in practice it's far more likely given node densities that you'd see requests for the same token concurrently than you would for two tokens sharing a policy concurrently (to a degree that would warrant the overhead of the current variation of singleflighting. Given that, this patch switches the Policy resolution process to only singleflight by requesting token (but keeps the cache as by-policy).pull/5457/merge
parent
aec25fde59
commit
cd96af4fc0
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sentinel"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
|
@ -81,28 +82,12 @@ func IsACLRemoteError(err error) bool {
|
|||
type ACLResolverDelegate interface {
|
||||
ACLsEnabled() bool
|
||||
ACLDatacenter(legacy bool) string
|
||||
// UseLegacyACLs
|
||||
UseLegacyACLs() bool
|
||||
ResolveIdentityFromToken(token string) (bool, structs.ACLIdentity, error)
|
||||
ResolvePolicyFromID(policyID string) (bool, *structs.ACLPolicy, error)
|
||||
RPC(method string, args interface{}, reply interface{}) error
|
||||
}
|
||||
|
||||
type remoteACLLegacyResult struct {
|
||||
authorizer acl.Authorizer
|
||||
err error
|
||||
}
|
||||
|
||||
type remoteACLIdentityResult struct {
|
||||
identity structs.ACLIdentity
|
||||
err error
|
||||
}
|
||||
|
||||
type remoteACLPolicyResult struct {
|
||||
policy *structs.ACLPolicy
|
||||
err error
|
||||
}
|
||||
|
||||
type policyTokenError struct {
|
||||
Err error
|
||||
token string
|
||||
|
@ -146,7 +131,7 @@ type ACLResolverConfig struct {
|
|||
//
|
||||
// When the down policy is set to async-cache and we have already cached values
|
||||
// then go routines will be spawned to perform the RPCs in the background
|
||||
// and then will udpate the cache with either the positive or negative result.
|
||||
// and then will update the cache with either the positive or negative result.
|
||||
//
|
||||
// When the down policy is set to extend-cache or the token/policy is not already
|
||||
// cached then the same go routines are spawned to do the RPCs in the background.
|
||||
|
@ -161,13 +146,10 @@ type ACLResolver struct {
|
|||
delegate ACLResolverDelegate
|
||||
sentinel sentinel.Evaluator
|
||||
|
||||
cache *structs.ACLCaches
|
||||
asyncIdentityResults map[string][]chan (*remoteACLIdentityResult)
|
||||
asyncIdentityResultsMutex sync.RWMutex
|
||||
asyncPolicyResults map[string][]chan (*remoteACLPolicyResult)
|
||||
asyncPolicyResultsMutex sync.RWMutex
|
||||
asyncLegacyResults map[string][]chan (*remoteACLLegacyResult)
|
||||
asyncLegacyMutex sync.RWMutex
|
||||
cache *structs.ACLCaches
|
||||
identityGroup singleflight.Group
|
||||
policyGroup singleflight.Group
|
||||
legacyGroup singleflight.Group
|
||||
|
||||
down acl.Authorizer
|
||||
|
||||
|
@ -211,40 +193,17 @@ func NewACLResolver(config *ACLResolverConfig) (*ACLResolver, error) {
|
|||
}
|
||||
|
||||
return &ACLResolver{
|
||||
config: config.Config,
|
||||
logger: config.Logger,
|
||||
delegate: config.Delegate,
|
||||
sentinel: config.Sentinel,
|
||||
cache: cache,
|
||||
asyncIdentityResults: make(map[string][]chan (*remoteACLIdentityResult)),
|
||||
asyncPolicyResults: make(map[string][]chan (*remoteACLPolicyResult)),
|
||||
asyncLegacyResults: make(map[string][]chan (*remoteACLLegacyResult)),
|
||||
autoDisable: config.AutoDisable,
|
||||
down: down,
|
||||
config: config.Config,
|
||||
logger: config.Logger,
|
||||
delegate: config.Delegate,
|
||||
sentinel: config.Sentinel,
|
||||
cache: cache,
|
||||
autoDisable: config.AutoDisable,
|
||||
down: down,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// fireAsyncLegacyResult is used to notify any watchers that legacy resolution of a token is complete
|
||||
func (r *ACLResolver) fireAsyncLegacyResult(token string, authorizer acl.Authorizer, ttl time.Duration, err error) {
|
||||
// cache the result: positive or negative
|
||||
r.cache.PutAuthorizerWithTTL(token, authorizer, ttl)
|
||||
|
||||
// get the list of channels to send the result to
|
||||
r.asyncLegacyMutex.Lock()
|
||||
channels := r.asyncLegacyResults[token]
|
||||
delete(r.asyncLegacyResults, token)
|
||||
r.asyncLegacyMutex.Unlock()
|
||||
|
||||
// notify all watchers of the RPC results
|
||||
result := &remoteACLLegacyResult{authorizer, err}
|
||||
for _, cx := range channels {
|
||||
// only chans that are being blocked on will be in the list of channels so this cannot block
|
||||
cx <- result
|
||||
close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ACLResolver) resolveTokenLegacyAsync(token string, cached *structs.AuthorizerCacheEntry) {
|
||||
func (r *ACLResolver) fetchAndCacheTokenLegacy(token string, cached *structs.AuthorizerCacheEntry) (acl.Authorizer, error) {
|
||||
req := structs.ACLPolicyResolveLegacyRequest{
|
||||
Datacenter: r.delegate.ACLDatacenter(true),
|
||||
ACL: token,
|
||||
|
@ -260,8 +219,12 @@ func (r *ACLResolver) resolveTokenLegacyAsync(token string, cached *structs.Auth
|
|||
if err == nil {
|
||||
parent := acl.RootAuthorizer(reply.Parent)
|
||||
if parent == nil {
|
||||
r.fireAsyncLegacyResult(token, cached.Authorizer, cacheTTL, acl.ErrInvalidParent)
|
||||
return
|
||||
var authorizer acl.Authorizer
|
||||
if cached != nil {
|
||||
authorizer = cached.Authorizer
|
||||
}
|
||||
r.cache.PutAuthorizerWithTTL(token, authorizer, cacheTTL)
|
||||
return authorizer, acl.ErrInvalidParent
|
||||
}
|
||||
|
||||
var policies []*acl.Policy
|
||||
|
@ -271,30 +234,32 @@ func (r *ACLResolver) resolveTokenLegacyAsync(token string, cached *structs.Auth
|
|||
}
|
||||
|
||||
authorizer, err := acl.NewPolicyAuthorizer(parent, policies, r.sentinel)
|
||||
r.fireAsyncLegacyResult(token, authorizer, reply.TTL, err)
|
||||
return
|
||||
|
||||
r.cache.PutAuthorizerWithTTL(token, authorizer, reply.TTL)
|
||||
return authorizer, err
|
||||
}
|
||||
|
||||
if acl.IsErrNotFound(err) {
|
||||
// Make sure to remove from the cache if it was deleted
|
||||
r.fireAsyncLegacyResult(token, nil, cacheTTL, acl.ErrNotFound)
|
||||
return
|
||||
r.cache.PutAuthorizerWithTTL(token, nil, cacheTTL)
|
||||
return nil, acl.ErrNotFound
|
||||
|
||||
}
|
||||
|
||||
// some other RPC error
|
||||
switch r.config.ACLDownPolicy {
|
||||
case "allow":
|
||||
r.fireAsyncLegacyResult(token, acl.AllowAll(), cacheTTL, nil)
|
||||
return
|
||||
r.cache.PutAuthorizerWithTTL(token, acl.AllowAll(), cacheTTL)
|
||||
return acl.AllowAll(), nil
|
||||
case "async-cache", "extend-cache":
|
||||
if cached != nil {
|
||||
r.fireAsyncLegacyResult(token, cached.Authorizer, cacheTTL, nil)
|
||||
return
|
||||
r.cache.PutAuthorizerWithTTL(token, cached.Authorizer, cacheTTL)
|
||||
return cached.Authorizer, nil
|
||||
}
|
||||
fallthrough
|
||||
default:
|
||||
r.fireAsyncLegacyResult(token, acl.DenyAll(), cacheTTL, nil)
|
||||
return
|
||||
r.cache.PutAuthorizerWithTTL(token, acl.DenyAll(), cacheTTL)
|
||||
return acl.DenyAll(), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -331,32 +296,12 @@ func (r *ACLResolver) resolveTokenLegacy(token string) (acl.Authorizer, error) {
|
|||
metrics.IncrCounter([]string{"acl", "token", "cache_miss"}, 1)
|
||||
|
||||
// Resolve the token in the background and wait on the result if we must
|
||||
var waitChan chan *remoteACLLegacyResult
|
||||
waitChan := r.legacyGroup.DoChan(token, func() (interface{}, error) {
|
||||
authorizer, err := r.fetchAndCacheTokenLegacy(token, entry)
|
||||
return authorizer, err
|
||||
})
|
||||
|
||||
waitForResult := entry == nil || r.config.ACLDownPolicy != "async-cache"
|
||||
|
||||
r.asyncLegacyMutex.Lock()
|
||||
|
||||
// check if resolution for this token is already happening
|
||||
waiters, ok := r.asyncLegacyResults[token]
|
||||
if !ok || waiters == nil {
|
||||
// initialize the slice of waiters if not already done
|
||||
waiters = make([]chan *remoteACLLegacyResult, 0)
|
||||
}
|
||||
if waitForResult {
|
||||
// create the waitChan only if we are going to block waiting
|
||||
// for the response and then append it to the list of waiters
|
||||
// Because we will block (not select or discard this chan) we
|
||||
// do not need to create it as buffered
|
||||
waitChan = make(chan *remoteACLLegacyResult)
|
||||
r.asyncLegacyResults[token] = append(waiters, waitChan)
|
||||
}
|
||||
r.asyncLegacyMutex.Unlock()
|
||||
|
||||
if !ok {
|
||||
// start the async RPC if it wasn't already ongoing
|
||||
go r.resolveTokenLegacyAsync(token, entry)
|
||||
}
|
||||
|
||||
if !waitForResult {
|
||||
// waitForResult being false requires the cacheEntry to not be nil
|
||||
if entry.Authorizer != nil {
|
||||
|
@ -367,30 +312,16 @@ func (r *ACLResolver) resolveTokenLegacy(token string) (acl.Authorizer, error) {
|
|||
|
||||
// block waiting for the async RPC to finish.
|
||||
res := <-waitChan
|
||||
return res.authorizer, res.err
|
||||
}
|
||||
|
||||
// fireAsyncTokenResult is used to notify all waiters that the results of a token resolution is complete
|
||||
func (r *ACLResolver) fireAsyncTokenResult(token string, identity structs.ACLIdentity, err error) {
|
||||
// cache the result: positive or negative
|
||||
r.cache.PutIdentity(token, identity)
|
||||
|
||||
// get the list of channels to send the result to
|
||||
r.asyncIdentityResultsMutex.Lock()
|
||||
channels := r.asyncIdentityResults[token]
|
||||
delete(r.asyncIdentityResults, token)
|
||||
r.asyncIdentityResultsMutex.Unlock()
|
||||
|
||||
// notify all watchers of the RPC results
|
||||
result := &remoteACLIdentityResult{identity, err}
|
||||
for _, cx := range channels {
|
||||
// cannot block because all wait chans will have another goroutine blocked on the read
|
||||
cx <- result
|
||||
close(cx)
|
||||
var authorizer acl.Authorizer
|
||||
if res.Val != nil { // avoid a nil-not-nil bug
|
||||
authorizer = res.Val.(acl.Authorizer)
|
||||
}
|
||||
|
||||
return authorizer, res.Err
|
||||
}
|
||||
|
||||
func (r *ACLResolver) resolveIdentityFromTokenAsync(token string, cached *structs.IdentityCacheEntry) {
|
||||
func (r *ACLResolver) fetchAndCacheIdentityFromToken(token string, cached *structs.IdentityCacheEntry) (structs.ACLIdentity, error) {
|
||||
req := structs.ACLTokenGetRequest{
|
||||
Datacenter: r.delegate.ACLDatacenter(false),
|
||||
TokenID: token,
|
||||
|
@ -405,28 +336,30 @@ func (r *ACLResolver) resolveIdentityFromTokenAsync(token string, cached *struct
|
|||
err := r.delegate.RPC("ACL.TokenRead", &req, &resp)
|
||||
if err == nil {
|
||||
if resp.Token == nil {
|
||||
r.fireAsyncTokenResult(token, nil, acl.ErrNotFound)
|
||||
r.cache.PutIdentity(token, nil)
|
||||
return nil, acl.ErrNotFound
|
||||
} else {
|
||||
r.fireAsyncTokenResult(token, resp.Token, nil)
|
||||
r.cache.PutIdentity(token, resp.Token)
|
||||
return resp.Token, nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if acl.IsErrNotFound(err) {
|
||||
// Make sure to remove from the cache if it was deleted
|
||||
r.fireAsyncTokenResult(token, nil, acl.ErrNotFound)
|
||||
return
|
||||
r.cache.PutIdentity(token, nil)
|
||||
return nil, acl.ErrNotFound
|
||||
|
||||
}
|
||||
|
||||
// some other RPC error
|
||||
if cached != nil && (r.config.ACLDownPolicy == "extend-cache" || r.config.ACLDownPolicy == "async-cache") {
|
||||
// extend the cache
|
||||
r.fireAsyncTokenResult(token, cached.Identity, nil)
|
||||
return
|
||||
r.cache.PutIdentity(token, cached.Identity)
|
||||
return cached.Identity, nil
|
||||
}
|
||||
|
||||
r.fireAsyncTokenResult(token, nil, err)
|
||||
return
|
||||
r.cache.PutIdentity(token, nil)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentity, error) {
|
||||
|
@ -445,32 +378,12 @@ func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentit
|
|||
metrics.IncrCounter([]string{"acl", "token", "cache_miss"}, 1)
|
||||
|
||||
// Background a RPC request and wait on it if we must
|
||||
var waitChan chan *remoteACLIdentityResult
|
||||
waitChan := r.identityGroup.DoChan(token, func() (interface{}, error) {
|
||||
identity, err := r.fetchAndCacheIdentityFromToken(token, cacheEntry)
|
||||
return identity, err
|
||||
})
|
||||
|
||||
waitForResult := cacheEntry == nil || r.config.ACLDownPolicy != "async-cache"
|
||||
|
||||
r.asyncIdentityResultsMutex.Lock()
|
||||
// check if resolution of this token is already ongoing
|
||||
waiters, ok := r.asyncIdentityResults[token]
|
||||
if !ok || waiters == nil {
|
||||
// only initialize the slice of waiters if need be (when this token resolution isn't ongoing)
|
||||
waiters = make([]chan *remoteACLIdentityResult, 0)
|
||||
}
|
||||
if waitForResult {
|
||||
// create the waitChan only if we are going to block waiting
|
||||
// for the response and then append it to the list of waiters
|
||||
// Because we will block (not select or discard this chan) we
|
||||
// do not need to create it as buffered
|
||||
waitChan = make(chan *remoteACLIdentityResult)
|
||||
r.asyncIdentityResults[token] = append(waiters, waitChan)
|
||||
}
|
||||
r.asyncIdentityResultsMutex.Unlock()
|
||||
|
||||
if !ok {
|
||||
// only start the RPC if one isn't in flight
|
||||
go r.resolveIdentityFromTokenAsync(token, cacheEntry)
|
||||
}
|
||||
|
||||
if !waitForResult {
|
||||
// waitForResult being false requires the cacheEntry to not be nil
|
||||
return cacheEntry.Identity, nil
|
||||
|
@ -479,34 +392,18 @@ func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentit
|
|||
// block on the read here, this is why we don't need chan buffering
|
||||
res := <-waitChan
|
||||
|
||||
if res.err != nil && !acl.IsErrNotFound(res.err) {
|
||||
return res.identity, ACLRemoteError{Err: res.err}
|
||||
var identity structs.ACLIdentity
|
||||
if res.Val != nil { // avoid a nil-not-nil bug
|
||||
identity = res.Val.(structs.ACLIdentity)
|
||||
}
|
||||
return res.identity, res.err
|
||||
|
||||
if res.Err != nil && !acl.IsErrNotFound(res.Err) {
|
||||
return identity, ACLRemoteError{Err: res.Err}
|
||||
}
|
||||
return identity, res.Err
|
||||
}
|
||||
|
||||
// fireAsyncPolicyResult is used to notify all waiters that policy resolution is complete.
|
||||
func (r *ACLResolver) fireAsyncPolicyResult(policyID string, policy *structs.ACLPolicy, err error, updateCache bool) {
|
||||
if updateCache {
|
||||
// cache the result: positive or negative
|
||||
r.cache.PutPolicy(policyID, policy)
|
||||
}
|
||||
|
||||
// get the list of channels to send the result to
|
||||
r.asyncPolicyResultsMutex.Lock()
|
||||
channels := r.asyncPolicyResults[policyID]
|
||||
delete(r.asyncPolicyResults, policyID)
|
||||
r.asyncPolicyResultsMutex.Unlock()
|
||||
|
||||
// notify all watchers of the RPC results
|
||||
result := &remoteACLPolicyResult{policy, err}
|
||||
for _, cx := range channels {
|
||||
// not closing the channel as there could be more events to be fired.
|
||||
cx <- result
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdentity, policyIDs []string, cached map[string]*structs.PolicyCacheEntry) {
|
||||
func (r *ACLResolver) fetchAndCachePoliciesForIdentity(identity structs.ACLIdentity, policyIDs []string, cached map[string]*structs.PolicyCacheEntry) (map[string]*structs.ACLPolicy, error) {
|
||||
req := structs.ACLPolicyBatchGetRequest{
|
||||
Datacenter: r.delegate.ACLDatacenter(false),
|
||||
PolicyIDs: policyIDs,
|
||||
|
@ -516,68 +413,66 @@ func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdenti
|
|||
},
|
||||
}
|
||||
|
||||
found := make(map[string]struct{})
|
||||
var resp structs.ACLPolicyBatchResponse
|
||||
err := r.delegate.RPC("ACL.PolicyResolve", &req, &resp)
|
||||
if err == nil {
|
||||
out := make(map[string]*structs.ACLPolicy)
|
||||
for _, policy := range resp.Policies {
|
||||
r.fireAsyncPolicyResult(policy.ID, policy, nil, true)
|
||||
found[policy.ID] = struct{}{}
|
||||
out[policy.ID] = policy
|
||||
}
|
||||
|
||||
for _, policyID := range policyIDs {
|
||||
if _, ok := found[policyID]; !ok {
|
||||
r.fireAsyncPolicyResult(policyID, nil, acl.ErrNotFound, true)
|
||||
if policy, ok := out[policyID]; ok {
|
||||
r.cache.PutPolicy(policyID, policy)
|
||||
} else {
|
||||
r.cache.PutPolicy(policyID, nil)
|
||||
}
|
||||
}
|
||||
return
|
||||
return out, nil
|
||||
}
|
||||
|
||||
if acl.IsErrNotFound(err) {
|
||||
// make sure to indicate that this identity is no longer valid within
|
||||
// the cache
|
||||
//
|
||||
// Note - This must be done before firing the results or else it would
|
||||
// be possible for waiters to get woken up an get the cached identity
|
||||
// again
|
||||
r.cache.PutIdentity(identity.SecretToken(), nil)
|
||||
for _, policyID := range policyIDs {
|
||||
// Do not touch the cache. Getting a top level ACL not found error
|
||||
// only indicates that the secret token used in the request
|
||||
// no longer exists
|
||||
r.fireAsyncPolicyResult(policyID, nil, &policyTokenError{acl.ErrNotFound, identity.SecretToken()}, false)
|
||||
}
|
||||
return
|
||||
|
||||
// Do not touch the policy cache. Getting a top level ACL not found error
|
||||
// only indicates that the secret token used in the request
|
||||
// no longer exists
|
||||
return nil, &policyTokenError{acl.ErrNotFound, identity.SecretToken()}
|
||||
}
|
||||
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// invalidate our ID cache so that identity resolution will take place
|
||||
// again in the future
|
||||
//
|
||||
// Note - This must be done before firing the results or else it would
|
||||
// be possible for waiters to get woken up and get the cached identity
|
||||
// again
|
||||
r.cache.RemoveIdentity(identity.SecretToken())
|
||||
|
||||
for _, policyID := range policyIDs {
|
||||
// Do not remove from the cache for permission denied
|
||||
// what this does indicate is that our view of the token is out of date
|
||||
r.fireAsyncPolicyResult(policyID, nil, &policyTokenError{acl.ErrPermissionDenied, identity.SecretToken()}, false)
|
||||
}
|
||||
return
|
||||
// Do not remove from the policy cache for permission denied
|
||||
// what this does indicate is that our view of the token is out of date
|
||||
return nil, &policyTokenError{acl.ErrPermissionDenied, identity.SecretToken()}
|
||||
}
|
||||
|
||||
// other RPC error - use cache if available
|
||||
|
||||
extendCache := r.config.ACLDownPolicy == "extend-cache" || r.config.ACLDownPolicy == "async-cache"
|
||||
|
||||
out := make(map[string]*structs.ACLPolicy)
|
||||
insufficientCache := false
|
||||
for _, policyID := range policyIDs {
|
||||
if entry, ok := cached[policyID]; extendCache && ok {
|
||||
r.fireAsyncPolicyResult(policyID, entry.Policy, nil, true)
|
||||
r.cache.PutPolicy(policyID, entry.Policy)
|
||||
if entry.Policy != nil {
|
||||
out[policyID] = entry.Policy
|
||||
}
|
||||
} else {
|
||||
r.fireAsyncPolicyResult(policyID, nil, ACLRemoteError{Err: err}, true)
|
||||
r.cache.PutPolicy(policyID, nil)
|
||||
insufficientCache = true
|
||||
}
|
||||
}
|
||||
return
|
||||
if insufficientCache {
|
||||
return nil, ACLRemoteError{Err: err}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *ACLResolver) filterPoliciesByScope(policies structs.ACLPolicies) structs.ACLPolicies {
|
||||
|
@ -660,70 +555,37 @@ func (r *ACLResolver) resolvePoliciesForIdentity(identity structs.ACLIdentity) (
|
|||
return r.filterPoliciesByScope(policies), nil
|
||||
}
|
||||
|
||||
hasMissing := len(missing) > 0
|
||||
|
||||
fetchIDs := missing
|
||||
for _, policy := range expired {
|
||||
fetchIDs = append(fetchIDs, policy.ID)
|
||||
}
|
||||
|
||||
// Background a RPC request and wait on it if we must
|
||||
var waitChan chan *remoteACLPolicyResult
|
||||
waitForResult := len(missing) > 0 || r.config.ACLDownPolicy != "async-cache"
|
||||
if waitForResult {
|
||||
// buffered because there are going to be multiple go routines that send data to this chan
|
||||
waitChan = make(chan *remoteACLPolicyResult, len(fetchIDs))
|
||||
}
|
||||
|
||||
var newAsyncFetchIds []string
|
||||
r.asyncPolicyResultsMutex.Lock()
|
||||
for _, policyID := range fetchIDs {
|
||||
clients, ok := r.asyncPolicyResults[policyID]
|
||||
if !ok || clients == nil {
|
||||
clients = make([]chan *remoteACLPolicyResult, 0)
|
||||
}
|
||||
if waitForResult {
|
||||
r.asyncPolicyResults[policyID] = append(clients, waitChan)
|
||||
}
|
||||
|
||||
if !ok {
|
||||
newAsyncFetchIds = append(newAsyncFetchIds, policyID)
|
||||
}
|
||||
}
|
||||
r.asyncPolicyResultsMutex.Unlock()
|
||||
|
||||
if len(newAsyncFetchIds) > 0 {
|
||||
// only start the RPC if one isn't in flight
|
||||
go r.resolvePoliciesAsyncForIdentity(identity, newAsyncFetchIds, expCacheMap)
|
||||
}
|
||||
waitChan := r.policyGroup.DoChan(identity.SecretToken(), func() (interface{}, error) {
|
||||
policies, err := r.fetchAndCachePoliciesForIdentity(identity, fetchIDs, expCacheMap)
|
||||
return policies, err
|
||||
})
|
||||
|
||||
waitForResult := hasMissing || r.config.ACLDownPolicy != "async-cache"
|
||||
if !waitForResult {
|
||||
// waitForResult being false requires that all the policies were cached already
|
||||
policies = append(policies, expired...)
|
||||
return r.filterPoliciesByScope(policies), nil
|
||||
}
|
||||
|
||||
for i := 0; i < len(fetchIDs); i++ {
|
||||
res := <-waitChan
|
||||
res := <-waitChan
|
||||
|
||||
if res.err != nil {
|
||||
if _, ok := res.err.(*policyTokenError); ok {
|
||||
// always return token errors
|
||||
return nil, res.err
|
||||
} else if !acl.IsErrNotFound(res.err) {
|
||||
// ignore regular not found errors for policies
|
||||
return nil, res.err
|
||||
}
|
||||
}
|
||||
if res.Err != nil {
|
||||
return nil, res.Err
|
||||
}
|
||||
|
||||
// we probably could handle a special case where we
|
||||
// get a permission denied error due to another requests
|
||||
// issues and spawn the go routine to resolve it ourselves.
|
||||
// however this should be exceedingly rare and in this case
|
||||
// we can just kick the can down the road and retry the whole
|
||||
// token/policy resolution. All the remaining good bits that
|
||||
// we need will already be cached anyways.
|
||||
if res.Val != nil {
|
||||
foundPolicies := res.Val.(map[string]*structs.ACLPolicy)
|
||||
|
||||
if res.policy != nil {
|
||||
policies = append(policies, res.policy)
|
||||
for _, policy := range foundPolicies {
|
||||
policies = append(policies, policy)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) {
|
|||
},
|
||||
},
|
||||
}, nil
|
||||
case "concurrent-resolve-1":
|
||||
case "concurrent-resolve":
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "5f57c1f6-6a89-4186-9445-531b316e01df",
|
||||
SecretID: "a1a54629-5050-4d17-8a4e-560d2423f835",
|
||||
|
@ -142,19 +142,6 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) {
|
|||
},
|
||||
},
|
||||
}, nil
|
||||
case "concurrent-resolve-2":
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "296bbe10-01aa-437e-ac3b-3ecdc00ea65c",
|
||||
SecretID: "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7",
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "node-wr",
|
||||
},
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "acl-wr",
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
case anonymousToken:
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "00000000-0000-0000-0000-000000000002",
|
||||
|
@ -737,6 +724,135 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
|
||||
requireIdentityCached(t, r, "foo", false, "no longer cached")
|
||||
})
|
||||
|
||||
t.Run("PolicyResolve-TokenNotFound", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
_, rawToken, _ := testIdentityForToken("found")
|
||||
foundToken := rawToken.(*structs.ACLToken)
|
||||
secretID := foundToken.SecretID
|
||||
|
||||
tokenResolved := false
|
||||
policyResolved := false
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
if !tokenResolved {
|
||||
reply.Token = foundToken
|
||||
tokenResolved = true
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("Not Supposed to be Invoked again")
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if !policyResolved {
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
policyResolved = true
|
||||
return nil
|
||||
}
|
||||
return acl.ErrNotFound // test condition
|
||||
},
|
||||
}
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
config.Config.ACLTokenTTL = 0
|
||||
config.Config.ACLPolicyTTL = 0
|
||||
})
|
||||
|
||||
// Prime the standard caches.
|
||||
authz, err := r.ResolveToken(secretID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, authz)
|
||||
require.True(t, authz.NodeWrite("foo", nil))
|
||||
|
||||
// Verify that the caches are setup properly.
|
||||
requireIdentityCached(t, r, secretID, true, "cached")
|
||||
requirePolicyCached(t, r, "node-wr", true, "cached") // from "found" token
|
||||
requirePolicyCached(t, r, "dc2-key-wr", true, "cached") // from "found" token
|
||||
|
||||
// Nuke 1 policy from the cache so that we force a policy resolve
|
||||
// during token resolve.
|
||||
r.cache.RemovePolicy("dc2-key-wr")
|
||||
|
||||
_, err = r.ResolveToken(secretID)
|
||||
require.True(t, acl.IsErrNotFound(err))
|
||||
|
||||
requireIdentityCached(t, r, secretID, false, "identity not found cached")
|
||||
requirePolicyCached(t, r, "node-wr", true, "still cached")
|
||||
require.Nil(t, r.cache.GetPolicy("dc2-key-wr"), "not stored at all")
|
||||
})
|
||||
|
||||
t.Run("PolicyResolve-PermissionDenied", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
_, rawToken, _ := testIdentityForToken("found")
|
||||
foundToken := rawToken.(*structs.ACLToken)
|
||||
secretID := foundToken.SecretID
|
||||
|
||||
policyResolved := false
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
// no limit
|
||||
reply.Token = foundToken
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if !policyResolved {
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
policyResolved = true
|
||||
return nil
|
||||
}
|
||||
return acl.ErrPermissionDenied // test condition
|
||||
},
|
||||
}
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
config.Config.ACLTokenTTL = 0
|
||||
config.Config.ACLPolicyTTL = 0
|
||||
})
|
||||
|
||||
// Prime the standard caches.
|
||||
authz, err := r.ResolveToken(secretID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, authz)
|
||||
require.True(t, authz.NodeWrite("foo", nil))
|
||||
|
||||
// Verify that the caches are setup properly.
|
||||
requireIdentityCached(t, r, secretID, true, "cached")
|
||||
requirePolicyCached(t, r, "node-wr", true, "cached") // from "found" token
|
||||
requirePolicyCached(t, r, "dc2-key-wr", true, "cached") // from "found" token
|
||||
|
||||
// Nuke 1 policy from the cache so that we force a policy resolve
|
||||
// during token resolve.
|
||||
r.cache.RemovePolicy("dc2-key-wr")
|
||||
|
||||
_, err = r.ResolveToken(secretID)
|
||||
require.True(t, acl.IsErrPermissionDenied(err))
|
||||
|
||||
require.Nil(t, r.cache.GetIdentity(secretID), "identity not stored at all")
|
||||
requirePolicyCached(t, r, "node-wr", true, "still cached")
|
||||
require.Nil(t, r.cache.GetPolicy("dc2-key-wr"), "not stored at all")
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLResolver_DatacenterScoping(t *testing.T) {
|
||||
|
@ -893,7 +1009,7 @@ func TestACLResolver_Client(t *testing.T) {
|
|||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
|
@ -907,6 +1023,7 @@ func TestACLResolver_Client(t *testing.T) {
|
|||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
|
@ -939,255 +1056,6 @@ func TestACLResolver_Client(t *testing.T) {
|
|||
require.Equal(t, int32(1), tokenReads)
|
||||
require.Equal(t, int32(1), policyResolves)
|
||||
})
|
||||
|
||||
t.Run("Concurrent-Policy-Resolve", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-2")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
// waits until both tokens have been read for up to 1 second
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
reads := atomic.LoadInt32(&tokenReads)
|
||||
if reads >= 2 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLTokenTTL = 600 * time.Second
|
||||
// effectively disables the cache - therefore the only way we end up
|
||||
// with 1 policy resolution is if they get single flighted
|
||||
config.Config.ACLPolicyTTL = 0 * time.Millisecond
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
ch1 := make(chan *asyncResolutionResult)
|
||||
ch2 := make(chan *asyncResolutionResult)
|
||||
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1)
|
||||
go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2)
|
||||
|
||||
res1 := <-ch1
|
||||
res2 := <-ch2
|
||||
|
||||
require.NoError(t, res1.err)
|
||||
require.NoError(t, res2.err)
|
||||
require.Equal(t, res1.authz, res2.authz)
|
||||
require.Equal(t, int32(2), tokenReads)
|
||||
require.Equal(t, int32(1), policyResolves)
|
||||
})
|
||||
|
||||
t.Run("Concurrent-Policy-Resolve-Permission-Denied", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var waitReady int32 = 1
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-2")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
|
||||
if atomic.CompareAndSwapInt32(&waitReady, 1, 0) {
|
||||
// waits until both tokens have been read for up to 1 second
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
reads := atomic.LoadInt32(&tokenReads)
|
||||
if reads >= 2 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLTokenTTL = 600 * time.Second
|
||||
config.Config.ACLPolicyTTL = 600 * time.Second
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
ch1 := make(chan *asyncResolutionResult)
|
||||
ch2 := make(chan *asyncResolutionResult)
|
||||
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1)
|
||||
go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2)
|
||||
|
||||
res1 := <-ch1
|
||||
res2 := <-ch2
|
||||
|
||||
require.NoError(t, res1.err)
|
||||
require.NoError(t, res2.err)
|
||||
require.Equal(t, res1.authz, res2.authz)
|
||||
// 2 reads for 1 token (cache gets invalidated and only 1 for the other)
|
||||
require.Equal(t, int32(3), tokenReads)
|
||||
require.Equal(t, int32(2), policyResolves)
|
||||
require.True(t, res1.authz.ACLRead())
|
||||
require.True(t, res1.authz.NodeWrite("foo", nil))
|
||||
})
|
||||
|
||||
t.Run("Concurrent-Policy-Resolve-Not-Found", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var waitReady int32 = 1
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
var tokenNotAllowed string
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-2")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
|
||||
if atomic.CompareAndSwapInt32(&waitReady, 1, 0) {
|
||||
// waits until both tokens have been read for up to 1 second
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
reads := atomic.LoadInt32(&tokenReads)
|
||||
if reads >= 2 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
tokenNotAllowed = args.Token
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLTokenTTL = 600 * time.Second
|
||||
config.Config.ACLPolicyTTL = 600 * time.Second
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
ch1 := make(chan *asyncResolutionResult)
|
||||
ch2 := make(chan *asyncResolutionResult)
|
||||
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1)
|
||||
go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2)
|
||||
|
||||
res1 := <-ch1
|
||||
res2 := <-ch2
|
||||
|
||||
var errResult *asyncResolutionResult
|
||||
var goodResult *asyncResolutionResult
|
||||
|
||||
// can't be sure which token resolution is going to be the one that does the first policy resolution
|
||||
// so we record it and then determine here how the results should be validated
|
||||
if tokenNotAllowed == "a1a54629-5050-4d17-8a4e-560d2423f835" {
|
||||
errResult = res1
|
||||
goodResult = res2
|
||||
} else {
|
||||
errResult = res2
|
||||
goodResult = res1
|
||||
}
|
||||
|
||||
require.Error(t, errResult.err)
|
||||
require.Nil(t, errResult.authz)
|
||||
require.EqualError(t, errResult.err, acl.ErrNotFound.Error())
|
||||
require.NoError(t, goodResult.err)
|
||||
require.Equal(t, int32(2), tokenReads)
|
||||
require.Equal(t, int32(2), policyResolves)
|
||||
require.NotNil(t, goodResult.authz)
|
||||
require.True(t, goodResult.authz.ACLRead())
|
||||
require.True(t, goodResult.authz.NodeWrite("foo", nil))
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLResolver_LocalTokensAndPolicies(t *testing.T) {
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
// Copyright 2013 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package singleflight provides a duplicate function call suppression
|
||||
// mechanism.
|
||||
package singleflight // import "golang.org/x/sync/singleflight"
|
||||
|
||||
import "sync"
|
||||
|
||||
// call is an in-flight or completed singleflight.Do call
|
||||
type call struct {
|
||||
wg sync.WaitGroup
|
||||
|
||||
// These fields are written once before the WaitGroup is done
|
||||
// and are only read after the WaitGroup is done.
|
||||
val interface{}
|
||||
err error
|
||||
|
||||
// These fields are read and written with the singleflight
|
||||
// mutex held before the WaitGroup is done, and are read but
|
||||
// not written after the WaitGroup is done.
|
||||
dups int
|
||||
chans []chan<- Result
|
||||
}
|
||||
|
||||
// Group represents a class of work and forms a namespace in
|
||||
// which units of work can be executed with duplicate suppression.
|
||||
type Group struct {
|
||||
mu sync.Mutex // protects m
|
||||
m map[string]*call // lazily initialized
|
||||
}
|
||||
|
||||
// Result holds the results of Do, so they can be passed
|
||||
// on a channel.
|
||||
type Result struct {
|
||||
Val interface{}
|
||||
Err error
|
||||
Shared bool
|
||||
}
|
||||
|
||||
// Do executes and returns the results of the given function, making
|
||||
// sure that only one execution is in-flight for a given key at a
|
||||
// time. If a duplicate comes in, the duplicate caller waits for the
|
||||
// original to complete and receives the same results.
|
||||
// The return value shared indicates whether v was given to multiple callers.
|
||||
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
|
||||
g.mu.Lock()
|
||||
if g.m == nil {
|
||||
g.m = make(map[string]*call)
|
||||
}
|
||||
if c, ok := g.m[key]; ok {
|
||||
c.dups++
|
||||
g.mu.Unlock()
|
||||
c.wg.Wait()
|
||||
return c.val, c.err, true
|
||||
}
|
||||
c := new(call)
|
||||
c.wg.Add(1)
|
||||
g.m[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
g.doCall(c, key, fn)
|
||||
return c.val, c.err, c.dups > 0
|
||||
}
|
||||
|
||||
// DoChan is like Do but returns a channel that will receive the
|
||||
// results when they are ready.
|
||||
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
|
||||
ch := make(chan Result, 1)
|
||||
g.mu.Lock()
|
||||
if g.m == nil {
|
||||
g.m = make(map[string]*call)
|
||||
}
|
||||
if c, ok := g.m[key]; ok {
|
||||
c.dups++
|
||||
c.chans = append(c.chans, ch)
|
||||
g.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
c := &call{chans: []chan<- Result{ch}}
|
||||
c.wg.Add(1)
|
||||
g.m[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
go g.doCall(c, key, fn)
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// doCall handles the single call for a key.
|
||||
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
|
||||
c.val, c.err = fn()
|
||||
c.wg.Done()
|
||||
|
||||
g.mu.Lock()
|
||||
delete(g.m, key)
|
||||
for _, ch := range c.chans {
|
||||
ch <- Result{c.val, c.err, c.dups > 0}
|
||||
}
|
||||
g.mu.Unlock()
|
||||
}
|
||||
|
||||
// Forget tells the singleflight to forget about a key. Future calls
|
||||
// to Do for this key will call the function rather than waiting for
|
||||
// an earlier call to complete.
|
||||
func (g *Group) Forget(key string) {
|
||||
g.mu.Lock()
|
||||
delete(g.m, key)
|
||||
g.mu.Unlock()
|
||||
}
|
|
@ -320,6 +320,7 @@
|
|||
{"path":"golang.org/x/net/trace","checksumSHA1":"u/r66lwYfgg682u5hZG7/E7+VCY=","revision":"d866cfc389cec985d6fda2859936a575a55a3ab6","revisionTime":"2017-12-11T20:45:21Z"},
|
||||
{"path":"golang.org/x/oauth2","revision":""},
|
||||
{"path":"golang.org/x/oauth2/google","revision":""},
|
||||
{"path":"golang.org/x/sync/singleflight","checksumSHA1":"VhUZFUuhLFSBFUfskMC4am5RIdc=","revision":"e225da77a7e68af35c70ccbf71af2b83e6acac3c","revisionTime":"2019-02-15T22:36:53Z"},
|
||||
{"path":"golang.org/x/sys/cpu","checksumSHA1":"REkmyB368pIiip76LiqMLspgCRk=","revision":"ad87a3a340fa7f3bed189293fbfa7a9b7e021ae1","revisionTime":"2018-06-18T16:37:21Z"},
|
||||
{"path":"golang.org/x/sys/unix","checksumSHA1":"su2QDjUzrUO0JnOH9m0cNg0QqsM=","revision":"ac767d655b305d4e9612f5f6e33120b9176c4ad4","revisionTime":"2018-07-08T03:57:06Z"},
|
||||
{"path":"golang.org/x/sys/windows","checksumSHA1":"P8Y8GFwxybTfltfh8Q0lHqlEYIM=","revision":"ac767d655b305d4e9612f5f6e33120b9176c4ad4","revisionTime":"2018-07-08T03:57:06Z"},
|
||||
|
|
Loading…
Reference in New Issue