From 019d511fe7e52385580bbe973d954a9f8663876c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 23:39:57 -0800 Subject: [PATCH] consul: Fixing tombstone creation and hinting of GC --- consul/state_store.go | 79 ++++++++++++++++++++------------------ consul/state_store_test.go | 38 +++++++++++++++++- 2 files changed, 79 insertions(+), 38 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index fcdb546496..e3b43b801a 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -78,6 +78,10 @@ type StateStore struct { // is never questioned. lockDelay map[string]time.Time lockDelayLock sync.RWMutex + + // GC is when we create tombstones to track their time-to-live. + // The GC is consumed upstream to manage clearing of tombstones. + gc *TombstoneGC } // StateSnapshot is used to provide a point-in-time snapshot @@ -104,18 +108,18 @@ func (s *StateSnapshot) Close() error { } // NewStateStore is used to create a new state store -func NewStateStore(logOutput io.Writer) (*StateStore, error) { +func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) { // Create a new temp dir path, err := ioutil.TempDir("", "consul") if err != nil { return nil, err } - return NewStateStorePath(path, logOutput) + return NewStateStorePath(gc, path, logOutput) } // NewStateStorePath is used to create a new state store at a given path // The path is cleared on closing. -func NewStateStorePath(path string, logOutput io.Writer) (*StateStore, error) { +func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) { // Open the env env, err := mdb.NewEnv() if err != nil { @@ -1203,7 +1207,7 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { // kvsDeleteWithIndex does a delete with either the id or id_prefix func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { - tx, err := s.kvsTable.StartTxn(false, nil) + tx, err := s.tables.StartTxn(false) if err != nil { return err } @@ -1213,50 +1217,51 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts . // kvsDeleteWithIndexTxn does a delete within an existing transaction func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error { - // Create the appropriate tombstone entries - streamCh := make(chan interface{}, 128) - doneCh := make(chan struct{}) - var tombstoneErr error - go s.kvsTombstoneEntries(index, tx, streamCh, doneCh, &tombstoneErr) - err := s.kvsTable.StreamTxn(streamCh, tx, tableIndex, parts...) - <-doneCh - if err != nil { - return err - } - if tombstoneErr != nil { - return tombstoneErr - } + num := 0 + for { + // Get some number of entries to delete + pairs, err := s.kvsTable.GetTxnLimit(tx, 128, tableIndex, parts...) + if err != nil { + return err + } - num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...) - if err != nil { - return err + // Create the tombstones and delete + for _, raw := range pairs { + ent := raw.(*structs.DirEntry) + ent.ModifyIndex = index // Update the index + ent.Value = nil // Reduce storage required + ent.Session = "" + if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { + return err + } + if _, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { + return err + } + } + + // Increment the total number + num += len(pairs) + if len(pairs) == 0 { + break + } } if num > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) + tx.Defer(func() { + s.watch[s.kvsTable].Notify() + if s.gc != nil { + // If GC is configured, then we hint that this index + // required expiration. + s.gc.Hint(index) + } + }) } return tx.Commit() } -// kvsTombstoneEntries is used to consume KVS entries over a stream -// and commit them as tombstones within a given transaction and index. -func (s *StateStore) kvsTombstoneEntries(index uint64, tx *MDBTxn, streamCh chan interface{}, doneCh chan struct{}, errOut *error) { - defer close(doneCh) - for raw := range streamCh { - ent := raw.(*structs.DirEntry) - ent.ModifyIndex = index - ent.Value = nil - ent.Session = "" - if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { - s.logger.Printf("[ERR] consul.state: Failed to create tombstone for %s: %s", ent.Key, err) - *errOut = err - } - } -} - // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { return s.kvsSet(index, d, kvCAS) diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 888b4a3432..621734ef3a 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -11,7 +11,7 @@ import ( ) func testStateStore() (*StateStore, error) { - return NewStateStore(os.Stderr) + return NewStateStore(nil, os.Stderr) } func TestEnsureRegistration(t *testing.T) { @@ -1413,6 +1413,14 @@ func TestKVSDelete(t *testing.T) { } defer store.Close() + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + store.gc = gc + // Create the entry d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} if err := store.KVSSet(1000, d); err != nil { @@ -1435,6 +1443,16 @@ func TestKVSDelete(t *testing.T) { if d != nil { t.Fatalf("bad: %v", d) } + + // Check that we get a delete + select { + case idx := <-gc.ExpireCh(): + if idx != 1020 { + t.Fatalf("bad %d", idx) + } + case <-time.After(20 * time.Millisecond): + t.Fatalf("should expire") + } } func TestKVSCheckAndSet(t *testing.T) { @@ -1737,6 +1755,14 @@ func TestKVSDeleteTree(t *testing.T) { } defer store.Close() + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + store.gc = gc + // Should not exist err = store.KVSDeleteTree(1000, "/web") if err != nil { @@ -1774,6 +1800,16 @@ func TestKVSDeleteTree(t *testing.T) { if len(ents) != 0 { t.Fatalf("bad: %v", ents) } + + // Check that we get a delete + select { + case idx := <-gc.ExpireCh(): + if idx != 1010 { + t.Fatalf("bad %d", idx) + } + case <-time.After(20 * time.Millisecond): + t.Fatalf("should expire") + } } func TestSessionCreate(t *testing.T) {