consul: Adding support for lock-delay in sessions

pull/162/head
Armon Dadgar 11 years ago
parent c071932f92
commit e0abf2e92c

@ -25,6 +25,23 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
return fmt.Errorf("Must provide key")
}
// If this is a lock, we must check for a lock-delay. Since lock-delay
// is based on wall-time, each peer expire the lock-delay at a slightly
// different time. This means the enforcement of lock-delay cannot be done
// after the raft log is committed as it would lead to inconsistent FSMs.
// Instead, the lock-delay must be enforced before commit. This means that
// only the wall-time of the leader node is used, preventing any inconsistencies.
if args.Op == structs.KVSLock {
state := k.srv.fsm.State()
expires := state.KVSLockDelay(args.DirEnt.Key)
if expires.After(time.Now()) {
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
args.DirEnt.Key, expires)
*reply = false
return nil
}
}
// Apply the update
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {

@ -5,6 +5,7 @@ import (
"github.com/hashicorp/consul/testutil"
"os"
"testing"
"time"
)
func TestKVS_Apply(t *testing.T) {
@ -224,5 +225,72 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
if dirent.Keys[2] != "/test/sub/" {
t.Fatalf("Bad: %v", dirent.Keys)
}
}
func TestKVS_Apply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// Create and invalidate a session with a lock
state := s1.fsm.State()
if err := state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := state.SessionCreate(2, session); err != nil {
t.Fatalf("err: %v", err)
}
id := session.ID
d := &structs.DirEntry{
Key: "test",
Session: id,
}
if ok, err := state.KVSLock(3, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
if err := state.SessionDestroy(4, id); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "test",
Session: validId,
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out != false {
t.Fatalf("should not acquire")
}
// Wait for lock-delay
time.Sleep(50 * time.Millisecond)
// Should acquire
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out != true {
t.Fatalf("should acquire")
}
}

@ -10,6 +10,8 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"
)
const (
@ -54,6 +56,22 @@ type StateStore struct {
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
// lockDelay is used to mark certain locks as unacquirable.
// When a lock is forcefully released (failing health
// check, destroyed session, etc), it is subject to the LockDelay
// impossed by the session. This prevents another session from
// acquiring the lock for some period of time as a protection against
// split-brains. This is inspired by the lock-delay in Chubby.
// Because this relies on wall-time, we cannot assume all peers
// perceive time as flowing uniformly. This means KVSLock MUST ignore
// lockDelay, since the lockDelay may have expired on the leader,
// but not on the follower. Rejecting the lock could result in
// inconsistencies in the FSMs due to the rate time progresses. Instead,
// only the opinion of the leader is respected, and the Raft log
// is never questioned.
lockDelay map[string]time.Time
lockDelayLock sync.RWMutex
}
// StateSnapshot is used to provide a point-in-time snapshot
@ -98,6 +116,7 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
path: path,
env: env,
watch: make(map[*MDBTable]*NotifyGroup),
lockDelay: make(map[string]time.Time),
}
// Ensure we can initialize
@ -1076,6 +1095,17 @@ func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error)
return s.kvsSet(index, d, kvUnlock)
}
// KVSLockDelay returns the expiration time of a key lock delay. A key may
// have a lock delay if it was unlocked due to a session invalidation instead
// of a graceful unlock. This must be checked on the leader node, and not in
// KVSLock due to the variability of clocks.
func (s *StateStore) KVSLockDelay(key string) time.Time {
s.lockDelayLock.RLock()
expires := s.lockDelay[key]
s.lockDelayLock.RUnlock()
return expires
}
// kvsSet is the internal setter
func (s *StateStore) kvsSet(
index uint64,
@ -1367,8 +1397,14 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
}
session := res[0].(*structs.Session)
// Enforce the MaxLockDelay
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}
// Invalidate any held locks
if err := s.invalidateLocks(index, tx, id); err != nil {
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
return err
}
@ -1395,11 +1431,20 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
// invalidateLocks is used to invalidate all the locks held by a session
// within a given txn. All tables should be locked in the tx.
func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error {
func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
lockDelay time.Duration, id string) error {
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
if err != nil {
return err
}
var expires time.Time
if lockDelay > 0 {
s.lockDelayLock.Lock()
defer s.lockDelayLock.Unlock()
expires = time.Now().Add(lockDelay)
}
for _, pair := range pairs {
kv := pair.(*structs.DirEntry)
kv.Session = "" // Clear the lock
@ -1407,6 +1452,16 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error
if err := s.kvsTable.InsertTxn(tx, kv); err != nil {
return err
}
// If there is a lock delay, prevent acquisition
// for at least lockDelay period
if lockDelay > 0 {
s.lockDelay[kv.Key] = expires
time.AfterFunc(lockDelay, func() {
s.lockDelayLock.Lock()
delete(s.lockDelay, kv.Key)
s.lockDelayLock.Unlock()
})
}
}
if len(pairs) > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {

@ -6,6 +6,7 @@ import (
"reflect"
"sort"
"testing"
"time"
)
func testStateStore() (*StateStore, error) {
@ -2059,7 +2060,7 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
session := &structs.Session{Node: "foo", LockDelay: 50 * time.Millisecond}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
@ -2095,4 +2096,10 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
if d2.Session != "" {
t.Fatalf("bad: %v", *d2)
}
// Key should have a lock delay
expires := store.KVSLockDelay("/foo")
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
t.Fatalf("Bad: %v", expires)
}
}

@ -29,6 +29,12 @@ const (
HealthCritical = "critical"
)
const (
// MaxLockDelay provides a maximum LockDelay value for
// a session. Any value above this will not be respected.
MaxLockDelay = 60 * time.Second
)
// RPCInfo is used to describe common information about query
type RPCInfo interface {
RequestDatacenter() string
@ -343,10 +349,11 @@ type IndexedKeyList struct {
// Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks.
type Session struct {
CreateIndex uint64
ID string
Node string
Checks []string
CreateIndex uint64
LockDelay time.Duration
}
type Sessions []*Session

Loading…
Cancel
Save