diff --git a/consul/state/kvs.go b/consul/state/kvs.go index 577dbb89b6..43d08e33cb 100644 --- a/consul/state/kvs.go +++ b/consul/state/kvs.go @@ -123,6 +123,12 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { tx := s.db.Txn(false) defer tx.Abort() + return s.kvsGetTxn(tx, key) +} + +// kvsGetTxn is the inner method that gets a KVS entry inside an existing +// transaction. +func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirEntry, error) { // Get the table index. idx := maxIndexTxn(tx, "kvs", "tombstones") @@ -313,6 +319,18 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() + set, err := s.kvsDeleteCASTxn(tx, idx, cidx, key) + if !set || err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing +// transaction. +func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) { // Retrieve the existing kvs entry, if any exists. entry, err := tx.First("kvs", "id", key) if err != nil { @@ -331,8 +349,6 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { if err := s.kvsDeleteTxn(tx, idx, key); err != nil { return false, err } - - tx.Commit() return true, nil } @@ -344,6 +360,18 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error tx := s.db.Txn(true) defer tx.Abort() + set, err := s.kvsSetCASTxn(tx, idx, entry) + if !set || err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +// kvsSetCASTxn is the inner method used to do a CAS inside an existing +// transaction. +func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { // Retrieve the existing entry. existing, err := tx.First("kvs", "id", entry.Key) if err != nil { @@ -367,8 +395,6 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error if err := s.kvsSetTxn(tx, idx, entry, false); err != nil { return false, err } - - tx.Commit() return true, nil } @@ -379,6 +405,17 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { tx := s.db.Txn(true) defer tx.Abort() + if err := s.kvsDeleteTreeTxn(tx, idx, prefix); err != nil { + return err + } + + tx.Commit() + return nil +} + +// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an +// existing transaction. +func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error { // Get an iterator over all of the keys with the given prefix. entries, err := tx.Get("kvs", "id_prefix", prefix) if err != nil { @@ -414,8 +451,6 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { return fmt.Errorf("failed updating index: %s", err) } } - - tx.Commit() return nil } @@ -431,6 +466,18 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) tx := s.db.Txn(true) defer tx.Abort() + locked, err := s.kvsLockTxn(tx, idx, entry) + if !locked || err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +// kvsLockTxn is the inner method that does a lock inside an existing +// transaction. +func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { // Verify that a session is present. if entry.Session == "" { return false, fmt.Errorf("missing session") @@ -476,8 +523,6 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { return false, err } - - tx.Commit() return true, nil } @@ -487,6 +532,18 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error tx := s.db.Txn(true) defer tx.Abort() + unlocked, err := s.kvsUnlockTxn(tx, idx, entry) + if !unlocked || err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +// kvsUnlockTxn is the inner method that does an unlock inside an existing +// transaction. +func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { // Verify that a session is present. if entry.Session == "" { return false, fmt.Errorf("missing session") @@ -519,7 +576,5 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { return false, err } - - tx.Commit() return true, nil }