Vendors first stage branch of the v2 Raft library.

pull/2222/head
James Phillips 2016-08-08 19:18:43 -07:00
parent 9803c03461
commit cc1f709333
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
30 changed files with 3016 additions and 1708 deletions

View File

@ -1,23 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test

View File

@ -1,14 +0,0 @@
language: go
go:
- 1.2
- tip
install: make deps
script:
- make integ
notifications:
flowdock:
secure: fZrcf9rlh2IrQrlch1sHkn3YI7SKvjGnAl/zyV5D6NROe1Bbr6d3QRMuCXWWdhJHzjKmXk5rIzbqJhUc0PNF7YjxGNKSzqWMQ56KcvN1k8DzlqxpqkcA3Jbs6fXCWo2fssRtZ7hj/wOP1f5n6cc7kzHDt9dgaYJ6nO2fqNPJiTc=

View File

@ -1,7 +1,7 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
test:
go test -timeout=5s ./...
go test -timeout=45s ./...
integ: test
INTEG_TESTS=yes go test -timeout=3s -run=Integ ./...

View File

@ -24,7 +24,7 @@ go version
For complete documentation, see the associated [Godoc](http://godoc.org/github.com/hashicorp/raft).
To prevent complications with cgo, the primary backend `MDBStore` is in a separate repositoy,
To prevent complications with cgo, the primary backend `MDBStore` is in a separate repository,
called [raft-mdb](http://github.com/hashicorp/raft-mdb). That is the recommended implementation
for the `LogStore` and `StableStore`.

935
vendor/github.com/hashicorp/raft/api.go generated vendored Normal file
View File

@ -0,0 +1,935 @@
package raft
import (
"errors"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"
"github.com/armon/go-metrics"
)
var (
// ErrLeader is returned when an operation can't be completed on a
// leader node.
ErrLeader = errors.New("node is the leader")
// ErrNotLeader is returned when an operation can't be completed on a
// follower or candidate node.
ErrNotLeader = errors.New("node is not the leader")
// ErrLeadershipLost is returned when a leader fails to commit a log entry
// because it's been deposed in the process.
ErrLeadershipLost = errors.New("leadership lost while committing log")
// ErrRaftShutdown is returned when operations are requested against an
// inactive Raft.
ErrRaftShutdown = errors.New("raft is already shutdown")
// ErrEnqueueTimeout is returned when a command fails due to a timeout.
ErrEnqueueTimeout = errors.New("timed out enqueuing operation")
// ErrNothingNewToSnapshot is returned when trying to create a snapshot
// but there's nothing new commited to the FSM since we started.
ErrNothingNewToSnapshot = errors.New("nothing new to snapshot")
// ErrUnsupportedProtocol is returned when an operation is attempted
// that's not supported by the current protocol version.
ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version")
// ErrCantBootstrap is returned when attempt is made to bootstrap a
// cluster that already has state present.
ErrCantBootstrap = errors.New("bootstrap only works on new clusters")
)
// Raft implements a Raft node.
type Raft struct {
raftState
// protocolVersion is used to inter-operate with Raft servers running
// different versions of the library. See comments in config.go for more
// details.
protocolVersion ProtocolVersion
// applyCh is used to async send logs to the main thread to
// be committed and applied to the FSM.
applyCh chan *logFuture
// Configuration provided at Raft initialization
conf Config
// FSM is the client state machine to apply commands to
fsm FSM
// fsmCommitCh is used to trigger async application of logs to the fsm
fsmCommitCh chan commitTuple
// fsmRestoreCh is used to trigger a restore from snapshot
fsmRestoreCh chan *restoreFuture
// fsmSnapshotCh is used to trigger a new snapshot being taken
fsmSnapshotCh chan *reqSnapshotFuture
// lastContact is the last time we had contact from the
// leader node. This can be used to gauge staleness.
lastContact time.Time
lastContactLock sync.RWMutex
// Leader is the current cluster leader
leader ServerAddress
leaderLock sync.RWMutex
// leaderCh is used to notify of leadership changes
leaderCh chan bool
// leaderState used only while state is leader
leaderState leaderState
// Stores our local server ID, used to avoid sending RPCs to ourself
localID ServerID
// Stores our local addr
localAddr ServerAddress
// Used for our logging
logger *log.Logger
// LogStore provides durable storage for logs
logs LogStore
// Used to request the leader to make configuration changes.
configurationChangeCh chan *configurationChangeFuture
// Tracks the latest configuration and latest committed configuration from
// the log/snapshot.
configurations configurations
// RPC chan comes from the transport layer
rpcCh <-chan RPC
// Shutdown channel to exit, protected to prevent concurrent exits
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
// snapshots is used to store and retrieve snapshots
snapshots SnapshotStore
// snapshotCh is used for user triggered snapshots
snapshotCh chan *snapshotFuture
// stable is a StableStore implementation for durable state
// It provides stable storage for many fields in raftState
stable StableStore
// The transport layer we use
trans Transport
// verifyCh is used to async send verify futures to the main thread
// to verify we are still the leader
verifyCh chan *verifyFuture
// configurationsCh is used to get the configuration data safely from
// outside of the main thread.
configurationsCh chan *configurationsFuture
// bootstrapCh is used to attempt an initial bootstrap from outside of
// the main thread.
bootstrapCh chan *bootstrapFuture
// List of observers and the mutex that protects them. The observers list
// is indexed by an artificial ID which is used for deregistration.
observersLock sync.RWMutex
observers map[uint64]*Observer
}
// BootstrapCluster initializes a server's storage with the given cluster
// configuration. This should only be called at the beginning of time for the
// cluster, and you absolutely must make sure that you call it with the same
// configuration on all the Voter servers. There is no need to bootstrap
// Nonvoter and Staging servers.
//
// One sane approach is to boostrap a single server with a configuration
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
}
// Sanity check the Raft peer configuration.
if err := checkConfiguration(configuration); err != nil {
return err
}
// Make sure the cluster is in a clean state.
hasState, err := HasExistingState(logs, stable, snaps)
if err != nil {
return fmt.Errorf("failed to check for existing state: %v", err)
}
if hasState {
return ErrCantBootstrap
}
// Set current term to 1.
if err := stable.SetUint64(keyCurrentTerm, 1); err != nil {
return fmt.Errorf("failed to save current term: %v", err)
}
// Append configuration entry to log.
entry := &Log{
Index: 1,
Term: 1,
}
if conf.ProtocolVersion < 3 {
entry.Type = LogRemovePeerDeprecated
entry.Data = encodePeers(configuration, trans)
} else {
entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration)
}
if err := logs.StoreLog(entry); err != nil {
return fmt.Errorf("failed to append configuration entry to log: %v", err)
}
return nil
}
// RecoverCluster is used to manually force a new configuration in order to
// recover from a loss of quorum where the current configuration cannot be
// restored, such as when several servers die at the same time. This works by
// reading all the current state for this server, creating a snapshot with the
// supplied configuration, and then truncating the Raft log. This is the only
// safe way to force a given configuration without actually altering the log to
// insert any new entries, which could cause conflicts with other servers with
// different state.
//
// WARNING! This operation implicitly commits all entries in the Raft log, so
// in general this is an extremely unsafe operation. If you've lost your other
// servers and are performing a manual recovery, then you've also lost the
// commit information, so this is likely the best you can do, but you should be
// aware that calling this can cause Raft log entries that were in the process
// of being replicated but not yet be committed to be committed.
//
// Note the FSM passed here is used for the snapshot operations and will be
// left in a state that should not be used by the application. Be sure to
// discard this FSM and any associated state and provide a fresh one when
// calling NewRaft later.
//
// A typical way to recover the cluster is to shut down all servers and then
// run RecoverCluster on every server using an identical configuration. When
// the cluster is then restarted, and election should occur and then Raft will
// resume normal operation. If it's desired to make a particular server the
// leader, this can be used to inject a new configuration with that server as
// the sole voter, and then join up other new clean-state peer servers using
// the usual APIs in order to bring the cluster back into a known state.
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
}
// Sanity check the Raft peer configuration.
if err := checkConfiguration(configuration); err != nil {
return err
}
// Refuse to recover if there's no existing state. This would be safe to
// do, but it is likely an indication of an operator error where they
// expect data to be there and it's not. By refusing, we force them
// to show intent to start a cluster fresh by explicitly doing a
// bootstrap, rather than quietly fire up a fresh cluster here.
hasState, err := HasExistingState(logs, stable, snaps)
if err != nil {
return fmt.Errorf("failed to check for existing state: %v", err)
}
if !hasState {
return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error")
}
// Attempt to restore any snapshots we find, newest to oldest.
var snapshotIndex uint64
var snapshotTerm uint64
snapshots, err := snaps.List()
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
}
for _, snapshot := range snapshots {
_, source, err := snaps.Open(snapshot.ID)
if err != nil {
// Skip this one and try the next. We will detect if we
// couldn't open any snapshots.
continue
}
defer source.Close()
if err := fsm.Restore(source); err != nil {
// Same here, skip and try the next one.
continue
}
snapshotIndex = snapshot.Index
snapshotTerm = snapshot.Term
break
}
if len(snapshots) > 0 && (snapshotIndex == 0 || snapshotTerm == 0) {
return fmt.Errorf("failed to restore any of the available snapshots")
}
// The snapshot information is the best known end point for the data
// until we play back the Raft log entries.
lastIndex := snapshotIndex
lastTerm := snapshotTerm
// Apply any Raft log entries past the snapshot.
lastLogIndex, err := logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to find last log: %v", err)
}
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
var entry Log
if err := logs.GetLog(index, &entry); err != nil {
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == LogCommand {
_ = fsm.Apply(&entry)
}
lastIndex = entry.Index
lastTerm = entry.Term
}
// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
snapshot, err := fsm.Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot FSM: %v", err)
}
version := getSnapshotVersion(conf.ProtocolVersion)
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
if err := snapshot.Persist(sink); err != nil {
return fmt.Errorf("failed to persist snapshot: %v", err)
}
if err := sink.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
}
// Compact the log so that we don't get bad interference from any
// configuration change log entries that might be there.
firstLogIndex, err := logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
}
if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
}
return nil
}
// HasExistingState returns true if the server has any existing state (logs,
// knowledge of a current term, or any snapshots).
func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error) {
// Make sure we don't have a current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
if err == nil {
if currentTerm > 0 {
return true, nil
}
} else {
if err.Error() != "not found" {
return false, fmt.Errorf("failed to read current term: %v", err)
}
}
// Make sure we have an empty log.
lastIndex, err := logs.LastIndex()
if err != nil {
return false, fmt.Errorf("failed to get last log index: %v", err)
}
if lastIndex > 0 {
return true, nil
}
// Make sure we have no snapshots
snapshots, err := snaps.List()
if err != nil {
return false, fmt.Errorf("failed to list snapshots: %v", err)
}
if len(snapshots) > 0 {
return true, nil
}
return false, nil
}
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
// Validate the configuration.
if err := ValidateConfig(conf); err != nil {
return nil, err
}
// Ensure we have a LogOutput.
var logger *log.Logger
if conf.Logger != nil {
logger = conf.Logger
} else {
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}
logger = log.New(conf.LogOutput, "", log.LstdFlags)
}
// Try to restore the current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
if err != nil && err.Error() != "not found" {
return nil, fmt.Errorf("failed to load current term: %v", err)
}
// Read the index of the last log entry.
lastIndex, err := logs.LastIndex()
if err != nil {
return nil, fmt.Errorf("failed to find last log: %v", err)
}
// Get the last log entry.
var lastLog Log
if lastIndex > 0 {
if err = logs.GetLog(lastIndex, &lastLog); err != nil {
return nil, fmt.Errorf("failed to get last log at index %d: %v", lastIndex, err)
}
}
// Make sure we have a valid server address and ID.
protocolVersion := conf.ProtocolVersion
localAddr := ServerAddress(trans.LocalAddr())
localID := conf.LocalID
// TODO (slackpad) - When we deprecate protocol version 2, remove this
// along with the AddPeer() and RemovePeer() APIs.
if protocolVersion < 3 && string(localID) != string(localAddr) {
return nil, fmt.Errorf("when running with ProtocolVersion < 3, LocalID must be set to the network address")
}
// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmCommitCh: make(chan commitTuple, 128),
fsmRestoreCh: make(chan *restoreFuture),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
rpcCh: trans.Consumer(),
snapshots: snaps,
snapshotCh: make(chan *snapshotFuture),
shutdownCh: make(chan struct{}),
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
}
// Initialize as a follower.
r.setState(Follower)
// Start as leader if specified. This should only be used
// for testing purposes.
if conf.StartAsLeader {
r.setState(Leader)
r.setLeader(r.localAddr)
}
// Restore the current term and the last log.
r.setCurrentTerm(currentTerm)
r.setLastLog(lastLog.Index, lastLog.Term)
// Attempt to restore a snapshot if there are any.
if err := r.restoreSnapshot(); err != nil {
return nil, err
}
// Scan through the log for any configuration change entries.
snapshotIndex, _ := r.getLastSnapshot()
for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
var entry Log
if err := r.logs.GetLog(index, &entry); err != nil {
r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", index, err)
panic(err)
}
r.processConfigurationLogEntry(&entry)
}
r.logger.Printf("[INFO] raft: Initial configuration (index=%d): %+v",
r.configurations.latestIndex, r.configurations.latest.Servers)
// Setup a heartbeat fast-path to avoid head-of-line
// blocking where possible. It MUST be safe for this
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)
// Start the background work.
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
return r, nil
}
// restoreSnapshot attempts to restore the latest snapshots, and fails if none
// of them can be restored. This is called at initialization time, and is
// completely unsafe to call at any other time.
func (r *Raft) restoreSnapshot() error {
snapshots, err := r.snapshots.List()
if err != nil {
r.logger.Printf("[ERR] raft: Failed to list snapshots: %v", err)
return err
}
// Try to load in order of newest to oldest
for _, snapshot := range snapshots {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Printf("[ERR] raft: Failed to open snapshot %v: %v", snapshot.ID, err)
continue
}
defer source.Close()
if err := r.fsm.Restore(source); err != nil {
r.logger.Printf("[ERR] raft: Failed to restore snapshot %v: %v", snapshot.ID, err)
continue
}
// Log success
r.logger.Printf("[INFO] raft: Restored from snapshot %v", snapshot.ID)
// Update the lastApplied so we don't replay old logs
r.setLastApplied(snapshot.Index)
// Update the last stable snapshot info
r.setLastSnapshot(snapshot.Index, snapshot.Term)
// Update the configuration
if snapshot.Version > 0 {
r.configurations.committed = snapshot.Configuration
r.configurations.committedIndex = snapshot.ConfigurationIndex
r.configurations.latest = snapshot.Configuration
r.configurations.latestIndex = snapshot.ConfigurationIndex
} else {
configuration := decodePeers(snapshot.Peers, r.trans)
r.configurations.committed = configuration
r.configurations.committedIndex = snapshot.Index
r.configurations.latest = configuration
r.configurations.latestIndex = snapshot.Index
}
// Success!
return nil
}
// If we had snapshots and failed to load them, its an error
if len(snapshots) > 0 {
return fmt.Errorf("failed to load any existing snapshots")
}
return nil
}
// BootstrapCluster is equivalent to non-member BootstrapCluster but can be
// called on an un-bootstrapped Raft instance after it has been created. This
// should only be called at the beginning of time for the cluster, and you
// absolutely must make sure that you call it with the same configuration on all
// the Voter servers. There is no need to bootstrap Nonvoter and Staging
// servers.
func (r *Raft) BootstrapCluster(configuration Configuration) Future {
bootstrapReq := &bootstrapFuture{}
bootstrapReq.init()
bootstrapReq.configuration = configuration
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.bootstrapCh <- bootstrapReq:
return bootstrapReq
}
}
// Leader is used to return the current leader of the cluster.
// It may return empty string if there is no current leader
// or the leader is unknown.
func (r *Raft) Leader() ServerAddress {
r.leaderLock.RLock()
leader := r.leader
r.leaderLock.RUnlock()
return leader
}
// Apply is used to apply a command to the FSM in a highly consistent
// manner. This returns a future that can be used to wait on the application.
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: cmd,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
// Barrier is used to issue a command that blocks until all preceeding
// operations have been applied to the FSM. It can be used to ensure the
// FSM reflects all queued writes. An optional timeout can be provided to
// limit the amount of time we wait for the command to be started. This
// must be run on the leader or it will fail.
func (r *Raft) Barrier(timeout time.Duration) Future {
metrics.IncrCounter([]string{"raft", "barrier"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogBarrier,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
// VerifyLeader is used to ensure the current node is still
// the leader. This can be done to prevent stale reads when a
// new leader has potentially been elected.
func (r *Raft) VerifyLeader() Future {
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
verifyFuture := &verifyFuture{}
verifyFuture.init()
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.verifyCh <- verifyFuture:
return verifyFuture
}
}
// GetConfiguration returns the latest configuration and its associated index
// currently in use. This may not yet be committed. This must not be called on
// the main thread (which can access the information directly).
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
select {
case <-r.shutdownCh:
configReq.respond(ErrRaftShutdown)
return configReq
case r.configurationsCh <- configReq:
return configReq
}
}
// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
// run on the leader or it will fail. Use AddVoter/AddNonvoter instead.
func (r *Raft) AddPeer(peer ServerAddress) Future {
if r.protocolVersion > 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: AddStaging,
serverID: ServerID(peer),
serverAddress: peer,
prevIndex: 0,
}, 0)
}
// RemovePeer (deprecated) is used to remove a peer from the cluster. If the
// current leader is being removed, it will cause a new election
// to occur. This must be run on the leader or it will fail.
// Use RemoveServer instead.
func (r *Raft) RemovePeer(peer ServerAddress) Future {
if r.protocolVersion > 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: RemoveServer,
serverID: ServerID(peer),
prevIndex: 0,
}, 0)
}
// AddVoter will add the given server to the cluster as a staging server. If the
// server is already in the cluster as a voter, this does nothing. This must be
// run on the leader or it will fail. The leader will promote the staging server
// to a voter once that server is ready. If nonzero, prevIndex is the index of
// the only configuration upon which this change may be applied; if another
// configuration entry has been added in the meantime, this request will fail.
// If nonzero, timeout is how long this server should wait before the
// configuration change log entry is appended.
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: AddStaging,
serverID: id,
serverAddress: address,
prevIndex: prevIndex,
}, timeout)
}
// AddNonvoter will add the given server to the cluster but won't assign it a
// vote. The server will receive log entries, but it won't participate in
// elections or log entry commitment. If the server is already in the cluster as
// a staging server or voter, this does nothing. This must be run on the leader
// or it will fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: AddNonvoter,
serverID: id,
serverAddress: address,
prevIndex: prevIndex,
}, timeout)
}
// RemoveServer will remove the given server from the cluster. If the current
// leader is being removed, it will cause a new election to occur. This must be
// run on the leader or it will fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: RemoveServer,
serverID: id,
prevIndex: prevIndex,
}, timeout)
}
// DemoteVoter will take away a server's vote, if it has one. If present, the
// server will continue to receive log entries, but it won't participate in
// elections or log entry commitment. If the server is not in the cluster, this
// does nothing. This must be run on the leader or it will fail. For prevIndex
// and timeout, see AddVoter.
func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: DemoteVoter,
serverID: id,
prevIndex: prevIndex,
}, timeout)
}
// Shutdown is used to stop the Raft background routines.
// This is not a graceful operation. Provides a future that
// can be used to block until all background routines have exited.
func (r *Raft) Shutdown() Future {
r.shutdownLock.Lock()
defer r.shutdownLock.Unlock()
if !r.shutdown {
close(r.shutdownCh)
r.shutdown = true
r.setState(Shutdown)
return &shutdownFuture{r}
}
// avoid closing transport twice
return &shutdownFuture{nil}
}
// Snapshot is used to manually force Raft to take a snapshot.
// Returns a future that can be used to block until complete.
func (r *Raft) Snapshot() Future {
snapFuture := &snapshotFuture{}
snapFuture.init()
select {
case r.snapshotCh <- snapFuture:
return snapFuture
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
}
}
// State is used to return the current raft state.
func (r *Raft) State() RaftState {
return r.getState()
}
// LeaderCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it. The channel is not buffered,
// and does not block on writes.
func (r *Raft) LeaderCh() <-chan bool {
return r.leaderCh
}
// String returns a string representation of this Raft node.
func (r *Raft) String() string {
return fmt.Sprintf("Node at %s [%v]", r.localAddr, r.getState())
}
// LastContact returns the time of last contact by a leader.
// This only makes sense if we are currently a follower.
func (r *Raft) LastContact() time.Time {
r.lastContactLock.RLock()
last := r.lastContact
r.lastContactLock.RUnlock()
return last
}
// Stats is used to return a map of various internal stats. This
// should only be used for informative purposes or debugging.
//
// Keys are: "state", "term", "last_log_index", "last_log_term",
// "commit_index", "applied_index", "fsm_pending",
// "last_snapshot_index", "last_snapshot_term",
// "latest_configuration", "last_contact", and "num_peers".
//
// The value of "state" is a numerical value representing a
// RaftState const.
//
// The value of "latest_configuration" is a string which contains
// the id of each server, its suffrage status, and its address.
//
// The value of "last_contact" is either "never" if there
// has been no contact with a leader, "0" if the node is in the
// leader state, or the time since last contact with a leader
// formatted as a string.
//
// The value of "num_peers" is the number of other voting servers in the
// cluster, not including this node. If this node isn't part of the
// configuration then this will be "0".
//
// All other values are uint64s, formatted as strings.
func (r *Raft) Stats() map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
lastLogIndex, lastLogTerm := r.getLastLog()
lastSnapIndex, lastSnapTerm := r.getLastSnapshot()
s := map[string]string{
"state": r.getState().String(),
"term": toString(r.getCurrentTerm()),
"last_log_index": toString(lastLogIndex),
"last_log_term": toString(lastLogTerm),
"commit_index": toString(r.getCommitIndex()),
"applied_index": toString(r.getLastApplied()),
"fsm_pending": toString(uint64(len(r.fsmCommitCh))),
"last_snapshot_index": toString(lastSnapIndex),
"last_snapshot_term": toString(lastSnapTerm),
"protocol_version": toString(uint64(r.protocolVersion)),
"protocol_version_min": toString(uint64(ProtocolVersionMin)),
"protocol_version_max": toString(uint64(ProtocolVersionMax)),
"snapshot_version_min": toString(uint64(SnapshotVersionMin)),
"snapshot_version_max": toString(uint64(SnapshotVersionMax)),
}
future := r.GetConfiguration()
if err := future.Error(); err != nil {
r.logger.Printf("[WARN] raft: could not get configuration for Stats: %v", err)
} else {
configuration := future.Configuration()
s["latest_configuration_index"] = toString(future.Index())
s["latest_configuration"] = fmt.Sprintf("%+v", configuration.Servers)
// This is a legacy metric that we've seen people use in the wild.
hasUs := false
numPeers := 0
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
if server.ID == r.localID {
hasUs = true
} else {
numPeers++
}
}
}
if !hasUs {
numPeers = 0
}
s["num_peers"] = toString(uint64(numPeers))
}
last := r.LastContact()
if last.IsZero() {
s["last_contact"] = "never"
} else if r.getState() == Leader {
s["last_contact"] = "0"
} else {
s["last_contact"] = fmt.Sprintf("%v", time.Now().Sub(last))
}
return s
}
// LastIndex returns the last index in stable storage,
// either from the last log or from the last snapshot.
func (r *Raft) LastIndex() uint64 {
return r.getLastIndex()
}
// AppliedIndex returns the last index applied to the FSM. This is generally
// lagging behind the last index, especially for indexes that are persisted but
// have not yet been considered committed by the leader. NOTE - this reflects
// the last index that was sent to the application's FSM over the apply channel
// but DOES NOT mean that the application's FSM has yet consumed it and applied
// it to its internal state. Thus, the application's state may lag behind this
// index.
func (r *Raft) AppliedIndex() uint64 {
return r.getLastApplied()
}

