consul: Avoid loading all KV pairs during a snapshot

pull/17/head
Armon Dadgar 2014-04-01 11:55:25 -07:00
parent 8172526801
commit 9473bbe7bf
3 changed files with 63 additions and 19 deletions

View File

@ -290,11 +290,30 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
nodes = nil
// Dump the KVS entries
dirents := s.state.KVSDump()
for _, ent := range dirents {
// Register the node itself
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(ent); err != nil {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.KVSDump(streamCh); err != nil {
errorCh <- err
}
}()
OUTER:
for {
select {
case raw := <-streamCh:
if raw == nil {
break OUTER
}
ent := raw.(*structs.DirEntry)
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(ent); err != nil {
sink.Cancel()
return err
}
case err := <-errorCh:
sink.Cancel()
return err
}

View File

@ -923,16 +923,9 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks {
return checks
}
// KVSDump is used to list all KV entries
func (s *StateSnapshot) KVSDump() structs.DirEntries {
res, err := s.store.kvsTable.GetTxn(s.tx, "id")
if err != nil {
s.store.logger.Printf("[ERR] consul.state: Failed to get KVS entries: %v", err)
return nil
}
ents := make(structs.DirEntries, len(res))
for idx, r := range res {
ents[idx] = r.(*structs.DirEntry)
}
return ents
// KVSDump is used to list all KV entries. It takes a channel and streams
// back *struct.DirEntry objects. This will block and should be invoked
// in a goroutine.
func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
}

View File

@ -602,7 +602,23 @@ func TestStoreSnapshot(t *testing.T) {
}
// Check we have the entries
ents := snap.KVSDump()
streamCh := make(chan interface{}, 64)
doneCh := make(chan struct{})
var ents []*structs.DirEntry
go func() {
for {
obj := <-streamCh
if obj == nil {
close(doneCh)
return
}
ents = append(ents, obj.(*structs.DirEntry))
}
}()
if err := snap.KVSDump(streamCh); err != nil {
t.Fatalf("err: %v", err)
}
<-doneCh
if len(ents) != 2 {
t.Fatalf("missing KVS entries!")
}
@ -661,7 +677,23 @@ func TestStoreSnapshot(t *testing.T) {
}
// Check we have the entries
ents = snap.KVSDump()
streamCh = make(chan interface{}, 64)
doneCh = make(chan struct{})
ents = nil
go func() {
for {
obj := <-streamCh
if obj == nil {
close(doneCh)
return
}
ents = append(ents, obj.(*structs.DirEntry))
}
}()
if err := snap.KVSDump(streamCh); err != nil {
t.Fatalf("err: %v", err)
}
<-doneCh
if len(ents) != 2 {
t.Fatalf("missing KVS entries!")
}