From e0abf2e92cce6404610cc07599e70b83a9603126 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 May 2014 12:50:29 -0700 Subject: [PATCH] consul: Adding support for lock-delay in sessions --- consul/kvs_endpoint.go | 17 ++++++++++ consul/kvs_endpoint_test.go | 68 +++++++++++++++++++++++++++++++++++++ consul/state_store.go | 67 ++++++++++++++++++++++++++++++++---- consul/state_store_test.go | 9 ++++- consul/structs/structs.go | 9 ++++- 5 files changed, 162 insertions(+), 8 deletions(-) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index fde952432f..91d8f3bdea 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -25,6 +25,23 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { return fmt.Errorf("Must provide key") } + // If this is a lock, we must check for a lock-delay. Since lock-delay + // is based on wall-time, each peer expire the lock-delay at a slightly + // different time. This means the enforcement of lock-delay cannot be done + // after the raft log is committed as it would lead to inconsistent FSMs. + // Instead, the lock-delay must be enforced before commit. This means that + // only the wall-time of the leader node is used, preventing any inconsistencies. + if args.Op == structs.KVSLock { + state := k.srv.fsm.State() + expires := state.KVSLockDelay(args.DirEnt.Key) + if expires.After(time.Now()) { + k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v", + args.DirEnt.Key, expires) + *reply = false + return nil + } + } + // Apply the update resp, err := k.srv.raftApply(structs.KVSRequestType, args) if err != nil { diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index 69b8fdfbaa..c4131fdcbd 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/consul/testutil" "os" "testing" + "time" ) func TestKVS_Apply(t *testing.T) { @@ -224,5 +225,72 @@ func TestKVSEndpoint_ListKeys(t *testing.T) { if dirent.Keys[2] != "/test/sub/" { t.Fatalf("Bad: %v", dirent.Keys) } +} + +func TestKVS_Apply_LockDelay(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + // Create and invalidate a session with a lock + state := s1.fsm.State() + if err := state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{ + Node: "foo", + LockDelay: 50 * time.Millisecond, + } + if err := state.SessionCreate(2, session); err != nil { + t.Fatalf("err: %v", err) + } + id := session.ID + d := &structs.DirEntry{ + Key: "test", + Session: id, + } + if ok, err := state.KVSLock(3, d); err != nil || !ok { + t.Fatalf("err: %v", err) + } + if err := state.SessionDestroy(4, id); err != nil { + t.Fatalf("err: %v", err) + } + // Make a new session that is valid + if err := state.SessionCreate(5, session); err != nil { + t.Fatalf("err: %v", err) + } + validId := session.ID + + // Make a lock request + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "test", + Session: validId, + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + if out != false { + t.Fatalf("should not acquire") + } + + // Wait for lock-delay + time.Sleep(50 * time.Millisecond) + + // Should acquire + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + if out != true { + t.Fatalf("should acquire") + } } diff --git a/consul/state_store.go b/consul/state_store.go index eb27e62fa5..1b137f33ad 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -10,6 +10,8 @@ import ( "os" "runtime" "strings" + "sync" + "time" ) const ( @@ -54,6 +56,22 @@ type StateStore struct { tables MDBTables watch map[*MDBTable]*NotifyGroup queryTables map[string]MDBTables + + // lockDelay is used to mark certain locks as unacquirable. + // When a lock is forcefully released (failing health + // check, destroyed session, etc), it is subject to the LockDelay + // impossed by the session. This prevents another session from + // acquiring the lock for some period of time as a protection against + // split-brains. This is inspired by the lock-delay in Chubby. + // Because this relies on wall-time, we cannot assume all peers + // perceive time as flowing uniformly. This means KVSLock MUST ignore + // lockDelay, since the lockDelay may have expired on the leader, + // but not on the follower. Rejecting the lock could result in + // inconsistencies in the FSMs due to the rate time progresses. Instead, + // only the opinion of the leader is respected, and the Raft log + // is never questioned. + lockDelay map[string]time.Time + lockDelayLock sync.RWMutex } // StateSnapshot is used to provide a point-in-time snapshot @@ -94,10 +112,11 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) { } s := &StateStore{ - logger: log.New(logOutput, "", log.LstdFlags), - path: path, - env: env, - watch: make(map[*MDBTable]*NotifyGroup), + logger: log.New(logOutput, "", log.LstdFlags), + path: path, + env: env, + watch: make(map[*MDBTable]*NotifyGroup), + lockDelay: make(map[string]time.Time), } // Ensure we can initialize @@ -1076,6 +1095,17 @@ func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) return s.kvsSet(index, d, kvUnlock) } +// KVSLockDelay returns the expiration time of a key lock delay. A key may +// have a lock delay if it was unlocked due to a session invalidation instead +// of a graceful unlock. This must be checked on the leader node, and not in +// KVSLock due to the variability of clocks. +func (s *StateStore) KVSLockDelay(key string) time.Time { + s.lockDelayLock.RLock() + expires := s.lockDelay[key] + s.lockDelayLock.RUnlock() + return expires +} + // kvsSet is the internal setter func (s *StateStore) kvsSet( index uint64, @@ -1367,8 +1397,14 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro } session := res[0].(*structs.Session) + // Enforce the MaxLockDelay + delay := session.LockDelay + if delay > structs.MaxLockDelay { + delay = structs.MaxLockDelay + } + // Invalidate any held locks - if err := s.invalidateLocks(index, tx, id); err != nil { + if err := s.invalidateLocks(index, tx, delay, id); err != nil { return err } @@ -1395,11 +1431,20 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro // invalidateLocks is used to invalidate all the locks held by a session // within a given txn. All tables should be locked in the tx. -func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error { +func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, + lockDelay time.Duration, id string) error { pairs, err := s.kvsTable.GetTxn(tx, "session", id) if err != nil { return err } + + var expires time.Time + if lockDelay > 0 { + s.lockDelayLock.Lock() + defer s.lockDelayLock.Unlock() + expires = time.Now().Add(lockDelay) + } + for _, pair := range pairs { kv := pair.(*structs.DirEntry) kv.Session = "" // Clear the lock @@ -1407,6 +1452,16 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error if err := s.kvsTable.InsertTxn(tx, kv); err != nil { return err } + // If there is a lock delay, prevent acquisition + // for at least lockDelay period + if lockDelay > 0 { + s.lockDelay[kv.Key] = expires + time.AfterFunc(lockDelay, func() { + s.lockDelayLock.Lock() + delete(s.lockDelay, kv.Key) + s.lockDelayLock.Unlock() + }) + } } if len(pairs) > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { diff --git a/consul/state_store_test.go b/consul/state_store_test.go index d3bc1667cd..308335826c 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -6,6 +6,7 @@ import ( "reflect" "sort" "testing" + "time" ) func testStateStore() (*StateStore, error) { @@ -2059,7 +2060,7 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) { if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - session := &structs.Session{Node: "foo"} + session := &structs.Session{Node: "foo", LockDelay: 50 * time.Millisecond} if err := store.SessionCreate(4, session); err != nil { t.Fatalf("err: %v", err) } @@ -2095,4 +2096,10 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) { if d2.Session != "" { t.Fatalf("bad: %v", *d2) } + + // Key should have a lock delay + expires := store.KVSLockDelay("/foo") + if expires.Before(time.Now().Add(30 * time.Millisecond)) { + t.Fatalf("Bad: %v", expires) + } } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 0673ba39f7..b08a1c8e46 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -29,6 +29,12 @@ const ( HealthCritical = "critical" ) +const ( + // MaxLockDelay provides a maximum LockDelay value for + // a session. Any value above this will not be respected. + MaxLockDelay = 60 * time.Second +) + // RPCInfo is used to describe common information about query type RPCInfo interface { RequestDatacenter() string @@ -343,10 +349,11 @@ type IndexedKeyList struct { // Session is used to represent an open session in the KV store. // This issued to associate node checks with acquired locks. type Session struct { + CreateIndex uint64 ID string Node string Checks []string - CreateIndex uint64 + LockDelay time.Duration } type Sessions []*Session