View File

@ -1,8 +1,25 @@
package raft
// RPCHeader is a common sub-structure used to pass along protocol version and
// other information about the cluster. For older Raft implementations before
// versioning was added this will default to a zero-valued structure when read
// by newer Raft versions.
type RPCHeader struct {
// ProtocolVersion is the version of the protocol the sender is
// speaking.
ProtocolVersion ProtocolVersion
}
// WithRPCHeader is an interface that exposes the RPC header.
type WithRPCHeader interface {
GetRPCHeader() RPCHeader
}
// AppendEntriesRequest is the command used to append entries to the
// replicated log.
type AppendEntriesRequest struct {
RPCHeader
// Provide the current term and leader
Term uint64
Leader []byte
@ -18,9 +35,16 @@ type AppendEntriesRequest struct {
LeaderCommitIndex uint64
}
// See WithRPCHeader.
func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
// AppendEntriesResponse is the response returned from an
// AppendEntriesRequest.
type AppendEntriesResponse struct {
RPCHeader
// Newer term if leader is out of date
Term uint64
@ -35,9 +59,16 @@ type AppendEntriesResponse struct {
NoRetryBackoff bool
}
// See WithRPCHeader.
func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
// RequestVoteRequest is the command used by a candidate to ask a Raft peer
// for a vote in an election.
type RequestVoteRequest struct {
RPCHeader
// Provide the term and our id
Term uint64
Candidate []byte
@ -47,21 +78,38 @@ type RequestVoteRequest struct {
LastLogTerm uint64
}
// See WithRPCHeader.
func (r *RequestVoteRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
// RequestVoteResponse is the response returned from a RequestVoteRequest.
type RequestVoteResponse struct {
// Newer term if leader is out of date
RPCHeader
// Newer term if leader is out of date.
Term uint64
// Return the peers, so that a node can shutdown on removal
// Peers is deprecated, but required by servers that only understand
// protocol version 0. This is not populated in protocol version 2
// and later.
Peers []byte
// Is the vote granted
// Is the vote granted.
Granted bool
}
// See WithRPCHeader.
func (r *RequestVoteResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
// InstallSnapshotRequest is the command sent to a Raft peer to bootstrap its
// log (and state machine) from a snapshot on another peer.
type InstallSnapshotRequest struct {
RPCHeader
SnapshotVersion SnapshotVersion
Term uint64
Leader []byte
@ -69,16 +117,35 @@ type InstallSnapshotRequest struct {
LastLogIndex uint64
LastLogTerm uint64
// Peer Set in the snapshot
// Peer Set in the snapshot. This is deprecated in favor of Configuration
// but remains here in case we receive an InstallSnapshot from a leader
// that's running old code.
Peers []byte
// Cluster membership.
Configuration []byte
// Log index where 'Configuration' entry was originally written.
ConfigurationIndex uint64
// Size of the snapshot
Size int64
}
// See WithRPCHeader.
func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
// InstallSnapshotResponse is the response returned from an
// InstallSnapshotRequest.
type InstallSnapshotResponse struct {
RPCHeader
Term uint64
Success bool
}
// See WithRPCHeader.
func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

101
vendor/github.com/hashicorp/raft/commitment.go generated vendored Normal file
View File

@ -0,0 +1,101 @@
package raft
import (
"sort"
"sync"
)
// Commitment is used to advance the leader's commit index. The leader and
// replication goroutines report in newly written entries with Match(), and
// this notifies on commitCh when the commit index has advanced.
type commitment struct {
// protectes matchIndexes and commitIndex
sync.Mutex
// notified when commitIndex increases
commitCh chan struct{}
// voter ID to log index: the server stores up through this log entry
matchIndexes map[ServerID]uint64
// a quorum stores up through this log entry. monotonically increases.
commitIndex uint64
// the first index of this leader's term: this needs to be replicated to a
// majority of the cluster before this leader may mark anything committed
// (per Raft's commitment rule)
startIndex uint64
}
// newCommitment returns an commitment struct that notifies the provided
// channel when log entries have been committed. A new commitment struct is
// created each time this server becomes leader for a particular term.
// 'configuration' is the servers in the cluster.
// 'startIndex' is the first index created in this term (see
// its description above).
func newCommitment(commitCh chan struct{}, configuration Configuration, startIndex uint64) *commitment {
matchIndexes := make(map[ServerID]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
matchIndexes[server.ID] = 0
}
}
return &commitment{
commitCh: commitCh,
matchIndexes: matchIndexes,
commitIndex: 0,
startIndex: startIndex,
}
}
// Called when a new cluster membership configuration is created: it will be
// used to determine commitment from now on. 'configuration' is the servers in
// the cluster.
func (c *commitment) setConfiguration(configuration Configuration) {
c.Lock()
defer c.Unlock()
oldMatchIndexes := c.matchIndexes
c.matchIndexes = make(map[ServerID]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
c.matchIndexes[server.ID] = oldMatchIndexes[server.ID] // defaults to 0
}
}
c.recalculate()
}
// Called by leader after commitCh is notified
func (c *commitment) getCommitIndex() uint64 {
c.Lock()
defer c.Unlock()
return c.commitIndex
}
// Match is called once a server completes writing entries to disk: either the
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server ServerID, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
c.matchIndexes[server] = matchIndex
c.recalculate()
}
}
// Internal helper to calculate new commitIndex from matchIndexes.
// Must be called with lock held.
func (c *commitment) recalculate() {
if len(c.matchIndexes) == 0 {
return
}
matched := make([]uint64, 0, len(c.matchIndexes))
for _, idx := range c.matchIndexes {
matched = append(matched, idx)
}
sort.Sort(uint64Slice(matched))
quorumMatchIndex := matched[(len(matched)-1)/2]
if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
c.commitIndex = quorumMatchIndex
asyncNotifyCh(c.commitCh)
}
}

View File

@ -7,18 +7,136 @@ import (
"time"
)
// Config provides any necessary configuration to
// the Raft server
// These are the versions of the protocol (which includes RPC messages as
// well as Raft-specific log entries) that this server can _understand_. Use
// the ProtocolVersion member of the Config object to control the version of
// the protocol to use when _speaking_ to other servers. Note that depending on
// the protocol version being spoken, some otherwise understood RPC messages
// may be refused. See dispositionRPC for details of this logic.
//
// There are notes about the upgrade path in the description of the versions
// below. If you are starting a fresh cluster then there's no reason not to
// jump right to the latest protocol version. If you need to interoperate with
// older, version 0 Raft servers you'll need to drive the cluster through the
// different versions in order.
//
// The version details are complicated, but here's a summary of what's required
// to get from a version 0 cluster to version 3:
//
// 1. In version N of your app that starts using the new Raft library with
// versioning, set ProtocolVersion to 1.
// 2. Make version N+1 of your app require version N as a prerequisite (all
// servers must be upgraded). For version N+1 of your app set ProtocolVersion
// to 2.
// 3. Similarly, make version N+2 of your app require version N+1 as a
// prerequisite. For version N+2 of your app, set ProtocolVersion to 3.
//
// During this upgrade, older cluster members will still have Server IDs equal
// to their network addresses. To upgrade an older member and give it an ID, it
// needs to leave the cluster and re-enter:
//
// 1. Remove the server from the cluster with RemoveServer, using its network
// address as its ServerID.
// 2. Update the server's config to a better ID (restarting the server).
// 3. Add the server back to the cluster with AddVoter, using its new ID.
//
// You can do this during the rolling upgrade from N+1 to N+2 of your app, or
// as a rolling change at any time after the upgrade.
//
// Version History
//
// 0: Original Raft library before versioning was added. Servers running this
// version of the Raft library use AddPeerDeprecated/RemovePeerDeprecated
// for all configuration changes, and have no support for LogConfiguration.
// 1: First versioned protocol, used to interoperate with old servers, and begin
// the migration path to newer versions of the protocol. Under this version
// all configuration changes are propagated using the now-deprecated
// RemovePeerDeprecated Raft log entry. This means that server IDs are always
// set to be the same as the server addresses (since the old log entry type
// cannot transmit an ID), and only AddPeer/RemovePeer APIs are supported.
// Servers running this version of the protocol can understand the new
// LogConfiguration Raft log entry but will never generate one so they can
// remain compatible with version 0 Raft servers in the cluster.
// 2: Transitional protocol used when migrating an existing cluster to the new
// server ID system. Server IDs are still set to be the same as server
// addresses, but all configuration changes are propagated using the new
// LogConfiguration Raft log entry type, which can carry full ID information.
// This version supports the old AddPeer/RemovePeer APIs as well as the new
// ID-based AddVoter/RemoveServer APIs which should be used when adding
// version 3 servers to the cluster later. This version sheds all
// interoperability with version 0 servers, but can interoperate with newer
// Raft servers running with protocol version 1 since they can understand the
// new LogConfiguration Raft log entry, and this version can still understand
// their RemovePeerDeprecated Raft log entries. We need this protocol version
// as an intermediate step between 1 and 3 so that servers will propagate the
// ID information that will come from newly-added (or -rolled) servers using
// protocol version 3, but since they are still using their address-based IDs
// from the previous step they will still be able to track commitments and
// their own voting status properly. If we skipped this step, servers would
// be started with their new IDs, but they wouldn't see themselves in the old
// address-based configuration, so none of the servers would think they had a
// vote.
// 3: Protocol adding full support for server IDs and new ID-based server APIs
// (AddVoter, AddNonvoter, etc.), old AddPeer/RemovePeer APIs are no longer
// supported. Version 2 servers should be swapped out by removing them from
// the cluster one-by-one and re-adding them with updated configuration for
// this protocol version, along with their server ID. The remove/add cycle
// is required to populate their server ID. Note that removing must be done
// by ID, which will be the old server's address.
type ProtocolVersion int
const (
ProtocolVersionMin ProtocolVersion = 0
ProtocolVersionMax = 3
)
// These are versions of snapshots that this server can _understand_. Currently,
// it is always assumed that this server generates the latest version, though
// this may be changed in the future to include a configurable version.
//
// Version History
//
// 0: Original Raft library before versioning was added. The peers portion of
// these snapshots is encoded in the legacy format which requires decodePeers
// to parse. This version of snapshots should only be produced by the
// unversioned Raft library.
// 1: New format which adds support for a full configuration structure and its
// associated log index, with support for server IDs and non-voting server
// modes. To ease upgrades, this also includes the legacy peers structure but
// that will never be used by servers that understand version 1 snapshots.
// Since the original Raft library didn't enforce any versioning, we must
// include the legacy peers structure for this version, but we can deprecate
// it in the next snapshot version.
type SnapshotVersion int
const (
SnapshotVersionMin SnapshotVersion = 0
SnapshotVersionMax = 1
)
// Config provides any necessary configuration for the Raft server.
type Config struct {
// Time in follower state without a leader before we attempt an election.
// ProtocolVersion allows a Raft server to inter-operate with older
// Raft servers running an older version of the code. This is used to
// version the wire protocol as well as Raft-specific log entries that
// the server uses when _speaking_ to other servers. There is currently
// no auto-negotiation of versions so all servers must be manually
// configured with compatible versions. See ProtocolVersionMin and
// ProtocolVersionMax for the versions of the protocol that this server
// can _understand_.
ProtocolVersion ProtocolVersion
// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
HeartbeatTimeout time.Duration
// Time in candidate state without a leader before we attempt an election.
// ElectionTimeout specifies the time in candidate state without
// a leader before we attempt an election.
ElectionTimeout time.Duration
// Time without an Apply() operation before we heartbeat to ensure
// a timely commit. Due to random staggering, may be delayed as much as
// 2x this value.
// CommitTimeout controls the time without an Apply() operation
// before we heartbeat to ensure a timely commit. Due to random
// staggering, may be delayed as much as 2x this value.
CommitTimeout time.Duration
// MaxAppendEntries controls the maximum number of append entries
@ -33,13 +151,6 @@ type Config struct {
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool
// DisableBootstrapAfterElect is used to turn off EnableSingleNode
// after the node is elected. This is used to prevent self-election
// if the node is removed from the Raft cluster via RemovePeer. Setting
// it to false will keep the bootstrap mode, allowing the node to self-elect
// and potentially bootstrap a separate cluster.
DisableBootstrapAfterElect bool
// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
@ -55,11 +166,6 @@ type Config struct {
// just replay a small set of logs.
SnapshotThreshold uint64
// EnableSingleNode allows for a single node mode of operation. This
// is false by default, which prevents a lone node from electing itself.
// leader.
EnableSingleNode bool
// LeaderLeaseTimeout is used to control how long the "lease" lasts
// for being the leader without being able to contact a quorum
// of nodes. If we reach this interval without contact, we will
@ -70,6 +176,11 @@ type Config struct {
// never be used except for testing purposes, as it can cause a split-brain.
StartAsLeader bool
// The unique ID for this server across all time. When running with
// ProtocolVersion < 3, you must set this to be the same as the network
// address of your transport.
LocalID ServerID
// NotifyCh is used to provide a channel that will be notified of leadership
// changes. Raft will block writing to this channel, so it should either be
// buffered or aggressively consumed.
@ -87,22 +198,35 @@ type Config struct {
// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
ProtocolVersion: ProtocolVersionMax,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
DisableBootstrapAfterElect: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
EnableSingleNode: false,
LeaderLeaseTimeout: 500 * time.Millisecond,
}
}
// ValidateConfig is used to validate a sane configuration
func ValidateConfig(config *Config) error {
// We don't actually support running as 0 in the library any more, but
// we do understand it.
protocolMin := ProtocolVersionMin
if protocolMin == 0 {
protocolMin = 1
}
if config.ProtocolVersion < protocolMin ||
config.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version %d must be >= %d and <= %d",
config.ProtocolVersion, protocolMin, ProtocolVersionMax)
}
if len(config.LocalID) == 0 {
return fmt.Errorf("LocalID cannot be empty")
}
if config.HeartbeatTimeout < 5*time.Millisecond {
return fmt.Errorf("Heartbeat timeout is too low")
}

343
vendor/github.com/hashicorp/raft/configuration.go generated vendored Normal file
View File

@ -0,0 +1,343 @@
package raft
import "fmt"
// ServerSuffrage determines whether a Server in a Configuration gets a vote.
type ServerSuffrage int
// Note: Don't renumber these, since the numbers are written into the log.
const (
// Voter is a server whose vote is counted in elections and whose match index
// is used in advancing the leader's commit index.
Voter ServerSuffrage = iota
// Nonvoter is a server that receives log entries but is not considered for
// elections or commitment purposes.
Nonvoter
// Staging is a server that acts like a nonvoter with one exception: once a
// staging server receives enough log entries to be sufficiently caught up to
// the leader's log, the leader will invoke a membership change to change
// the Staging server to a Voter.
Staging
)
func (s ServerSuffrage) String() string {
switch s {
case Voter:
return "Voter"
case Nonvoter:
return "Nonvoter"
case Staging:
return "Staging"
}
return "ServerSuffrage"
}
// ServerID is a unique string identifying a server for all time.
type ServerID string
// ServerAddress is a network address for a server that a transport can contact.
type ServerAddress string
// Server tracks the information about a single server in a configuration.
type Server struct {
// Suffrage determines whether the server gets a vote.
Suffrage ServerSuffrage
// ID is a unique string identifying this server for all time.
ID ServerID
// Address is its network address that a transport can contact.
Address ServerAddress
}
// Configuration tracks which servers are in the cluster, and whether they have
// votes. This should include the local server, if it's a member of the cluster.
// The servers are listed no particular order, but each should only appear once.
// These entries are appended to the log during membership changes.
type Configuration struct {
Servers []Server
}
// Clone makes a deep copy of a Configuration.
func (c *Configuration) Clone() (copy Configuration) {
copy.Servers = append(copy.Servers, c.Servers...)
return
}
// ConfigurationChangeCommand is the different ways to change the cluster
// configuration.
type ConfigurationChangeCommand uint8
const (
// AddStaging makes a server Staging unless its Voter.
AddStaging ConfigurationChangeCommand = iota
// AddNonvoter makes a server Nonvoter unless its Staging or Voter.
AddNonvoter
// DemoteVoter makes a server Nonvoter unless its absent.
DemoteVoter
// RemoveServer removes a server entirely from the cluster membership.
RemoveServer
// Promote is created automatically by a leader; it turns a Staging server
// into a Voter.
Promote
)
func (c ConfigurationChangeCommand) String() string {
switch c {
case AddStaging:
return "AddStaging"
case AddNonvoter:
return "AddNonvoter"
case DemoteVoter:
return "DemoteVoter"
case RemoveServer:
return "RemoveServer"
case Promote:
return "Promote"
}
return "ConfigurationChangeCommand"
}
// configurationChangeRequest describes a change that a leader would like to
// make to its current configuration. It's used only within a single server
// (never serialized into the log), as part of `configurationChangeFuture`.
type configurationChangeRequest struct {
command ConfigurationChangeCommand
serverID ServerID
serverAddress ServerAddress // only present for AddStaging, AddNonvoter
// prevIndex, if nonzero, is the index of the only configuration upon which
// this change may be applied; if another configuration entry has been
// added in the meantime, this request will fail.
prevIndex uint64
}
// configurations is state tracked on every server about its Configurations.
// Note that, per Diego's dissertation, there can be at most one uncommitted
// configuration at a time (the next configuration may not be created until the
// prior one has been committed).
//
// One downside to storing just two configurations is that if you try to take a
// snahpsot when your state machine hasn't yet applied the committedIndex, we
// have no record of the configuration that would logically fit into that
// snapshot. We disallow snapshots in that case now. An alternative approach,
// which LogCabin uses, is to track every configuration change in the
// log.
type configurations struct {
// committed is the latest configuration in the log/snapshot that has been
// committed (the one with the largest index).
committed Configuration
// committedIndex is the log index where 'committed' was written.
committedIndex uint64
// latest is the latest configuration in the log/snapshot (may be committed
// or uncommitted)
latest Configuration
// latestIndex is the log index where 'latest' was written.
latestIndex uint64
}
// Clone makes a deep copy of a configurations object.
func (c *configurations) Clone() (copy configurations) {
copy.committed = c.committed.Clone()
copy.committedIndex = c.committedIndex
copy.latest = c.latest.Clone()
copy.latestIndex = c.latestIndex
return
}
// hasVote returns true if the server identified by 'id' is a Voter in the
// provided Configuration.
func hasVote(configuration Configuration, id ServerID) bool {
for _, server := range configuration.Servers {
if server.ID == id {
return server.Suffrage == Voter
}
}
return false
}
// checkConfiguration tests a cluster membership configuration for common
// errors.
func checkConfiguration(configuration Configuration) error {
idSet := make(map[ServerID]bool)
addressSet := make(map[ServerAddress]bool)
var voters int
for _, server := range configuration.Servers {
if server.ID == "" {
return fmt.Errorf("Empty ID in configuration: %v", configuration)
}
if server.Address == "" {
return fmt.Errorf("Empty address in configuration: %v", server)
}
if idSet[server.ID] {
return fmt.Errorf("Found duplicate ID in configuration: %v", server.ID)
}
idSet[server.ID] = true
if addressSet[server.Address] {
return fmt.Errorf("Found duplicate address in configuration: %v", server.Address)
}
addressSet[server.Address] = true
if server.Suffrage == Voter {
voters++
}
}
if voters == 0 {
return fmt.Errorf("Need at least one voter in configuration: %v", configuration)
}
return nil
}
// nextConfiguration generates a new Configuration from the current one and a
// configuration change request. It's split from appendConfigurationEntry so
// that it can be unit tested easily.
func nextConfiguration(current Configuration, currentIndex uint64, change configurationChangeRequest) (Configuration, error) {
if change.prevIndex > 0 && change.prevIndex != currentIndex {
return Configuration{}, fmt.Errorf("Configuration changed since %v (latest is %v)", change.prevIndex, currentIndex)
}
configuration := current.Clone()
switch change.command {
case AddStaging:
// TODO: barf on new address?
newServer := Server{
// TODO: This should add the server as Staging, to be automatically
// promoted to Voter later. However, the promoton to Voter is not yet
// implemented, and doing so is not trivial with the way the leader loop
// coordinates with the replication goroutines today. So, for now, the
// server will have a vote right away, and the Promote case below is
// unused.
Suffrage: Voter,
ID: change.serverID,
Address: change.serverAddress,
}
found := false
for i, server := range configuration.Servers {
if server.ID == change.serverID {
if server.Suffrage == Voter {
configuration.Servers[i].Address = change.serverAddress
} else {
configuration.Servers[i] = newServer
}
found = true
break
}
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
}
case AddNonvoter:
newServer := Server{
Suffrage: Nonvoter,
ID: change.serverID,
Address: change.serverAddress,
}
found := false
for i, server := range configuration.Servers {
if server.ID == change.serverID {
if server.Suffrage != Nonvoter {
configuration.Servers[i].Address = change.serverAddress
} else {
configuration.Servers[i] = newServer
}
found = true
break
}
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
}
case DemoteVoter:
for i, server := range configuration.Servers {
if server.ID == change.serverID {
configuration.Servers[i].Suffrage = Nonvoter
break
}
}
case RemoveServer:
for i, server := range configuration.Servers {
if server.ID == change.serverID {
configuration.Servers = append(configuration.Servers[:i], configuration.Servers[i+1:]...)
break
}
}
case Promote:
for i, server := range configuration.Servers {
if server.ID == change.serverID && server.Suffrage == Staging {
configuration.Servers[i].Suffrage = Voter
break
}
}
}
// Make sure we didn't do something bad like remove the last voter
if err := checkConfiguration(configuration); err != nil {
return Configuration{}, err
}
return configuration, nil
}
// encodePeers is used to serialize a Configuration into the old peers format.
// This is here for backwards compatibility when operating with a mix of old
// servers and should be removed once we deprecate support for protocol version 1.
func encodePeers(configuration Configuration, trans Transport) []byte {
// Gather up all the voters, other suffrage types are not supported by
// this data format.
var encPeers [][]byte
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
encPeers = append(encPeers, trans.EncodePeer(server.Address))
}
}
// Encode the entire array.
buf, err := encodeMsgPack(encPeers)
if err != nil {
panic(fmt.Errorf("failed to encode peers: %v", err))
}
return buf.Bytes()
}
// decodePeers is used to deserialize an old list of peers into a Configuration.
// This is here for backwards compatibility with old log entries and snapshots;
// it should be removed eventually.
func decodePeers(buf []byte, trans Transport) Configuration {
// Decode the buffer first.
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
}
// Deserialize each peer.
var servers []Server
for _, enc := range encPeers {
p := trans.DecodePeer(enc)
servers = append(servers, Server{
Suffrage: Voter,
ID: ServerID(p),
Address: ServerAddress(p),
})
}
return Configuration{
Servers: servers,
}
}
// encodeConfiguration serializes a Configuration using MsgPack, or panics on
// errors.
func encodeConfiguration(configuration Configuration) []byte {
buf, err := encodeMsgPack(configuration)
if err != nil {
panic(fmt.Errorf("failed to encode configuration: %v", err))
}
return buf.Bytes()
}
// decodeConfiguration deserializes a Configuration using MsgPack, or panics on
// errors.
func decodeConfiguration(buf []byte) Configuration {
var configuration Configuration
if err := decodeMsgPack(buf, &configuration); err != nil {
panic(fmt.Errorf("failed to decode configuration: %v", err))
}
return configuration
}

