diff --git a/consul/fsm.go b/consul/fsm.go index 0332cf6772..b6bde95e61 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -317,6 +317,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { return err } + case structs.TombstoneRequestType: + var req structs.DirEntry + if err := dec.Decode(&req); err != nil { + return err + } + if err := c.state.TombstoneRestore(&req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -357,6 +366,11 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + + if err := s.persistTombstones(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -462,6 +476,33 @@ func (s *consulSnapshot) persistKV(sink raft.SnapshotSink, } } +func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + streamCh := make(chan interface{}, 256) + errorCh := make(chan error) + go func() { + if err := s.state.TombstoneDump(streamCh); err != nil { + errorCh <- err + } + }() + + for { + select { + case raw := <-streamCh: + if raw == nil { + return nil + } + sink.Write([]byte{byte(structs.TombstoneRequestType)}) + if err := encoder.Encode(raw); err != nil { + return err + } + + case err := <-errorCh: + return err + } + } +} + func (s *consulSnapshot) Release() { s.state.Close() }