mirror of https://github.com/hashicorp/consul
531 lines
14 KiB
Go
531 lines
14 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
// DefaultSemaphoreSessionName is the Session Name we assign if none is provided
|
|
DefaultSemaphoreSessionName = "Consul API Semaphore"
|
|
|
|
// DefaultSemaphoreSessionTTL is the default session TTL if no Session is provided
|
|
// when creating a new Semaphore. This is used because we do not have another
|
|
// other check to depend upon.
|
|
DefaultSemaphoreSessionTTL = "15s"
|
|
|
|
// DefaultSemaphoreWaitTime is how long we block for at a time to check if semaphore
|
|
// acquisition is possible. This affects the minimum time it takes to cancel
|
|
// a Semaphore acquisition.
|
|
DefaultSemaphoreWaitTime = 15 * time.Second
|
|
|
|
// DefaultSemaphoreKey is the key used within the prefix to
|
|
// use for coordination between all the contenders.
|
|
DefaultSemaphoreKey = ".lock"
|
|
|
|
// SemaphoreFlagValue is a magic flag we set to indicate a key
|
|
// is being used for a semaphore. It is used to detect a potential
|
|
// conflict with a lock.
|
|
SemaphoreFlagValue = 0xe0f69a2baa414de0
|
|
)
|
|
|
|
var (
|
|
// ErrSemaphoreHeld is returned if we attempt to double lock
|
|
ErrSemaphoreHeld = fmt.Errorf("Semaphore already held")
|
|
|
|
// ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore
|
|
// that we do not hold.
|
|
ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held")
|
|
|
|
// ErrSemaphoreInUse is returned if we attempt to destroy a semaphore
|
|
// that is in use.
|
|
ErrSemaphoreInUse = fmt.Errorf("Semaphore in use")
|
|
|
|
// ErrSemaphoreConflict is returned if the flags on a key
|
|
// used for a semaphore do not match expectation
|
|
ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use")
|
|
)
|
|
|
|
// Semaphore is used to implement a distributed semaphore
|
|
// using the Consul KV primitives.
|
|
type Semaphore struct {
|
|
c *Client
|
|
opts *SemaphoreOptions
|
|
|
|
isHeld bool
|
|
sessionRenew chan struct{}
|
|
lockSession string
|
|
l sync.Mutex
|
|
}
|
|
|
|
// SemaphoreOptions is used to parameterize the Semaphore
|
|
type SemaphoreOptions struct {
|
|
Prefix string // Must be set and have write permissions
|
|
Limit int // Must be set, and be positive
|
|
Value []byte // Optional, value to associate with the contender entry
|
|
Session string // Optional, created if not specified
|
|
SessionName string // Optional, defaults to DefaultLockSessionName
|
|
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
|
|
MonitorRetries int // Optional, defaults to 0 which means no retries
|
|
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
|
|
SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime
|
|
SemaphoreTryOnce bool // Optional, defaults to false which means try forever
|
|
Namespace string // Optional, defaults to API client config, namespace of ACL token, or "default" namespace
|
|
}
|
|
|
|
// semaphoreLock is written under the DefaultSemaphoreKey and
|
|
// is used to coordinate between all the contenders.
|
|
type semaphoreLock struct {
|
|
// Limit is the integer limit of holders. This is used to
|
|
// verify that all the holders agree on the value.
|
|
Limit int
|
|
|
|
// Holders is a list of all the semaphore holders.
|
|
// It maps the session ID to true. It is used as a set effectively.
|
|
Holders map[string]bool
|
|
}
|
|
|
|
// SemaphorePrefix is used to created a Semaphore which will operate
|
|
// at the given KV prefix and uses the given limit for the semaphore.
|
|
// The prefix must have write privileges, and the limit must be agreed
|
|
// upon by all contenders.
|
|
func (c *Client) SemaphorePrefix(prefix string, limit int) (*Semaphore, error) {
|
|
opts := &SemaphoreOptions{
|
|
Prefix: prefix,
|
|
Limit: limit,
|
|
}
|
|
return c.SemaphoreOpts(opts)
|
|
}
|
|
|
|
// SemaphoreOpts is used to create a Semaphore with the given options.
|
|
// The prefix must have write privileges, and the limit must be agreed
|
|
// upon by all contenders. If a Session is not provided, one will be created.
|
|
func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
|
|
if opts.Prefix == "" {
|
|
return nil, fmt.Errorf("missing prefix")
|
|
}
|
|
if opts.Limit <= 0 {
|
|
return nil, fmt.Errorf("semaphore limit must be positive")
|
|
}
|
|
if opts.SessionName == "" {
|
|
opts.SessionName = DefaultSemaphoreSessionName
|
|
}
|
|
if opts.SessionTTL == "" {
|
|
opts.SessionTTL = DefaultSemaphoreSessionTTL
|
|
} else {
|
|
if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
|
|
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
|
|
}
|
|
}
|
|
if opts.MonitorRetryTime == 0 {
|
|
opts.MonitorRetryTime = DefaultMonitorRetryTime
|
|
}
|
|
if opts.SemaphoreWaitTime == 0 {
|
|
opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime
|
|
}
|
|
s := &Semaphore{
|
|
c: c,
|
|
opts: opts,
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Acquire attempts to reserve a slot in the semaphore, blocking until
|
|
// success, interrupted via the stopCh or an error is encountered.
|
|
// Providing a non-nil stopCh can be used to abort the attempt.
|
|
// On success, a channel is returned that represents our slot.
|
|
// This channel could be closed at any time due to session invalidation,
|
|
// communication errors, operator intervention, etc. It is NOT safe to
|
|
// assume that the slot is held until Release() unless the Session is specifically
|
|
// created without any associated health checks. By default Consul sessions
|
|
// prefer liveness over safety and an application must be able to handle
|
|
// the session being lost.
|
|
func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
|
// Hold the lock as we try to acquire
|
|
s.l.Lock()
|
|
defer s.l.Unlock()
|
|
|
|
// Check if we already hold the semaphore
|
|
if s.isHeld {
|
|
return nil, ErrSemaphoreHeld
|
|
}
|
|
|
|
// Check if we need to create a session first
|
|
s.lockSession = s.opts.Session
|
|
if s.lockSession == "" {
|
|
sess, err := s.createSession()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create session: %v", err)
|
|
}
|
|
|
|
s.sessionRenew = make(chan struct{})
|
|
s.lockSession = sess
|
|
session := s.c.Session()
|
|
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
|
|
|
|
// If we fail to acquire the lock, cleanup the session
|
|
defer func() {
|
|
if !s.isHeld {
|
|
close(s.sessionRenew)
|
|
s.sessionRenew = nil
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Create the contender entry
|
|
kv := s.c.KV()
|
|
wOpts := WriteOptions{Namespace: s.opts.Namespace}
|
|
|
|
made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), &wOpts)
|
|
if err != nil || !made {
|
|
return nil, fmt.Errorf("failed to make contender entry: %v", err)
|
|
}
|
|
|
|
// Setup the query options
|
|
qOpts := QueryOptions{
|
|
WaitTime: s.opts.SemaphoreWaitTime,
|
|
Namespace: s.opts.Namespace,
|
|
}
|
|
|
|
start := time.Now()
|
|
attempts := 0
|
|
WAIT:
|
|
// Check if we should quit
|
|
select {
|
|
case <-stopCh:
|
|
return nil, nil
|
|
default:
|
|
}
|
|
|
|
// Handle the one-shot mode.
|
|
if s.opts.SemaphoreTryOnce && attempts > 0 {
|
|
elapsed := time.Since(start)
|
|
if elapsed > s.opts.SemaphoreWaitTime {
|
|
return nil, nil
|
|
}
|
|
|
|
// Query wait time should not exceed the semaphore wait time
|
|
qOpts.WaitTime = s.opts.SemaphoreWaitTime - elapsed
|
|
}
|
|
attempts++
|
|
|
|
// Read the prefix
|
|
pairs, meta, err := kv.List(s.opts.Prefix, &qOpts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read prefix: %v", err)
|
|
}
|
|
|
|
// Decode the lock
|
|
lockPair := s.findLock(pairs)
|
|
if lockPair.Flags != SemaphoreFlagValue {
|
|
return nil, ErrSemaphoreConflict
|
|
}
|
|
lock, err := s.decodeLock(lockPair)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Verify we agree with the limit
|
|
if lock.Limit != s.opts.Limit {
|
|
return nil, fmt.Errorf("semaphore limit conflict (lock: %d, local: %d)",
|
|
lock.Limit, s.opts.Limit)
|
|
}
|
|
|
|
// Prune the dead holders
|
|
s.pruneDeadHolders(lock, pairs)
|
|
|
|
// Check if the lock is held
|
|
if len(lock.Holders) >= lock.Limit {
|
|
qOpts.WaitIndex = meta.LastIndex
|
|
goto WAIT
|
|
}
|
|
|
|
// Create a new lock with us as a holder
|
|
lock.Holders[s.lockSession] = true
|
|
newLock, err := s.encodeLock(lock, lockPair.ModifyIndex)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Attempt the acquisition
|
|
didSet, _, err := kv.CAS(newLock, &wOpts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to update lock: %v", err)
|
|
}
|
|
if !didSet {
|
|
// Update failed, could have been a race with another contender,
|
|
// retry the operation
|
|
goto WAIT
|
|
}
|
|
|
|
// Watch to ensure we maintain ownership of the slot
|
|
lockCh := make(chan struct{})
|
|
go s.monitorLock(s.lockSession, lockCh)
|
|
|
|
// Set that we own the lock
|
|
s.isHeld = true
|
|
|
|
// Acquired! All done
|
|
return lockCh, nil
|
|
}
|
|
|
|
// Release is used to voluntarily give up our semaphore slot. It is
|
|
// an error to call this if the semaphore has not been acquired.
|
|
func (s *Semaphore) Release() error {
|
|
// Hold the lock as we try to release
|
|
s.l.Lock()
|
|
defer s.l.Unlock()
|
|
|
|
// Ensure the lock is actually held
|
|
if !s.isHeld {
|
|
return ErrSemaphoreNotHeld
|
|
}
|
|
|
|
// Set that we no longer own the lock
|
|
s.isHeld = false
|
|
|
|
// Stop the session renew
|
|
if s.sessionRenew != nil {
|
|
defer func() {
|
|
close(s.sessionRenew)
|
|
s.sessionRenew = nil
|
|
}()
|
|
}
|
|
|
|
// Get and clear the lock session
|
|
lockSession := s.lockSession
|
|
s.lockSession = ""
|
|
|
|
// Remove ourselves as a lock holder
|
|
kv := s.c.KV()
|
|
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
|
|
|
|
wOpts := WriteOptions{Namespace: s.opts.Namespace}
|
|
qOpts := QueryOptions{Namespace: s.opts.Namespace}
|
|
|
|
READ:
|
|
pair, _, err := kv.Get(key, &qOpts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if pair == nil {
|
|
pair = &KVPair{}
|
|
}
|
|
lock, err := s.decodeLock(pair)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create a new lock without us as a holder
|
|
if _, ok := lock.Holders[lockSession]; ok {
|
|
delete(lock.Holders, lockSession)
|
|
newLock, err := s.encodeLock(lock, pair.ModifyIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Swap the locks
|
|
didSet, _, err := kv.CAS(newLock, &wOpts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update lock: %v", err)
|
|
}
|
|
if !didSet {
|
|
goto READ
|
|
}
|
|
}
|
|
|
|
// Destroy the contender entry
|
|
contenderKey := path.Join(s.opts.Prefix, lockSession)
|
|
if _, err := kv.Delete(contenderKey, &wOpts); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Destroy is used to cleanup the semaphore entry. It is not necessary
|
|
// to invoke. It will fail if the semaphore is in use.
|
|
func (s *Semaphore) Destroy() error {
|
|
// Hold the lock as we try to acquire
|
|
s.l.Lock()
|
|
defer s.l.Unlock()
|
|
|
|
// Check if we already hold the semaphore
|
|
if s.isHeld {
|
|
return ErrSemaphoreHeld
|
|
}
|
|
|
|
// List for the semaphore
|
|
kv := s.c.KV()
|
|
|
|
q := QueryOptions{Namespace: s.opts.Namespace}
|
|
pairs, _, err := kv.List(s.opts.Prefix, &q)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read prefix: %v", err)
|
|
}
|
|
|
|
// Find the lock pair, bail if it doesn't exist
|
|
lockPair := s.findLock(pairs)
|
|
if lockPair.ModifyIndex == 0 {
|
|
return nil
|
|
}
|
|
if lockPair.Flags != SemaphoreFlagValue {
|
|
return ErrSemaphoreConflict
|
|
}
|
|
|
|
// Decode the lock
|
|
lock, err := s.decodeLock(lockPair)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Prune the dead holders
|
|
s.pruneDeadHolders(lock, pairs)
|
|
|
|
// Check if there are any holders
|
|
if len(lock.Holders) > 0 {
|
|
return ErrSemaphoreInUse
|
|
}
|
|
|
|
// Attempt the delete
|
|
w := WriteOptions{Namespace: s.opts.Namespace}
|
|
didRemove, _, err := kv.DeleteCAS(lockPair, &w)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to remove semaphore: %v", err)
|
|
}
|
|
if !didRemove {
|
|
return ErrSemaphoreInUse
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// createSession is used to create a new managed session
|
|
func (s *Semaphore) createSession() (string, error) {
|
|
session := s.c.Session()
|
|
se := &SessionEntry{
|
|
Name: s.opts.SessionName,
|
|
TTL: s.opts.SessionTTL,
|
|
Behavior: SessionBehaviorDelete,
|
|
}
|
|
|
|
w := WriteOptions{Namespace: s.opts.Namespace}
|
|
id, _, err := session.Create(se, &w)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
// contenderEntry returns a formatted KVPair for the contender
|
|
func (s *Semaphore) contenderEntry(session string) *KVPair {
|
|
return &KVPair{
|
|
Key: path.Join(s.opts.Prefix, session),
|
|
Value: s.opts.Value,
|
|
Session: session,
|
|
Flags: SemaphoreFlagValue,
|
|
}
|
|
}
|
|
|
|
// findLock is used to find the KV Pair which is used for coordination
|
|
func (s *Semaphore) findLock(pairs KVPairs) *KVPair {
|
|
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
|
|
for _, pair := range pairs {
|
|
if pair.Key == key {
|
|
return pair
|
|
}
|
|
}
|
|
return &KVPair{Flags: SemaphoreFlagValue}
|
|
}
|
|
|
|
// decodeLock is used to decode a semaphoreLock from an
|
|
// entry in Consul
|
|
func (s *Semaphore) decodeLock(pair *KVPair) (*semaphoreLock, error) {
|
|
// Handle if there is no lock
|
|
if pair == nil || pair.Value == nil {
|
|
return &semaphoreLock{
|
|
Limit: s.opts.Limit,
|
|
Holders: make(map[string]bool),
|
|
}, nil
|
|
}
|
|
|
|
l := &semaphoreLock{}
|
|
if err := json.Unmarshal(pair.Value, l); err != nil {
|
|
return nil, fmt.Errorf("lock decoding failed: %v", err)
|
|
}
|
|
return l, nil
|
|
}
|
|
|
|
// encodeLock is used to encode a semaphoreLock into a KVPair
|
|
// that can be PUT
|
|
func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, error) {
|
|
enc, err := json.Marshal(l)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("lock encoding failed: %v", err)
|
|
}
|
|
pair := &KVPair{
|
|
Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey),
|
|
Value: enc,
|
|
Flags: SemaphoreFlagValue,
|
|
ModifyIndex: oldIndex,
|
|
}
|
|
return pair, nil
|
|
}
|
|
|
|
// pruneDeadHolders is used to remove all the dead lock holders
|
|
func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) {
|
|
// Gather all the live holders
|
|
alive := make(map[string]struct{}, len(pairs))
|
|
for _, pair := range pairs {
|
|
if pair.Session != "" {
|
|
alive[pair.Session] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Remove any holders that are dead
|
|
for holder := range lock.Holders {
|
|
if _, ok := alive[holder]; !ok {
|
|
delete(lock.Holders, holder)
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorLock is a long running routine to monitor a semaphore ownership
|
|
// It closes the stopCh if we lose our slot.
|
|
func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) {
|
|
defer close(stopCh)
|
|
kv := s.c.KV()
|
|
opts := QueryOptions{
|
|
RequireConsistent: true,
|
|
Namespace: s.opts.Namespace,
|
|
}
|
|
WAIT:
|
|
retries := s.opts.MonitorRetries
|
|
RETRY:
|
|
pairs, meta, err := kv.List(s.opts.Prefix, &opts)
|
|
if err != nil {
|
|
// If configured we can try to ride out a brief Consul unavailability
|
|
// by doing retries. Note that we have to attempt the retry in a non-
|
|
// blocking fashion so that we have a clean place to reset the retry
|
|
// counter if service is restored.
|
|
if retries > 0 && IsRetryableError(err) {
|
|
time.Sleep(s.opts.MonitorRetryTime)
|
|
retries--
|
|
opts.WaitIndex = 0
|
|
goto RETRY
|
|
}
|
|
return
|
|
}
|
|
lockPair := s.findLock(pairs)
|
|
lock, err := s.decodeLock(lockPair)
|
|
if err != nil {
|
|
return
|
|
}
|
|
s.pruneDeadHolders(lock, pairs)
|
|
if _, ok := lock.Holders[session]; ok {
|
|
opts.WaitIndex = meta.LastIndex
|
|
goto WAIT
|
|
}
|
|
}
|