From 36acf8d6a48cb61b2a68d1898a2f6033de831468 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 20 Jul 2017 22:21:14 -0500 Subject: [PATCH 1/8] Use new DeletePrefixMethod for implementing KVSDeleteTree operation. This makes deletes on sub trees larger than one million nodes about 100 times faster. Added unit tests. --- agent/consul/state/kvs.go | 35 +++----- agent/consul/state/kvs_test.go | 159 +++++++++++++++++++++++++++++++++ watch/funcs_test.go | 60 +++++++++++++ 3 files changed, 229 insertions(+), 25 deletions(-) diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index 87845b3daf..fbba471c1d 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -420,36 +420,21 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string) error { // kvsDeleteTreeTxn is the inner method that does a recursive delete inside an // existing transaction. func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error { - // Get an iterator over all of the keys with the given prefix. - entries, err := tx.Get("kvs", "id_prefix", prefix) + + // For prefix deletes, only insert one tombstone and delete the entire subtree + + deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix) + if err != nil { - return fmt.Errorf("failed kvs lookup: %s", err) + return fmt.Errorf("failed recursive deleting kvs entry: %s", err) } - // Go over all of the keys and remove them. We call the delete - // directly so that we only update the index once. We also add - // tombstones as we go. - var modified bool - var objs []interface{} - for entry := entries.Next(); entry != nil; entry = entries.Next() { - e := entry.(*structs.DirEntry) - if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil { + // Update the index if the delete was successful. + // Missing prefixes don't result in an index update + if deleted { + if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil { return fmt.Errorf("failed adding to graveyard: %s", err) } - objs = append(objs, entry) - modified = true - } - - // Do the actual deletes in a separate loop so we don't trash the - // iterator as we go. - for _, obj := range objs { - if err := tx.Delete("kvs", obj); err != nil { - return fmt.Errorf("failed deleting kvs entry: %s", err) - } - } - - // Update the index - if modified { if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index a12778d398..9fde6ae687 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "fmt" + "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/go-memdb" ) @@ -1022,6 +1024,163 @@ func TestStateStore_KVSDeleteTree(t *testing.T) { } } +func TestStateStore_Watches_PrefixDelete(t *testing.T) { + s := testStateStore(t) + + // Create some KVS entries + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp") + testSetKey(t, s, 5, "foo/bar/baz", "baz") + + // Delete a key and make sure the index comes from the tombstone. + ws := memdb.NewWatchSet() + idx, _, err := s.KVSList(ws, "foo/bar/baz") + if err != nil { + t.Fatalf("unexpected err: %s", err) + } + if err := s.KVSDeleteTree(6, "foo/bar"); err != nil { + t.Fatalf("unexpected err: %s", err) + } + if !watchFired(ws) { + t.Fatalf("expected watch to fire but it did not") + } + ws = memdb.NewWatchSet() + idx, _, err = s.KVSList(ws, "foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 6 { + t.Fatalf("bad index: %d, expected %d", idx, 6) + } + + // Set a different key to bump the index. This shouldn't fire the + // watch since there's a different prefix. + testSetKey(t, s, 7, "some/other/key", "") + if watchFired(ws) { + t.Fatalf("bad") + } + + // Make sure we get the right index from the tombstone for the prefix + idx, _, err = s.KVSList(nil, "foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + + if idx != 6 { + t.Fatalf("bad index: %d, expected %v", idx, 7) + } + + // Now ask for the index for a node within the prefix that was deleted + // We expect to get the max index in the tree because the tombstone contains the parent foo/bar + idx, _, err = s.KVSList(nil, "foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + + if idx != 7 { + t.Fatalf("bad index: %d, expected %v", idx, 7) + } + + // Now reap the tombstones and make sure we get the latest index + // since there are no matching keys. + if err := s.ReapTombstones(6); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList(nil, "foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // List all the keys to make sure the index is also correct. + idx, _, err = s.KVSList(nil, "") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSDeleteTreePrefix(t *testing.T) { + s := testStateStore(t) + + // Create kvs entries in the state store. + for i := 0; i < 120; i++ { + ind := uint64(i + 1) + key := "foo/bar" + fmt.Sprintf("%d", ind) + testSetKey(t, s, ind, key, "bar") + } + testSetKey(t, s, 121, "foo/zorp", "zorp") + + // Calling tree deletion which affects nothing does not + // modify the table index. + if err := s.KVSDeleteTree(129, "bar"); err != nil { + t.Fatalf("err: %s", err) + } + if idx := s.maxIndex("kvs"); idx != 121 { + t.Fatalf("bad index: %d", idx) + } + + // Call tree deletion with a nested prefix. + if err := s.KVSDeleteTree(122, "foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + + // Check that all the matching keys were deleted + tx := s.db.Txn(false) + defer tx.Abort() + + entries, err := tx.Get("kvs", "id") + if err != nil { + t.Fatalf("err: %s", err) + } + + num := 0 + for entry := entries.Next(); entry != nil; entry = entries.Next() { + if entry.(*structs.DirEntry).Key != "foo/zorp" { + t.Fatalf("unexpected kvs entry: %#v", entry) + } + num++ + } + + if num != 1 { + t.Fatalf("expected 1 key, got: %d", num) + } + + // Index should be updated if modifications are made + if idx := s.maxIndex("kvs"); idx != 122 { + t.Fatalf("bad index: %d", idx) + } + + // Check that the tombstones ware created and that prevents the index + // from sliding backwards. + idx, _, err := s.KVSList(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 122 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap the tombstones and watch the index revert to the remaining + // foo/zorp key's index. + if err := s.ReapTombstones(122); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 121 { + t.Fatalf("bad index: %d", idx) + } +} + func TestStateStore_KVSLockDelay(t *testing.T) { s := testStateStore(t) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index e0991e49c5..c048a0256f 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -68,6 +68,66 @@ func TestKeyWatch(t *testing.T) { } } +func TestKeyWatch_With_PrefixDelete(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) + invoke := 0 + deletedKeyWatchInvoked := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil && deletedKeyWatchInvoked == 0 { + deletedKeyWatchInvoked++ + return + } + if invoke == 0 { + v, ok := raw.(*consulapi.KVPair) + if !ok || v == nil || string(v.Value) != "test" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar/baz", + Value: []byte("test"), + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + + // Delete the key + _, err = kv.DeleteTree("foo/bar", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + plan.Stop() + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + if invoke != 1 { + t.Fatalf("expected watch plan to be invoked once but got %v", invoke) + } + + if deletedKeyWatchInvoked != 1 { + t.Fatalf("expected watch plan to be invoked once on delete but got %v", deletedKeyWatchInvoked) + } +} + func TestKeyPrefixWatch(t *testing.T) { if consulAddr == "" { t.Skip() From b841c99b87307e126c3b8c2f431699c20aea4648 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 25 Jul 2017 17:28:43 -0500 Subject: [PATCH 2/8] Govendor update go-memdb and go-immutable-radix to pick up changes for DeletePrefix --- .../hashicorp/go-immutable-radix/iradix.go | 96 ++++++++++++++++ .../github.com/hashicorp/go-memdb/README.md | 5 + vendor/github.com/hashicorp/go-memdb/index.go | 74 +++++++++++++ vendor/github.com/hashicorp/go-memdb/memdb.go | 12 +- vendor/github.com/hashicorp/go-memdb/txn.go | 103 +++++++++++++++++- vendor/vendor.json | 4 +- 6 files changed, 284 insertions(+), 10 deletions(-) diff --git a/vendor/github.com/hashicorp/go-immutable-radix/iradix.go b/vendor/github.com/hashicorp/go-immutable-radix/iradix.go index 551ccbde77..c7172c406b 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/iradix.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/iradix.go @@ -183,6 +183,31 @@ func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node { return nc } +// Visit all the nodes in the tree under n, and add their mutateChannels to the transaction +// Returns the size of the subtree visited +func (t *Txn) trackChannelsAndCount(n *Node) int { + // Count only leaf nodes + leaves := 0 + if n.leaf != nil { + leaves = 1 + } + // Mark this node as being mutated. + if t.trackMutate { + t.trackChannel(n.mutateCh) + } + + // Mark its leaf as being mutated, if appropriate. + if t.trackMutate && n.leaf != nil { + t.trackChannel(n.leaf.mutateCh) + } + + // Recurse on the children + for _, e := range n.edges { + leaves += t.trackChannelsAndCount(e.node) + } + return leaves +} + // mergeChild is called to collapse the given node with its child. This is only // called when the given node is not a leaf and has a single edge. func (t *Txn) mergeChild(n *Node) { @@ -357,6 +382,56 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) { return nc, leaf } +// delete does a recursive deletion +func (t *Txn) deletePrefix(parent, n *Node, search []byte) (*Node, int) { + // Check for key exhaustion + if len(search) == 0 { + nc := t.writeNode(n, true) + if n.isLeaf() { + nc.leaf = nil + } + nc.edges = nil + return nc, t.trackChannelsAndCount(n) + } + + // Look for an edge + label := search[0] + idx, child := n.getEdge(label) + // We make sure that either the child node's prefix starts with the search term, or the search term starts with the child node's prefix + // Need to do both so that we can delete prefixes that don't correspond to any node in the tree + if child == nil || (!bytes.HasPrefix(child.prefix, search) && !bytes.HasPrefix(search, child.prefix)) { + return nil, 0 + } + + // Consume the search prefix + if len(child.prefix) > len(search) { + search = []byte("") + } else { + search = search[len(child.prefix):] + } + newChild, numDeletions := t.deletePrefix(n, child, search) + if newChild == nil { + return nil, 0 + } + // Copy this node. WATCH OUT - it's safe to pass "false" here because we + // will only ADD a leaf via nc.mergeChild() if there isn't one due to + // the !nc.isLeaf() check in the logic just below. This is pretty subtle, + // so be careful if you change any of the logic here. + + nc := t.writeNode(n, false) + + // Delete the edge if the node has no edges + if newChild.leaf == nil && len(newChild.edges) == 0 { + nc.delEdge(label) + if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() { + t.mergeChild(nc) + } + } else { + nc.edges[idx].node = newChild + } + return nc, numDeletions +} + // Insert is used to add or update a given key. The return provides // the previous value and a bool indicating if any was set. func (t *Txn) Insert(k []byte, v interface{}) (interface{}, bool) { @@ -384,6 +459,19 @@ func (t *Txn) Delete(k []byte) (interface{}, bool) { return nil, false } +// DeletePrefix is used to delete an entire subtree that matches the prefix +// This will delete all nodes under that prefix +func (t *Txn) DeletePrefix(prefix []byte) bool { + newRoot, numDeletions := t.deletePrefix(nil, t.root, prefix) + if newRoot != nil { + t.root = newRoot + t.size = t.size - numDeletions + return true + } + return false + +} + // Root returns the current root of the radix tree within this // transaction. The root is not safe across insert and delete operations, // but can be used to read the current state during a transaction. @@ -524,6 +612,14 @@ func (t *Tree) Delete(k []byte) (*Tree, interface{}, bool) { return txn.Commit(), old, ok } +// DeletePrefix is used to delete all nodes starting with a given prefix. Returns the new tree, +// and a bool indicating if the prefix matched any nodes +func (t *Tree) DeletePrefix(k []byte) (*Tree, bool) { + txn := t.Txn() + ok := txn.DeletePrefix(k) + return txn.Commit(), ok +} + // Root returns the root node of the tree which can be used for richer // query operations. func (t *Tree) Root() *Node { diff --git a/vendor/github.com/hashicorp/go-memdb/README.md b/vendor/github.com/hashicorp/go-memdb/README.md index 675044beb3..4e051c81ab 100644 --- a/vendor/github.com/hashicorp/go-memdb/README.md +++ b/vendor/github.com/hashicorp/go-memdb/README.md @@ -21,6 +21,11 @@ The database provides the following: a single field index, or more advanced compound field indexes. Certain types like UUID can be efficiently compressed from strings into byte indexes for reduced storage requirements. + +* Watches - Callers can populate a watch set as part of a query, which can be used to + detect when a modification has been made to the database which affects the query + results. This lets callers easily watch for changes in the database in a very general + way. For the underlying immutable radix trees, see [go-immutable-radix](https://github.com/hashicorp/go-immutable-radix). diff --git a/vendor/github.com/hashicorp/go-memdb/index.go b/vendor/github.com/hashicorp/go-memdb/index.go index 17aa026999..d1fb951466 100644 --- a/vendor/github.com/hashicorp/go-memdb/index.go +++ b/vendor/github.com/hashicorp/go-memdb/index.go @@ -1,6 +1,7 @@ package memdb import ( + "encoding/binary" "encoding/hex" "fmt" "reflect" @@ -249,6 +250,79 @@ func (s *StringMapFieldIndex) FromArgs(args ...interface{}) ([]byte, error) { return []byte(key), nil } +// UintFieldIndex is used to extract a uint field from an object using +// reflection and builds an index on that field. +type UintFieldIndex struct { + Field string +} + +func (u *UintFieldIndex) FromObject(obj interface{}) (bool, []byte, error) { + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) // Dereference the pointer if any + + fv := v.FieldByName(u.Field) + if !fv.IsValid() { + return false, nil, + fmt.Errorf("field '%s' for %#v is invalid", u.Field, obj) + } + + // Check the type + k := fv.Kind() + size, ok := IsUintType(k) + if !ok { + return false, nil, fmt.Errorf("field %q is of type %v; want a uint", u.Field, k) + } + + // Get the value and encode it + val := fv.Uint() + buf := make([]byte, size) + binary.PutUvarint(buf, val) + + return true, buf, nil +} + +func (u *UintFieldIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + + v := reflect.ValueOf(args[0]) + if !v.IsValid() { + return nil, fmt.Errorf("%#v is invalid", args[0]) + } + + k := v.Kind() + size, ok := IsUintType(k) + if !ok { + return nil, fmt.Errorf("arg is of type %v; want a uint", k) + } + + val := v.Uint() + buf := make([]byte, size) + binary.PutUvarint(buf, val) + + return buf, nil +} + +// IsUintType returns whether the passed type is a type of uint and the number +// of bytes needed to encode the type. +func IsUintType(k reflect.Kind) (size int, okay bool) { + switch k { + case reflect.Uint: + return binary.MaxVarintLen64, true + case reflect.Uint8: + return 2, true + case reflect.Uint16: + return binary.MaxVarintLen16, true + case reflect.Uint32: + return binary.MaxVarintLen32, true + case reflect.Uint64: + return binary.MaxVarintLen64, true + default: + return 0, false + } +} + // UUIDFieldIndex is used to extract a field from an object // using reflection and builds an index on that field by treating // it as a UUID. This is an optimization to using a StringFieldIndex diff --git a/vendor/github.com/hashicorp/go-memdb/memdb.go b/vendor/github.com/hashicorp/go-memdb/memdb.go index 13817547be..994a352323 100644 --- a/vendor/github.com/hashicorp/go-memdb/memdb.go +++ b/vendor/github.com/hashicorp/go-memdb/memdb.go @@ -13,8 +13,8 @@ import ( // on values. The database makes use of immutable radix trees to provide // transactions and MVCC. type MemDB struct { - schema *DBSchema - root unsafe.Pointer // *iradix.Tree underneath + schema *DBSchema + root unsafe.Pointer // *iradix.Tree underneath primary bool // There can only be a single writter at once @@ -30,8 +30,8 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) { // Create the MemDB db := &MemDB{ - schema: schema, - root: unsafe.Pointer(iradix.New()), + schema: schema, + root: unsafe.Pointer(iradix.New()), primary: true, } if err := db.initialize(); err != nil { @@ -65,8 +65,8 @@ func (db *MemDB) Txn(write bool) *Txn { // operations to the existing DB. func (db *MemDB) Snapshot() *MemDB { clone := &MemDB{ - schema: db.schema, - root: unsafe.Pointer(db.getRoot()), + schema: db.schema, + root: unsafe.Pointer(db.getRoot()), primary: false, } return clone diff --git a/vendor/github.com/hashicorp/go-memdb/txn.go b/vendor/github.com/hashicorp/go-memdb/txn.go index a069a9fd99..c4273648e1 100644 --- a/vendor/github.com/hashicorp/go-memdb/txn.go +++ b/vendor/github.com/hashicorp/go-memdb/txn.go @@ -117,14 +117,23 @@ func (txn *Txn) Commit() { // Commit each sub-transaction scoped to (table, index) for key, subTxn := range txn.modified { path := indexPath(key.Table, key.Index) - final := subTxn.Commit() + final := subTxn.CommitOnly() txn.rootTxn.Insert(path, final) } // Update the root of the DB - newRoot := txn.rootTxn.Commit() + newRoot := txn.rootTxn.CommitOnly() atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot)) + // Now issue all of the mutation updates (this is safe to call + // even if mutation tracking isn't enabled); we do this after + // the root pointer is swapped so that waking responders will + // see the new state. + for _, subTxn := range txn.modified { + subTxn.Notify() + } + txn.rootTxn.Notify() + // Clear the txn txn.rootTxn = nil txn.modified = nil @@ -321,6 +330,96 @@ func (txn *Txn) Delete(table string, obj interface{}) error { return nil } +// DeletePrefix is used to delete an entire subtree based on a prefix. +// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete. +// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation. +// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects. +func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) { + if !txn.write { + return false, fmt.Errorf("cannot delete in read-only transaction") + } + + if !strings.HasSuffix(prefix_index, "_prefix") { + return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index) + } + + deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix") + + // Get an iterator over all of the keys with the given prefix. + entries, err := txn.Get(table, prefix_index, prefix) + if err != nil { + return false, fmt.Errorf("failed kvs lookup: %s", err) + } + // Get the table schema + tableSchema, ok := txn.db.schema.Tables[table] + if !ok { + return false, fmt.Errorf("invalid table '%s'", table) + } + + foundAny := false + for entry := entries.Next(); entry != nil; entry = entries.Next() { + if !foundAny { + foundAny = true + } + // Get the primary ID of the object + idSchema := tableSchema.Indexes[id] + idIndexer := idSchema.Indexer.(SingleIndexer) + ok, idVal, err := idIndexer.FromObject(entry) + if err != nil { + return false, fmt.Errorf("failed to build primary index: %v", err) + } + if !ok { + return false, fmt.Errorf("object missing primary index") + } + // Remove the object from all the indexes except the given prefix index + for name, indexSchema := range tableSchema.Indexes { + if name == deletePrefixIndex { + continue + } + indexTxn := txn.writableIndex(table, name) + + // Handle the update by deleting from the index first + var ( + ok bool + vals [][]byte + err error + ) + switch indexer := indexSchema.Indexer.(type) { + case SingleIndexer: + var val []byte + ok, val, err = indexer.FromObject(entry) + vals = [][]byte{val} + case MultiIndexer: + ok, vals, err = indexer.FromObject(entry) + } + if err != nil { + return false, fmt.Errorf("failed to build index '%s': %v", name, err) + } + + if ok { + // Handle non-unique index by computing a unique index. + // This is done by appending the primary key which must + // be unique anyways. + for _, val := range vals { + if !indexSchema.Unique { + val = append(val, idVal...) + } + indexTxn.Delete(val) + } + } + } + } + if foundAny { + indexTxn := txn.writableIndex(table, deletePrefixIndex) + ok = indexTxn.DeletePrefix([]byte(prefix)) + if !ok { + panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix)) + } + return true, nil + } + return false, nil +} + // DeleteAll is used to delete all the objects in a given table // matching the constraints on the index func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) { diff --git a/vendor/vendor.json b/vendor/vendor.json index 70ecbebbe9..d3cb208b50 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -63,8 +63,8 @@ {"checksumSHA1":"cdOCt0Yb+hdErz8NAQqayxPmRsY=","path":"github.com/hashicorp/errwrap","revision":"7554cd9344cec97297fa6649b055a8c98c2a1e55","revisionTime":"2014-10-28T05:47:10Z"}, {"checksumSHA1":"nd3S1qkFv7zZxA9be0bw4nT0pe0=","path":"github.com/hashicorp/go-checkpoint","revision":"e4b2dc34c0f698ee04750bf2035d8b9384233e1b","revisionTime":"2015-10-22T18:15:14Z"}, {"checksumSHA1":"b8F628srIitj5p7Y130xc9k0QWs=","path":"github.com/hashicorp/go-cleanhttp","revision":"3573b8b52aa7b37b9358d966a898feb387f62437","revisionTime":"2017-02-11T01:34:15Z"}, - {"checksumSHA1":"zvmksNyW6g+Fd/bywd4vcn8rp+M=","path":"github.com/hashicorp/go-immutable-radix","revision":"d0852f9e7b91ec9633735052bdab00bf802b353c","revisionTime":"2017-02-14T00:45:45Z"}, - {"checksumSHA1":"K8Fsgt1llTXP0EwqdBzvSGdKOKc=","path":"github.com/hashicorp/go-memdb","revision":"c01f56b44823e8ba697e23c18d12dca984b85aca","revisionTime":"2017-01-23T15:32:28Z"}, + {"checksumSHA1":"Cas2nprG6pWzf05A2F/OlnjUu2Y=","path":"github.com/hashicorp/go-immutable-radix","revision":"8aac2701530899b64bdea735a1de8da899815220","revisionTime":"2017-07-25T22:12:15Z"}, + {"checksumSHA1":"T65qvYBTy4rYks7oN+U0muEqtRw=","path":"github.com/hashicorp/go-memdb","revision":"2b2d6c35e14e7557ea1003e707d5e179fa315028","revisionTime":"2017-07-25T22:15:03Z"}, {"checksumSHA1":"TNlVzNR1OaajcNi3CbQ3bGbaLGU=","path":"github.com/hashicorp/go-msgpack/codec","revision":"fa3f63826f7c23912c15263591e65d54d080b458","revisionTime":"2015-05-18T23:42:57Z"}, {"checksumSHA1":"lrSl49G23l6NhfilxPM0XFs5rZo=","path":"github.com/hashicorp/go-multierror","revision":"d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5","revisionTime":"2015-09-16T20:57:42Z"}, {"checksumSHA1":"ErJHGU6AVPZM9yoY/xV11TwSjQs=","path":"github.com/hashicorp/go-retryablehttp","revision":"6e85be8fee1dcaa02c0eaaac2df5a8fbecf94145","revisionTime":"2016-09-30T03:51:02Z"}, From ae443e21d68140903fa9180e6236a85177b85f87 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 25 Jul 2017 19:17:40 -0500 Subject: [PATCH 3/8] Improved unit test per code review --- agent/consul/state/kvs_test.go | 87 ++++++++++++++++++---------------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index 9fde6ae687..a278f7ca2c 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -1,13 +1,12 @@ package state import ( + "fmt" "reflect" "strings" "testing" "time" - "fmt" - "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/go-memdb" ) @@ -1032,77 +1031,85 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) { testSetKey(t, s, 2, "foo/bar", "bar") testSetKey(t, s, 3, "foo/bar/zip", "zip") testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp") - testSetKey(t, s, 5, "foo/bar/baz", "baz") + testSetKey(t, s, 5, "foo/bar/zip/zap", "zap") + testSetKey(t, s, 6, "foo/nope", "nope") - // Delete a key and make sure the index comes from the tombstone. ws := memdb.NewWatchSet() - idx, _, err := s.KVSList(ws, "foo/bar/baz") + got, _, err := s.KVSList(ws, "foo/bar") if err != nil { t.Fatalf("unexpected err: %s", err) } - if err := s.KVSDeleteTree(6, "foo/bar"); err != nil { + var wantIndex uint64 = 5 + if got != wantIndex { + t.Fatalf("bad index: %d, expected %d", wantIndex, got) + } + + // Delete a key and make sure the index comes from the tombstone. + if err := s.KVSDeleteTree(7, "foo/bar/zip"); err != nil { t.Fatalf("unexpected err: %s", err) } + // Make sure watch fires if !watchFired(ws) { t.Fatalf("expected watch to fire but it did not") } - ws = memdb.NewWatchSet() - idx, _, err = s.KVSList(ws, "foo/bar/baz") + + //Verify index matches tombstone + got, _, err = s.KVSList(ws, "foo/bar") if err != nil { - t.Fatalf("err: %s", err) + t.Fatalf("unexpected err: %s", err) } - if idx != 6 { - t.Fatalf("bad index: %d, expected %d", idx, 6) + wantIndex = 7 + if got != wantIndex { + t.Fatalf("bad index: %d, expected %d", got, wantIndex) } - - // Set a different key to bump the index. This shouldn't fire the - // watch since there's a different prefix. - testSetKey(t, s, 7, "some/other/key", "") - if watchFired(ws) { - t.Fatalf("bad") + // Make sure watch fires + if !watchFired(ws) { + t.Fatalf("expected watch to fire but it did not") } - // Make sure we get the right index from the tombstone for the prefix - idx, _, err = s.KVSList(nil, "foo/bar") - if err != nil { + // Reap tombstone and verify list on the same key reverts its index value + if err := s.ReapTombstones(wantIndex); err != nil { t.Fatalf("err: %s", err) } - if idx != 6 { - t.Fatalf("bad index: %d, expected %v", idx, 7) - } - - // Now ask for the index for a node within the prefix that was deleted - // We expect to get the max index in the tree because the tombstone contains the parent foo/bar - idx, _, err = s.KVSList(nil, "foo/bar/baz") + got, _, err = s.KVSList(nil, "foo/bar") + wantIndex = 2 if err != nil { t.Fatalf("err: %s", err) } - - if idx != 7 { - t.Fatalf("bad index: %d, expected %v", idx, 7) + if got != wantIndex { + t.Fatalf("bad index: %d, expected %d", got, wantIndex) } - // Now reap the tombstones and make sure we get the latest index - // since there are no matching keys. - if err := s.ReapTombstones(6); err != nil { + if err := s.ReapTombstones(7); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSList(nil, "foo/bar/baz") + ws = memdb.NewWatchSet() + // Set a different key to bump the index. This shouldn't fire the + // watch since there's a different prefix. + testSetKey(t, s, 8, "some/other/key", "") + + // Now ask for the index for a node within the prefix that was deleted + // We expect to get the max index in the tree + wantIndex = 8 + got, _, err = s.KVSList(ws, "foo/bar/baz") if err != nil { t.Fatalf("err: %s", err) } - if idx != 7 { - t.Fatalf("bad index: %d", idx) + if watchFired(ws) { + t.Fatalf("Watch should not have fired") + } + if got != wantIndex { + t.Fatalf("bad index: %d, expected %d", got, wantIndex) } - // List all the keys to make sure the index is also correct. - idx, _, err = s.KVSList(nil, "") + // List all the keys to make sure the index returned is the max index + got, _, err = s.KVSList(nil, "") if err != nil { t.Fatalf("err: %s", err) } - if idx != 7 { - t.Fatalf("bad index: %d", idx) + if got != wantIndex { + t.Fatalf("bad index: %d, expected %d", got, wantIndex) } } From b772c477c29c28d7afed3366ec40fbbb8e101411 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 25 Jul 2017 19:39:05 -0500 Subject: [PATCH 4/8] Removed redundant call to reap tombstone from unit test --- agent/consul/state/kvs_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index a278f7ca2c..6d5eca6443 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -1081,9 +1081,6 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) { t.Fatalf("bad index: %d, expected %d", got, wantIndex) } - if err := s.ReapTombstones(7); err != nil { - t.Fatalf("err: %s", err) - } ws = memdb.NewWatchSet() // Set a different key to bump the index. This shouldn't fire the // watch since there's a different prefix. From fee418d37850be2582236177c0e8deaa491bd6de Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 25 Jul 2017 20:39:33 -0500 Subject: [PATCH 5/8] Removed redundant comments and unit test --- agent/consul/state/kvs.go | 2 - agent/consul/state/kvs_test.go | 78 +--------------------------------- 2 files changed, 1 insertion(+), 79 deletions(-) diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index fbba471c1d..909126cc46 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -429,8 +429,6 @@ func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error return fmt.Errorf("failed recursive deleting kvs entry: %s", err) } - // Update the index if the delete was successful. - // Missing prefixes don't result in an index update if deleted { if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil { return fmt.Errorf("failed adding to graveyard: %s", err) diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index 6d5eca6443..1b17d65850 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -1,7 +1,6 @@ package state import ( - "fmt" "reflect" "strings" "testing" @@ -1081,7 +1080,6 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) { t.Fatalf("bad index: %d, expected %d", got, wantIndex) } - ws = memdb.NewWatchSet() // Set a different key to bump the index. This shouldn't fire the // watch since there's a different prefix. testSetKey(t, s, 8, "some/other/key", "") @@ -1089,6 +1087,7 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) { // Now ask for the index for a node within the prefix that was deleted // We expect to get the max index in the tree wantIndex = 8 + ws = memdb.NewWatchSet() got, _, err = s.KVSList(ws, "foo/bar/baz") if err != nil { t.Fatalf("err: %s", err) @@ -1110,81 +1109,6 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) { } } -func TestStateStore_KVSDeleteTreePrefix(t *testing.T) { - s := testStateStore(t) - - // Create kvs entries in the state store. - for i := 0; i < 120; i++ { - ind := uint64(i + 1) - key := "foo/bar" + fmt.Sprintf("%d", ind) - testSetKey(t, s, ind, key, "bar") - } - testSetKey(t, s, 121, "foo/zorp", "zorp") - - // Calling tree deletion which affects nothing does not - // modify the table index. - if err := s.KVSDeleteTree(129, "bar"); err != nil { - t.Fatalf("err: %s", err) - } - if idx := s.maxIndex("kvs"); idx != 121 { - t.Fatalf("bad index: %d", idx) - } - - // Call tree deletion with a nested prefix. - if err := s.KVSDeleteTree(122, "foo/bar"); err != nil { - t.Fatalf("err: %s", err) - } - - // Check that all the matching keys were deleted - tx := s.db.Txn(false) - defer tx.Abort() - - entries, err := tx.Get("kvs", "id") - if err != nil { - t.Fatalf("err: %s", err) - } - - num := 0 - for entry := entries.Next(); entry != nil; entry = entries.Next() { - if entry.(*structs.DirEntry).Key != "foo/zorp" { - t.Fatalf("unexpected kvs entry: %#v", entry) - } - num++ - } - - if num != 1 { - t.Fatalf("expected 1 key, got: %d", num) - } - - // Index should be updated if modifications are made - if idx := s.maxIndex("kvs"); idx != 122 { - t.Fatalf("bad index: %d", idx) - } - - // Check that the tombstones ware created and that prevents the index - // from sliding backwards. - idx, _, err := s.KVSList(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 122 { - t.Fatalf("bad index: %d", idx) - } - - // Now reap the tombstones and watch the index revert to the remaining - // foo/zorp key's index. - if err := s.ReapTombstones(122); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSList(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 121 { - t.Fatalf("bad index: %d", idx) - } -} - func TestStateStore_KVSLockDelay(t *testing.T) { s := testStateStore(t) From 4498814843e242d7bb9b13c600f4fab5b6ba9182 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 25 Jul 2017 21:54:11 -0500 Subject: [PATCH 6/8] Don't insert tombstone for empty prefix delete. Other minor unit test fixes --- agent/consul/state/kvs.go | 8 +++++--- agent/consul/state/txn_test.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index 909126cc46..d282ec3a42 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -426,12 +426,14 @@ func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix) if err != nil { - return fmt.Errorf("failed recursive deleting kvs entry: %s", err) + return fmt.Errorf("failed recursive deleting kvs entry: %s", err) } if deleted { - if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil { - return fmt.Errorf("failed adding to graveyard: %s", err) + if prefix != "" { // don't insert a tombstone if the entire tree is deleted, all watchers on keys will see the max_index of the tree + if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil { + return fmt.Errorf("failed adding to graveyard: %s", err) + } } if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) diff --git a/agent/consul/state/txn_test.go b/agent/consul/state/txn_test.go index 8c9936961e..17cd1da581 100644 --- a/agent/consul/state/txn_test.go +++ b/agent/consul/state/txn_test.go @@ -698,7 +698,7 @@ func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) { expected := []string{ "cannot insert in read-only transaction", "cannot insert in read-only transaction", - "cannot insert in read-only transaction", + "failed recursive deleting kvs entry", } if len(errors) != len(expected) { t.Fatalf("bad len: %d != %d", len(errors), len(expected)) From b94617b281572df55296bf1d012373027c53fbf7 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 26 Jul 2017 09:42:07 -0500 Subject: [PATCH 7/8] Add extra test case for deleting entire tree with empty prefix --- agent/consul/state/kvs_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index 1b17d65850..36db7ac7b9 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -1107,6 +1107,20 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) { if got != wantIndex { t.Fatalf("bad index: %d, expected %d", got, wantIndex) } + + // Delete all the keys, special case where tombstones are not inserted + if err := s.KVSDeleteTree(9, ""); err != nil { + t.Fatalf("unexpected err: %s", err) + } + wantIndex = 9 + got, _, err = s.KVSList(nil, "/foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if got != wantIndex { + t.Fatalf("bad index: %d, expected %d", got, wantIndex) + } + } func TestStateStore_KVSLockDelay(t *testing.T) { From a4487b2742d567d389cf7aceb3589085f469dc19 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 26 Jul 2017 09:42:28 -0500 Subject: [PATCH 8/8] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index adfbb3c1d6..bae7874112 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ IMPROVEMENTS: BUG FIXES: * agent: Clean up temporary files during disk write errors when persisting services and checks. [GH-3207] - +* api: Implemented a much faster recursive delete algorithm for the KV store. It has been bench-marked to be up to 100X faster on recursive deletes that affect millions of nodes. ## 0.9.0 (July 20, 2017) BREAKING CHANGES: