package api
import (
"fmt"
"sync"
"time"
)
const (
// DefaultLockSessionName is the Session Name we assign if none is provided
DefaultLockSessionName = "Consul API Lock"
// DefaultLockSessionTTL is the default session TTL if no Session is provided
// when creating a new Lock. This is used because we do not have another
// other check to depend upon.
DefaultLockSessionTTL = "15s"
// DefaultLockWaitTime is how long we block for at a time to check if lock
// acquisition is possible. This affects the minimum time it takes to cancel
// a Lock acquisition.
DefaultLockWaitTime = 15 * time . Second
// DefaultLockRetryTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultLockRetryTime = 5 * time . Second
)
var (
// ErrLockHeld is returned if we attempt to double lock
ErrLockHeld = fmt . Errorf ( "Lock already held" )
// ErrLockNotHeld is returned if we attempt to unlock a lock
// that we do not hold.
ErrLockNotHeld = fmt . Errorf ( "Lock not held" )
)
// Lock is used to implement client-side leader election. It is follows the
// algorithm as described here: https://consul.io/docs/guides/leader-election.html.
type Lock struct {
c * Client
opts * LockOptions
isHeld bool
sessionRenew chan struct { }
lockSession string
l sync . Mutex
}
// LockOptions is used to parameterize the Lock behavior.
type LockOptions struct {
Key string // Must be set and have write permissions
Value [ ] byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
}
// LockKey returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func ( c * Client ) LockKey ( key string ) ( * Lock , error ) {
opts := & LockOptions {
Key : key ,
}
return c . LockOpts ( opts )
}
// LockOpts returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func ( c * Client ) LockOpts ( opts * LockOptions ) ( * Lock , error ) {
if opts . Key == "" {
return nil , fmt . Errorf ( "missing key" )
}
if opts . SessionName == "" {
opts . SessionName = DefaultLockSessionName
}
if opts . SessionTTL == "" {
opts . SessionTTL = DefaultLockSessionTTL
} else {
if _ , err := time . ParseDuration ( opts . SessionTTL ) ; err != nil {
return nil , fmt . Errorf ( "invalid SessionTTL: %v" , err )
}
}
l := & Lock {
c : c ,
opts : opts ,
}
return l , nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Providing a non-nil stopCh can be used to abort the lock attempt.
// Returns a channel that is closed if our lock is lost or an error.
// This channel could be closed at any time due to session invalidation,
// communication errors, operator intervention, etc. It is NOT safe to
// assume that the lock is held until Unlock() unless the Session is specifically
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the lock being lost.
func ( l * Lock ) Lock ( stopCh chan struct { } ) ( chan struct { } , error ) {
// Hold the lock as we try to acquire
l . l . Lock ( )
defer l . l . Unlock ( )
// Check if we already hold the lock
if l . isHeld {
return nil , ErrLockHeld
}
// Check if we need to create a session first
l . lockSession = l . opts . Session
if l . lockSession == "" {
if s , err := l . createSession ( ) ; err != nil {
return nil , fmt . Errorf ( "failed to create session: %v" , err )
} else {
l . sessionRenew = make ( chan struct { } )
l . lockSession = s
go l . renewSession ( s , l . sessionRenew )
// If we fail to acquire the lock, cleanup the session
defer func ( ) {
if ! l . isHeld {
close ( l . sessionRenew )
l . sessionRenew = nil
}
} ( )
}
}
// Setup the query options
kv := l . c . KV ( )
qOpts := & QueryOptions {
WaitTime : DefaultLockWaitTime ,
}
WAIT :
// Check if we should quit
select {
case <- stopCh :
return nil , nil
default :
}
// Look for an existing lock, blocking until not taken
pair , meta , err := kv . Get ( l . opts . Key , qOpts )
if err != nil {
return nil , fmt . Errorf ( "failed to read lock: %v" , err )
}
if pair != nil && pair . Session != "" {
qOpts . WaitIndex = meta . LastIndex
goto WAIT
}
// Try to acquire the lock
lockEnt := l . lockEntry ( l . lockSession )
locked , _ , err := kv . Acquire ( lockEnt , nil )
if err != nil {
return nil , fmt . Errorf ( "failed to acquire lock: %v" , err )
}
// Handle the case of not getting the lock
if ! locked {
select {
case <- time . After ( DefaultLockRetryTime ) :
goto WAIT
case <- stopCh :
return nil , nil
}
}
// Watch to ensure we maintain leadership
leaderCh := make ( chan struct { } )
go l . monitorLock ( l . lockSession , leaderCh )
// Set that we own the lock
l . isHeld = true
// Locked! All done
return leaderCh , nil
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func ( l * Lock ) Unlock ( ) error {
// Hold the lock as we try to release
l . l . Lock ( )
defer l . l . Unlock ( )
// Ensure the lock is actually held
if ! l . isHeld {
return ErrLockNotHeld
}
// Set that we no longer own the lock
l . isHeld = false
// Stop the session renew
if l . sessionRenew != nil {
defer func ( ) {
close ( l . sessionRenew )
l . sessionRenew = nil
} ( )
}
// Get the lock entry, and clear the lock session
lockEnt := l . lockEntry ( l . lockSession )
l . lockSession = ""
// Release the lock explicitly
kv := l . c . KV ( )
_ , _ , err := kv . Release ( lockEnt , nil )
if err != nil {
return fmt . Errorf ( "failed to release lock: %v" , err )
}
return nil
}
// createSession is used to create a new managed session
func ( l * Lock ) createSession ( ) ( string , error ) {
session := l . c . Session ( )
se := & SessionEntry {
Name : l . opts . SessionName ,
TTL : l . opts . SessionTTL ,
}
id , _ , err := session . Create ( se , nil )
if err != nil {
return "" , err
}
return id , nil
}
// lockEntry returns a formatted KVPair for the lock
func ( l * Lock ) lockEntry ( session string ) * KVPair {
return & KVPair {
Key : l . opts . Key ,
Value : l . opts . Value ,
Session : session ,
}
}
// renewSession is a long running routine that maintians a session
// by doing a periodic Session renewal.
func ( l * Lock ) renewSession ( id string , doneCh chan struct { } ) {
session := l . c . Session ( )
ttl , _ := time . ParseDuration ( l . opts . SessionTTL )
for {
select {
case <- time . After ( ttl / 2 ) :
entry , _ , err := session . Renew ( id , nil )
if err != nil || entry == nil {
return
}
// Handle the server updating the TTL
ttl , _ = time . ParseDuration ( entry . TTL )
case <- doneCh :
// Attempt a session destroy
session . Destroy ( id , nil )
return
}
}
}
// monitorLock is a long running routine to monitor a lock ownership
// It closes the stopCh if we lose our leadership.
func ( l * Lock ) monitorLock ( session string , stopCh chan struct { } ) {
defer close ( stopCh )
kv := l . c . KV ( )
opts := & QueryOptions { RequireConsistent : true }
WAIT :
pair , meta , err := kv . Get ( l . opts . Key , opts )
if err != nil {
return
}
if pair != nil && pair . Session == session {
opts . WaitIndex = meta . LastIndex
goto WAIT
}
}