consul: Fixing tombstone creation and hinting of GC

pull/577/head
Armon Dadgar 2014-12-10 23:39:57 -08:00
parent b79be042d4
commit 019d511fe7
2 changed files with 79 additions and 38 deletions

View File

@ -78,6 +78,10 @@ type StateStore struct {
// is never questioned.
lockDelay map[string]time.Time
lockDelayLock sync.RWMutex
// GC is when we create tombstones to track their time-to-live.
// The GC is consumed upstream to manage clearing of tombstones.
gc *TombstoneGC
}
// StateSnapshot is used to provide a point-in-time snapshot
@ -104,18 +108,18 @@ func (s *StateSnapshot) Close() error {
}
// NewStateStore is used to create a new state store
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) {
// Create a new temp dir
path, err := ioutil.TempDir("", "consul")
if err != nil {
return nil, err
}
return NewStateStorePath(path, logOutput)
return NewStateStorePath(gc, path, logOutput)
}
// NewStateStorePath is used to create a new state store at a given path
// The path is cleared on closing.
func NewStateStorePath(path string, logOutput io.Writer) (*StateStore, error) {
func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) {
// Open the env
env, err := mdb.NewEnv()
if err != nil {
@ -1203,7 +1207,7 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
// kvsDeleteWithIndex does a delete with either the id or id_prefix
func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error {
tx, err := s.kvsTable.StartTxn(false, nil)
tx, err := s.tables.StartTxn(false)
if err != nil {
return err
}
@ -1213,50 +1217,51 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts .
// kvsDeleteWithIndexTxn does a delete within an existing transaction
func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error {
// Create the appropriate tombstone entries
streamCh := make(chan interface{}, 128)
doneCh := make(chan struct{})
var tombstoneErr error
go s.kvsTombstoneEntries(index, tx, streamCh, doneCh, &tombstoneErr)
err := s.kvsTable.StreamTxn(streamCh, tx, tableIndex, parts...)
<-doneCh
if err != nil {
return err
}
if tombstoneErr != nil {
return tombstoneErr
}
num := 0
for {
// Get some number of entries to delete
pairs, err := s.kvsTable.GetTxnLimit(tx, 128, tableIndex, parts...)
if err != nil {
return err
}
num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...)
if err != nil {
return err
// Create the tombstones and delete
for _, raw := range pairs {
ent := raw.(*structs.DirEntry)
ent.ModifyIndex = index // Update the index
ent.Value = nil // Reduce storage required
ent.Session = ""
if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil {
return err
}
if _, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil {
return err
}
}
// Increment the total number
num += len(pairs)
if len(pairs) == 0 {
break
}
}
if num > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
tx.Defer(func() {
s.watch[s.kvsTable].Notify()
if s.gc != nil {
// If GC is configured, then we hint that this index
// required expiration.
s.gc.Hint(index)
}
})
}
return tx.Commit()
}
// kvsTombstoneEntries is used to consume KVS entries over a stream
// and commit them as tombstones within a given transaction and index.
func (s *StateStore) kvsTombstoneEntries(index uint64, tx *MDBTxn, streamCh chan interface{}, doneCh chan struct{}, errOut *error) {
defer close(doneCh)
for raw := range streamCh {
ent := raw.(*structs.DirEntry)
ent.ModifyIndex = index
ent.Value = nil
ent.Session = ""
if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil {
s.logger.Printf("[ERR] consul.state: Failed to create tombstone for %s: %s", ent.Key, err)
*errOut = err
}
}
}
// KVSCheckAndSet is used to perform an atomic check-and-set
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
return s.kvsSet(index, d, kvCAS)

View File

@ -11,7 +11,7 @@ import (
)
func testStateStore() (*StateStore, error) {
return NewStateStore(os.Stderr)
return NewStateStore(nil, os.Stderr)
}
func TestEnsureRegistration(t *testing.T) {
@ -1413,6 +1413,14 @@ func TestKVSDelete(t *testing.T) {
}
defer store.Close()
ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran)
if err != nil {
t.Fatalf("err: %v", err)
}
store.gc = gc
// Create the entry
d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
@ -1435,6 +1443,16 @@ func TestKVSDelete(t *testing.T) {
if d != nil {
t.Fatalf("bad: %v", d)
}
// Check that we get a delete
select {
case idx := <-gc.ExpireCh():
if idx != 1020 {
t.Fatalf("bad %d", idx)
}
case <-time.After(20 * time.Millisecond):
t.Fatalf("should expire")
}
}
func TestKVSCheckAndSet(t *testing.T) {
@ -1737,6 +1755,14 @@ func TestKVSDeleteTree(t *testing.T) {
}
defer store.Close()
ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran)
if err != nil {
t.Fatalf("err: %v", err)
}
store.gc = gc
// Should not exist
err = store.KVSDeleteTree(1000, "/web")
if err != nil {
@ -1774,6 +1800,16 @@ func TestKVSDeleteTree(t *testing.T) {
if len(ents) != 0 {
t.Fatalf("bad: %v", ents)
}
// Check that we get a delete
select {
case idx := <-gc.ExpireCh():
if idx != 1010 {
t.Fatalf("bad %d", idx)
}
case <-time.After(20 * time.Millisecond):
t.Fatalf("should expire")
}
}
func TestSessionCreate(t *testing.T) {