mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
533 lines
14 KiB
533 lines
14 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: MPL-2.0 |
|
|
|
package api |
|
|
|
import ( |
|
"encoding/json" |
|
"fmt" |
|
"path" |
|
"sync" |
|
"time" |
|
) |
|
|
|
const ( |
|
// DefaultSemaphoreSessionName is the Session Name we assign if none is provided |
|
DefaultSemaphoreSessionName = "Consul API Semaphore" |
|
|
|
// DefaultSemaphoreSessionTTL is the default session TTL if no Session is provided |
|
// when creating a new Semaphore. This is used because we do not have another |
|
// other check to depend upon. |
|
DefaultSemaphoreSessionTTL = "15s" |
|
|
|
// DefaultSemaphoreWaitTime is how long we block for at a time to check if semaphore |
|
// acquisition is possible. This affects the minimum time it takes to cancel |
|
// a Semaphore acquisition. |
|
DefaultSemaphoreWaitTime = 15 * time.Second |
|
|
|
// DefaultSemaphoreKey is the key used within the prefix to |
|
// use for coordination between all the contenders. |
|
DefaultSemaphoreKey = ".lock" |
|
|
|
// SemaphoreFlagValue is a magic flag we set to indicate a key |
|
// is being used for a semaphore. It is used to detect a potential |
|
// conflict with a lock. |
|
SemaphoreFlagValue = 0xe0f69a2baa414de0 |
|
) |
|
|
|
var ( |
|
// ErrSemaphoreHeld is returned if we attempt to double lock |
|
ErrSemaphoreHeld = fmt.Errorf("Semaphore already held") |
|
|
|
// ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore |
|
// that we do not hold. |
|
ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held") |
|
|
|
// ErrSemaphoreInUse is returned if we attempt to destroy a semaphore |
|
// that is in use. |
|
ErrSemaphoreInUse = fmt.Errorf("Semaphore in use") |
|
|
|
// ErrSemaphoreConflict is returned if the flags on a key |
|
// used for a semaphore do not match expectation |
|
ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use") |
|
) |
|
|
|
// Semaphore is used to implement a distributed semaphore |
|
// using the Consul KV primitives. |
|
type Semaphore struct { |
|
c *Client |
|
opts *SemaphoreOptions |
|
|
|
isHeld bool |
|
sessionRenew chan struct{} |
|
lockSession string |
|
l sync.Mutex |
|
} |
|
|
|
// SemaphoreOptions is used to parameterize the Semaphore |
|
type SemaphoreOptions struct { |
|
Prefix string // Must be set and have write permissions |
|
Limit int // Must be set, and be positive |
|
Value []byte // Optional, value to associate with the contender entry |
|
Session string // Optional, created if not specified |
|
SessionName string // Optional, defaults to DefaultLockSessionName |
|
SessionTTL string // Optional, defaults to DefaultLockSessionTTL |
|
MonitorRetries int // Optional, defaults to 0 which means no retries |
|
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime |
|
SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime |
|
SemaphoreTryOnce bool // Optional, defaults to false which means try forever |
|
Namespace string `json:",omitempty"` // Optional, defaults to API client config, namespace of ACL token, or "default" namespace |
|
} |
|
|
|
// semaphoreLock is written under the DefaultSemaphoreKey and |
|
// is used to coordinate between all the contenders. |
|
type semaphoreLock struct { |
|
// Limit is the integer limit of holders. This is used to |
|
// verify that all the holders agree on the value. |
|
Limit int |
|
|
|
// Holders is a list of all the semaphore holders. |
|
// It maps the session ID to true. It is used as a set effectively. |
|
Holders map[string]bool |
|
} |
|
|
|
// SemaphorePrefix is used to created a Semaphore which will operate |
|
// at the given KV prefix and uses the given limit for the semaphore. |
|
// The prefix must have write privileges, and the limit must be agreed |
|
// upon by all contenders. |
|
func (c *Client) SemaphorePrefix(prefix string, limit int) (*Semaphore, error) { |
|
opts := &SemaphoreOptions{ |
|
Prefix: prefix, |
|
Limit: limit, |
|
} |
|
return c.SemaphoreOpts(opts) |
|
} |
|
|
|
// SemaphoreOpts is used to create a Semaphore with the given options. |
|
// The prefix must have write privileges, and the limit must be agreed |
|
// upon by all contenders. If a Session is not provided, one will be created. |
|
func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) { |
|
if opts.Prefix == "" { |
|
return nil, fmt.Errorf("missing prefix") |
|
} |
|
if opts.Limit <= 0 { |
|
return nil, fmt.Errorf("semaphore limit must be positive") |
|
} |
|
if opts.SessionName == "" { |
|
opts.SessionName = DefaultSemaphoreSessionName |
|
} |
|
if opts.SessionTTL == "" { |
|
opts.SessionTTL = DefaultSemaphoreSessionTTL |
|
} else { |
|
if _, err := time.ParseDuration(opts.SessionTTL); err != nil { |
|
return nil, fmt.Errorf("invalid SessionTTL: %v", err) |
|
} |
|
} |
|
if opts.MonitorRetryTime == 0 { |
|
opts.MonitorRetryTime = DefaultMonitorRetryTime |
|
} |
|
if opts.SemaphoreWaitTime == 0 { |
|
opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime |
|
} |
|
s := &Semaphore{ |
|
c: c, |
|
opts: opts, |
|
} |
|
return s, nil |
|
} |
|
|
|
// Acquire attempts to reserve a slot in the semaphore, blocking until |
|
// success, interrupted via the stopCh or an error is encountered. |
|
// Providing a non-nil stopCh can be used to abort the attempt. |
|
// On success, a channel is returned that represents our slot. |
|
// This channel could be closed at any time due to session invalidation, |
|
// communication errors, operator intervention, etc. It is NOT safe to |
|
// assume that the slot is held until Release() unless the Session is specifically |
|
// created without any associated health checks. By default Consul sessions |
|
// prefer liveness over safety and an application must be able to handle |
|
// the session being lost. |
|
func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) { |
|
// Hold the lock as we try to acquire |
|
s.l.Lock() |
|
defer s.l.Unlock() |
|
|
|
// Check if we already hold the semaphore |
|
if s.isHeld { |
|
return nil, ErrSemaphoreHeld |
|
} |
|
|
|
// Check if we need to create a session first |
|
s.lockSession = s.opts.Session |
|
if s.lockSession == "" { |
|
sess, err := s.createSession() |
|
if err != nil { |
|
return nil, fmt.Errorf("failed to create session: %v", err) |
|
} |
|
|
|
s.sessionRenew = make(chan struct{}) |
|
s.lockSession = sess |
|
session := s.c.Session() |
|
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew) |
|
|
|
// If we fail to acquire the lock, cleanup the session |
|
defer func() { |
|
if !s.isHeld { |
|
close(s.sessionRenew) |
|
s.sessionRenew = nil |
|
} |
|
}() |
|
} |
|
|
|
// Create the contender entry |
|
kv := s.c.KV() |
|
wOpts := WriteOptions{Namespace: s.opts.Namespace} |
|
|
|
made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), &wOpts) |
|
if err != nil || !made { |
|
return nil, fmt.Errorf("failed to make contender entry: %v", err) |
|
} |
|
|
|
// Setup the query options |
|
qOpts := QueryOptions{ |
|
WaitTime: s.opts.SemaphoreWaitTime, |
|
Namespace: s.opts.Namespace, |
|
} |
|
|
|
start := time.Now() |
|
attempts := 0 |
|
WAIT: |
|
// Check if we should quit |
|
select { |
|
case <-stopCh: |
|
return nil, nil |
|
default: |
|
} |
|
|
|
// Handle the one-shot mode. |
|
if s.opts.SemaphoreTryOnce && attempts > 0 { |
|
elapsed := time.Since(start) |
|
if elapsed > s.opts.SemaphoreWaitTime { |
|
return nil, nil |
|
} |
|
|
|
// Query wait time should not exceed the semaphore wait time |
|
qOpts.WaitTime = s.opts.SemaphoreWaitTime - elapsed |
|
} |
|
attempts++ |
|
|
|
// Read the prefix |
|
pairs, meta, err := kv.List(s.opts.Prefix, &qOpts) |
|
if err != nil { |
|
return nil, fmt.Errorf("failed to read prefix: %v", err) |
|
} |
|
|
|
// Decode the lock |
|
lockPair := s.findLock(pairs) |
|
if lockPair.Flags != SemaphoreFlagValue { |
|
return nil, ErrSemaphoreConflict |
|
} |
|
lock, err := s.decodeLock(lockPair) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// Verify we agree with the limit |
|
if lock.Limit != s.opts.Limit { |
|
return nil, fmt.Errorf("semaphore limit conflict (lock: %d, local: %d)", |
|
lock.Limit, s.opts.Limit) |
|
} |
|
|
|
// Prune the dead holders |
|
s.pruneDeadHolders(lock, pairs) |
|
|
|
// Check if the lock is held |
|
if len(lock.Holders) >= lock.Limit { |
|
qOpts.WaitIndex = meta.LastIndex |
|
goto WAIT |
|
} |
|
|
|
// Create a new lock with us as a holder |
|
lock.Holders[s.lockSession] = true |
|
newLock, err := s.encodeLock(lock, lockPair.ModifyIndex) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// Attempt the acquisition |
|
didSet, _, err := kv.CAS(newLock, &wOpts) |
|
if err != nil { |
|
return nil, fmt.Errorf("failed to update lock: %v", err) |
|
} |
|
if !didSet { |
|
// Update failed, could have been a race with another contender, |
|
// retry the operation |
|
goto WAIT |
|
} |
|
|
|
// Watch to ensure we maintain ownership of the slot |
|
lockCh := make(chan struct{}) |
|
go s.monitorLock(s.lockSession, lockCh) |
|
|
|
// Set that we own the lock |
|
s.isHeld = true |
|
|
|
// Acquired! All done |
|
return lockCh, nil |
|
} |
|
|
|
// Release is used to voluntarily give up our semaphore slot. It is |
|
// an error to call this if the semaphore has not been acquired. |
|
func (s *Semaphore) Release() error { |
|
// Hold the lock as we try to release |
|
s.l.Lock() |
|
defer s.l.Unlock() |
|
|
|
// Ensure the lock is actually held |
|
if !s.isHeld { |
|
return ErrSemaphoreNotHeld |
|
} |
|
|
|
// Set that we no longer own the lock |
|
s.isHeld = false |
|
|
|
// Stop the session renew |
|
if s.sessionRenew != nil { |
|
defer func() { |
|
close(s.sessionRenew) |
|
s.sessionRenew = nil |
|
}() |
|
} |
|
|
|
// Get and clear the lock session |
|
lockSession := s.lockSession |
|
s.lockSession = "" |
|
|
|
// Remove ourselves as a lock holder |
|
kv := s.c.KV() |
|
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey) |
|
|
|
wOpts := WriteOptions{Namespace: s.opts.Namespace} |
|
qOpts := QueryOptions{Namespace: s.opts.Namespace} |
|
|
|
READ: |
|
pair, _, err := kv.Get(key, &qOpts) |
|
if err != nil { |
|
return err |
|
} |
|
if pair == nil { |
|
pair = &KVPair{} |
|
} |
|
lock, err := s.decodeLock(pair) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
// Create a new lock without us as a holder |
|
if _, ok := lock.Holders[lockSession]; ok { |
|
delete(lock.Holders, lockSession) |
|
newLock, err := s.encodeLock(lock, pair.ModifyIndex) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
// Swap the locks |
|
didSet, _, err := kv.CAS(newLock, &wOpts) |
|
if err != nil { |
|
return fmt.Errorf("failed to update lock: %v", err) |
|
} |
|
if !didSet { |
|
goto READ |
|
} |
|
} |
|
|
|
// Destroy the contender entry |
|
contenderKey := path.Join(s.opts.Prefix, lockSession) |
|
if _, err := kv.Delete(contenderKey, &wOpts); err != nil { |
|
return err |
|
} |
|
return nil |
|
} |
|
|
|
// Destroy is used to cleanup the semaphore entry. It is not necessary |
|
// to invoke. It will fail if the semaphore is in use. |
|
func (s *Semaphore) Destroy() error { |
|
// Hold the lock as we try to acquire |
|
s.l.Lock() |
|
defer s.l.Unlock() |
|
|
|
// Check if we already hold the semaphore |
|
if s.isHeld { |
|
return ErrSemaphoreHeld |
|
} |
|
|
|
// List for the semaphore |
|
kv := s.c.KV() |
|
|
|
q := QueryOptions{Namespace: s.opts.Namespace} |
|
pairs, _, err := kv.List(s.opts.Prefix, &q) |
|
if err != nil { |
|
return fmt.Errorf("failed to read prefix: %v", err) |
|
} |
|
|
|
// Find the lock pair, bail if it doesn't exist |
|
lockPair := s.findLock(pairs) |
|
if lockPair.ModifyIndex == 0 { |
|
return nil |
|
} |
|
if lockPair.Flags != SemaphoreFlagValue { |
|
return ErrSemaphoreConflict |
|
} |
|
|
|
// Decode the lock |
|
lock, err := s.decodeLock(lockPair) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
// Prune the dead holders |
|
s.pruneDeadHolders(lock, pairs) |
|
|
|
// Check if there are any holders |
|
if len(lock.Holders) > 0 { |
|
return ErrSemaphoreInUse |
|
} |
|
|
|
// Attempt the delete |
|
w := WriteOptions{Namespace: s.opts.Namespace} |
|
didRemove, _, err := kv.DeleteCAS(lockPair, &w) |
|
if err != nil { |
|
return fmt.Errorf("failed to remove semaphore: %v", err) |
|
} |
|
if !didRemove { |
|
return ErrSemaphoreInUse |
|
} |
|
return nil |
|
} |
|
|
|
// createSession is used to create a new managed session |
|
func (s *Semaphore) createSession() (string, error) { |
|
session := s.c.Session() |
|
se := &SessionEntry{ |
|
Name: s.opts.SessionName, |
|
TTL: s.opts.SessionTTL, |
|
Behavior: SessionBehaviorDelete, |
|
} |
|
|
|
w := WriteOptions{Namespace: s.opts.Namespace} |
|
id, _, err := session.Create(se, &w) |
|
if err != nil { |
|
return "", err |
|
} |
|
return id, nil |
|
} |
|
|
|
// contenderEntry returns a formatted KVPair for the contender |
|
func (s *Semaphore) contenderEntry(session string) *KVPair { |
|
return &KVPair{ |
|
Key: path.Join(s.opts.Prefix, session), |
|
Value: s.opts.Value, |
|
Session: session, |
|
Flags: SemaphoreFlagValue, |
|
} |
|
} |
|
|
|
// findLock is used to find the KV Pair which is used for coordination |
|
func (s *Semaphore) findLock(pairs KVPairs) *KVPair { |
|
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey) |
|
for _, pair := range pairs { |
|
if pair.Key == key { |
|
return pair |
|
} |
|
} |
|
return &KVPair{Flags: SemaphoreFlagValue} |
|
} |
|
|
|
// decodeLock is used to decode a semaphoreLock from an |
|
// entry in Consul |
|
func (s *Semaphore) decodeLock(pair *KVPair) (*semaphoreLock, error) { |
|
// Handle if there is no lock |
|
if pair == nil || pair.Value == nil { |
|
return &semaphoreLock{ |
|
Limit: s.opts.Limit, |
|
Holders: make(map[string]bool), |
|
}, nil |
|
} |
|
|
|
l := &semaphoreLock{} |
|
if err := json.Unmarshal(pair.Value, l); err != nil { |
|
return nil, fmt.Errorf("lock decoding failed: %v", err) |
|
} |
|
return l, nil |
|
} |
|
|
|
// encodeLock is used to encode a semaphoreLock into a KVPair |
|
// that can be PUT |
|
func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, error) { |
|
enc, err := json.Marshal(l) |
|
if err != nil { |
|
return nil, fmt.Errorf("lock encoding failed: %v", err) |
|
} |
|
pair := &KVPair{ |
|
Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey), |
|
Value: enc, |
|
Flags: SemaphoreFlagValue, |
|
ModifyIndex: oldIndex, |
|
} |
|
return pair, nil |
|
} |
|
|
|
// pruneDeadHolders is used to remove all the dead lock holders |
|
func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) { |
|
// Gather all the live holders |
|
alive := make(map[string]struct{}, len(pairs)) |
|
for _, pair := range pairs { |
|
if pair.Session != "" { |
|
alive[pair.Session] = struct{}{} |
|
} |
|
} |
|
|
|
// Remove any holders that are dead |
|
for holder := range lock.Holders { |
|
if _, ok := alive[holder]; !ok { |
|
delete(lock.Holders, holder) |
|
} |
|
} |
|
} |
|
|
|
// monitorLock is a long running routine to monitor a semaphore ownership |
|
// It closes the stopCh if we lose our slot. |
|
func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) { |
|
defer close(stopCh) |
|
kv := s.c.KV() |
|
opts := QueryOptions{ |
|
RequireConsistent: true, |
|
Namespace: s.opts.Namespace, |
|
} |
|
WAIT: |
|
retries := s.opts.MonitorRetries |
|
RETRY: |
|
pairs, meta, err := kv.List(s.opts.Prefix, &opts) |
|
if err != nil { |
|
// If configured we can try to ride out a brief Consul unavailability |
|
// by doing retries. Note that we have to attempt the retry in a non- |
|
// blocking fashion so that we have a clean place to reset the retry |
|
// counter if service is restored. |
|
if retries > 0 && IsRetryableError(err) { |
|
time.Sleep(s.opts.MonitorRetryTime) |
|
retries-- |
|
opts.WaitIndex = 0 |
|
goto RETRY |
|
} |
|
return |
|
} |
|
lockPair := s.findLock(pairs) |
|
lock, err := s.decodeLock(lockPair) |
|
if err != nil { |
|
return |
|
} |
|
s.pruneDeadHolders(lock, pairs) |
|
if _, ok := lock.Holders[session]; ok { |
|
opts.WaitIndex = meta.LastIndex |
|
goto WAIT |
|
} |
|
}
|
|
|