mirror of https://github.com/hashicorp/consul
consul: Ensure KVS List handles tombstones
parent
eb2df41171
commit
b4292640a5
|
@ -135,32 +135,36 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
state.QueryTables("KVSList"),
|
state.QueryTables("KVSList"),
|
||||||
func() error {
|
func() error {
|
||||||
index, ent, err := state.KVSList(args.Key)
|
tombIndex, index, ent, err := state.KVSList(args.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if acl != nil {
|
if acl != nil {
|
||||||
ent = FilterDirEnt(acl, ent)
|
ent = FilterDirEnt(acl, ent)
|
||||||
}
|
}
|
||||||
if len(ent) == 0 {
|
|
||||||
// Must provide non-zero index to prevent blocking
|
|
||||||
// Index 1 is impossible anyways (due to Raft internals)
|
|
||||||
if index == 0 {
|
|
||||||
reply.Index = 1
|
|
||||||
} else {
|
|
||||||
reply.Index = index
|
|
||||||
}
|
|
||||||
reply.Entries = nil
|
|
||||||
} else {
|
|
||||||
// Determine the maximum affected index
|
|
||||||
var maxIndex uint64
|
|
||||||
for _, e := range ent {
|
|
||||||
if e.ModifyIndex > maxIndex {
|
|
||||||
maxIndex = e.ModifyIndex
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
reply.Index = maxIndex
|
// Determine the maximum affected index
|
||||||
|
var maxIndex uint64
|
||||||
|
for _, e := range ent {
|
||||||
|
if e.ModifyIndex > maxIndex {
|
||||||
|
maxIndex = e.ModifyIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if tombIndex > maxIndex {
|
||||||
|
maxIndex = tombIndex
|
||||||
|
}
|
||||||
|
// Must provide non-zero index to prevent blocking
|
||||||
|
// Index 1 is impossible anyways (due to Raft internals)
|
||||||
|
if maxIndex == 0 {
|
||||||
|
if index > 0 {
|
||||||
|
maxIndex = index
|
||||||
|
} else {
|
||||||
|
maxIndex = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reply.Index = maxIndex
|
||||||
|
|
||||||
|
if len(ent) != 0 {
|
||||||
reply.Entries = ent
|
reply.Entries = ent
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1115,13 +1115,39 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVSList is used to list all KV entries with a prefix
|
// KVSList is used to list all KV entries with a prefix
|
||||||
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
|
func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) {
|
||||||
idx, res, err := s.kvsTable.Get("id_prefix", prefix)
|
tables := MDBTables{s.kvsTable, s.tombstoneTable}
|
||||||
|
tx, err := tables.StartTxn(true)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
idx, err := tables.LastIndexTxn(tx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
ents := make(structs.DirEntries, len(res))
|
ents := make(structs.DirEntries, len(res))
|
||||||
for idx, r := range res {
|
for idx, r := range res {
|
||||||
ents[idx] = r.(*structs.DirEntry)
|
ents[idx] = r.(*structs.DirEntry)
|
||||||
}
|
}
|
||||||
return idx, ents, err
|
|
||||||
|
// Check for the higest index in the tombstone table
|
||||||
|
var maxIndex uint64
|
||||||
|
res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix)
|
||||||
|
for _, r := range res {
|
||||||
|
ent := r.(*structs.DirEntry)
|
||||||
|
if ent.ModifyIndex > maxIndex {
|
||||||
|
maxIndex = ent.ModifyIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return maxIndex, idx, ents, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVSListKeys is used to list keys with a prefix, and up to a given seperator
|
// KVSListKeys is used to list keys with a prefix, and up to a given seperator
|
||||||
|
|
|
@ -1588,7 +1588,7 @@ func TestKVS_List(t *testing.T) {
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
// Should not exist
|
// Should not exist
|
||||||
idx, ents, err := store.KVSList("/web")
|
_, idx, ents, err := store.KVSList("/web")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1614,7 +1614,7 @@ func TestKVS_List(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should list
|
// Should list
|
||||||
idx, ents, err = store.KVSList("/web")
|
_, idx, ents, err = store.KVSList("/web")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1636,6 +1636,55 @@ func TestKVS_List(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestKVSList_TombstoneIndex(t *testing.T) {
|
||||||
|
store, err := testStateStore()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close()
|
||||||
|
|
||||||
|
// Create the entries
|
||||||
|
d := &structs.DirEntry{Key: "/web/a", Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1000, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d = &structs.DirEntry{Key: "/web/b", Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1001, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d = &structs.DirEntry{Key: "/web/c", Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1002, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nuke the last node
|
||||||
|
err = store.KVSDeleteTree(1003, "/web/c")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add another node
|
||||||
|
d = &structs.DirEntry{Key: "/other", Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1004, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// List should properly reflect tombstoned value
|
||||||
|
tombIdx, idx, ents, err := store.KVSList("/web")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 1004 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if tombIdx != 1003 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if len(ents) != 2 {
|
||||||
|
t.Fatalf("bad: %v", ents)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestKVS_ListKeys(t *testing.T) {
|
func TestKVS_ListKeys(t *testing.T) {
|
||||||
store, err := testStateStore()
|
store, err := testStateStore()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1852,13 +1901,16 @@ func TestKVSDeleteTree(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nothing should list
|
// Nothing should list
|
||||||
idx, ents, err := store.KVSList("/web")
|
tombIdx, idx, ents, err := store.KVSList("/web")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if idx != 1010 {
|
if idx != 1010 {
|
||||||
t.Fatalf("bad: %v", idx)
|
t.Fatalf("bad: %v", idx)
|
||||||
}
|
}
|
||||||
|
if tombIdx != 1010 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
if len(ents) != 0 {
|
if len(ents) != 0 {
|
||||||
t.Fatalf("bad: %v", ents)
|
t.Fatalf("bad: %v", ents)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue