From 36acf8d6a48cb61b2a68d1898a2f6033de831468 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 20 Jul 2017 22:21:14 -0500 Subject: [PATCH] Use new DeletePrefixMethod for implementing KVSDeleteTree operation. This makes deletes on sub trees larger than one million nodes about 100 times faster. Added unit tests. --- agent/consul/state/kvs.go | 35 +++----- agent/consul/state/kvs_test.go | 159 +++++++++++++++++++++++++++++++++ watch/funcs_test.go | 60 +++++++++++++ 3 files changed, 229 insertions(+), 25 deletions(-) diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index 87845b3daf..fbba471c1d 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -420,36 +420,21 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string) error { // kvsDeleteTreeTxn is the inner method that does a recursive delete inside an // existing transaction. func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error { - // Get an iterator over all of the keys with the given prefix. - entries, err := tx.Get("kvs", "id_prefix", prefix) + + // For prefix deletes, only insert one tombstone and delete the entire subtree + + deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix) + if err != nil { - return fmt.Errorf("failed kvs lookup: %s", err) + return fmt.Errorf("failed recursive deleting kvs entry: %s", err) } - // Go over all of the keys and remove them. We call the delete - // directly so that we only update the index once. We also add - // tombstones as we go. - var modified bool - var objs []interface{} - for entry := entries.Next(); entry != nil; entry = entries.Next() { - e := entry.(*structs.DirEntry) - if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil { + // Update the index if the delete was successful. + // Missing prefixes don't result in an index update + if deleted { + if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil { return fmt.Errorf("failed adding to graveyard: %s", err) } - objs = append(objs, entry) - modified = true - } - - // Do the actual deletes in a separate loop so we don't trash the - // iterator as we go. - for _, obj := range objs { - if err := tx.Delete("kvs", obj); err != nil { - return fmt.Errorf("failed deleting kvs entry: %s", err) - } - } - - // Update the index - if modified { if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index a12778d398..9fde6ae687 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "fmt" + "github.com/hashicorp/consul/agent/consul/structs" "github.com/hashicorp/go-memdb" ) @@ -1022,6 +1024,163 @@ func TestStateStore_KVSDeleteTree(t *testing.T) { } } +func TestStateStore_Watches_PrefixDelete(t *testing.T) { + s := testStateStore(t) + + // Create some KVS entries + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp") + testSetKey(t, s, 5, "foo/bar/baz", "baz") + + // Delete a key and make sure the index comes from the tombstone. + ws := memdb.NewWatchSet() + idx, _, err := s.KVSList(ws, "foo/bar/baz") + if err != nil { + t.Fatalf("unexpected err: %s", err) + } + if err := s.KVSDeleteTree(6, "foo/bar"); err != nil { + t.Fatalf("unexpected err: %s", err) + } + if !watchFired(ws) { + t.Fatalf("expected watch to fire but it did not") + } + ws = memdb.NewWatchSet() + idx, _, err = s.KVSList(ws, "foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 6 { + t.Fatalf("bad index: %d, expected %d", idx, 6) + } + + // Set a different key to bump the index. This shouldn't fire the + // watch since there's a different prefix. + testSetKey(t, s, 7, "some/other/key", "") + if watchFired(ws) { + t.Fatalf("bad") + } + + // Make sure we get the right index from the tombstone for the prefix + idx, _, err = s.KVSList(nil, "foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + + if idx != 6 { + t.Fatalf("bad index: %d, expected %v", idx, 7) + } + + // Now ask for the index for a node within the prefix that was deleted + // We expect to get the max index in the tree because the tombstone contains the parent foo/bar + idx, _, err = s.KVSList(nil, "foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + + if idx != 7 { + t.Fatalf("bad index: %d, expected %v", idx, 7) + } + + // Now reap the tombstones and make sure we get the latest index + // since there are no matching keys. + if err := s.ReapTombstones(6); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList(nil, "foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // List all the keys to make sure the index is also correct. + idx, _, err = s.KVSList(nil, "") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSDeleteTreePrefix(t *testing.T) { + s := testStateStore(t) + + // Create kvs entries in the state store. + for i := 0; i < 120; i++ { + ind := uint64(i + 1) + key := "foo/bar" + fmt.Sprintf("%d", ind) + testSetKey(t, s, ind, key, "bar") + } + testSetKey(t, s, 121, "foo/zorp", "zorp") + + // Calling tree deletion which affects nothing does not + // modify the table index. + if err := s.KVSDeleteTree(129, "bar"); err != nil { + t.Fatalf("err: %s", err) + } + if idx := s.maxIndex("kvs"); idx != 121 { + t.Fatalf("bad index: %d", idx) + } + + // Call tree deletion with a nested prefix. + if err := s.KVSDeleteTree(122, "foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + + // Check that all the matching keys were deleted + tx := s.db.Txn(false) + defer tx.Abort() + + entries, err := tx.Get("kvs", "id") + if err != nil { + t.Fatalf("err: %s", err) + } + + num := 0 + for entry := entries.Next(); entry != nil; entry = entries.Next() { + if entry.(*structs.DirEntry).Key != "foo/zorp" { + t.Fatalf("unexpected kvs entry: %#v", entry) + } + num++ + } + + if num != 1 { + t.Fatalf("expected 1 key, got: %d", num) + } + + // Index should be updated if modifications are made + if idx := s.maxIndex("kvs"); idx != 122 { + t.Fatalf("bad index: %d", idx) + } + + // Check that the tombstones ware created and that prevents the index + // from sliding backwards. + idx, _, err := s.KVSList(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 122 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap the tombstones and watch the index revert to the remaining + // foo/zorp key's index. + if err := s.ReapTombstones(122); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 121 { + t.Fatalf("bad index: %d", idx) + } +} + func TestStateStore_KVSLockDelay(t *testing.T) { s := testStateStore(t) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index e0991e49c5..c048a0256f 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -68,6 +68,66 @@ func TestKeyWatch(t *testing.T) { } } +func TestKeyWatch_With_PrefixDelete(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) + invoke := 0 + deletedKeyWatchInvoked := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil && deletedKeyWatchInvoked == 0 { + deletedKeyWatchInvoked++ + return + } + if invoke == 0 { + v, ok := raw.(*consulapi.KVPair) + if !ok || v == nil || string(v.Value) != "test" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar/baz", + Value: []byte("test"), + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + + // Delete the key + _, err = kv.DeleteTree("foo/bar", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + plan.Stop() + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + if invoke != 1 { + t.Fatalf("expected watch plan to be invoked once but got %v", invoke) + } + + if deletedKeyWatchInvoked != 1 { + t.Fatalf("expected watch plan to be invoked once on delete but got %v", deletedKeyWatchInvoked) + } +} + func TestKeyPrefixWatch(t *testing.T) { if consulAddr == "" { t.Skip()