api: Detect conflicting existing values for lock/semaphore

pull/619/head
Armon Dadgar 2015-01-19 15:32:19 -10:00
parent 08c0a81e4f
commit 14691f7e29
5 changed files with 111 additions and 1 deletions

View File

@ -22,6 +22,7 @@ var consulConfig = `{
"serf_wan": 18400,
"server": 18000
},
"bind_addr": "127.0.0.1",
"data_dir": "%s",
"bootstrap": true,
"log_level": "debug",

View File

@ -24,6 +24,11 @@ const (
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultLockRetryTime = 5 * time.Second
// LockFlagValue is a magic flag we set to indicate a key
// is being used for a semaphore. It is used to detect a potential
// conflict with a lock.
LockFlagValue = 0x2ddccbc058a50c18
)
var (
@ -37,6 +42,10 @@ var (
// ErrLockInUse is returned if we attempt to destroy a lock
// that is in use.
ErrLockInUse = fmt.Errorf("Lock in use")
// ErrLockConflict is returned if the flags on a key
// used for a lock do not match expectation
ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
)
// Lock is used to implement client-side leader election. It is follows the
@ -152,6 +161,9 @@ WAIT:
if err != nil {
return nil, fmt.Errorf("failed to read lock: %v", err)
}
if pair != nil && pair.Flags != LockFlagValue {
return nil, ErrLockConflict
}
if pair != nil && pair.Session != "" {
qOpts.WaitIndex = meta.LastIndex
goto WAIT
@ -245,6 +257,11 @@ func (l *Lock) Destroy() error {
return nil
}
// Check for possible flag conflict
if pair.Flags != LockFlagValue {
return ErrLockConflict
}
// Check if it is in use
if pair.Session != "" {
return ErrLockInUse
@ -281,6 +298,7 @@ func (l *Lock) lockEntry(session string) *KVPair {
Key: l.opts.Key,
Value: l.opts.Value,
Session: session,
Flags: LockFlagValue,
}
}

View File

@ -250,3 +250,40 @@ func TestLock_Destroy(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestLock_Conflict(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
sema, err := c.SemaphorePrefix("test/lock/", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
lockCh, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if lockCh == nil {
t.Fatalf("not hold")
}
defer sema.Release()
lock, err := c.LockKey("test/lock/.lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should conflict with semaphore
_, err = lock.Lock(nil)
if err != ErrLockConflict {
t.Fatalf("err: %v", err)
}
// Should conflict with semaphore
err = lock.Destroy()
if err != ErrLockConflict {
t.Fatalf("err: %v", err)
}
}

View File

@ -30,6 +30,11 @@ const (
// DefaultSemaphoreKey is the key used within the prefix to
// use for coordination between all the contenders.
DefaultSemaphoreKey = ".lock"
// SemaphoreFlagValue is a magic flag we set to indicate a key
// is being used for a semaphore. It is used to detect a potential
// conflict with a lock.
SemaphoreFlagValue = 0xe0f69a2baa414de0
)
var (
@ -43,6 +48,10 @@ var (
// ErrSemaphoreInUse is returned if we attempt to destroy a semaphore
// that is in use.
ErrSemaphoreInUse = fmt.Errorf("Semaphore in use")
// ErrSemaphoreConflict is returned if the flags on a key
// used for a semaphore do not match expectation
ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use")
)
// Semaphore is used to implement a distributed semaphore
@ -186,6 +195,9 @@ WAIT:
// Decode the lock
lockPair := s.findLock(pairs)
if lockPair.Flags != SemaphoreFlagValue {
return nil, ErrSemaphoreConflict
}
lock, err := s.decodeLock(lockPair)
if err != nil {
return nil, err
@ -328,6 +340,9 @@ func (s *Semaphore) Destroy() error {
if lockPair.ModifyIndex == 0 {
return nil
}
if lockPair.Flags != SemaphoreFlagValue {
return ErrSemaphoreConflict
}
// Decode the lock
lock, err := s.decodeLock(lockPair)
@ -399,6 +414,7 @@ func (s *Semaphore) contenderEntry(session string) *KVPair {
Key: path.Join(s.opts.Prefix, session),
Value: s.opts.Value,
Session: session,
Flags: SemaphoreFlagValue,
}
}
@ -410,7 +426,7 @@ func (s *Semaphore) findLock(pairs KVPairs) *KVPair {
return pair
}
}
return &KVPair{}
return &KVPair{Flags: SemaphoreFlagValue}
}
// decodeLock is used to decode a semaphoreLock from an
@ -441,6 +457,7 @@ func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, erro
pair := &KVPair{
Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey),
Value: enc,
Flags: SemaphoreFlagValue,
ModifyIndex: oldIndex,
}
return pair, nil

View File

@ -267,3 +267,40 @@ func TestSemaphore_Destroy(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestSemaphore_Conflict(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
lock, err := c.LockKey("test/sema/.lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
sema, err := c.SemaphorePrefix("test/sema/", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should conflict with lock
_, err = sema.Acquire(nil)
if err != ErrSemaphoreConflict {
t.Fatalf("err: %v", err)
}
// Should conflict with lock
err = sema.Destroy()
if err != ErrSemaphoreConflict {
t.Fatalf("err: %v", err)
}
}