Merge pull request #3313 from hashicorp/delete_prefix

This fixes #1278
pull/3328/head
preetapan 7 years ago committed by GitHub
commit f2bc2b8d07

@ -6,7 +6,7 @@ IMPROVEMENTS:
BUG FIXES:
* agent: Clean up temporary files during disk write errors when persisting services and checks. [GH-3207]
* api: Implemented a much faster recursive delete algorithm for the KV store. It has been bench-marked to be up to 100X faster on recursive deletes that affect millions of nodes.
## 0.9.0 (July 20, 2017)
BREAKING CHANGES:

@ -420,36 +420,21 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string) error {
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction.
func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
// Get an iterator over all of the keys with the given prefix.
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
// Go over all of the keys and remove them. We call the delete
// directly so that we only update the index once. We also add
// tombstones as we go.
var modified bool
var objs []interface{}
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
objs = append(objs, entry)
modified = true
}
// For prefix deletes, only insert one tombstone and delete the entire subtree
// Do the actual deletes in a separate loop so we don't trash the
// iterator as we go.
for _, obj := range objs {
if err := tx.Delete("kvs", obj); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
if err != nil {
return fmt.Errorf("failed recursive deleting kvs entry: %s", err)
}
// Update the index
if modified {
if deleted {
if prefix != "" { // don't insert a tombstone if the entire tree is deleted, all watchers on keys will see the max_index of the tree
if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}

@ -1022,6 +1022,107 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
}
}
func TestStateStore_Watches_PrefixDelete(t *testing.T) {
s := testStateStore(t)
// Create some KVS entries
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp")
testSetKey(t, s, 5, "foo/bar/zip/zap", "zap")
testSetKey(t, s, 6, "foo/nope", "nope")
ws := memdb.NewWatchSet()
got, _, err := s.KVSList(ws, "foo/bar")
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
var wantIndex uint64 = 5
if got != wantIndex {
t.Fatalf("bad index: %d, expected %d", wantIndex, got)
}
// Delete a key and make sure the index comes from the tombstone.
if err := s.KVSDeleteTree(7, "foo/bar/zip"); err != nil {
t.Fatalf("unexpected err: %s", err)
}
// Make sure watch fires
if !watchFired(ws) {
t.Fatalf("expected watch to fire but it did not")
}
//Verify index matches tombstone
got, _, err = s.KVSList(ws, "foo/bar")
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
wantIndex = 7
if got != wantIndex {
t.Fatalf("bad index: %d, expected %d", got, wantIndex)
}
// Make sure watch fires
if !watchFired(ws) {
t.Fatalf("expected watch to fire but it did not")
}
// Reap tombstone and verify list on the same key reverts its index value
if err := s.ReapTombstones(wantIndex); err != nil {
t.Fatalf("err: %s", err)
}
got, _, err = s.KVSList(nil, "foo/bar")
wantIndex = 2
if err != nil {
t.Fatalf("err: %s", err)
}
if got != wantIndex {
t.Fatalf("bad index: %d, expected %d", got, wantIndex)
}
// Set a different key to bump the index. This shouldn't fire the
// watch since there's a different prefix.
testSetKey(t, s, 8, "some/other/key", "")
// Now ask for the index for a node within the prefix that was deleted
// We expect to get the max index in the tree
wantIndex = 8
ws = memdb.NewWatchSet()
got, _, err = s.KVSList(ws, "foo/bar/baz")
if err != nil {
t.Fatalf("err: %s", err)
}
if watchFired(ws) {
t.Fatalf("Watch should not have fired")
}
if got != wantIndex {
t.Fatalf("bad index: %d, expected %d", got, wantIndex)
}
// List all the keys to make sure the index returned is the max index
got, _, err = s.KVSList(nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
if got != wantIndex {
t.Fatalf("bad index: %d, expected %d", got, wantIndex)
}
// Delete all the keys, special case where tombstones are not inserted
if err := s.KVSDeleteTree(9, ""); err != nil {
t.Fatalf("unexpected err: %s", err)
}
wantIndex = 9
got, _, err = s.KVSList(nil, "/foo/bar")
if err != nil {
t.Fatalf("err: %s", err)
}
if got != wantIndex {
t.Fatalf("bad index: %d, expected %d", got, wantIndex)
}
}
func TestStateStore_KVSLockDelay(t *testing.T) {
s := testStateStore(t)

@ -698,7 +698,7 @@ func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) {
expected := []string{
"cannot insert in read-only transaction",
"cannot insert in read-only transaction",
"cannot insert in read-only transaction",
"failed recursive deleting kvs entry",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))

@ -183,6 +183,31 @@ func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
return nc
}
// Visit all the nodes in the tree under n, and add their mutateChannels to the transaction
// Returns the size of the subtree visited
func (t *Txn) trackChannelsAndCount(n *Node) int {
// Count only leaf nodes
leaves := 0
if n.leaf != nil {
leaves = 1
}
// Mark this node as being mutated.
if t.trackMutate {
t.trackChannel(n.mutateCh)
}
// Mark its leaf as being mutated, if appropriate.
if t.trackMutate && n.leaf != nil {
t.trackChannel(n.leaf.mutateCh)
}
// Recurse on the children
for _, e := range n.edges {
leaves += t.trackChannelsAndCount(e.node)
}
return leaves
}
// mergeChild is called to collapse the given node with its child. This is only
// called when the given node is not a leaf and has a single edge.
func (t *Txn) mergeChild(n *Node) {
@ -357,6 +382,56 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
return nc, leaf
}
// delete does a recursive deletion
func (t *Txn) deletePrefix(parent, n *Node, search []byte) (*Node, int) {
// Check for key exhaustion
if len(search) == 0 {
nc := t.writeNode(n, true)
if n.isLeaf() {
nc.leaf = nil
}
nc.edges = nil
return nc, t.trackChannelsAndCount(n)
}
// Look for an edge
label := search[0]
idx, child := n.getEdge(label)
// We make sure that either the child node's prefix starts with the search term, or the search term starts with the child node's prefix
// Need to do both so that we can delete prefixes that don't correspond to any node in the tree
if child == nil || (!bytes.HasPrefix(child.prefix, search) && !bytes.HasPrefix(search, child.prefix)) {
return nil, 0
}
// Consume the search prefix
if len(child.prefix) > len(search) {
search = []byte("")
} else {
search = search[len(child.prefix):]
}
newChild, numDeletions := t.deletePrefix(n, child, search)
if newChild == nil {
return nil, 0
}
// Copy this node. WATCH OUT - it's safe to pass "false" here because we
// will only ADD a leaf via nc.mergeChild() if there isn't one due to
// the !nc.isLeaf() check in the logic just below. This is pretty subtle,
// so be careful if you change any of the logic here.
nc := t.writeNode(n, false)
// Delete the edge if the node has no edges
if newChild.leaf == nil && len(newChild.edges) == 0 {
nc.delEdge(label)
if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
t.mergeChild(nc)
}
} else {
nc.edges[idx].node = newChild
}
return nc, numDeletions
}
// Insert is used to add or update a given key. The return provides
// the previous value and a bool indicating if any was set.
func (t *Txn) Insert(k []byte, v interface{}) (interface{}, bool) {
@ -384,6 +459,19 @@ func (t *Txn) Delete(k []byte) (interface{}, bool) {
return nil, false
}
// DeletePrefix is used to delete an entire subtree that matches the prefix
// This will delete all nodes under that prefix
func (t *Txn) DeletePrefix(prefix []byte) bool {
newRoot, numDeletions := t.deletePrefix(nil, t.root, prefix)
if newRoot != nil {
t.root = newRoot
t.size = t.size - numDeletions
return true
}
return false
}
// Root returns the current root of the radix tree within this
// transaction. The root is not safe across insert and delete operations,
// but can be used to read the current state during a transaction.
@ -524,6 +612,14 @@ func (t *Tree) Delete(k []byte) (*Tree, interface{}, bool) {
return txn.Commit(), old, ok
}
// DeletePrefix is used to delete all nodes starting with a given prefix. Returns the new tree,
// and a bool indicating if the prefix matched any nodes
func (t *Tree) DeletePrefix(k []byte) (*Tree, bool) {
txn := t.Txn()
ok := txn.DeletePrefix(k)
return txn.Commit(), ok
}
// Root returns the root node of the tree which can be used for richer
// query operations.
func (t *Tree) Root() *Node {

@ -21,6 +21,11 @@ The database provides the following:
a single field index, or more advanced compound field indexes. Certain types like
UUID can be efficiently compressed from strings into byte indexes for reduced
storage requirements.
* Watches - Callers can populate a watch set as part of a query, which can be used to
detect when a modification has been made to the database which affects the query
results. This lets callers easily watch for changes in the database in a very general
way.
For the underlying immutable radix trees, see [go-immutable-radix](https://github.com/hashicorp/go-immutable-radix).

@ -1,6 +1,7 @@
package memdb
import (
"encoding/binary"
"encoding/hex"
"fmt"
"reflect"
@ -249,6 +250,79 @@ func (s *StringMapFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
return []byte(key), nil
}
// UintFieldIndex is used to extract a uint field from an object using
// reflection and builds an index on that field.
type UintFieldIndex struct {
Field string
}
func (u *UintFieldIndex) FromObject(obj interface{}) (bool, []byte, error) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v) // Dereference the pointer if any
fv := v.FieldByName(u.Field)
if !fv.IsValid() {
return false, nil,
fmt.Errorf("field '%s' for %#v is invalid", u.Field, obj)
}
// Check the type
k := fv.Kind()
size, ok := IsUintType(k)
if !ok {
return false, nil, fmt.Errorf("field %q is of type %v; want a uint", u.Field, k)
}
// Get the value and encode it
val := fv.Uint()
buf := make([]byte, size)
binary.PutUvarint(buf, val)
return true, buf, nil
}
func (u *UintFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
v := reflect.ValueOf(args[0])
if !v.IsValid() {
return nil, fmt.Errorf("%#v is invalid", args[0])
}
k := v.Kind()
size, ok := IsUintType(k)
if !ok {
return nil, fmt.Errorf("arg is of type %v; want a uint", k)
}
val := v.Uint()
buf := make([]byte, size)
binary.PutUvarint(buf, val)
return buf, nil
}
// IsUintType returns whether the passed type is a type of uint and the number
// of bytes needed to encode the type.
func IsUintType(k reflect.Kind) (size int, okay bool) {
switch k {
case reflect.Uint:
return binary.MaxVarintLen64, true
case reflect.Uint8:
return 2, true
case reflect.Uint16:
return binary.MaxVarintLen16, true
case reflect.Uint32:
return binary.MaxVarintLen32, true
case reflect.Uint64:
return binary.MaxVarintLen64, true
default:
return 0, false
}
}
// UUIDFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field by treating
// it as a UUID. This is an optimization to using a StringFieldIndex

@ -13,8 +13,8 @@ import (
// on values. The database makes use of immutable radix trees to provide
// transactions and MVCC.
type MemDB struct {
schema *DBSchema
root unsafe.Pointer // *iradix.Tree underneath
schema *DBSchema
root unsafe.Pointer // *iradix.Tree underneath
primary bool
// There can only be a single writter at once
@ -30,8 +30,8 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
// Create the MemDB
db := &MemDB{
schema: schema,
root: unsafe.Pointer(iradix.New()),
schema: schema,
root: unsafe.Pointer(iradix.New()),
primary: true,
}
if err := db.initialize(); err != nil {
@ -65,8 +65,8 @@ func (db *MemDB) Txn(write bool) *Txn {
// operations to the existing DB.
func (db *MemDB) Snapshot() *MemDB {
clone := &MemDB{
schema: db.schema,
root: unsafe.Pointer(db.getRoot()),
schema: db.schema,
root: unsafe.Pointer(db.getRoot()),
primary: false,
}
return clone

@ -117,14 +117,23 @@ func (txn *Txn) Commit() {
// Commit each sub-transaction scoped to (table, index)
for key, subTxn := range txn.modified {
path := indexPath(key.Table, key.Index)
final := subTxn.Commit()
final := subTxn.CommitOnly()
txn.rootTxn.Insert(path, final)
}
// Update the root of the DB
newRoot := txn.rootTxn.Commit()
newRoot := txn.rootTxn.CommitOnly()
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
// Now issue all of the mutation updates (this is safe to call
// even if mutation tracking isn't enabled); we do this after
// the root pointer is swapped so that waking responders will
// see the new state.
for _, subTxn := range txn.modified {
subTxn.Notify()
}
txn.rootTxn.Notify()
// Clear the txn
txn.rootTxn = nil
txn.modified = nil
@ -321,6 +330,96 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
return nil
}
// DeletePrefix is used to delete an entire subtree based on a prefix.
// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete.
// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation.
// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects.
func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) {
if !txn.write {
return false, fmt.Errorf("cannot delete in read-only transaction")
}
if !strings.HasSuffix(prefix_index, "_prefix") {
return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index)
}
deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix")
// Get an iterator over all of the keys with the given prefix.
entries, err := txn.Get(table, prefix_index, prefix)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Get the table schema
tableSchema, ok := txn.db.schema.Tables[table]
if !ok {
return false, fmt.Errorf("invalid table '%s'", table)
}
foundAny := false
for entry := entries.Next(); entry != nil; entry = entries.Next() {
if !foundAny {
foundAny = true
}
// Get the primary ID of the object
idSchema := tableSchema.Indexes[id]
idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(entry)
if err != nil {
return false, fmt.Errorf("failed to build primary index: %v", err)
}
if !ok {
return false, fmt.Errorf("object missing primary index")
}
// Remove the object from all the indexes except the given prefix index
for name, indexSchema := range tableSchema.Indexes {
if name == deletePrefixIndex {
continue
}
indexTxn := txn.writableIndex(table, name)
// Handle the update by deleting from the index first
var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(entry)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(entry)
}
if err != nil {
return false, fmt.Errorf("failed to build index '%s': %v", name, err)
}
if ok {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
for _, val := range vals {
if !indexSchema.Unique {
val = append(val, idVal...)
}
indexTxn.Delete(val)
}
}
}
}
if foundAny {
indexTxn := txn.writableIndex(table, deletePrefixIndex)
ok = indexTxn.DeletePrefix([]byte(prefix))
if !ok {
panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix))
}
return true, nil
}
return false, nil
}
// DeleteAll is used to delete all the objects in a given table
// matching the constraints on the index
func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {

@ -63,8 +63,8 @@
{"checksumSHA1":"cdOCt0Yb+hdErz8NAQqayxPmRsY=","path":"github.com/hashicorp/errwrap","revision":"7554cd9344cec97297fa6649b055a8c98c2a1e55","revisionTime":"2014-10-28T05:47:10Z"},
{"checksumSHA1":"nd3S1qkFv7zZxA9be0bw4nT0pe0=","path":"github.com/hashicorp/go-checkpoint","revision":"e4b2dc34c0f698ee04750bf2035d8b9384233e1b","revisionTime":"2015-10-22T18:15:14Z"},
{"checksumSHA1":"b8F628srIitj5p7Y130xc9k0QWs=","path":"github.com/hashicorp/go-cleanhttp","revision":"3573b8b52aa7b37b9358d966a898feb387f62437","revisionTime":"2017-02-11T01:34:15Z"},
{"checksumSHA1":"zvmksNyW6g+Fd/bywd4vcn8rp+M=","path":"github.com/hashicorp/go-immutable-radix","revision":"d0852f9e7b91ec9633735052bdab00bf802b353c","revisionTime":"2017-02-14T00:45:45Z"},
{"checksumSHA1":"K8Fsgt1llTXP0EwqdBzvSGdKOKc=","path":"github.com/hashicorp/go-memdb","revision":"c01f56b44823e8ba697e23c18d12dca984b85aca","revisionTime":"2017-01-23T15:32:28Z"},
{"checksumSHA1":"Cas2nprG6pWzf05A2F/OlnjUu2Y=","path":"github.com/hashicorp/go-immutable-radix","revision":"8aac2701530899b64bdea735a1de8da899815220","revisionTime":"2017-07-25T22:12:15Z"},
{"checksumSHA1":"T65qvYBTy4rYks7oN+U0muEqtRw=","path":"github.com/hashicorp/go-memdb","revision":"2b2d6c35e14e7557ea1003e707d5e179fa315028","revisionTime":"2017-07-25T22:15:03Z"},
{"checksumSHA1":"TNlVzNR1OaajcNi3CbQ3bGbaLGU=","path":"github.com/hashicorp/go-msgpack/codec","revision":"fa3f63826f7c23912c15263591e65d54d080b458","revisionTime":"2015-05-18T23:42:57Z"},
{"checksumSHA1":"lrSl49G23l6NhfilxPM0XFs5rZo=","path":"github.com/hashicorp/go-multierror","revision":"d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5","revisionTime":"2015-09-16T20:57:42Z"},
{"checksumSHA1":"ErJHGU6AVPZM9yoY/xV11TwSjQs=","path":"github.com/hashicorp/go-retryablehttp","revision":"6e85be8fee1dcaa02c0eaaac2df5a8fbecf94145","revisionTime":"2016-09-30T03:51:02Z"},

@ -68,6 +68,66 @@ func TestKeyWatch(t *testing.T) {
}
}
func TestKeyWatch_With_PrefixDelete(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`)
invoke := 0
deletedKeyWatchInvoked := 0
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil && deletedKeyWatchInvoked == 0 {
deletedKeyWatchInvoked++
return
}
if invoke == 0 {
v, ok := raw.(*consulapi.KVPair)
if !ok || v == nil || string(v.Value) != "test" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
defer plan.Stop()
time.Sleep(20 * time.Millisecond)
kv := plan.client.KV()
pair := &consulapi.KVPair{
Key: "foo/bar/baz",
Value: []byte("test"),
}
_, err := kv.Put(pair, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the query to run
time.Sleep(20 * time.Millisecond)
// Delete the key
_, err = kv.DeleteTree("foo/bar", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke != 1 {
t.Fatalf("expected watch plan to be invoked once but got %v", invoke)
}
if deletedKeyWatchInvoked != 1 {
t.Fatalf("expected watch plan to be invoked once on delete but got %v", deletedKeyWatchInvoked)
}
}
func TestKeyPrefixWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()

Loading…
Cancel
Save