mirror of https://github.com/hashicorp/consul
rpc: measure blocking queries (#7224)
* agent: measure blocking queries * agent.rpc: update docs to mention we only record blocking queries * agent.rpc: make go fmt happy * agent.rpc: fix non-atomic read and decrement with bitwise xor of uint64 0 * agent.rpc: clarify review question * agent.rpc: today I learned that one must declare all variables before interacting with goto labels * Update agent/consul/server.go agent.rpc: more precise comment on `Server.queriesBlocking` Co-Authored-By: Paul Banks <banks@banksco.de> * Update website/source/docs/agent/telemetry.html.md agent.rpc: improve queries_blocking description Co-Authored-By: Paul Banks <banks@banksco.de> * agent.rpc: fix some bugs found in review * add a note about the updated counter behavior to telemetry.md * docs: add upgrade-specific note on consul.rpc.quer{y,ies_blocking} behavior Co-authored-by: Paul Banks <banks@banksco.de>pull/7252/head
parent
b42735f710
commit
55f19a9eb2
|
@ -7,6 +7,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -519,8 +520,12 @@ type queryFn func(memdb.WatchSet, *state.Store) error
|
||||||
// blockingQuery is used to process a potentially blocking query operation.
|
// blockingQuery is used to process a potentially blocking query operation.
|
||||||
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
||||||
var timeout *time.Timer
|
var timeout *time.Timer
|
||||||
|
var queriesBlocking uint64
|
||||||
var queryTimeout time.Duration
|
var queryTimeout time.Duration
|
||||||
|
|
||||||
|
// Instrument all queries run
|
||||||
|
metrics.IncrCounter([]string{"rpc", "query"}, 1)
|
||||||
|
|
||||||
minQueryIndex := queryOpts.GetMinQueryIndex()
|
minQueryIndex := queryOpts.GetMinQueryIndex()
|
||||||
// Fast path right to the non-blocking query.
|
// Fast path right to the non-blocking query.
|
||||||
if minQueryIndex == 0 {
|
if minQueryIndex == 0 {
|
||||||
|
@ -542,10 +547,20 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
|
||||||
timeout = time.NewTimer(queryTimeout)
|
timeout = time.NewTimer(queryTimeout)
|
||||||
defer timeout.Stop()
|
defer timeout.Stop()
|
||||||
|
|
||||||
|
// instrument blockingQueries
|
||||||
|
// atomic inc our server's count of in-flight blockingQueries and store the new value
|
||||||
|
queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1)
|
||||||
|
// atomic dec when we return from blockingQuery()
|
||||||
|
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
||||||
|
// set the gauge directly to the new value of s.blockingQueries
|
||||||
|
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking))
|
||||||
|
|
||||||
RUN_QUERY:
|
RUN_QUERY:
|
||||||
|
// Setup blocking loop
|
||||||
// Update the query metadata.
|
// Update the query metadata.
|
||||||
s.setQueryMeta(queryMeta)
|
s.setQueryMeta(queryMeta)
|
||||||
|
|
||||||
|
// Validate
|
||||||
// If the read must be consistent we verify that we are still the leader.
|
// If the read must be consistent we verify that we are still the leader.
|
||||||
if queryOpts.GetRequireConsistent() {
|
if queryOpts.GetRequireConsistent() {
|
||||||
if err := s.consistentRead(); err != nil {
|
if err := s.consistentRead(); err != nil {
|
||||||
|
@ -553,8 +568,7 @@ RUN_QUERY:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the query.
|
// Run query
|
||||||
metrics.IncrCounter([]string{"rpc", "query"}, 1)
|
|
||||||
|
|
||||||
// Operate on a consistent set of state. This makes sure that the
|
// Operate on a consistent set of state. This makes sure that the
|
||||||
// abandon channel goes with the state that the caller is using to
|
// abandon channel goes with the state that the caller is using to
|
||||||
|
@ -571,7 +585,7 @@ RUN_QUERY:
|
||||||
ws.Add(state.AbandonCh())
|
ws.Add(state.AbandonCh())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block up to the timeout if we didn't see anything fresh.
|
// Execute the queryFn
|
||||||
err := fn(ws, state)
|
err := fn(ws, state)
|
||||||
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
|
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
|
||||||
// blocking was requested by client, NOT meta.Index since the state function
|
// blocking was requested by client, NOT meta.Index since the state function
|
||||||
|
@ -584,6 +598,7 @@ RUN_QUERY:
|
||||||
if err == nil && queryMeta.GetIndex() < 1 {
|
if err == nil && queryMeta.GetIndex() < 1 {
|
||||||
queryMeta.SetIndex(1)
|
queryMeta.SetIndex(1)
|
||||||
}
|
}
|
||||||
|
// block up to the timeout if we don't see anything fresh.
|
||||||
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
|
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
|
||||||
if expired := ws.Watch(timeout.C); !expired {
|
if expired := ws.Watch(timeout.C); !expired {
|
||||||
// If a restore may have woken us up then bail out from
|
// If a restore may have woken us up then bail out from
|
||||||
|
@ -594,6 +609,7 @@ RUN_QUERY:
|
||||||
select {
|
select {
|
||||||
case <-state.AbandonCh():
|
case <-state.AbandonCh():
|
||||||
default:
|
default:
|
||||||
|
// loop back and look for an update again
|
||||||
goto RUN_QUERY
|
goto RUN_QUERY
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,6 +110,12 @@ var (
|
||||||
// Server is Consul server which manages the service discovery,
|
// Server is Consul server which manages the service discovery,
|
||||||
// health checking, DC forwarding, Raft, and multiple Serf pools.
|
// health checking, DC forwarding, Raft, and multiple Serf pools.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
|
// queriesBlocking is a counter that we incr and decr atomically in
|
||||||
|
// rpc calls to provide telemetry on how many blocking queries are running.
|
||||||
|
// We interact with queriesBlocking atomically, do not move without ensuring it is
|
||||||
|
// correctly 64-byte aligned in the struct layout
|
||||||
|
queriesBlocking uint64
|
||||||
|
|
||||||
// aclConfig is the configuration for the ACL system
|
// aclConfig is the configuration for the ACL system
|
||||||
aclConfig *acl.Config
|
aclConfig *acl.Config
|
||||||
|
|
||||||
|
|
|
@ -765,10 +765,16 @@ These metrics are used to monitor the health of the Consul servers.
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>`consul.rpc.query`</td>
|
<td>`consul.rpc.query`</td>
|
||||||
<td>This increments when a server sends a (potentially blocking) RPC query.</td>
|
<td>This increments when a server receives a new blocking RPC request, indicating the rate of new blocking query calls. See consul.rpc.queries_blocking for the current number of in-flight blocking RPC calls. This metric changed in 1.7.0 to only increment on the the start of a query. The rate of queries will appear lower, but is more accurate.</td>
|
||||||
<td>queries</td>
|
<td>queries</td>
|
||||||
<td>counter</td>
|
<td>counter</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>`consul.rpc.queries_blocking`</td>
|
||||||
|
<td>This shows the current number of in-flight blocking queries the server is handling.</td>
|
||||||
|
<td>queries</td>
|
||||||
|
<td>gauge</td>
|
||||||
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>`consul.rpc.cross-dc`</td>
|
<td>`consul.rpc.cross-dc`</td>
|
||||||
<td>This increments when a server sends a (potentially blocking) cross datacenter RPC query.</td>
|
<td>This increments when a server sends a (potentially blocking) cross datacenter RPC query.</td>
|
||||||
|
|
|
@ -35,6 +35,13 @@ users, both the datacenter and the services namespace will be present. For examp
|
||||||
PTR record would previously have contained `web.service.consul`, it will now be `web.service.dc1.consul`
|
PTR record would previously have contained `web.service.consul`, it will now be `web.service.dc1.consul`
|
||||||
in OSS or `web.service.ns1.dc1.consul` for Enterprise.
|
in OSS or `web.service.ns1.dc1.consul` for Enterprise.
|
||||||
|
|
||||||
|
### Telemetry: semantics of `consul.rpc.query` changed, see `consul.rpc.queries_blocking`
|
||||||
|
|
||||||
|
Consul has changed the semantics of query counts in its [telemetry](/docs/agent/telemetry.html#metrics-reference).
|
||||||
|
`consul.rpc.query` now only increments on the *start* of a query (blocking or non-blocking), whereas before it would
|
||||||
|
measure when blocking queries polled for more data. The gauge `consul.rpc.queries_blocking` has been added for a more
|
||||||
|
to more precisely capture the view of *active* blocking queries.
|
||||||
|
|
||||||
## Consul 1.6.0
|
## Consul 1.6.0
|
||||||
|
|
||||||
#### Removal of Deprecated Features
|
#### Removal of Deprecated Features
|
||||||
|
|
Loading…
Reference in New Issue