package api
import (
"encoding/json"
"fmt"
"path"
"sync"
"time"
)
const (
// DefaultSemaphoreSessionName is the Session Name we assign if none is provided
DefaultSemaphoreSessionName = "Consul API Semaphore"
// DefaultSemaphoreSessionTTL is the default session TTL if no Session is provided
// when creating a new Semaphore. This is used because we do not have another
// other check to depend upon.
DefaultSemaphoreSessionTTL = "15s"
// DefaultSemaphoreWaitTime is how long we block for at a time to check if semaphore
// acquisition is possible. This affects the minimum time it takes to cancel
// a Semaphore acquisition.
DefaultSemaphoreWaitTime = 15 * time . Second
// DefaultSemaphoreKey is the key used within the prefix to
// use for coordination between all the contenders.
DefaultSemaphoreKey = ".lock"
// SemaphoreFlagValue is a magic flag we set to indicate a key
// is being used for a semaphore. It is used to detect a potential
// conflict with a lock.
SemaphoreFlagValue = 0xe0f69a2baa414de0
)
var (
// ErrSemaphoreHeld is returned if we attempt to double lock
ErrSemaphoreHeld = fmt . Errorf ( "Semaphore already held" )
// ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore
// that we do not hold.
ErrSemaphoreNotHeld = fmt . Errorf ( "Semaphore not held" )
// ErrSemaphoreInUse is returned if we attempt to destroy a semaphore
// that is in use.
ErrSemaphoreInUse = fmt . Errorf ( "Semaphore in use" )
// ErrSemaphoreConflict is returned if the flags on a key
// used for a semaphore do not match expectation
ErrSemaphoreConflict = fmt . Errorf ( "Existing key does not match semaphore use" )
)
// Semaphore is used to implement a distributed semaphore
// using the Consul KV primitives.
type Semaphore struct {
c * Client
opts * SemaphoreOptions
isHeld bool
sessionRenew chan struct { }
lockSession string
l sync . Mutex
}
// SemaphoreOptions is used to parameterize the Semaphore
type SemaphoreOptions struct {
Prefix string // Must be set and have write permissions
Limit int // Must be set, and be positive
Value [ ] byte // Optional, value to associate with the contender entry
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
MonitorRetries int // Optional, defaults to 0 which means no retries
MonitorRetryTime time . Duration // Optional, defaults to DefaultMonitorRetryTime
SemaphoreWaitTime time . Duration // Optional, defaults to DefaultSemaphoreWaitTime
SemaphoreTryOnce bool // Optional, defaults to false which means try forever
Namespace string // Optional, defaults to API client config, namespace of ACL token, or "default" namespace
}
// semaphoreLock is written under the DefaultSemaphoreKey and
// is used to coordinate between all the contenders.
type semaphoreLock struct {
// Limit is the integer limit of holders. This is used to
// verify that all the holders agree on the value.
Limit int
// Holders is a list of all the semaphore holders.
// It maps the session ID to true. It is used as a set effectively.
Holders map [ string ] bool
}
// SemaphorePrefix is used to created a Semaphore which will operate
// at the given KV prefix and uses the given limit for the semaphore.
// The prefix must have write privileges, and the limit must be agreed
// upon by all contenders.
func ( c * Client ) SemaphorePrefix ( prefix string , limit int ) ( * Semaphore , error ) {
opts := & SemaphoreOptions {
Prefix : prefix ,
Limit : limit ,
}
return c . SemaphoreOpts ( opts )
}
// SemaphoreOpts is used to create a Semaphore with the given options.
// The prefix must have write privileges, and the limit must be agreed
// upon by all contenders. If a Session is not provided, one will be created.
func ( c * Client ) SemaphoreOpts ( opts * SemaphoreOptions ) ( * Semaphore , error ) {
if opts . Prefix == "" {
return nil , fmt . Errorf ( "missing prefix" )
}
if opts . Limit <= 0 {
return nil , fmt . Errorf ( "semaphore limit must be positive" )
}
if opts . SessionName == "" {
opts . SessionName = DefaultSemaphoreSessionName
}
if opts . SessionTTL == "" {
opts . SessionTTL = DefaultSemaphoreSessionTTL
} else {
if _ , err := time . ParseDuration ( opts . SessionTTL ) ; err != nil {
return nil , fmt . Errorf ( "invalid SessionTTL: %v" , err )
}
}
if opts . MonitorRetryTime == 0 {
opts . MonitorRetryTime = DefaultMonitorRetryTime
}
if opts . SemaphoreWaitTime == 0 {
opts . SemaphoreWaitTime = DefaultSemaphoreWaitTime
}
s := & Semaphore {
c : c ,
opts : opts ,
}
return s , nil
}
// Acquire attempts to reserve a slot in the semaphore, blocking until
// success, interrupted via the stopCh or an error is encountered.
// Providing a non-nil stopCh can be used to abort the attempt.
// On success, a channel is returned that represents our slot.
// This channel could be closed at any time due to session invalidation,
// communication errors, operator intervention, etc. It is NOT safe to
// assume that the slot is held until Release() unless the Session is specifically
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the session being lost.
func ( s * Semaphore ) Acquire ( stopCh <- chan struct { } ) ( <- chan struct { } , error ) {
// Hold the lock as we try to acquire
s . l . Lock ( )
defer s . l . Unlock ( )
// Check if we already hold the semaphore
if s . isHeld {
return nil , ErrSemaphoreHeld
}
// Check if we need to create a session first
s . lockSession = s . opts . Session
if s . lockSession == "" {
sess , err := s . createSession ( )
if err != nil {
return nil , fmt . Errorf ( "failed to create session: %v" , err )
}
s . sessionRenew = make ( chan struct { } )
s . lockSession = sess
session := s . c . Session ( )
go session . RenewPeriodic ( s . opts . SessionTTL , sess , nil , s . sessionRenew )
// If we fail to acquire the lock, cleanup the session
defer func ( ) {
if ! s . isHeld {
close ( s . sessionRenew )
s . sessionRenew = nil
}
} ( )
}
// Create the contender entry
kv := s . c . KV ( )
wOpts := WriteOptions { Namespace : s . opts . Namespace }
made , _ , err := kv . Acquire ( s . contenderEntry ( s . lockSession ) , & wOpts )
if err != nil || ! made {
return nil , fmt . Errorf ( "failed to make contender entry: %v" , err )
}
// Setup the query options
qOpts := QueryOptions {
WaitTime : s . opts . SemaphoreWaitTime ,
Namespace : s . opts . Namespace ,
}
start := time . Now ( )
attempts := 0
WAIT :
// Check if we should quit
select {
case <- stopCh :
return nil , nil
default :
}
// Handle the one-shot mode.
if s . opts . SemaphoreTryOnce && attempts > 0 {
elapsed := time . Since ( start )
if elapsed > s . opts . SemaphoreWaitTime {
return nil , nil
}
// Query wait time should not exceed the semaphore wait time
qOpts . WaitTime = s . opts . SemaphoreWaitTime - elapsed
}
attempts ++
// Read the prefix
pairs , meta , err := kv . List ( s . opts . Prefix , & qOpts )
if err != nil {
return nil , fmt . Errorf ( "failed to read prefix: %v" , err )
}
// Decode the lock
lockPair := s . findLock ( pairs )
if lockPair . Flags != SemaphoreFlagValue {
return nil , ErrSemaphoreConflict
}
lock , err := s . decodeLock ( lockPair )
if err != nil {
return nil , err
}
// Verify we agree with the limit
if lock . Limit != s . opts . Limit {
return nil , fmt . Errorf ( "semaphore limit conflict (lock: %d, local: %d)" ,
lock . Limit , s . opts . Limit )
}
// Prune the dead holders
s . pruneDeadHolders ( lock , pairs )
// Check if the lock is held
if len ( lock . Holders ) >= lock . Limit {
qOpts . WaitIndex = meta . LastIndex
goto WAIT
}
// Create a new lock with us as a holder
lock . Holders [ s . lockSession ] = true
newLock , err := s . encodeLock ( lock , lockPair . ModifyIndex )
if err != nil {
return nil , err
}
// Attempt the acquisition
didSet , _ , err := kv . CAS ( newLock , & wOpts )
if err != nil {
return nil , fmt . Errorf ( "failed to update lock: %v" , err )
}
if ! didSet {
// Update failed, could have been a race with another contender,
// retry the operation
goto WAIT
}
// Watch to ensure we maintain ownership of the slot
lockCh := make ( chan struct { } )
go s . monitorLock ( s . lockSession , lockCh )
// Set that we own the lock
s . isHeld = true
// Acquired! All done
return lockCh , nil
}
// Release is used to voluntarily give up our semaphore slot. It is
// an error to call this if the semaphore has not been acquired.
func ( s * Semaphore ) Release ( ) error {
// Hold the lock as we try to release
s . l . Lock ( )
defer s . l . Unlock ( )
// Ensure the lock is actually held
if ! s . isHeld {
return ErrSemaphoreNotHeld
}
// Set that we no longer own the lock
s . isHeld = false
// Stop the session renew
if s . sessionRenew != nil {
defer func ( ) {
close ( s . sessionRenew )
s . sessionRenew = nil
} ( )
}
// Get and clear the lock session
lockSession := s . lockSession
s . lockSession = ""
// Remove ourselves as a lock holder
kv := s . c . KV ( )
key := path . Join ( s . opts . Prefix , DefaultSemaphoreKey )
wOpts := WriteOptions { Namespace : s . opts . Namespace }
qOpts := QueryOptions { Namespace : s . opts . Namespace }
READ :
pair , _ , err := kv . Get ( key , & qOpts )
if err != nil {
return err
}
if pair == nil {
pair = & KVPair { }
}
lock , err := s . decodeLock ( pair )
if err != nil {
return err
}
// Create a new lock without us as a holder
if _ , ok := lock . Holders [ lockSession ] ; ok {
delete ( lock . Holders , lockSession )
newLock , err := s . encodeLock ( lock , pair . ModifyIndex )
if err != nil {
return err
}
// Swap the locks
didSet , _ , err := kv . CAS ( newLock , & wOpts )
if err != nil {
return fmt . Errorf ( "failed to update lock: %v" , err )
}
if ! didSet {
goto READ
}
}
// Destroy the contender entry
contenderKey := path . Join ( s . opts . Prefix , lockSession )
if _ , err := kv . Delete ( contenderKey , & wOpts ) ; err != nil {
return err
}
return nil
}
// Destroy is used to cleanup the semaphore entry. It is not necessary
// to invoke. It will fail if the semaphore is in use.
func ( s * Semaphore ) Destroy ( ) error {
// Hold the lock as we try to acquire
s . l . Lock ( )
defer s . l . Unlock ( )
// Check if we already hold the semaphore
if s . isHeld {
return ErrSemaphoreHeld
}
// List for the semaphore
kv := s . c . KV ( )
q := QueryOptions { Namespace : s . opts . Namespace }
pairs , _ , err := kv . List ( s . opts . Prefix , & q )
if err != nil {
return fmt . Errorf ( "failed to read prefix: %v" , err )
}
// Find the lock pair, bail if it doesn't exist
lockPair := s . findLock ( pairs )
if lockPair . ModifyIndex == 0 {
return nil
}
if lockPair . Flags != SemaphoreFlagValue {
return ErrSemaphoreConflict
}
// Decode the lock
lock , err := s . decodeLock ( lockPair )
if err != nil {
return err
}
// Prune the dead holders
s . pruneDeadHolders ( lock , pairs )
// Check if there are any holders
if len ( lock . Holders ) > 0 {
return ErrSemaphoreInUse
}
// Attempt the delete
w := WriteOptions { Namespace : s . opts . Namespace }
didRemove , _ , err := kv . DeleteCAS ( lockPair , & w )
if err != nil {
return fmt . Errorf ( "failed to remove semaphore: %v" , err )
}
if ! didRemove {
return ErrSemaphoreInUse
}
return nil
}
// createSession is used to create a new managed session
func ( s * Semaphore ) createSession ( ) ( string , error ) {
session := s . c . Session ( )
se := & SessionEntry {
Name : s . opts . SessionName ,
TTL : s . opts . SessionTTL ,
Behavior : SessionBehaviorDelete ,
}
w := WriteOptions { Namespace : s . opts . Namespace }
id , _ , err := session . Create ( se , & w )
if err != nil {
return "" , err
}
return id , nil
}
// contenderEntry returns a formatted KVPair for the contender
func ( s * Semaphore ) contenderEntry ( session string ) * KVPair {
return & KVPair {
Key : path . Join ( s . opts . Prefix , session ) ,
Value : s . opts . Value ,
Session : session ,
Flags : SemaphoreFlagValue ,
}
}
// findLock is used to find the KV Pair which is used for coordination
func ( s * Semaphore ) findLock ( pairs KVPairs ) * KVPair {
key := path . Join ( s . opts . Prefix , DefaultSemaphoreKey )
for _ , pair := range pairs {
if pair . Key == key {
return pair
}
}
return & KVPair { Flags : SemaphoreFlagValue }
}
// decodeLock is used to decode a semaphoreLock from an
// entry in Consul
func ( s * Semaphore ) decodeLock ( pair * KVPair ) ( * semaphoreLock , error ) {
// Handle if there is no lock
if pair == nil || pair . Value == nil {
return & semaphoreLock {
Limit : s . opts . Limit ,
Holders : make ( map [ string ] bool ) ,
} , nil
}
l := & semaphoreLock { }
if err := json . Unmarshal ( pair . Value , l ) ; err != nil {
return nil , fmt . Errorf ( "lock decoding failed: %v" , err )
}
return l , nil
}
// encodeLock is used to encode a semaphoreLock into a KVPair
// that can be PUT
func ( s * Semaphore ) encodeLock ( l * semaphoreLock , oldIndex uint64 ) ( * KVPair , error ) {
enc , err := json . Marshal ( l )
if err != nil {
return nil , fmt . Errorf ( "lock encoding failed: %v" , err )
}
pair := & KVPair {
Key : path . Join ( s . opts . Prefix , DefaultSemaphoreKey ) ,
Value : enc ,
Flags : SemaphoreFlagValue ,
ModifyIndex : oldIndex ,
}
return pair , nil
}
// pruneDeadHolders is used to remove all the dead lock holders
func ( s * Semaphore ) pruneDeadHolders ( lock * semaphoreLock , pairs KVPairs ) {
// Gather all the live holders
alive := make ( map [ string ] struct { } , len ( pairs ) )
for _ , pair := range pairs {
if pair . Session != "" {
alive [ pair . Session ] = struct { } { }
}
}
// Remove any holders that are dead
for holder := range lock . Holders {
if _ , ok := alive [ holder ] ; ! ok {
delete ( lock . Holders , holder )
}
}
}
// monitorLock is a long running routine to monitor a semaphore ownership
// It closes the stopCh if we lose our slot.
func ( s * Semaphore ) monitorLock ( session string , stopCh chan struct { } ) {
defer close ( stopCh )
kv := s . c . KV ( )
opts := QueryOptions {
RequireConsistent : true ,
Namespace : s . opts . Namespace ,
}
WAIT :
retries := s . opts . MonitorRetries
RETRY :
pairs , meta , err := kv . List ( s . opts . Prefix , & opts )
if err != nil {
// If configured we can try to ride out a brief Consul unavailability
// by doing retries. Note that we have to attempt the retry in a non-
// blocking fashion so that we have a clean place to reset the retry
// counter if service is restored.
if retries > 0 && IsRetryableError ( err ) {
time . Sleep ( s . opts . MonitorRetryTime )
retries --
opts . WaitIndex = 0
goto RETRY
}
return
}
lockPair := s . findLock ( pairs )
lock , err := s . decodeLock ( lockPair )
if err != nil {
return
}
s . pruneDeadHolders ( lock , pairs )
if _ , ok := lock . Holders [ session ] ; ok {
opts . WaitIndex = meta . LastIndex
goto WAIT
}
}