|
|
|
@ -213,6 +213,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|
|
|
|
}
|
|
|
|
|
c.applyRegister(&req, header.LastIndex)
|
|
|
|
|
|
|
|
|
|
case structs.KVSRequestType:
|
|
|
|
|
var req structs.DirEntry
|
|
|
|
|
if err := dec.Decode(&req); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := c.state.KVSSet(req.CreateIndex, &req); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
|
|
|
|
}
|
|
|
|
@ -276,6 +285,21 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Enable GC of the ndoes
|
|
|
|
|
nodes = nil
|
|
|
|
|
|
|
|
|
|
// Dump the KVS entries
|
|
|
|
|
dirents := s.state.KVSDump()
|
|
|
|
|
for _, ent := range dirents {
|
|
|
|
|
// Register the node itself
|
|
|
|
|
sink.Write([]byte{byte(structs.KVSRequestType)})
|
|
|
|
|
if err := encoder.Encode(ent); err != nil {
|
|
|
|
|
sink.Cancel()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|