consul: Persist tombstones

pull/577/head
Armon Dadgar 2014-12-15 15:31:13 -08:00
parent bf74361fa7
commit 4492ad0ab4
1 changed files with 41 additions and 0 deletions

View File

@ -317,6 +317,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
return err
}
case structs.TombstoneRequestType:
var req structs.DirEntry
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.TombstoneRestore(&req); err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
@ -357,6 +366,11 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistTombstones(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
@ -462,6 +476,33 @@ func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
}
}
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.TombstoneDump(streamCh); err != nil {
errorCh <- err
}
}()
for {
select {
case raw := <-streamCh:
if raw == nil {
return nil
}
sink.Write([]byte{byte(structs.TombstoneRequestType)})
if err := encoder.Encode(raw); err != nil {
return err
}
case err := <-errorCh:
return err
}
}
}
func (s *consulSnapshot) Release() {
s.state.Close()
}