View File

@ -19,7 +19,8 @@ func NewDiscardSnapshotStore() *DiscardSnapshotStore {
return &DiscardSnapshotStore{}
}
func (d *DiscardSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSink, error) {
func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64,
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
return &DiscardSnapshotSink{}, nil
}

View File

@ -119,8 +119,14 @@ func (f *FileSnapshotStore) testPermissions() error {
if err != nil {
return err
}
fh.Close()
os.Remove(path)
if err = fh.Close(); err != nil {
return err
}
if err = os.Remove(path); err != nil {
return err
}
return nil
}
@ -132,7 +138,13 @@ func snapshotName(term, index uint64) string {
}
// Create is used to start a new snapshot
func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSink, error) {
func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
// We only support version 1 snapshots at this time.
if version != 1 {
return nil, fmt.Errorf("unsupported snapshot version %d", version)
}
// Create a new path
name := snapshotName(term, index)
path := filepath.Join(f.path, name+tmpSuffix)
@ -151,10 +163,13 @@ func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSi
dir: path,
meta: fileSnapshotMeta{
SnapshotMeta: SnapshotMeta{
Version: version,
ID: name,
Index: index,
Term: term,
Peers: peers,
Peers: encodePeers(configuration, trans),
Configuration: configuration,
ConfigurationIndex: configurationIndex,
},
CRC: nil,
},
@ -236,6 +251,12 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
continue
}
// Make sure we can understand this version.
if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax {
f.logger.Printf("[WARN] snapshot: Snapshot version for %v not supported: %d", dirName, meta.Version)
continue
}
// Append, but only return up to the retain count
snapMeta = append(snapMeta, meta)
}
@ -380,7 +401,10 @@ func (s *FileSnapshotSink) Close() error {
}
// Reap any old snapshots
s.store.ReapSnapshots()
if err := s.store.ReapSnapshots(); err != nil {
return err
}
return nil
}

