diff --git a/consul/state/graveyard.go b/consul/state/graveyard.go index a500159cdf..487691f8d2 100644 --- a/consul/state/graveyard.go +++ b/consul/state/graveyard.go @@ -39,16 +39,16 @@ func (g *Graveyard) InsertTxn(tx *memdb.Txn, context string, idx uint64) error { // GetMaxIndexTxn returns the highest index tombstone whose key matches the // given context, using a prefix match. func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, context string) (uint64, error) { - stones, err := tx.Get(g.Table, "id", context) + stones, err := tx.Get(g.Table, "id_prefix", context) if err != nil { return 0, fmt.Errorf("failed querying tombstones: %s", err) } var lindex uint64 for stone := stones.Next(); stone != nil; stone = stones.Next() { - r := stone.(*Tombstone) - if r.Index > lindex { - lindex = r.Index + s := stone.(*Tombstone) + if s.Index > lindex { + lindex = s.Index } } return lindex, nil @@ -56,7 +56,7 @@ func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, context string) (uint64, error // DumpTxn returns all the tombstones. func (g *Graveyard) DumpTxn(tx *memdb.Txn) ([]*Tombstone, error) { - stones, err := tx.Get(g.Table, "id", "") + stones, err := tx.Get(g.Table, "id") if err != nil { return nil, fmt.Errorf("failed querying tombstones: %s", err) } @@ -87,7 +87,7 @@ func (g *Graveyard) ReapTxn(tx *memdb.Txn, idx uint64) error { // This does a full table scan since we currently can't index on a // numeric value. Since this is all in-memory and done infrequently // this pretty reasonable. - stones, err := tx.Get(g.Table, "id", "") + stones, err := tx.Get(g.Table, "id") if err != nil { return fmt.Errorf("failed querying tombstones: %s", err) } diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 2d9db05fac..8fd275e3f6 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -149,6 +149,63 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) { } } +func TestStateStore_ReapTombstones(t *testing.T) { + s := testStateStore(t) + + // Create some KV pairs. + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + testSetKey(t, s, 3, "foo/baz", "bar") + testSetKey(t, s, 4, "foo/moo", "bar") + testSetKey(t, s, 5, "foo/zoo", "bar") + + // Call a delete on some specific keys. + if err := s.KVSDelete(6, "foo/baz"); err != nil { + t.Fatalf("err: %s", err) + } + if err := s.KVSDelete(7, "foo/moo"); err != nil { + t.Fatalf("err: %s", err) + } + + // Pull out the list and check the index, which should come from the + // tombstones. + idx, _, err := s.KVSList("foo/") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // Reap the tombstones <= 6. + if err := s.ReapTombstones(6); err != nil { + t.Fatalf("err: %s", err) + } + + // Should still be good because 7 is in there. + idx, _, err = s.KVSList("foo/") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap them all. + if err := s.ReapTombstones(7); err != nil { + t.Fatalf("err: %s", err) + } + + // At this point the index will slide backwards. + idx, _, err = s.KVSList("foo/") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } +} + func TestStateStore_EnsureNode(t *testing.T) { s := testStateStore(t)