|
|
|
@ -244,8 +244,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error {
|
|
|
|
|
} |
|
|
|
|
r.setCurrentTerm(1) |
|
|
|
|
r.setLastLog(entry.Index, entry.Term) |
|
|
|
|
r.processConfigurationLogEntry(&entry) |
|
|
|
|
return nil |
|
|
|
|
return r.processConfigurationLogEntry(&entry) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// runCandidate runs the FSM for a candidate.
|
|
|
|
@ -1383,7 +1382,13 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
|
|
|
|
|
|
|
|
|
|
// Handle any new configuration changes
|
|
|
|
|
for _, newEntry := range newEntries { |
|
|
|
|
r.processConfigurationLogEntry(newEntry) |
|
|
|
|
if err := r.processConfigurationLogEntry(newEntry); err != nil { |
|
|
|
|
r.logger.Warn("failed to append entry", |
|
|
|
|
"index", newEntry.Index, |
|
|
|
|
"error", err) |
|
|
|
|
rpcErr = err |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Update the lastLog
|
|
|
|
@ -1415,14 +1420,21 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
|
|
|
|
|
// processConfigurationLogEntry takes a log entry and updates the latest
|
|
|
|
|
// configuration if the entry results in a new configuration. This must only be
|
|
|
|
|
// called from the main thread, or from NewRaft() before any threads have begun.
|
|
|
|
|
func (r *Raft) processConfigurationLogEntry(entry *Log) { |
|
|
|
|
if entry.Type == LogConfiguration { |
|
|
|
|
func (r *Raft) processConfigurationLogEntry(entry *Log) error { |
|
|
|
|
switch entry.Type { |
|
|
|
|
case LogConfiguration: |
|
|
|
|
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) |
|
|
|
|
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index) |
|
|
|
|
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { |
|
|
|
|
|
|
|
|
|
case LogAddPeerDeprecated, LogRemovePeerDeprecated: |
|
|
|
|
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) |
|
|
|
|
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index) |
|
|
|
|
conf, err := decodePeers(entry.Data, r.trans) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
r.setLatestConfiguration(conf, entry.Index) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// requestVote is invoked when we get an request vote RPC call.
|
|
|
|
@ -1574,7 +1586,11 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
|
|
|
|
|
reqConfiguration = DecodeConfiguration(req.Configuration) |
|
|
|
|
reqConfigurationIndex = req.ConfigurationIndex |
|
|
|
|
} else { |
|
|
|
|
reqConfiguration = decodePeers(req.Peers, r.trans) |
|
|
|
|
reqConfiguration, rpcErr = decodePeers(req.Peers, r.trans) |
|
|
|
|
if rpcErr != nil { |
|
|
|
|
r.logger.Error("failed to install snapshot", "error", rpcErr) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
reqConfigurationIndex = req.LastLogIndex |
|
|
|
|
} |
|
|
|
|
version := getSnapshotVersion(r.protocolVersion) |
|
|
|
|