View File

@ -1,13 +1,20 @@
package raft
import (
"fmt"
"io"
"time"
"github.com/armon/go-metrics"
)
// FSM provides an interface that can be implemented by
// clients to make use of the replicated log.
type FSM interface {
// Apply log is invoked once a log entry is committed.
// It returns a value which will be made available in the
// ApplyFuture returned by Raft.Apply method if that
// method was called on the same Raft node as the FSM.
Apply(*Log) interface{}
// Snapshot is used to support log compaction. This call should
@ -35,3 +42,75 @@ type FSMSnapshot interface {
// Release is invoked when we are finished with the snapshot.
Release()
}
// runFSM is a long running goroutine responsible for applying logs
// to the FSM. This is done async of other logs since we don't want
// the FSM to block our internal operations.
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64
for {
select {
case req := <-r.fsmRestoreCh:
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
continue
}
// Attempt to restore
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
continue
}
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
case req := <-r.fsmSnapshotCh:
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
continue
}
// Start a snapshot
start := time.Now()
snap, err := r.fsm.Snapshot()
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)
// Respond to the request
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)
case commitEntry := <-r.fsmCommitCh:
// Apply the log if a command
var resp interface{}
if commitEntry.log.Type == LogCommand {
start := time.Now()
resp = r.fsm.Apply(commitEntry.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}
// Update the indexes
lastIndex = commitEntry.log.Index
lastTerm = commitEntry.log.Term
// Invoke the future if given
if commitEntry.future != nil {
commitEntry.future.response = resp
commitEntry.future.respond(nil)
}
case <-r.shutdownCh:
return
}
}
}

View File

