Add additional raft metrics (#5628)

* Add documentation for new raft metrics
* Revendor raft from master
pull/5639/head
Freddy 2019-04-09 16:09:22 -06:00 committed by GitHub
parent 8e07c63c54
commit a03392c906
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 124 additions and 30 deletions

2
go.mod
View File

@ -74,7 +74,7 @@ require (
github.com/hashicorp/mdns v1.0.1 // indirect github.com/hashicorp/mdns v1.0.1 // indirect
github.com/hashicorp/memberlist v0.1.3 github.com/hashicorp/memberlist v0.1.3
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1 github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1
github.com/hashicorp/serf v0.8.2 github.com/hashicorp/serf v0.8.2
github.com/hashicorp/vault v0.10.3 github.com/hashicorp/vault v0.10.3

4
go.sum
View File

@ -186,8 +186,8 @@ github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG67
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE= github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q= github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c h1:V0ncQQu4St5edf3p566gGAU8jxvrWdjny9dIFl2aV/s= github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472 h1:9EPzHJ1bJFaFbGOz3UV3DDFmGYANr+SF+eapmiK5zV4=
github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI= github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI=
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1 h1:LHTrLUnNkk+2YkO5EMG49q0lHdR9AZhDbCpu0+M3e0E= github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1 h1:LHTrLUnNkk+2YkO5EMG49q0lHdR9AZhDbCpu0+M3e0E=
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0= github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=

17
vendor/github.com/hashicorp/raft/CHANGELOG.md generated vendored Normal file
View File

@ -0,0 +1,17 @@
# UNRELEASED
IMPROVEMENTS
* InMemTransport: Add timeout for sending a message [[GH-313](https://github.com/hashicorp/raft/pull/313)]
* ensure 'make deps' downloads test dependencies like testify [[GH-310](https://github.com/hashicorp/raft/pull/310)]
* Clarifies function of CommitTimeout [[GH-309](https://github.com/hashicorp/raft/pull/309)]
# 1.0.0 (October 3rd, 2017)
v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version manages server identities using a UUID, so introduces some breaking API changes. It also versions the Raft protocol, and requires some special steps when interoperating with Raft servers running older versions of the library (see the detailed comment in config.go about version compatibility). You can reference https://github.com/hashicorp/consul/pull/2222 for an idea of what was required to port Consul to these new interfaces.
# 0.1.0 (September 29th, 2017)
v0.1.0 is the original stable version of the library that was in master and has been maintained with no breaking API changes. This was in use by Consul prior to version 0.7.0.

View File

@ -10,7 +10,7 @@ fuzz:
go test -timeout=300s ./fuzzy go test -timeout=300s ./fuzzy
deps: deps:
go get -d -v ./... go get -t -d -v ./...
echo $(DEPS) | xargs -n1 go get -d echo $(DEPS) | xargs -n1 go get -d
cov: cov:

View File

@ -34,7 +34,7 @@ and `StableStore`.
## Tagged Releases ## Tagged Releases
As of September 2017, Hashicorp will start using tags for this library to clearly indicate As of September 2017, HashiCorp will start using tags for this library to clearly indicate
major version updates. We recommend you vendor your application's dependency on this library. major version updates. We recommend you vendor your application's dependency on this library.
* v0.1.0 is the original stable version of the library that was in master and has been maintained * v0.1.0 is the original stable version of the library that was in master and has been maintained

View File

@ -164,7 +164,7 @@ type Raft struct {
// configuration on all the Voter servers. There is no need to bootstrap // configuration on all the Voter servers. There is no need to bootstrap
// Nonvoter and Staging servers. // Nonvoter and Staging servers.
// //
// One sane approach is to boostrap a single server with a configuration // One sane approach is to bootstrap a single server with a configuration
// listing just itself as a Voter, then invoke AddVoter() on it to add other // listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster. // servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,

View File

@ -9,7 +9,7 @@ import (
// replication goroutines report in newly written entries with Match(), and // replication goroutines report in newly written entries with Match(), and
// this notifies on commitCh when the commit index has advanced. // this notifies on commitCh when the commit index has advanced.
type commitment struct { type commitment struct {
// protectes matchIndexes and commitIndex // protects matchIndexes and commitIndex
sync.Mutex sync.Mutex
// notified when commitIndex increases // notified when commitIndex increases
commitCh chan struct{} commitCh chan struct{}

View File

@ -115,7 +115,7 @@ type configurationChangeRequest struct {
// prior one has been committed). // prior one has been committed).
// //
// One downside to storing just two configurations is that if you try to take a // One downside to storing just two configurations is that if you try to take a
// snahpsot when your state machine hasn't yet applied the committedIndex, we // snapshot when your state machine hasn't yet applied the committedIndex, we
// have no record of the configuration that would logically fit into that // have no record of the configuration that would logically fit into that
// snapshot. We disallow snapshots in that case now. An alternative approach, // snapshot. We disallow snapshots in that case now. An alternative approach,
// which LogCabin uses, is to track every configuration change in the // which LogCabin uses, is to track every configuration change in the
@ -198,7 +198,7 @@ func nextConfiguration(current Configuration, currentIndex uint64, change config
// TODO: barf on new address? // TODO: barf on new address?
newServer := Server{ newServer := Server{
// TODO: This should add the server as Staging, to be automatically // TODO: This should add the server as Staging, to be automatically
// promoted to Voter later. However, the promoton to Voter is not yet // promoted to Voter later. However, the promotion to Voter is not yet
// implemented, and doing so is not trivial with the way the leader loop // implemented, and doing so is not trivial with the way the leader loop
// coordinates with the replication goroutines today. So, for now, the // coordinates with the replication goroutines today. So, for now, the
// server will have a vote right away, and the Promote case below is // server will have a vote right away, and the Promote case below is

View File

@ -1,6 +1,7 @@
package raft package raft
import ( import (
"errors"
"sync" "sync"
) )
@ -106,7 +107,11 @@ func (i *InmemStore) Set(key []byte, val []byte) error {
func (i *InmemStore) Get(key []byte) ([]byte, error) { func (i *InmemStore) Get(key []byte) ([]byte, error) {
i.l.RLock() i.l.RLock()
defer i.l.RUnlock() defer i.l.RUnlock()
return i.kv[string(key)], nil val := i.kv[string(key)]
if val == nil {
return nil, errors.New("not found")
}
return val, nil
} }
// SetUint64 implements the StableStore interface. // SetUint64 implements the StableStore interface.

View File

@ -43,9 +43,11 @@ type InmemTransport struct {
timeout time.Duration timeout time.Duration
} }
// NewInmemTransport is used to initialize a new transport // NewInmemTransportWithTimeout is used to initialize a new transport and
// and generates a random local address if none is specified // generates a random local address if none is specified. The given timeout
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) { // will be used to decide how long to wait for a connected peer to process the
// RPCs that we're sending it. See also Connect() and Consumer().
func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (ServerAddress, *InmemTransport) {
if string(addr) == "" { if string(addr) == "" {
addr = NewInmemAddr() addr = NewInmemAddr()
} }
@ -53,11 +55,17 @@ func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
consumerCh: make(chan RPC, 16), consumerCh: make(chan RPC, 16),
localAddr: addr, localAddr: addr,
peers: make(map[ServerAddress]*InmemTransport), peers: make(map[ServerAddress]*InmemTransport),
timeout: 50 * time.Millisecond, timeout: timeout,
} }
return addr, trans return addr, trans
} }
// NewInmemTransport is used to initialize a new transport
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
return NewInmemTransportWithTimeout(addr, 50*time.Millisecond)
}
// SetHeartbeatHandler is used to set optional fast-path for // SetHeartbeatHandler is used to set optional fast-path for
// heartbeats, not supported for this transport. // heartbeats, not supported for this transport.
func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC)) { func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC)) {
@ -76,16 +84,15 @@ func (i *InmemTransport) LocalAddr() ServerAddress {
// AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests. // AppendEntries requests.
func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) {
i.RLock() i.Lock()
defer i.Unlock()
peer, ok := i.peers[target] peer, ok := i.peers[target]
i.RUnlock()
if !ok { if !ok {
return nil, fmt.Errorf("failed to connect to peer: %v", target) return nil, fmt.Errorf("failed to connect to peer: %v", target)
} }
pipeline := newInmemPipeline(i, peer, target) pipeline := newInmemPipeline(i, peer, target)
i.Lock()
i.pipelines = append(i.pipelines, pipeline) i.pipelines = append(i.pipelines, pipeline)
i.Unlock()
return pipeline, nil return pipeline, nil
} }
@ -140,11 +147,17 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re
// Send the RPC over // Send the RPC over
respCh := make(chan RPCResponse) respCh := make(chan RPCResponse)
peer.consumerCh <- RPC{ req := RPC{
Command: args, Command: args,
Reader: r, Reader: r,
RespChan: respCh, RespChan: respCh,
} }
select {
case peer.consumerCh <- req:
case <-time.After(timeout):
err = fmt.Errorf("send timed out")
return
}
// Wait for a response // Wait for a response
select { select {

View File

@ -461,16 +461,38 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress {
// listen is used to handling incoming connections. // listen is used to handling incoming connections.
func (n *NetworkTransport) listen() { func (n *NetworkTransport) listen() {
const baseDelay = 5 * time.Millisecond
const maxDelay = 1 * time.Second
var loopDelay time.Duration
for { for {
// Accept incoming connections // Accept incoming connections
conn, err := n.stream.Accept() conn, err := n.stream.Accept()
if err != nil { if err != nil {
if n.IsShutdown() { if loopDelay == 0 {
return loopDelay = baseDelay
} else {
loopDelay *= 2
} }
if loopDelay > maxDelay {
loopDelay = maxDelay
}
if !n.IsShutdown() {
n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err) n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err)
}
select {
case <-n.shutdownCh:
return
case <-time.After(loopDelay):
continue continue
} }
}
// No error, reset loop delay
loopDelay = 0
n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr())
// Handle the connection in dedicated routine // Handle the connection in dedicated routine

View File

@ -520,6 +520,9 @@ func (r *Raft) leaderLoop() {
} }
} }
var numProcessed int
start := time.Now()
for { for {
e := r.leaderState.inflight.Front() e := r.leaderState.inflight.Front()
if e == nil { if e == nil {
@ -532,10 +535,19 @@ func (r *Raft) leaderLoop() {
} }
// Measure the commit time // Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
r.processLogs(idx, commitLog) r.processLogs(idx, commitLog)
r.leaderState.inflight.Remove(e) r.leaderState.inflight.Remove(e)
numProcessed++
} }
// Measure the time to enqueue batch of logs for FSM to apply
metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start)
// Count the number of logs enqueued
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed))
if stepDown { if stepDown {
if r.conf.ShutdownOnRemove { if r.conf.ShutdownOnRemove {
r.logger.Printf("[INFO] raft: Removed ourself, shutting down") r.logger.Printf("[INFO] raft: Removed ourself, shutting down")
@ -848,7 +860,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
term := r.getCurrentTerm() term := r.getCurrentTerm()
lastIndex := r.getLastIndex() lastIndex := r.getLastIndex()
logs := make([]*Log, len(applyLogs))
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
for idx, applyLog := range applyLogs { for idx, applyLog := range applyLogs {
applyLog.dispatch = now applyLog.dispatch = now
@ -879,10 +894,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
} }
} }
// processLogs is used to apply all the committed entires that haven't been // processLogs is used to apply all the committed entries that haven't been
// applied up to the given index limit. // applied up to the given index limit.
// This can be called from both leaders and followers. // This can be called from both leaders and followers.
// Followers call this from AppendEntires, for n entires at a time, and always // Followers call this from AppendEntries, for n entries at a time, and always
// pass future=nil. // pass future=nil.
// Leaders call this once per inflight when entries are committed. They pass // Leaders call this once per inflight when entries are committed. They pass
// the future from inflights. // the future from inflights.
@ -899,7 +914,6 @@ func (r *Raft) processLogs(index uint64, future *logFuture) {
// Get the log, either from the future or from our log store // Get the log, either from the future or from our log store
if future != nil && future.log.Index == idx { if future != nil && future.log.Index == idx {
r.processLog(&future.log, future) r.processLog(&future.log, future)
} else { } else {
l := new(Log) l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil { if err := r.logs.GetLog(idx, l); err != nil {

View File

@ -31,7 +31,7 @@ type followerReplication struct {
peer Server peer Server
// commitment tracks the entries acknowledged by followers so that the // commitment tracks the entries acknowledged by followers so that the
// leader's commit index can advance. It is updated on successsful // leader's commit index can advance. It is updated on successful
// AppendEntries responses. // AppendEntries responses.
commitment *commitment commitment *commitment
@ -137,7 +137,12 @@ RPC:
case <-s.triggerCh: case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog() lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx) shouldStop = r.replicateTo(s, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout): // TODO: what is this? // This is _not_ our heartbeat mechanism but is to ensure
// followers quickly learn the leader's commit index when
// raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282.
case <-randomTimeout(r.conf.CommitTimeout):
lastLogIdx, _ := r.getLastLog() lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx) shouldStop = r.replicateTo(s, lastLogIdx)
} }

2
vendor/modules.txt vendored
View File

@ -264,7 +264,7 @@ github.com/hashicorp/mdns
github.com/hashicorp/memberlist github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 # github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c # github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472
github.com/hashicorp/raft github.com/hashicorp/raft
# github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1 # github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1
github.com/hashicorp/raft-boltdb github.com/hashicorp/raft-boltdb

View File

@ -379,6 +379,18 @@ These metrics are used to monitor the health of the Consul servers.
<td>commit logs / interval</td> <td>commit logs / interval</td>
<td>counter</td> <td>counter</td>
</tr> </tr>
<tr>
<td>`consul.raft.commitNumLogs`</td>
<td>This metric measures the count of logs processed for application to the FSM in a single batch.</td>
<td>logs</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.raft.fsm.enqueue`</td>
<td>This metric measures the amount of time to enqueue a batch of logs for the FSM to apply.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr> <tr>
<td>`consul.raft.fsm.restore`</td> <td>`consul.raft.fsm.restore`</td>
<td>This metric measures the time taken by the FSM to restore its state from a snapshot.</td> <td>This metric measures the time taken by the FSM to restore its state from a snapshot.</td>
@ -470,6 +482,12 @@ These metrics are used to monitor the health of the Consul servers.
<td>ms</td> <td>ms</td>
<td>timer</td> <td>timer</td>
</tr> </tr>
<tr>
<td>`consul.raft.leader.dispatchNumLogs`</td>
<td>This metric measures the number of logs committed to disk in a batch.</td>
<td>logs</td>
<td>gauge</td>
</tr>
<tr> <tr>
<td>`consul.raft.replication.appendEntries`</td> <td>`consul.raft.replication.appendEntries`</td>
<td>This measures the time it takes to replicate log entries to followers. This is a general indicator of the load pressure on the Consul servers, as well as the performance of the communication between the servers.</td> <td>This measures the time it takes to replicate log entries to followers. This is a general indicator of the load pressure on the Consul servers, as well as the performance of the communication between the servers.</td>