mirror of https://github.com/hashicorp/consul
chore: update raft to v1.2.0 (#8822)
parent
38f5ddce2a
commit
708957a982
2
go.mod
2
go.mod
|
@ -54,7 +54,7 @@ require (
|
|||
github.com/hashicorp/hil v0.0.0-20160711231837-1e86c6b523c5
|
||||
github.com/hashicorp/memberlist v0.2.2
|
||||
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
|
||||
github.com/hashicorp/raft v1.1.2
|
||||
github.com/hashicorp/raft v1.2.0
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||
github.com/hashicorp/serf v0.9.5
|
||||
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086
|
||||
|
|
4
go.sum
4
go.sum
|
@ -284,8 +284,8 @@ github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn
|
|||
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
|
||||
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
|
||||
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c=
|
||||
github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0=
|
||||
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
|
||||
github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM=
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
# UNRELEASED
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
* Remove `StartAsLeader` configuration option [[GH-364](https://github.com/hashicorp/raft/pull/386)]
|
||||
* Allow futures to react to `Shutdown()` to prevent a deadlock with `takeSnapshot()` [[GH-390](https://github.com/hashicorp/raft/pull/390)]
|
||||
* Prevent non-voters from becoming eligible for leadership elections [[GH-398](https://github.com/hashicorp/raft/pull/398)]
|
||||
* Remove an unneeded `io.Copy` from snapshot writes [[GH-399](https://github.com/hashicorp/raft/pull/399)]
|
||||
* Log decoded candidate address in `duplicate requestVote` warning [[GH-400](https://github.com/hashicorp/raft/pull/400)]
|
||||
* Prevent starting a TCP transport when IP address is `nil` [[GH-403](https://github.com/hashicorp/raft/pull/403)]
|
||||
* Reject leadership transfer requests when in candidate state to prevent indefinite blocking while unable to elect a leader [[GH-413](https://github.com/hashicorp/raft/pull/413)]
|
||||
* Add labels for metric metadata to reduce cardinality of metric names [[GH-409](https://github.com/hashicorp/raft/pull/409)]
|
||||
* Add peers metric [[GH-413](https://github.com/hashicorp/raft/pull/431)]
|
||||
|
||||
BUG FIXES
|
||||
|
||||
* Make `LeaderCh` always deliver the latest leadership transition [[GH-384](https://github.com/hashicorp/raft/pull/384)]
|
||||
* Handle updating an existing peer in `startStopReplication` [[GH-419](https://github.com/hashicorp/raft/pull/419)]
|
||||
|
||||
# 1.1.2 (January 17th, 2020)
|
||||
|
||||
FEATURES
|
||||
|
|
|
@ -16,8 +16,8 @@ endif
|
|||
TEST_RESULTS_DIR?=/tmp/test-results
|
||||
|
||||
test:
|
||||
go test $(TESTARGS) -timeout=60s -race .
|
||||
go test $(TESTARGS) -timeout=60s -tags batchtest -race .
|
||||
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -race .
|
||||
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -tags batchtest -race .
|
||||
|
||||
integ: test
|
||||
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
raft [![Build Status](https://travis-ci.org/hashicorp/raft.png)](https://travis-ci.org/hashicorp/raft) [![CircleCI](https://circleci.com/gh/hashicorp/raft.svg?style=svg)](https://circleci.com/gh/hashicorp/raft)
|
||||
raft [![CircleCI](https://circleci.com/gh/hashicorp/raft.svg?style=svg)](https://circleci.com/gh/hashicorp/raft)
|
||||
====
|
||||
|
||||
raft is a [Go](http://www.golang.org) library that manages a replicated
|
||||
|
|
|
@ -503,7 +503,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
|
|||
fsm: fsm,
|
||||
fsmMutateCh: make(chan interface{}, 128),
|
||||
fsmSnapshotCh: make(chan *reqSnapshotFuture),
|
||||
leaderCh: make(chan bool),
|
||||
leaderCh: make(chan bool, 1),
|
||||
localID: localID,
|
||||
localAddr: localAddr,
|
||||
logger: logger,
|
||||
|
@ -527,13 +527,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
|
|||
// Initialize as a follower.
|
||||
r.setState(Follower)
|
||||
|
||||
// Start as leader if specified. This should only be used
|
||||
// for testing purposes.
|
||||
if conf.StartAsLeader {
|
||||
r.setState(Leader)
|
||||
r.setLeader(r.localAddr)
|
||||
}
|
||||
|
||||
// Restore the current term and the last log.
|
||||
r.setCurrentTerm(currentTerm)
|
||||
r.setLastLog(lastLog.Index, lastLog.Term)
|
||||
|
@ -959,10 +952,17 @@ func (r *Raft) State() RaftState {
|
|||
return r.getState()
|
||||
}
|
||||
|
||||
// LeaderCh is used to get a channel which delivers signals on
|
||||
// acquiring or losing leadership. It sends true if we become
|
||||
// the leader, and false if we lose it. The channel is not buffered,
|
||||
// and does not block on writes.
|
||||
// LeaderCh is used to get a channel which delivers signals on acquiring or
|
||||
// losing leadership. It sends true if we become the leader, and false if we
|
||||
// lose it.
|
||||
//
|
||||
// Receivers can expect to receive a notification only if leadership
|
||||
// transition has occured.
|
||||
//
|
||||
// If receivers aren't ready for the signal, signals may drop and only the
|
||||
// latest leadership transition. For example, if a receiver receives subsequent
|
||||
// `true` values, they may deduce that leadership was lost and regained while
|
||||
// the the receiver was processing first leadership transition.
|
||||
func (r *Raft) LeaderCh() <-chan bool {
|
||||
return r.leaderCh
|
||||
}
|
||||
|
|
|
@ -178,10 +178,6 @@ type Config struct {
|
|||
// step down as leader.
|
||||
LeaderLeaseTimeout time.Duration
|
||||
|
||||
// StartAsLeader forces Raft to start in the leader state. This should
|
||||
// never be used except for testing purposes, as it can cause a split-brain.
|
||||
StartAsLeader bool
|
||||
|
||||
// The unique ID for this server across all time. When running with
|
||||
// ProtocolVersion < 3, you must set this to be the same as the network
|
||||
// address of your transport.
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"hash"
|
||||
"hash/crc64"
|
||||
"io"
|
||||
|
@ -16,6 +15,8 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -32,6 +33,10 @@ type FileSnapshotStore struct {
|
|||
path string
|
||||
retain int
|
||||
logger hclog.Logger
|
||||
|
||||
// noSync, if true, skips crash-safe file fsync api calls.
|
||||
// It's a private field, only used in testing
|
||||
noSync bool
|
||||
}
|
||||
|
||||
type snapMetaSlice []*fileSnapshotMeta
|
||||
|
@ -44,6 +49,8 @@ type FileSnapshotSink struct {
|
|||
parentDir string
|
||||
meta fileSnapshotMeta
|
||||
|
||||
noSync bool
|
||||
|
||||
stateFile *os.File
|
||||
stateHash hash.Hash64
|
||||
buffered *bufio.Writer
|
||||
|
@ -172,6 +179,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
|
|||
logger: f.logger,
|
||||
dir: path,
|
||||
parentDir: f.path,
|
||||
noSync: f.noSync,
|
||||
meta: fileSnapshotMeta{
|
||||
SnapshotMeta: SnapshotMeta{
|
||||
Version: version,
|
||||
|
@ -414,7 +422,7 @@ func (s *FileSnapshotSink) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
|
||||
if !s.noSync && runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
|
||||
parentFH, err := os.Open(s.parentDir)
|
||||
defer parentFH.Close()
|
||||
if err != nil {
|
||||
|
@ -462,8 +470,10 @@ func (s *FileSnapshotSink) finalize() error {
|
|||
}
|
||||
|
||||
// Sync to force fsync to disk
|
||||
if err := s.stateFile.Sync(); err != nil {
|
||||
return err
|
||||
if !s.noSync {
|
||||
if err := s.stateFile.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Get the file size
|
||||
|
@ -510,8 +520,10 @@ func (s *FileSnapshotSink) writeMeta() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err = fh.Sync(); err != nil {
|
||||
return err
|
||||
if !s.noSync {
|
||||
if err = fh.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -84,9 +84,10 @@ func (e errorFuture) Index() uint64 {
|
|||
// deferError can be embedded to allow a future
|
||||
// to provide an error in the future.
|
||||
type deferError struct {
|
||||
err error
|
||||
errCh chan error
|
||||
responded bool
|
||||
err error
|
||||
errCh chan error
|
||||
responded bool
|
||||
ShutdownCh chan struct{}
|
||||
}
|
||||
|
||||
func (d *deferError) init() {
|
||||
|
@ -103,7 +104,11 @@ func (d *deferError) Error() error {
|
|||
if d.errCh == nil {
|
||||
panic("waiting for response on nil channel")
|
||||
}
|
||||
d.err = <-d.errCh
|
||||
select {
|
||||
case d.err = <-d.errCh:
|
||||
case <-d.ShutdownCh:
|
||||
d.err = ErrRaftShutdown
|
||||
}
|
||||
return d.err
|
||||
}
|
||||
|
||||
|
|
|
@ -90,9 +90,9 @@ func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, erro
|
|||
|
||||
// Write appends the given bytes to the snapshot contents
|
||||
func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) {
|
||||
written, err := io.Copy(s.contents, bytes.NewReader(p))
|
||||
s.meta.Size += written
|
||||
return int(written), err
|
||||
written, err := s.contents.Write(p)
|
||||
s.meta.Size += int64(written)
|
||||
return written, err
|
||||
}
|
||||
|
||||
// Close updates the Size and is otherwise a no-op
|
||||
|
|
|
@ -63,7 +63,7 @@ func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (Se
|
|||
// NewInmemTransport is used to initialize a new transport
|
||||
// and generates a random local address if none is specified
|
||||
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
|
||||
return NewInmemTransportWithTimeout(addr, 50*time.Millisecond)
|
||||
return NewInmemTransportWithTimeout(addr, 500*time.Millisecond)
|
||||
}
|
||||
|
||||
// SetHeartbeatHandler is used to set optional fast-path for
|
||||
|
@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re
|
|||
}
|
||||
|
||||
// Send the RPC over
|
||||
respCh := make(chan RPCResponse)
|
||||
respCh := make(chan RPCResponse, 1)
|
||||
req := RPC{
|
||||
Command: args,
|
||||
Reader: r,
|
||||
|
|
|
@ -319,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
|
|||
if n.serverAddressProvider != nil {
|
||||
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
|
||||
if err != nil {
|
||||
n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err)
|
||||
n.logger.Warn("unable to get address for server, using fallback address", "id", id, "fallback", target, "error", err)
|
||||
} else {
|
||||
return serverAddressOverride
|
||||
}
|
||||
|
|
|
@ -311,6 +311,10 @@ func (r *Raft) runCandidate() {
|
|||
// Reject any restores since we are not the leader
|
||||
r.respond(ErrNotLeader)
|
||||
|
||||
case r := <-r.leadershipTransferCh:
|
||||
// Reject any operations since we are not the leader
|
||||
r.respond(ErrNotLeader)
|
||||
|
||||
case c := <-r.configurationsCh:
|
||||
c.configurations = r.configurations.Clone()
|
||||
c.respond(nil)
|
||||
|
@ -364,7 +368,7 @@ func (r *Raft) runLeader() {
|
|||
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
|
||||
|
||||
// Notify that we are the leader
|
||||
asyncNotifyBool(r.leaderCh, true)
|
||||
overrideNotifyBool(r.leaderCh, true)
|
||||
|
||||
// Push to the notify channel if given
|
||||
if notify := r.conf.NotifyCh; notify != nil {
|
||||
|
@ -420,7 +424,7 @@ func (r *Raft) runLeader() {
|
|||
r.leaderLock.Unlock()
|
||||
|
||||
// Notify that we are not the leader
|
||||
asyncNotifyBool(r.leaderCh, false)
|
||||
overrideNotifyBool(r.leaderCh, false)
|
||||
|
||||
// Push to the notify channel if given
|
||||
if notify := r.conf.NotifyCh; notify != nil {
|
||||
|
@ -469,10 +473,13 @@ func (r *Raft) startStopReplication() {
|
|||
if server.ID == r.localID {
|
||||
continue
|
||||
}
|
||||
|
||||
inConfig[server.ID] = true
|
||||
if _, ok := r.leaderState.replState[server.ID]; !ok {
|
||||
|
||||
s, ok := r.leaderState.replState[server.ID]
|
||||
if !ok {
|
||||
r.logger.Info("added peer, starting replication", "peer", server.ID)
|
||||
s := &followerReplication{
|
||||
s = &followerReplication{
|
||||
peer: server,
|
||||
commitment: r.leaderState.commitment,
|
||||
stopCh: make(chan uint64, 1),
|
||||
|
@ -485,10 +492,14 @@ func (r *Raft) startStopReplication() {
|
|||
notifyCh: make(chan struct{}, 1),
|
||||
stepDown: r.leaderState.stepDown,
|
||||
}
|
||||
|
||||
r.leaderState.replState[server.ID] = s
|
||||
r.goFunc(func() { r.replicate(s) })
|
||||
asyncNotifyCh(s.triggerCh)
|
||||
r.observe(PeerObservation{Peer: server, Removed: false})
|
||||
} else if ok && s.peer.Address != server.Address {
|
||||
r.logger.Info("updating peer", "peer", server.ID)
|
||||
s.peer = server
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -504,6 +515,9 @@ func (r *Raft) startStopReplication() {
|
|||
delete(r.leaderState.replState, serverID)
|
||||
r.observe(PeerObservation{Peer: repl.peer, Removed: true})
|
||||
}
|
||||
|
||||
// Update peers metric
|
||||
metrics.SetGauge([]string{"raft", "peers"}, float32(len(r.configurations.latest.Servers)))
|
||||
}
|
||||
|
||||
// configurationChangeChIfStable returns r.configurationChangeCh if it's safe
|
||||
|
@ -982,6 +996,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
|
|||
// Restore the snapshot into the FSM. If this fails we are in a
|
||||
// bad state so we panic to take ourselves out.
|
||||
fsm := &restoreFuture{ID: sink.ID()}
|
||||
fsm.ShutdownCh = r.shutdownCh
|
||||
fsm.init()
|
||||
select {
|
||||
case r.fsmMutateCh <- fsm:
|
||||
|
@ -1451,7 +1466,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
|
|||
if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
|
||||
r.logger.Info("duplicate requestVote for same term", "term", req.Term)
|
||||
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
|
||||
r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate)
|
||||
r.logger.Warn("duplicate requestVote from", "candidate", candidate)
|
||||
resp.Granted = true
|
||||
}
|
||||
return
|
||||
|
@ -1576,6 +1591,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
|
|||
|
||||
// Restore snapshot
|
||||
future := &restoreFuture{ID: sink.ID()}
|
||||
future.ShutdownCh = r.shutdownCh
|
||||
future.init()
|
||||
select {
|
||||
case r.fsmMutateCh <- future:
|
||||
|
@ -1732,13 +1748,13 @@ func (r *Raft) lookupServer(id ServerID) *Server {
|
|||
return nil
|
||||
}
|
||||
|
||||
// pickServer returns the follower that is most up to date. Because it accesses
|
||||
// leaderstate, it should only be called from the leaderloop.
|
||||
// pickServer returns the follower that is most up to date and participating in quorum.
|
||||
// Because it accesses leaderstate, it should only be called from the leaderloop.
|
||||
func (r *Raft) pickServer() *Server {
|
||||
var pick *Server
|
||||
var current uint64
|
||||
for _, server := range r.configurations.latest.Servers {
|
||||
if server.ID == r.localID {
|
||||
if server.ID == r.localID || server.Suffrage != Voter {
|
||||
continue
|
||||
}
|
||||
state, ok := r.leaderState.replState[server.ID]
|
||||
|
|
|
@ -326,6 +326,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
|
|||
s.failures++
|
||||
return false, err
|
||||
}
|
||||
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
|
||||
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
|
||||
// Duplicated information. Kept for backward compatibility.
|
||||
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start)
|
||||
|
||||
// Check for a newer term, stop running
|
||||
|
@ -386,6 +389,9 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
|
|||
} else {
|
||||
s.setLastContact()
|
||||
failures = 0
|
||||
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
|
||||
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
|
||||
// Duplicated information. Kept for backward compatibility.
|
||||
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
|
||||
s.notifyAll(resp.Success)
|
||||
}
|
||||
|
@ -572,6 +578,10 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
|
|||
|
||||
// appendStats is used to emit stats about an AppendEntries invocation.
|
||||
func appendStats(peer string, start time.Time, logs float32) {
|
||||
labels := []metrics.Label{{Name: "peer_id", Value: peer}}
|
||||
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels)
|
||||
metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels)
|
||||
// Duplicated information. Kept for backward compatibility.
|
||||
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
|
||||
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
|
||||
}
|
||||
|
|
|
@ -146,6 +146,7 @@ func (r *Raft) takeSnapshot() (string, error) {
|
|||
// We have to use the future here to safely get this information since
|
||||
// it is owned by the main thread.
|
||||
configReq := &configurationsFuture{}
|
||||
configReq.ShutdownCh = r.shutdownCh
|
||||
configReq.init()
|
||||
select {
|
||||
case r.configurationsCh <- configReq:
|
||||
|
|
|
@ -81,7 +81,7 @@ func newTCPTransport(bindAddr string,
|
|||
list.Close()
|
||||
return nil, errNotTCP
|
||||
}
|
||||
if addr.IP.IsUnspecified() {
|
||||
if addr.IP == nil || addr.IP.IsUnspecified() {
|
||||
list.Close()
|
||||
return nil, errNotAdvertisable
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package raft
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -276,19 +277,14 @@ func (c *cluster) Close() {
|
|||
// or a timeout occurs. It is possible to set a filter to look for specific
|
||||
// observations. Setting timeout to 0 means that it will wait forever until a
|
||||
// non-filtered observation is made.
|
||||
func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan struct{} {
|
||||
func (c *cluster) WaitEventChan(ctx context.Context, filter FilterFn) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(ch)
|
||||
var timeoutCh <-chan time.Time
|
||||
if timeout > 0 {
|
||||
timeoutCh = time.After(timeout)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-timeoutCh:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case o, ok := <-c.observationCh:
|
||||
if !ok || filter == nil || filter(&o) {
|
||||
return
|
||||
|
@ -304,11 +300,13 @@ func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan s
|
|||
// observations. Setting timeout to 0 means that it will wait forever until a
|
||||
// non-filtered observation is made or a test failure is signaled.
|
||||
func (c *cluster) WaitEvent(filter FilterFn, timeout time.Duration) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
eventCh := c.WaitEventChan(ctx, filter)
|
||||
select {
|
||||
case <-c.failedCh:
|
||||
c.t.FailNow()
|
||||
|
||||
case <-c.WaitEventChan(filter, timeout):
|
||||
case <-eventCh:
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -319,7 +317,9 @@ func (c *cluster) WaitForReplication(fsmLength int) {
|
|||
|
||||
CHECK:
|
||||
for {
|
||||
ch := c.WaitEventChan(nil, c.conf.CommitTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.conf.CommitTimeout)
|
||||
defer cancel()
|
||||
ch := c.WaitEventChan(ctx, nil)
|
||||
select {
|
||||
case <-c.failedCh:
|
||||
c.t.FailNow()
|
||||
|
@ -415,6 +415,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
|
|||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
eventCh := c.WaitEventChan(ctx, filter)
|
||||
select {
|
||||
case <-c.failedCh:
|
||||
c.t.FailNow()
|
||||
|
@ -422,7 +425,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
|
|||
case <-limitCh:
|
||||
c.FailNowf("timeout waiting for stable %s state", s)
|
||||
|
||||
case <-c.WaitEventChan(filter, 0):
|
||||
case <-eventCh:
|
||||
c.logger.Debug("resetting stability timeout")
|
||||
|
||||
case t, ok := <-timer.C:
|
||||
|
@ -805,5 +808,6 @@ func FileSnapTest(t *testing.T) (string, *FileSnapshotStore) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
snap.noSync = true
|
||||
return dir, snap
|
||||
}
|
||||
|
|
|
@ -96,6 +96,25 @@ func asyncNotifyBool(ch chan bool, v bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// overrideNotifyBool is used to notify on a bool channel
|
||||
// but override existing value if value is present.
|
||||
// ch must be 1-item buffered channel.
|
||||
//
|
||||
// This method does not support multiple concurrent calls.
|
||||
func overrideNotifyBool(ch chan bool, v bool) {
|
||||
select {
|
||||
case ch <- v:
|
||||
// value sent, all done
|
||||
case <-ch:
|
||||
// channel had an old value
|
||||
select {
|
||||
case ch <- v:
|
||||
default:
|
||||
panic("race: channel was sent concurrently")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Decode reverses the encode operation on a byte slice input.
|
||||
func decodeMsgPack(buf []byte, out interface{}) error {
|
||||
r := bytes.NewBuffer(buf)
|
||||
|
|
|
@ -276,7 +276,7 @@ github.com/hashicorp/mdns
|
|||
github.com/hashicorp/memberlist
|
||||
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
|
||||
github.com/hashicorp/net-rpc-msgpackrpc
|
||||
# github.com/hashicorp/raft v1.1.2
|
||||
# github.com/hashicorp/raft v1.2.0
|
||||
github.com/hashicorp/raft
|
||||
# github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||
github.com/hashicorp/raft-boltdb
|
||||
|
|
Loading…
Reference in New Issue