@ -7,16 +7,45 @@ import (
// Future is used to represent an action that may occur in the future.
type Future interface {
// Error blocks until the future arrives and then
// returns the error status of the future.
// This may be called any number of times - all
// calls will return the same value.
// Note that it is not OK to call this method
// twice concurrently on the same Future instance.
Error() error
}
// ApplyFuture is used for Apply() and can returns the FSM response.
type ApplyFuture interface {
// IndexFuture is used for future actions that can result in a raft log entry
// being created.
type IndexFuture interface {
Future
Response() interface{}
// Index holds the index of the newly applied log entry.
// This must not be called until after the Error method has returned.
Index() uint64
}
// ApplyFuture is used for Apply and can return the FSM response.
type ApplyFuture interface {
IndexFuture
// Response returns the FSM response as returned
// by the FSM.Apply method. This must not be called
// until after the Error method has returned.
Response() interface{}
}
// ConfigurationFuture is used for GetConfiguration and can return the
// latest configuration in use by Raft.
type ConfigurationFuture interface {
IndexFuture
// Configuration contains the latest configuration. This must
// not be called until after the Error method has returned.
Configuration() Configuration
}
// errorFuture is used to return a static error.
type errorFuture struct {
err error
@ -48,6 +77,9 @@ func (d *deferError) init() {
func (d *deferError) Error() error {
if d.err != nil {
// Note that when we've received a nil error, this
// won't trigger, but the channel is closed after
// send so we'll still return nil below.
return d.err
}
if d.errCh == nil {
@ -69,12 +101,28 @@ func (d *deferError) respond(err error) {
d.responded = true
}
// There are several types of requests that cause a configuration entry to
// be appended to the log. These are encoded here for leaderLoop() to process.
// This is internal to a single server.
type configurationChangeFuture struct {
logFuture
req configurationChangeRequest
}
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
// Raft object's BootstrapCluster member function for more details.
type bootstrapFuture struct {
deferError
// configuration is the proposed bootstrap configuration to apply.
configuration Configuration
}
// logFuture is used to apply a log entry and waits until
// the log is considered committed.
type logFuture struct {
deferError
log Log
policy quorumPolicy
response interface{}
dispatch time.Time
}
@ -87,18 +135,17 @@ func (l *logFuture) Index() uint64 {
return l.log.Index
}
type peerFuture struct {
deferError
peers []string
}
type shutdownFuture struct {
raft *Raft
}
func (s *shutdownFuture) Error() error {
for s.raft.getRoutines() > 0 {
time.Sleep(5 * time.Millisecond)
if s.raft == nil {
return nil
}
s.raft.waitShutdown()
if closeable, ok := s.raft.trans.(WithClose); ok {
closeable.Close()
}
return nil
}
@ -116,7 +163,6 @@ type reqSnapshotFuture struct {
// snapshot details provided by the FSM runner before responding
index uint64
term uint64
peers []string
snapshot FSMSnapshot
}
@ -137,6 +183,23 @@ type verifyFuture struct {
voteLock sync.Mutex
}
// configurationsFuture is used to retrieve the current configurations. This is
// used to allow safe access to this information outside of the main thread.
type configurationsFuture struct {
deferError
configurations configurations
}
// Configuration returns the latest configuration in use by Raft.
func (c *configurationsFuture) Configuration() Configuration {
return c.configurations.latest
}
// Index returns the index of the latest configuration in use by Raft.
func (c *configurationsFuture) Index() uint64 {
return c.configurations.latestIndex
}
// vote is used to respond to a verifyFuture.
// This may block when responding on the notifyCh.
func (v *verifyFuture) vote(leader bool) {

View File

@ -1,213 +0,0 @@
package raft
import (
"container/list"
"sync"
)
// QuorumPolicy allows individual logFutures to have different
// commitment rules while still using the inflight mechanism.
type quorumPolicy interface {
// Checks if a commit from a given peer is enough to
// satisfy the commitment rules
Commit() bool
// Checks if a commit is committed
IsCommitted() bool
}
// MajorityQuorum is used by Apply transactions and requires
// a simple majority of nodes.
type majorityQuorum struct {
count int
votesNeeded int
}
func newMajorityQuorum(clusterSize int) *majorityQuorum {
votesNeeded := (clusterSize / 2) + 1
return &majorityQuorum{count: 0, votesNeeded: votesNeeded}
}
func (m *majorityQuorum) Commit() bool {
m.count++
return m.count >= m.votesNeeded
}
func (m *majorityQuorum) IsCommitted() bool {
return m.count >= m.votesNeeded
}
// Inflight is used to track operations that are still in-flight.
type inflight struct {
sync.Mutex
committed *list.List
commitCh chan struct{}
minCommit uint64
maxCommit uint64
operations map[uint64]*logFuture
stopCh chan struct{}
}
// NewInflight returns an inflight struct that notifies
// the provided channel when logs are finished committing.
func newInflight(commitCh chan struct{}) *inflight {
return &inflight{
committed: list.New(),
commitCh: commitCh,
minCommit: 0,
maxCommit: 0,
operations: make(map[uint64]*logFuture),
stopCh: make(chan struct{}),
}
}
// Start is used to mark a logFuture as being inflight. It
// also commits the entry, as it is assumed the leader is
// starting.
func (i *inflight) Start(l *logFuture) {
i.Lock()
defer i.Unlock()
i.start(l)
}
// StartAll is used to mark a list of logFuture's as being
// inflight. It also commits each entry as the leader is
// assumed to be starting.
func (i *inflight) StartAll(logs []*logFuture) {
i.Lock()
defer i.Unlock()
for _, l := range logs {
i.start(l)
}
}
// start is used to mark a single entry as inflight,
// must be invoked with the lock held.
func (i *inflight) start(l *logFuture) {
idx := l.log.Index
i.operations[idx] = l
if idx > i.maxCommit {
i.maxCommit = idx
}
if i.minCommit == 0 {
i.minCommit = idx
}
i.commit(idx)
}
// Cancel is used to cancel all in-flight operations.
// This is done when the leader steps down, and all futures
// are sent the given error.
func (i *inflight) Cancel(err error) {
// Close the channel first to unblock any pending commits
close(i.stopCh)
// Lock after close to avoid deadlock
i.Lock()
defer i.Unlock()
// Respond to all inflight operations
for _, op := range i.operations {
op.respond(err)
}
// Clear all the committed but not processed
for e := i.committed.Front(); e != nil; e = e.Next() {
e.Value.(*logFuture).respond(err)
}
// Clear the map
i.operations = make(map[uint64]*logFuture)
// Clear the list of committed
i.committed = list.New()
// Close the commmitCh
close(i.commitCh)
// Reset indexes
i.minCommit = 0
i.maxCommit = 0
}
// Committed returns all the committed operations in order.
func (i *inflight) Committed() (l *list.List) {
i.Lock()
l, i.committed = i.committed, list.New()
i.Unlock()
return l
}
// Commit is used by leader replication routines to indicate that
// a follower was finished committing a log to disk.
func (i *inflight) Commit(index uint64) {
i.Lock()
defer i.Unlock()
i.commit(index)
}
// CommitRange is used to commit a range of indexes inclusively.
// It is optimized to avoid commits for indexes that are not tracked.
func (i *inflight) CommitRange(minIndex, maxIndex uint64) {
i.Lock()
defer i.Unlock()
// Update the minimum index
minIndex = max(i.minCommit, minIndex)
// Commit each index
for idx := minIndex; idx <= maxIndex; idx++ {
i.commit(idx)
}
}
// commit is used to commit a single index. Must be called with the lock held.
func (i *inflight) commit(index uint64) {
op, ok := i.operations[index]
if !ok {
// Ignore if not in the map, as it may be committed already
return
}
// Check if we've satisfied the commit
if !op.policy.Commit() {
return
}
// Cannot commit if this is not the minimum inflight. This can happen
// if the quorum size changes, meaning a previous commit requires a larger
// quorum that this commit. We MUST block until the previous log is committed,
// otherwise logs will be applied out of order.
if index != i.minCommit {
return
}
NOTIFY:
// Add the operation to the committed list
i.committed.PushBack(op)
// Stop tracking since it is committed
delete(i.operations, index)
// Update the indexes
if index == i.maxCommit {
i.minCommit = 0
i.maxCommit = 0
} else {
i.minCommit++
}
// Check if the next in-flight operation is ready
if i.minCommit != 0 {
op = i.operations[i.minCommit]
if op.policy.IsCommitted() {
index = i.minCommit
goto NOTIFY
}
}
// Async notify of ready operations
asyncNotifyCh(i.commitCh)
}

View File

@ -81,7 +81,16 @@ func (i *InmemStore) DeleteRange(min, max uint64) error {
for j := min; j <= max; j++ {
delete(i.logs, j)
}
if min <= i.lowIndex {
i.lowIndex = max + 1
}
if max >= i.highIndex {
i.highIndex = min - 1
}
if i.lowIndex > i.highIndex {
i.lowIndex = 0
i.highIndex = 0
}
return nil
}

View File

@ -9,15 +9,15 @@ import (
// NewInmemAddr returns a new in-memory addr with
// a randomly generate UUID as the ID.
func NewInmemAddr() string {
return generateUUID()
func NewInmemAddr() ServerAddress {
return ServerAddress(generateUUID())
}
// inmemPipeline is used to pipeline requests for the in-mem transport.
type inmemPipeline struct {
trans *InmemTransport
peer *InmemTransport
peerAddr string
peerAddr ServerAddress
doneCh chan AppendFuture
inprogressCh chan *inmemPipelineInflight
@ -37,20 +37,22 @@ type inmemPipelineInflight struct {
type InmemTransport struct {
sync.RWMutex
consumerCh chan RPC
localAddr string
peers map[string]*InmemTransport
localAddr ServerAddress
peers map[ServerAddress]*InmemTransport
pipelines []*inmemPipeline
timeout time.Duration
}
// NewInmemTransport is used to initialize a new transport
// and generates a random local address.
func NewInmemTransport() (string, *InmemTransport) {
addr := NewInmemAddr()
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
if string(addr) == "" {
addr = NewInmemAddr()
}
trans := &InmemTransport{
consumerCh: make(chan RPC, 16),
localAddr: addr,
peers: make(map[string]*InmemTransport),
peers: make(map[ServerAddress]*InmemTransport),
timeout: 50 * time.Millisecond,
}
return addr, trans
@ -67,13 +69,13 @@ func (i *InmemTransport) Consumer() <-chan RPC {
}
// LocalAddr implements the Transport interface.
func (i *InmemTransport) LocalAddr() string {
func (i *InmemTransport) LocalAddr() ServerAddress {
return i.localAddr
}
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (i *InmemTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) {
func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) {
i.RLock()
peer, ok := i.peers[target]
i.RUnlock()
@ -88,7 +90,7 @@ func (i *InmemTransport) AppendEntriesPipeline(target string) (AppendPipeline, e
}
// AppendEntries implements the Transport interface.
func (i *InmemTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
if err != nil {
return err
@ -101,7 +103,7 @@ func (i *InmemTransport) AppendEntries(target string, args *AppendEntriesRequest
}
// RequestVote implements the Transport interface.
func (i *InmemTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error {
func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
if err != nil {
return err
@ -114,7 +116,7 @@ func (i *InmemTransport) RequestVote(target string, args *RequestVoteRequest, re
}
// InstallSnapshot implements the Transport interface.
func (i *InmemTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
func (i *InmemTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout)
if err != nil {
return err
@ -126,7 +128,7 @@ func (i *InmemTransport) InstallSnapshot(target string, args *InstallSnapshotReq
return nil
}
func (i *InmemTransport) makeRPC(target string, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
i.RLock()
peer, ok := i.peers[target]
i.RUnlock()
@ -156,28 +158,27 @@ func (i *InmemTransport) makeRPC(target string, args interface{}, r io.Reader, t
return
}
// EncodePeer implements the Transport interface. It uses the UUID as the
// address directly.
func (i *InmemTransport) EncodePeer(p string) []byte {
// EncodePeer implements the Transport interface.
func (i *InmemTransport) EncodePeer(p ServerAddress) []byte {
return []byte(p)
}
// DecodePeer implements the Transport interface. It wraps the UUID in an
// InmemAddr.
func (i *InmemTransport) DecodePeer(buf []byte) string {
return string(buf)
// DecodePeer implements the Transport interface.
func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress {
return ServerAddress(buf)
}
// Connect is used to connect this transport to another transport for
// a given peer name. This allows for local routing.
func (i *InmemTransport) Connect(peer string, trans *InmemTransport) {
func (i *InmemTransport) Connect(peer ServerAddress, t Transport) {
trans := t.(*InmemTransport)
i.Lock()
defer i.Unlock()
i.peers[peer] = trans
}
// Disconnect is used to remove the ability to route to a given peer.
func (i *InmemTransport) Disconnect(peer string) {
func (i *InmemTransport) Disconnect(peer ServerAddress) {
i.Lock()
defer i.Unlock()
delete(i.peers, peer)
@ -199,7 +200,7 @@ func (i *InmemTransport) Disconnect(peer string) {
func (i *InmemTransport) DisconnectAll() {
i.Lock()
defer i.Unlock()
i.peers = make(map[string]*InmemTransport)
i.peers = make(map[ServerAddress]*InmemTransport)
// Handle pipelines
for _, pipeline := range i.pipelines {
@ -208,7 +209,13 @@ func (i *InmemTransport) DisconnectAll() {
i.pipelines = nil
}
func newInmemPipeline(trans *InmemTransport, peer *InmemTransport, addr string) *inmemPipeline {
// Close is used to permanently disable the transport
func (i *InmemTransport) Close() error {
i.DisconnectAll()
return nil
}
func newInmemPipeline(trans *InmemTransport, peer *InmemTransport, addr ServerAddress) *inmemPipeline {
i := &inmemPipeline{
trans: trans,
peer: peer,

View File

@ -10,11 +10,15 @@ const (
// LogNoop is used to assert leadership.
LogNoop
// LogAddPeer is used to add a new peer.
LogAddPeer
// LogAddPeer is used to add a new peer. This should only be used with
// older protocol versions designed to be compatible with unversioned
// Raft servers. See comments in config.go for details.
LogAddPeerDeprecated
// LogRemovePeer is used to remove an existing peer.
LogRemovePeer
// LogRemovePeer is used to remove an existing peer. This should only be
// used with older protocol versions designed to be compatible with
// unversioned Raft servers. See comments in config.go for details.
LogRemovePeerDeprecated
// LogBarrier is used to ensure all preceding operations have been
// applied to the FSM. It is similar to LogNoop, but instead of returning
@ -22,39 +26,47 @@ const (
// it is possible there are operations committed but not yet applied to
// the FSM.
LogBarrier
// LogConfiguration establishes a membership change configuration. It is
// created when a server is added, removed, promoted, etc. Only used
// when protocol version 1 or greater is in use.
LogConfiguration
)
// Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine.
type Log struct {
// Index holds the index of the log entry.
Index uint64
Term uint64
Type LogType
Data []byte
// peer is not exported since it is not transmitted, only used
// internally to construct the Data field.
peer string
// Term holds the election term of the log entry.
Term uint64
// Type holds the type of the log entry.
Type LogType
// Data holds the log entry's type-specific data.
Data []byte
}
// LogStore is used to provide an interface for storing
// and retrieving logs in a durable fashion.
type LogStore interface {
// Returns the first index written. 0 for no entries.
// FirstIndex returns the first index written. 0 for no entries.
FirstIndex() (uint64, error)
// Returns the last index written. 0 for no entries.
// LastIndex returns the last index written. 0 for no entries.
LastIndex() (uint64, error)
// Gets a log entry at a given index.
// GetLog gets a log entry at a given index.
GetLog(index uint64, log *Log) error
// Stores a log entry.
// StoreLog stores a log entry.
StoreLog(log *Log) error
// Stores multiple log entries.
// StoreLogs stores multiple log entries.
StoreLogs(logs []*Log) error
// Deletes a range of log entries. The range is inclusive.
// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}

83
vendor/github.com/hashicorp/raft/membership.md generated vendored Normal file
View File

@ -0,0 +1,83 @@
Simon (@superfell) and I (@ongardie) talked through reworking this library's cluster membership changes last Friday. We don't see a way to split this into independent patches, so we're taking the next best approach: submitting the plan here for review, then working on an enormous PR. Your feedback would be appreciated. (@superfell is out this week, however, so don't expect him to respond quickly.)
These are the main goals:
- Bringing things in line with the description in my PhD dissertation;
- Catching up new servers prior to granting them a vote, as well as allowing permanent non-voting members; and
- Eliminating the `peers.json` file, to avoid issues of consistency between that and the log/snapshot.
## Data-centric view
We propose to re-define a *configuration* as a set of servers, where each server includes an address (as it does today) and a mode that is either:
- *Voter*: a server whose vote is counted in elections and whose match index is used in advancing the leader's commit index.
- *Nonvoter*: a server that receives log entries but is not considered for elections or commitment purposes.
- *Staging*: a server that acts like a nonvoter with one exception: once a staging server receives enough log entries to catch up sufficiently to the leader's log, the leader will invoke a membership change to change the staging server to a voter.
All changes to the configuration will be done by writing a new configuration to the log. The new configuration will be in affect as soon as it is appended to the log (not when it is committed like a normal state machine command). Note that, per my dissertation, there can be at most one uncommitted configuration at a time (the next configuration may not be created until the prior one has been committed). It's not strictly necessary to follow these same rules for the nonvoter/staging servers, but we think its best to treat all changes uniformly.
Each server will track two configurations:
1. its *committed configuration*: the latest configuration in the log/snapshot that has been committed, along with its index.
2. its *latest configuration*: the latest configuration in the log/snapshot (may be committed or uncommitted), along with its index.
When there's no membership change happening, these two will be the same. The latest configuration is almost always the one used, except:
- When followers truncate the suffix of their logs, they may need to fall back to the committed configuration.
- When snapshotting, the committed configuration is written, to correspond with the committed log prefix that is being snapshotted.
## Application API
We propose the following operations for clients to manipulate the cluster configuration:
- AddVoter: server becomes staging unless voter,
- AddNonvoter: server becomes nonvoter unless staging or voter,
- DemoteVoter: server becomes nonvoter unless absent,
- RemovePeer: server removed from configuration,
- GetConfiguration: waits for latest config to commit, returns committed config.
This diagram, of which I'm quite proud, shows the possible transitions:
```
+-----------------------------------------------------------------------------+
| |
| Start -> +--------+ |
| ,------<------------| | |
| / | absent | |
| / RemovePeer--> | | <---RemovePeer |
| / | +--------+ \ |
| / | | \ |
| AddNonvoter | AddVoter \ |
| | ,->---' `--<-. | \ |
| v / \ v \ |
| +----------+ +----------+ +----------+ |
| | | ---AddVoter--> | | -log caught up --> | | |
| | nonvoter | | staging | | voter | |
| | | <-DemoteVoter- | | ,- | | |
| +----------+ \ +----------+ / +----------+ |
| \ / |
| `--------------<---------------' |
| |
+-----------------------------------------------------------------------------+
```
While these operations aren't quite symmetric, we think they're a good set to capture
the possible intent of the user. For example, if I want to make sure a server doesn't have a vote, but the server isn't part of the configuration at all, it probably shouldn't be added as a nonvoting server.
Each of these application-level operations will be interpreted by the leader and, if it has an effect, will cause the leader to write a new configuration entry to its log. Which particular application-level operation caused the log entry to be written need not be part of the log entry.
## Code implications
This is a non-exhaustive list, but we came up with a few things:
- Remove the PeerStore: the `peers.json` file introduces the possibility of getting out of sync with the log and snapshot, and it's hard to maintain this atomically as the log changes. It's not clear whether it's meant to track the committed or latest configuration, either.
- Servers will have to search their snapshot and log to find the committed configuration and the latest configuration on startup.
- Bootstrap will no longer use `peers.json` but should initialize the log or snapshot with an application-provided configuration entry.
- Snapshots should store the index of their configuration along with the configuration itself. In my experience with LogCabin, the original log index of the configuration is very useful to include in debug log messages.
- As noted in hashicorp/raft#84, configuration change requests should come in via a separate channel, and one may not proceed until the last has been committed.
- As to deciding when a log is sufficiently caught up, implementing a sophisticated algorithm *is* something that can be done in a separate PR. An easy and decent placeholder is: once the staging server has reached 95% of the leader's commit index, promote it.
## Feedback
Again, we're looking for feedback here before we start working on this. Here are some questions to think about:
- Does this seem like where we want things to go?
- Is there anything here that should be left out?
- Is there anything else we're forgetting about?
- Is there a good way to break this up?
- What do we need to worry about in terms of backwards compatibility?
- What implication will this have on current tests?
- What's the best way to test this code, in particular the small changes that will be sprinkled all over the library?

View File

@ -56,7 +56,7 @@ is not known if there is an error.
*/
type NetworkTransport struct {
connPool map[string][]*netConn
connPool map[ServerAddress][]*netConn
connPoolLock sync.Mutex
consumeCh chan RPC
@ -84,11 +84,11 @@ type StreamLayer interface {
net.Listener
// Dial is used to create a new outgoing connection
Dial(address string, timeout time.Duration) (net.Conn, error)
Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)
}
type netConn struct {
target string
target ServerAddress
conn net.Conn
r *bufio.Reader
w *bufio.Writer
@ -142,7 +142,7 @@ func NewNetworkTransportWithLogger(
logger = log.New(os.Stderr, "", log.LstdFlags)
}
trans := &NetworkTransport{
connPool: make(map[string][]*netConn),
connPool: make(map[ServerAddress][]*netConn),
consumeCh: make(chan RPC),
logger: logger,
maxPool: maxPool,
@ -183,8 +183,8 @@ func (n *NetworkTransport) Consumer() <-chan RPC {
}
// LocalAddr implements the Transport interface.
func (n *NetworkTransport) LocalAddr() string {
return n.stream.Addr().String()
func (n *NetworkTransport) LocalAddr() ServerAddress {
return ServerAddress(n.stream.Addr().String())
}
// IsShutdown is used to check if the transport is shutdown.
@ -198,7 +198,7 @@ func (n *NetworkTransport) IsShutdown() bool {
}
// getExistingConn is used to grab a pooled connection.
func (n *NetworkTransport) getPooledConn(target string) *netConn {
func (n *NetworkTransport) getPooledConn(target ServerAddress) *netConn {
n.connPoolLock.Lock()
defer n.connPoolLock.Unlock()
@ -215,7 +215,7 @@ func (n *NetworkTransport) getPooledConn(target string) *netConn {
}
// getConn is used to get a connection from the pool.
func (n *NetworkTransport) getConn(target string) (*netConn, error) {
func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) {
// Check for a pooled conn
if conn := n.getPooledConn(target); conn != nil {
return conn, nil
@ -260,7 +260,7 @@ func (n *NetworkTransport) returnConn(conn *netConn) {
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (n *NetworkTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) {
func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) {
// Get a connection
conn, err := n.getConn(target)
if err != nil {
@ -272,17 +272,17 @@ func (n *NetworkTransport) AppendEntriesPipeline(target string) (AppendPipeline,
}
// AppendEntries implements the Transport interface.
func (n *NetworkTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
func (n *NetworkTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
return n.genericRPC(target, rpcAppendEntries, args, resp)
}
// RequestVote implements the Transport interface.
func (n *NetworkTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error {
func (n *NetworkTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
return n.genericRPC(target, rpcRequestVote, args, resp)
}
// genericRPC handles a simple request/response RPC.
func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interface{}, resp interface{}) error {
func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error {
// Get a conn
conn, err := n.getConn(target)
if err != nil {
@ -295,7 +295,7 @@ func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interfa
}
// Send the RPC
if err := sendRPC(conn, rpcType, args); err != nil {
if err = sendRPC(conn, rpcType, args); err != nil {
return err
}
@ -308,7 +308,7 @@ func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interfa
}
// InstallSnapshot implements the Transport interface.
func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
// Get a conn, always close for InstallSnapshot
conn, err := n.getConn(target)
if err != nil {
@ -326,17 +326,17 @@ func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotR
}
// Send the RPC
if err := sendRPC(conn, rpcInstallSnapshot, args); err != nil {
if err = sendRPC(conn, rpcInstallSnapshot, args); err != nil {
return err
}
// Stream the state
if _, err := io.Copy(conn.w, data); err != nil {
if _, err = io.Copy(conn.w, data); err != nil {
return err
}
// Flush
if err := conn.w.Flush(); err != nil {
if err = conn.w.Flush(); err != nil {
return err
}
@ -346,13 +346,13 @@ func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotR
}
// EncodePeer implements the Transport interface.
func (n *NetworkTransport) EncodePeer(p string) []byte {
func (n *NetworkTransport) EncodePeer(p ServerAddress) []byte {
return []byte(p)
}
// DecodePeer implements the Transport interface.
func (n *NetworkTransport) DecodePeer(buf []byte) string {
return string(buf)
func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress {
return ServerAddress(buf)
}
// listen is used to handling incoming connections.

115
vendor/github.com/hashicorp/raft/observer.go generated vendored Normal file
View File

@ -0,0 +1,115 @@
package raft
import (
"sync/atomic"
)
// Observation is sent along the given channel to observers when an event occurs.
type Observation struct {
// Raft holds the Raft instance generating the observation.
Raft *Raft
// Data holds observation-specific data. Possible types are
// *RequestVoteRequest and RaftState.
Data interface{}
}
// nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration.
var nextObserverID uint64
// FilterFn is a function that can be registered in order to filter observations.
// The function reports whether the observation should be included - if
// it returns false, the observation will be filtered out.
type FilterFn func(o *Observation) bool
// Observer describes what to do with a given observation.
type Observer struct {
// channel receives observations.
channel chan Observation
// blocking, if true, will cause Raft to block when sending an observation
// to this observer. This should generally be set to false.
blocking bool
// filter will be called to determine if an observation should be sent to
// the channel.
filter FilterFn
// id is the ID of this observer in the Raft map.
id uint64
// numObserved and numDropped are performance counters for this observer.
numObserved uint64
numDropped uint64
}
// NewObserver creates a new observer that can be registered
// to make observations on a Raft instance. Observations
// will be sent on the given channel if they satisfy the
// given filter.
//
// If blocking is true, the observer will block when it can't
// send on the channel, otherwise it may discard events.
func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer {
return &Observer{
channel: channel,
blocking: blocking,
filter: filter,
id: atomic.AddUint64(&nextObserverID, 1),
}
}
// GetNumObserved returns the number of observations.
func (or *Observer) GetNumObserved() uint64 {
return atomic.LoadUint64(&or.numObserved)
}
// GetNumDropped returns the number of dropped observations due to blocking.
func (or *Observer) GetNumDropped() uint64 {
return atomic.LoadUint64(&or.numDropped)
}
// RegisterObserver registers a new observer.
func (r *Raft) RegisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
r.observers[or.id] = or
}
// DeregisterObserver deregisters an observer.
func (r *Raft) DeregisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
delete(r.observers, or.id)
}
// observe sends an observation to every observer.
func (r *Raft) observe(o interface{}) {
// In general observers should not block. But in any case this isn't
// disastrous as we only hold a read lock, which merely prevents
// registration / deregistration of observers.
r.observersLock.RLock()
defer r.observersLock.RUnlock()
for _, or := range r.observers {
// It's wasteful to do this in the loop, but for the common case
// where there are no observers we won't create any objects.
ob := Observation{Raft: r, Data: o}
if or.filter != nil && !or.filter(&ob) {
continue
}
if or.channel == nil {
continue
}
if or.blocking {
or.channel <- ob
atomic.AddUint64(&or.numObserved, 1)
} else {
select {
case or.channel <- ob:
atomic.AddUint64(&or.numObserved, 1)
default:
atomic.AddUint64(&or.numDropped, 1)
}
}
}
}

View File

@ -1,122 +0,0 @@
package raft
import (
"bytes"
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"sync"
)
const (
jsonPeerPath = "peers.json"
)
// PeerStore provides an interface for persistent storage and
// retrieval of peers. We use a separate interface than StableStore
// since the peers may need to be edited by a human operator. For example,
// in a two node cluster, the failure of either node requires human intervention
// since consensus is impossible.
type PeerStore interface {
// Peers returns the list of known peers.
Peers() ([]string, error)
// SetPeers sets the list of known peers. This is invoked when a peer is
// added or removed.
SetPeers([]string) error
}
// StaticPeers is used to provide a static list of peers.
type StaticPeers struct {
StaticPeers []string
l sync.Mutex
}
// Peers implements the PeerStore interface.
func (s *StaticPeers) Peers() ([]string, error) {
s.l.Lock()
peers := s.StaticPeers
s.l.Unlock()
return peers, nil
}
// SetPeers implements the PeerStore interface.
func (s *StaticPeers) SetPeers(p []string) error {
s.l.Lock()
s.StaticPeers = p
s.l.Unlock()
return nil
}
// JSONPeers is used to provide peer persistence on disk in the form
// of a JSON file. This allows human operators to manipulate the file.
type JSONPeers struct {
l sync.Mutex
path string
trans Transport
}
// NewJSONPeers creates a new JSONPeers store. Requires a transport
// to handle the serialization of network addresses.
func NewJSONPeers(base string, trans Transport) *JSONPeers {
path := filepath.Join(base, jsonPeerPath)
store := &JSONPeers{
path: path,
trans: trans,
}
return store
}
// Peers implements the PeerStore interface.
func (j *JSONPeers) Peers() ([]string, error) {
j.l.Lock()
defer j.l.Unlock()
// Read the file
buf, err := ioutil.ReadFile(j.path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
// Check for no peers
if len(buf) == 0 {
return nil, nil
}
// Decode the peers
var peerSet []string
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peerSet); err != nil {
return nil, err
}
// Deserialize each peer
var peers []string
for _, p := range peerSet {
peers = append(peers, j.trans.DecodePeer([]byte(p)))
}
return peers, nil
}
// SetPeers implements the PeerStore interface.
func (j *JSONPeers) SetPeers(peers []string) error {
j.l.Lock()
defer j.l.Unlock()
// Encode each peer
var peerSet []string
for _, p := range peers {
peerSet = append(peerSet, string(j.trans.EncodePeer(p)))
}
// Convert to JSON
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err := enc.Encode(peerSet); err != nil {
return err
}
// Write out as JSON
return ioutil.WriteFile(j.path, buf.Bytes(), 0755)
}

46
vendor/github.com/hashicorp/raft/peersjson.go generated vendored Normal file
View File

@ -0,0 +1,46 @@
package raft
import (
"bytes"
"encoding/json"
"io/ioutil"
)
// ReadPeersJSON consumes a legacy peers.json file in the format of the old JSON
// peer store and creates a new-style configuration structure. This can be used
// to migrate this data or perform manual recovery when running protocol versions
// that can interoperate with older, unversioned Raft servers. This should not be
// used once server IDs are in use, because the old peers.json file didn't have
// support for these, nor non-voter suffrage types.
func ReadPeersJSON(path string) (Configuration, error) {
// Read in the file.
buf, err := ioutil.ReadFile(path)
if err != nil {
return Configuration{}, err
}
// Parse it as JSON.
var peers []string
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peers); err != nil {
return Configuration{}, err
}
// Map it into the new-style configuration structure. We can only specify
// voter roles here, and the ID has to be the same as the address.
var configuration Configuration
for _, peer := range peers {
server := Server{
Suffrage: Voter,
ID: ServerID(peer),
Address: ServerAddress(peer),
}
configuration.Servers = append(configuration.Servers, server)
}
// We should only ingest valid configurations.
if err := checkConfiguration(configuration); err != nil {
return Configuration{}, err
}
return configuration, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -24,32 +24,58 @@ var (
ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported")
)
// followerReplication is in charge of sending snapshots and log entries from
// this leader during this particular term to a remote follower.
type followerReplication struct {
peer string
inflight *inflight
// peer contains the network address and ID of the remote follower.
peer Server
// commitment tracks the entries acknowledged by followers so that the
// leader's commit index can advance. It is updated on successsful
// AppendEntries responses.
commitment *commitment
// stopCh is notified/closed when this leader steps down or the follower is
// removed from the cluster. In the follower removed case, it carries a log
// index; replication should be attempted with a best effort up through that
// index, before exiting.
stopCh chan uint64
// triggerCh is notified every time new entries are appended to the log.
triggerCh chan struct{}
// currentTerm is the term of this leader, to be included in AppendEntries
// requests.
currentTerm uint64
matchIndex uint64
// nextIndex is the index of the next log entry to send to the follower,
// which may fall past the end of the log.
nextIndex uint64
// lastContact is updated to the current time whenever any response is
// received from the follower (successful or not). This is used to check
// whether the leader should step down (Raft.checkLeaderLease()).
lastContact time.Time
// lastContactLock protects 'lastContact'.
lastContactLock sync.RWMutex
// failures counts the number of failed RPCs since the last success, which is
// used to apply backoff.
failures uint64
// notifyCh is notified to send out a heartbeat, which is used to check that
// this server is still leader.
notifyCh chan struct{}
// notify is a list of futures to be resolved upon receipt of an
// acknowledgement, then cleared from this list.
notify []*verifyFuture
// notifyLock protects 'notify'.
notifyLock sync.Mutex
// stepDown is used to indicate to the leader that we
// should step down based on information from a follower.
stepDown chan struct{}
// allowPipeline is used to control it seems like
// pipeline replication should be enabled.
// allowPipeline is used to determine when to pipeline the AppendEntries RPCs.
// It is private to this replication goroutine.
allowPipeline bool
}
@ -83,8 +109,8 @@ func (s *followerReplication) setLastContact() {
s.lastContactLock.Unlock()
}
// replicate is a long running routine that is used to manage
// the process of replicating logs to our followers.
// replicate is a long running routine that replicates log entries to a single
// follower.
func (r *Raft) replicate(s *followerReplication) {
// Start an async heartbeating routing
stopHeartbeat := make(chan struct{})
@ -102,9 +128,11 @@ RPC:
}
return
case <-s.triggerCh:
shouldStop = r.replicateTo(s, r.getLastLogIndex())
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.replicateTo(s, r.getLastLogIndex())
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout): // TODO: what is this?
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}
// If things looks healthy, switch to pipeline mode
@ -129,7 +157,8 @@ PIPELINE:
goto RPC
}
// replicateTo is used to replicate the logs up to a given last index.
// replicateTo is a hepler to replicate(), used to replicate the logs up to a
// given last index.
// If the follower log is behind, we take care to bring them up to date.
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
// Create the base request
@ -154,12 +183,12 @@ START:
// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil {
r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err)
s.failures++
return
}
appendStats(s.peer, start, float32(len(req.Entries)))
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
@ -180,7 +209,6 @@ START:
s.allowPipeline = true
} else {
s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1)
s.matchIndex = s.nextIndex - 1
if resp.NoRetryBackoff {
s.failures = 0
} else {
@ -190,6 +218,17 @@ START:
}
CHECK_MORE:
// Poll the stop channel here in case we are looping and have been asked
// to stop, or have stepped down as leader. Even for the best effort case
// where we are asked to replicate to a given index and then shutdown,
// it's better to not loop in here to send lots of entries to a straggler
// that's leaving the cluster anyways.
select {
case <-s.stopCh:
return true
default:
}
// Check if there are more logs to replicate
if s.nextIndex <= lastIndex {
goto START
@ -236,23 +275,27 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
// Setup the request
req := InstallSnapshotRequest{
RPCHeader: r.getRPCHeader(),
SnapshotVersion: meta.Version,
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
LastLogIndex: meta.Index,
LastLogTerm: meta.Term,
Peers: meta.Peers,
Size: meta.Size,
Configuration: encodeConfiguration(meta.Configuration),
ConfigurationIndex: meta.ConfigurationIndex,
}
// Make the call
start := time.Now()
var resp InstallSnapshotResponse
if err := r.trans.InstallSnapshot(s.peer, &req, &resp, snapshot); err != nil {
if err := r.trans.InstallSnapshot(s.peer.Address, &req, &resp, snapshot); err != nil {
r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err)
s.failures++
return false, err
}
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", s.peer}, start)
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start)
// Check for a newer term, stop running
if resp.Term > req.Term {
@ -265,12 +308,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
// Check for success
if resp.Success {
// Mark any inflight logs as committed
s.inflight.CommitRange(s.matchIndex+1, meta.Index)
// Update the indexes
s.matchIndex = meta.Index
s.nextIndex = s.matchIndex + 1
s.nextIndex = meta.Index + 1
s.commitment.match(s.peer.ID, meta.Index)
// Clear any failures
s.failures = 0
@ -290,6 +330,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
var failures uint64
req := AppendEntriesRequest{
RPCHeader: r.getRPCHeader(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
}
@ -304,8 +345,8 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
}
start := time.Now()
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer, err)
if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil {
r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err)
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
@ -314,7 +355,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
} else {
s.setLastContact()
failures = 0
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", s.peer}, start)
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
s.notifyAll(resp.Success)
}
}
@ -326,7 +367,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
// back to the standard replication which can handle more complex situations.
func (r *Raft) pipelineReplicate(s *followerReplication) error {
// Create a new pipeline
pipeline, err := r.trans.AppendEntriesPipeline(s.peer)
pipeline, err := r.trans.AppendEntriesPipeline(s.peer.Address)
if err != nil {
return err
}
@ -353,14 +394,17 @@ SEND:
case <-finishCh:
break SEND
case maxIndex := <-s.stopCh:
// Make a best effort to replicate up to this index
if maxIndex > 0 {
r.pipelineSend(s, pipeline, &nextIndex, maxIndex)
}
break SEND
case <-s.triggerCh:
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex())
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex())
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
}
}
@ -373,7 +417,8 @@ SEND:
return nil
}
// pipelineSend is used to send data over a pipeline.
// pipelineSend is used to send data over a pipeline. It is a helper to
// pipelineReplicate.
func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *uint64, lastIndex uint64) (shouldStop bool) {
// Create a new append request
req := new(AppendEntriesRequest)
@ -403,7 +448,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
select {
case ready := <-respCh:
req, resp := ready.Request(), ready.Response()
appendStats(s.peer, ready.Start(), float32(len(req.Entries)))
appendStats(string(s.peer.ID), ready.Start(), float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
@ -429,6 +474,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
// setupAppendEntries is used to setup an append entries request.
func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
req.RPCHeader = r.getRPCHeader()
req.Term = s.currentTerm
req.Leader = r.trans.EncodePeer(r.localAddr)
req.LeaderCommitIndex = r.getCommitIndex()
@ -446,13 +492,14 @@ func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequ
func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error {
// Guard for the first index, since there is no 0 log entry
// Guard against the previous index being a snapshot as well
lastSnapIdx, lastSnapTerm := r.getLastSnapshot()
if nextIndex == 1 {
req.PrevLogEntry = 0
req.PrevLogTerm = 0
} else if (nextIndex - 1) == r.getLastSnapshotIndex() {
req.PrevLogEntry = r.getLastSnapshotIndex()
req.PrevLogTerm = r.getLastSnapshotTerm()
} else if (nextIndex - 1) == lastSnapIdx {
req.PrevLogEntry = lastSnapIdx
req.PrevLogTerm = lastSnapTerm
} else {
var l Log
@ -498,18 +545,15 @@ func (r *Raft) handleStaleTerm(s *followerReplication) {
asyncNotifyCh(s.stepDown)
}
// updateLastAppended is used to update follower replication state after a successful
// AppendEntries RPC.
// updateLastAppended is used to update follower replication state after a
// successful AppendEntries RPC.
// TODO: This isn't used during InstallSnapshot, but the code there is similar.
func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {
// Mark any inflight logs as committed
if logs := req.Entries; len(logs) > 0 {
first := logs[0]
last := logs[len(logs)-1]
s.inflight.CommitRange(first.Index, last.Index)
// Update the indexes
s.matchIndex = last.Index
s.nextIndex = last.Index + 1
s.commitment.match(s.peer.ID, last.Index)
}
// Notify still leader

View File

@ -1,26 +1,50 @@
package raft
import (
"fmt"
"io"
"time"
"github.com/armon/go-metrics"
)
// SnapshotMeta is for metadata of a snapshot.
type SnapshotMeta struct {
ID string // ID is opaque to the store, and is used for opening
// Version is the version number of the snapshot metadata. This does not cover
// the application's data in the snapshot, that should be versioned
// separately.
Version SnapshotVersion
// ID is opaque to the store, and is used for opening.
ID string
// Index and Term store when the snapshot was taken.
Index uint64
Term uint64
// Peers is deprecated and used to support version 0 snapshots, but will
// be populated in version 1 snapshots as well to help with upgrades.
Peers []byte
// Configuration and ConfigurationIndex are present in version 1
// snapshots and later.
Configuration Configuration
ConfigurationIndex uint64
// Size is the size of the snapshot in bytes.
Size int64
}
// SnapshotStore interface is used to allow for flexible implementations
// of snapshot storage and retrieval. For example, a client could implement
// a shared state store such as S3, allowing new nodes to restore snapshots
// without steaming from the leader.
// without streaming from the leader.
type SnapshotStore interface {
// Create is used to begin a snapshot at a given index and term,
// with the current peer set already encoded.
Create(index, term uint64, peers []byte) (SnapshotSink, error)
// Create is used to begin a snapshot at a given index and term, and with
// the given committed configuration. The version parameter controls
// which snapshot version to create.
Create(version SnapshotVersion, index, term uint64, configuration Configuration,
configurationIndex uint64, trans Transport) (SnapshotSink, error)
// List is used to list the available snapshots in the store.
// It should return then in descending order, with the highest index first.
@ -38,3 +62,173 @@ type SnapshotSink interface {
ID() string
Cancel() error
}
// runSnapshots is a long running goroutine used to manage taking
// new snapshots of the FSM. It runs in parallel to the FSM and
// main goroutines, so that snapshots do not block normal operation.
func (r *Raft) runSnapshots() {
for {
select {
case <-randomTimeout(r.conf.SnapshotInterval):
// Check if we should snapshot
if !r.shouldSnapshot() {
continue
}
// Trigger a snapshot
if err := r.takeSnapshot(); err != nil {
r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err)
}
case future := <-r.snapshotCh:
// User-triggered, run immediately
err := r.takeSnapshot()
if err != nil {
r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err)
}
future.respond(err)
case <-r.shutdownCh:
return
}
}
}
// shouldSnapshot checks if we meet the conditions to take
// a new snapshot.
func (r *Raft) shouldSnapshot() bool {
// Check the last snapshot index
lastSnap, _ := r.getLastSnapshot()
// Check the last log index
lastIdx, err := r.logs.LastIndex()
if err != nil {
r.logger.Printf("[ERR] raft: Failed to get last log index: %v", err)
return false
}
// Compare the delta to the threshold
delta := lastIdx - lastSnap
return delta >= r.conf.SnapshotThreshold
}
// takeSnapshot is used to take a new snapshot. This must only be called from
// the snapshot thread, never the main thread.
func (r *Raft) takeSnapshot() error {
defer metrics.MeasureSince([]string{"raft", "snapshot", "takeSnapshot"}, time.Now())
// Create a request for the FSM to perform a snapshot.
snapReq := &reqSnapshotFuture{}
snapReq.init()
// Wait for dispatch or shutdown.
select {
case r.fsmSnapshotCh <- snapReq:
case <-r.shutdownCh:
return ErrRaftShutdown
}
// Wait until we get a response
if err := snapReq.Error(); err != nil {
if err != ErrNothingNewToSnapshot {
err = fmt.Errorf("failed to start snapshot: %v", err)
}
return err
}
defer snapReq.snapshot.Release()
// Make a request for the configurations and extract the committed info.
// We have to use the future here to safely get this information since
// it is owned by the main thread.
configReq := &configurationsFuture{}
configReq.init()
select {
case r.configurationsCh <- configReq:
case <-r.shutdownCh:
return ErrRaftShutdown
}
if err := configReq.Error(); err != nil {
return err
}
committed := configReq.configurations.committed
committedIndex := configReq.configurations.committedIndex
// We don't support snapshots while there's a config change outstanding
// since the snapshot doesn't have a means to represent this state. This
// is a little weird because we need the FSM to apply an index that's
// past the configuration change, even though the FSM itself doesn't see
// the configuration changes. It should be ok in practice with normal
// application traffic flowing through the FSM. If there's none of that
// then it's not crucial that we snapshot, since there's not much going
// on Raft-wise.
if snapReq.index < committedIndex {
return fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
committedIndex, snapReq.index)
}
// Create a new snapshot.
r.logger.Printf("[INFO] raft: Starting snapshot up to %d", snapReq.index)
start := time.Now()
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "create"}, start)
// Try to persist the snapshot.
start = time.Now()
if err := snapReq.snapshot.Persist(sink); err != nil {
sink.Cancel()
return fmt.Errorf("failed to persist snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "persist"}, start)
// Close and check for error.
if err := sink.Close(); err != nil {
return fmt.Errorf("failed to close snapshot: %v", err)
}
// Update the last stable snapshot info.
r.setLastSnapshot(snapReq.index, snapReq.term)
// Compact the logs.
if err := r.compactLogs(snapReq.index); err != nil {
return err
}
r.logger.Printf("[INFO] raft: Snapshot to %d complete", snapReq.index)
return nil
}
// compactLogs takes the last inclusive index of a snapshot
// and trims the logs that are no longer needed.
func (r *Raft) compactLogs(snapIdx uint64) error {
defer metrics.MeasureSince([]string{"raft", "compactLogs"}, time.Now())
// Determine log ranges to compact
minLog, err := r.logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
}
// Check if we have enough logs to truncate
lastLogIdx, _ := r.getLastLog()
if lastLogIdx <= r.conf.TrailingLogs {
return nil
}
// Truncate up to the end of the snapshot, or `TrailingLogs`
// back from the head, which ever is further back. This ensures
// at least `TrailingLogs` entries, but does not allow logs
// after the snapshot to be removed.
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)
// Log this
r.logger.Printf("[INFO] raft: Compacting logs from %d to %d", minLog, maxLog)
// Compact the logs
if err := r.logs.DeleteRange(minLog, maxLog); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
}
return nil
}

View File

@ -1,6 +1,7 @@
package raft
import (
"sync"
"sync/atomic"
)
@ -44,22 +45,25 @@ type raftState struct {
// The current term, cache of StableStore
currentTerm uint64
// Cache the latest log from LogStore
LastLogIndex uint64
LastLogTerm uint64
// Highest committed log entry
commitIndex uint64
// Last applied log to the FSM
lastApplied uint64
// protects 4 next fields
lastLock sync.Mutex
// Cache the latest snapshot index/term
lastSnapshotIndex uint64
lastSnapshotTerm uint64
// Tracks the number of live routines
runningRoutines int32
// Cache the latest log from LogStore
lastLogIndex uint64
lastLogTerm uint64
// Tracks running goroutines
routinesGroup sync.WaitGroup
// The current state
state RaftState
@ -83,87 +87,81 @@ func (r *raftState) setCurrentTerm(term uint64) {
atomic.StoreUint64(&r.currentTerm, term)
}
func (r *raftState) getLastLogIndex() uint64 {
return atomic.LoadUint64(&r.LastLogIndex)
func (r *raftState) getLastLog() (index, term uint64) {
r.lastLock.Lock()
index = r.lastLogIndex
term = r.lastLogTerm
r.lastLock.Unlock()
return
}
func (r *raftState) setLastLogIndex(term uint64) {
atomic.StoreUint64(&r.LastLogIndex, term)
func (r *raftState) setLastLog(index, term uint64) {
r.lastLock.Lock()
r.lastLogIndex = index
r.lastLogTerm = term
r.lastLock.Unlock()
}
func (r *raftState) getLastLogTerm() uint64 {
return atomic.LoadUint64(&r.LastLogTerm)
func (r *raftState) getLastSnapshot() (index, term uint64) {
r.lastLock.Lock()
index = r.lastSnapshotIndex
term = r.lastSnapshotTerm
r.lastLock.Unlock()
return
}
func (r *raftState) setLastLogTerm(term uint64) {
atomic.StoreUint64(&r.LastLogTerm, term)
func (r *raftState) setLastSnapshot(index, term uint64) {
r.lastLock.Lock()
r.lastSnapshotIndex = index
r.lastSnapshotTerm = term
r.lastLock.Unlock()
}
func (r *raftState) getCommitIndex() uint64 {
return atomic.LoadUint64(&r.commitIndex)
}
func (r *raftState) setCommitIndex(term uint64) {
atomic.StoreUint64(&r.commitIndex, term)
func (r *raftState) setCommitIndex(index uint64) {
atomic.StoreUint64(&r.commitIndex, index)
}
func (r *raftState) getLastApplied() uint64 {
return atomic.LoadUint64(&r.lastApplied)
}
func (r *raftState) setLastApplied(term uint64) {
atomic.StoreUint64(&r.lastApplied, term)
}
func (r *raftState) getLastSnapshotIndex() uint64 {
return atomic.LoadUint64(&r.lastSnapshotIndex)
}
func (r *raftState) setLastSnapshotIndex(term uint64) {
atomic.StoreUint64(&r.lastSnapshotIndex, term)
}
func (r *raftState) getLastSnapshotTerm() uint64 {
return atomic.LoadUint64(&r.lastSnapshotTerm)
}
func (r *raftState) setLastSnapshotTerm(term uint64) {
atomic.StoreUint64(&r.lastSnapshotTerm, term)
}
func (r *raftState) incrRoutines() {
atomic.AddInt32(&r.runningRoutines, 1)
}
func (r *raftState) decrRoutines() {
atomic.AddInt32(&r.runningRoutines, -1)
}
func (r *raftState) getRoutines() int32 {
return atomic.LoadInt32(&r.runningRoutines)
func (r *raftState) setLastApplied(index uint64) {
atomic.StoreUint64(&r.lastApplied, index)
}
// Start a goroutine and properly handle the race between a routine
// starting and incrementing, and exiting and decrementing.
func (r *raftState) goFunc(f func()) {
r.incrRoutines()
r.routinesGroup.Add(1)
go func() {
defer r.decrRoutines()
defer r.routinesGroup.Done()
f()
}()
}
func (r *raftState) waitShutdown() {
r.routinesGroup.Wait()
}
// getLastIndex returns the last index in stable storage.
// Either from the last log or from the last snapshot.
func (r *raftState) getLastIndex() uint64 {
return max(r.getLastLogIndex(), r.getLastSnapshotIndex())
r.lastLock.Lock()
defer r.lastLock.Unlock()
return max(r.lastLogIndex, r.lastSnapshotIndex)
}
// getLastEntry returns the last index and term in stable storage.
// Either from the last log or from the last snapshot.
func (r *raftState) getLastEntry() (uint64, uint64) {
if r.getLastLogIndex() >= r.getLastSnapshotIndex() {
return r.getLastLogIndex(), r.getLastLogTerm()
r.lastLock.Lock()
defer r.lastLock.Unlock()
if r.lastLogIndex >= r.lastSnapshotIndex {
return r.lastLogIndex, r.lastLogTerm
}
return r.getLastSnapshotIndex(), r.getLastSnapshotTerm()
return r.lastSnapshotIndex, r.lastSnapshotTerm
}

View File

@ -81,8 +81,8 @@ func newTCPTransport(bindAddr string,
}
// Dial implements the StreamLayer interface.
func (t *TCPStreamLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", address, timeout)
func (t *TCPStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", string(address), timeout)
}
// Accept implements the net.Listener interface.

View File

@ -31,27 +31,27 @@ type Transport interface {
Consumer() <-chan RPC
// LocalAddr is used to return our local address to distinguish from our peers.
LocalAddr() string
LocalAddr() ServerAddress
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
AppendEntriesPipeline(target string) (AppendPipeline, error)
AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error)
// AppendEntries sends the appropriate RPC to the target node.
AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
// RequestVote sends the appropriate RPC to the target node.
RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error
RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
// InstallSnapshot is used to push a snapshot down to a follower. The data is read from
// the ReadCloser and streamed to the client.
InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
// EncodePeer is used to serialize a peer name.
EncodePeer(string) []byte
// EncodePeer is used to serialize a peer's address.
EncodePeer(ServerAddress) []byte
// DecodePeer is used to deserialize a peer name.
DecodePeer([]byte) string
// DecodePeer is used to deserialize a peer's address.
DecodePeer([]byte) ServerAddress
// SetHeartbeatHandler is used to setup a heartbeat handler
// as a fast-pass. This is to avoid head-of-line blocking from
@ -60,6 +60,35 @@ type Transport interface {
SetHeartbeatHandler(cb func(rpc RPC))
}
// WithClose is an interface that a transport may provide which
// allows a transport to be shut down cleanly when a Raft instance
// shuts down.
//
// It is defined separately from Transport as unfortunately it wasn't in the
// original interface specification.
type WithClose interface {
// Close permanently closes a transport, stopping
// any associated goroutines and freeing other resources.
Close() error
}
// LoopbackTransport is an interface that provides a loopback transport suitable for testing
// e.g. InmemTransport. It's there so we don't have to rewrite tests.
type LoopbackTransport interface {
Transport // Embedded transport reference
WithPeers // Embedded peer management
WithClose // with a close routine
}
// WithPeers is an interface that a transport may provide which allows for connection and
// disconnection. Unless the transport is a loopback transport, the transport specified to
// "Connect" is likely to be nil.
type WithPeers interface {
Connect(peer ServerAddress, t Transport) // Connect a peer
Disconnect(peer ServerAddress) // Disconnect a given peer
DisconnectAll() // Disconnect all peers, possibly to reconnect them later
}
// AppendPipeline is used for pipelining AppendEntries requests. It is used
// to increase the replication throughput by masking latency and better
// utilizing bandwidth.
@ -72,14 +101,24 @@ type AppendPipeline interface {
// response futures when they are ready.
Consumer() <-chan AppendFuture
// Closes pipeline and cancels all inflight RPCs
// Close closes the pipeline and cancels all inflight RPCs
Close() error
}
// AppendFuture is used to return information about a pipelined AppendEntries request.
type AppendFuture interface {
Future
// Start returns the time that the append request was started.
// It is always OK to call this method.
Start() time.Time
// Request holds the parameters of the AppendEntries call.
// It is always OK to call this method.
Request() *AppendEntriesRequest
// Response holds the results of the AppendEntries call.
// This method must only be called after the Error
// method returns, and will only be valid on success.
Response() *AppendEntriesResponse
}

View File

@ -3,7 +3,6 @@ package raft
import (
"bytes"
crand "crypto/rand"
"encoding/binary"
"fmt"
"math"
"math/big"
@ -68,14 +67,6 @@ func generateUUID() string {
buf[10:16])
}
// asyncNotify is used to do an async channel send to
// a list of channels. This will not block.
func asyncNotify(chans []chan struct{}) {
for _, ch := range chans {
asyncNotifyCh(ch)
}
}
// asyncNotifyCh is used to do an async channel send
// to a single channel without blocking.
func asyncNotifyCh(ch chan struct{}) {
@ -85,6 +76,17 @@ func asyncNotifyCh(ch chan struct{}) {
}
}
// drainNotifyCh empties out a single-item notification channel without
// blocking, and returns whether it received anything.
func drainNotifyCh(ch chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}
// asyncNotifyBool is used to do an async notification
// on a bool channel.
func asyncNotifyBool(ch chan bool, v bool) {
@ -94,70 +96,6 @@ func asyncNotifyBool(ch chan bool, v bool) {
}
}
// ExcludePeer is used to exclude a single peer from a list of peers.
func ExcludePeer(peers []string, peer string) []string {
otherPeers := make([]string, 0, len(peers))
for _, p := range peers {
if p != peer {
otherPeers = append(otherPeers, p)
}
}
return otherPeers
}
// PeerContained checks if a given peer is contained in a list.
func PeerContained(peers []string, peer string) bool {
for _, p := range peers {
if p == peer {
return true
}
}
return false
}
// AddUniquePeer is used to add a peer to a list of existing
// peers only if it is not already contained.
func AddUniquePeer(peers []string, peer string) []string {
if PeerContained(peers, peer) {
return peers
}
return append(peers, peer)
}
// encodePeers is used to serialize a list of peers.
func encodePeers(peers []string, trans Transport) []byte {
// Encode each peer
var encPeers [][]byte
for _, p := range peers {
encPeers = append(encPeers, trans.EncodePeer(p))
}
// Encode the entire array
buf, err := encodeMsgPack(encPeers)
if err != nil {
panic(fmt.Errorf("failed to encode peers: %v", err))
}
return buf.Bytes()
}
// decodePeers is used to deserialize a list of peers.
func decodePeers(buf []byte, trans Transport) []string {
// Decode the buffer first
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
}
// Deserialize each peer
var peers []string
for _, enc := range encPeers {
peers = append(peers, trans.DecodePeer(enc))
}
return peers
}
// Decode reverses the encode operation on a byte slice input.
func decodeMsgPack(buf []byte, out interface{}) error {
r := bytes.NewBuffer(buf)
@ -175,18 +113,6 @@ func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
return buf, err
}
// Converts bytes to an integer.
func bytesToUint64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
// Converts a uint64 to a byte slice.
func uint64ToBytes(u uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, u)
return buf
}
// backoff is used to compute an exponential backoff
// duration. Base time is scaled by the current round,
// up to some maximum scale factor.
@ -198,3 +124,10 @@ func backoff(base time.Duration, round, limit uint64) time.Duration {
}
return base
}
// Needed for sorting []uint64, used to determine commitment
type uint64Slice []uint64
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

4
vendor/vendor.json vendored
View File

@ -256,8 +256,10 @@
"revision": "a14192a58a694c123d8fe5481d4a4727d6ae82f3"
},
{
"checksumSHA1": "zMgiTV0dfJIQNRCJDF50bLomDvg=",
"path": "github.com/hashicorp/raft",
"revision": "057b893fd996696719e98b6c44649ea14968c811"
"revision": "c69c15dd73b6695ba75b3502ce6b332cc0042c83",
"revisionTime": "2016-08-01T21:27:18Z"
},
{
"path": "github.com/hashicorp/raft-boltdb",