mirror of https://github.com/hashicorp/consul
consul/state: basic k/v operations
parent
f05a322dc7
commit
08d41224a3
|
@ -702,3 +702,55 @@ func (s *StateStore) parseNodes(
|
||||||
}
|
}
|
||||||
return lindex, results, nil
|
return lindex, results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVSSet is used to store a key/value pair.
|
||||||
|
func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
return s.kvsSetTxn(idx, entry, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// kvsSetTxn is used to insert or update a key/value pair in the state
|
||||||
|
// store. It is the inner method used and handles only the actual storage.
|
||||||
|
func (s *StateStore) kvsSetTxn(
|
||||||
|
idx uint64, entry *structs.DirEntry,
|
||||||
|
tx *memdb.Txn) error {
|
||||||
|
|
||||||
|
// Retrieve an existing KV pair
|
||||||
|
existing, err := tx.First("kvs", "id", entry.Key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed key lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the indexes
|
||||||
|
if existing != nil {
|
||||||
|
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex
|
||||||
|
entry.ModifyIndex = idx
|
||||||
|
} else {
|
||||||
|
entry.CreateIndex = idx
|
||||||
|
entry.ModifyIndex = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the kv pair in the state store and update the index
|
||||||
|
if err := tx.Insert("kvs", entry); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting kv entry: %s", err)
|
||||||
|
}
|
||||||
|
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSGet is used to retrieve a key/value pair from the state store.
|
||||||
|
func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
entry, err := tx.First("kvs", "id", key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed key lookup: %s", err)
|
||||||
|
}
|
||||||
|
return entry.(*structs.DirEntry), nil
|
||||||
|
}
|
||||||
|
|
|
@ -775,3 +775,56 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||||
t.Fatalf("bad: %#v", dump[0].Services[0])
|
t.Fatalf("bad: %#v", dump[0].Services[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateStore_KVSSet(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Write a new K/V entry to the store
|
||||||
|
entry := &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("bar"),
|
||||||
|
}
|
||||||
|
if err := s.KVSSet(1, entry); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the K/V entry again
|
||||||
|
result, err := s.KVSGet("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if result == nil {
|
||||||
|
t.Fatalf("expected k/v pair, got nothing")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the index was injected into the result
|
||||||
|
if result.CreateIndex != 1 || result.ModifyIndex != 1 {
|
||||||
|
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the value matches
|
||||||
|
if v := string(result.Value); v != "bar" {
|
||||||
|
t.Fatalf("expected 'bar', got: '%s'", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updating the entry works and changes the index
|
||||||
|
update := &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("baz"),
|
||||||
|
}
|
||||||
|
if err := s.KVSSet(2, update); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the kv pair and check
|
||||||
|
result, err = s.KVSGet("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if result.CreateIndex != 1 || result.ModifyIndex != 2 {
|
||||||
|
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
|
||||||
|
}
|
||||||
|
if v := string(result.Value); v != "baz" {
|
||||||
|
t.Fatalf("expected 'baz', got '%s'", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -348,13 +348,13 @@ type IndexedNodeDump struct {
|
||||||
// DirEntry is used to represent a directory entry. This is
|
// DirEntry is used to represent a directory entry. This is
|
||||||
// used for values in our Key-Value store.
|
// used for values in our Key-Value store.
|
||||||
type DirEntry struct {
|
type DirEntry struct {
|
||||||
CreateIndex uint64
|
LockIndex uint64
|
||||||
ModifyIndex uint64
|
Key string
|
||||||
LockIndex uint64
|
Flags uint64
|
||||||
Key string
|
Value []byte
|
||||||
Flags uint64
|
Session string `json:",omitempty"`
|
||||||
Value []byte
|
|
||||||
Session string `json:",omitempty"`
|
RaftIndex
|
||||||
}
|
}
|
||||||
type DirEntries []*DirEntry
|
type DirEntries []*DirEntry
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue