mirror of https://github.com/hashicorp/consul
Merge pull request #12109 from hashicorp/dnephin/blocking-query-1
rpc: make blockingQuery easier to readpull/12202/head
commit
db0478265b
|
@ -919,108 +919,74 @@ type queryFn func(memdb.WatchSet, *state.Store) error
|
||||||
|
|
||||||
// blockingQuery is used to process a potentially blocking query operation.
|
// blockingQuery is used to process a potentially blocking query operation.
|
||||||
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
||||||
var cancel func()
|
|
||||||
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
|
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
|
||||||
|
|
||||||
var queriesBlocking uint64
|
|
||||||
var queryTimeout time.Duration
|
|
||||||
|
|
||||||
// Instrument all queries run
|
|
||||||
metrics.IncrCounter([]string{"rpc", "query"}, 1)
|
metrics.IncrCounter([]string{"rpc", "query"}, 1)
|
||||||
|
|
||||||
minQueryIndex := queryOpts.GetMinQueryIndex()
|
minQueryIndex := queryOpts.GetMinQueryIndex()
|
||||||
// Fast path right to the non-blocking query.
|
// Perform a non-blocking query
|
||||||
if minQueryIndex == 0 {
|
if minQueryIndex == 0 {
|
||||||
goto RUN_QUERY
|
if queryOpts.GetRequireConsistent() {
|
||||||
|
if err := s.consistentRead(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var ws memdb.WatchSet
|
||||||
|
err := fn(ws, s.fsm.State())
|
||||||
|
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
queryTimeout = queryOpts.GetMaxQueryTime()
|
timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime())
|
||||||
// Restrict the max query time, and ensure there is always one.
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
if queryTimeout > s.config.MaxQueryTime {
|
|
||||||
queryTimeout = s.config.MaxQueryTime
|
|
||||||
} else if queryTimeout <= 0 {
|
|
||||||
queryTimeout = s.config.DefaultQueryTime
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply a small amount of jitter to the request.
|
|
||||||
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
|
|
||||||
|
|
||||||
// wrap the base context with a deadline
|
|
||||||
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
|
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// instrument blockingQueries
|
count := atomic.AddUint64(&s.queriesBlocking, 1)
|
||||||
// atomic inc our server's count of in-flight blockingQueries and store the new value
|
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count))
|
||||||
queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1)
|
// decrement the count when the function returns.
|
||||||
// atomic dec when we return from blockingQuery()
|
|
||||||
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
||||||
// set the gauge directly to the new value of s.blockingQueries
|
|
||||||
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking))
|
|
||||||
|
|
||||||
RUN_QUERY:
|
for {
|
||||||
// Setup blocking loop
|
if queryOpts.GetRequireConsistent() {
|
||||||
|
if err := s.consistentRead(); err != nil {
|
||||||
// Validate
|
return err
|
||||||
// If the read must be consistent we verify that we are still the leader.
|
}
|
||||||
if queryOpts.GetRequireConsistent() {
|
|
||||||
if err := s.consistentRead(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Run query
|
// Operate on a consistent set of state. This makes sure that the
|
||||||
|
// abandon channel goes with the state that the caller is using to
|
||||||
// Operate on a consistent set of state. This makes sure that the
|
// build watches.
|
||||||
// abandon channel goes with the state that the caller is using to
|
state := s.fsm.State()
|
||||||
// build watches.
|
|
||||||
state := s.fsm.State()
|
|
||||||
|
|
||||||
// We can skip all watch tracking if this isn't a blocking query.
|
|
||||||
var ws memdb.WatchSet
|
|
||||||
if minQueryIndex > 0 {
|
|
||||||
ws = memdb.NewWatchSet()
|
|
||||||
|
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
// This channel will be closed if a snapshot is restored and the
|
// This channel will be closed if a snapshot is restored and the
|
||||||
// whole state store is abandoned.
|
// whole state store is abandoned.
|
||||||
ws.Add(state.AbandonCh())
|
ws.Add(state.AbandonCh())
|
||||||
}
|
|
||||||
|
|
||||||
// Execute the queryFn
|
err := fn(ws, state)
|
||||||
err := fn(ws, state)
|
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Update the query metadata.
|
if queryMeta.GetIndex() > minQueryIndex {
|
||||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
|
// block until something changes, or the timeout
|
||||||
// blocking was requested by client, NOT meta.Index since the state function
|
if err := ws.WatchCtx(ctx); err != nil {
|
||||||
// might return zero if something is not initialized and care wasn't taken to
|
// exit if we've reached the timeout, or other cancellation
|
||||||
// handle that special case (in practice this happened a lot so fixing it
|
return nil
|
||||||
// systematically here beats trying to remember to add zero checks in every
|
}
|
||||||
// state method). We also need to ensure that unless there is an error, we
|
|
||||||
// return an index > 0 otherwise the client will never block and burn CPU and
|
|
||||||
// requests.
|
|
||||||
if err == nil && queryMeta.GetIndex() < 1 {
|
|
||||||
queryMeta.SetIndex(1)
|
|
||||||
}
|
|
||||||
// block up to the timeout if we don't see anything fresh.
|
|
||||||
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
|
|
||||||
if err := ws.WatchCtx(ctx); err == nil {
|
|
||||||
// a non-nil error only occurs when the context is cancelled
|
|
||||||
|
|
||||||
// If a restore may have woken us up then bail out from
|
// exit if the state store has been abandoned
|
||||||
// the query immediately. This is slightly race-ey since
|
select {
|
||||||
// this might have been interrupted for other reasons,
|
case <-state.AbandonCh():
|
||||||
// but it's OK to kick it back to the caller in either
|
return nil
|
||||||
// case.
|
default:
|
||||||
select {
|
|
||||||
case <-state.AbandonCh():
|
|
||||||
default:
|
|
||||||
// loop back and look for an update again
|
|
||||||
goto RUN_QUERY
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
||||||
|
@ -1035,6 +1001,17 @@ func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) {
|
||||||
m.SetKnownLeader(s.raft.Leader() != "")
|
m.SetKnownLeader(s.raft.Leader() != "")
|
||||||
}
|
}
|
||||||
maskResultsFilteredByACLs(token, m)
|
maskResultsFilteredByACLs(token, m)
|
||||||
|
|
||||||
|
// Always set a non-zero QueryMeta.Index. Generally we expect the
|
||||||
|
// QueryMeta.Index to be set to structs.RaftIndex.ModifyIndex. If the query
|
||||||
|
// returned no results we expect it to be set to the max index of the table,
|
||||||
|
// however we can't guarantee this always happens.
|
||||||
|
// To prevent a client from accidentally performing many non-blocking queries
|
||||||
|
// (which causes lots of unnecessary load), we always set a default value of 1.
|
||||||
|
// This is sufficient to prevent the unnecessary load in most cases.
|
||||||
|
if m.GetIndex() < 1 {
|
||||||
|
m.SetIndex(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// consistentRead is used to ensure we do not perform a stale
|
// consistentRead is used to ensure we do not perform a stale
|
||||||
|
@ -1070,6 +1047,22 @@ func (s *Server) consistentRead() error {
|
||||||
return structs.ErrNotReadyForConsistentReads
|
return structs.ErrNotReadyForConsistentReads
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rpcQueryTimeout calculates the timeout for the query, ensures it is
|
||||||
|
// constrained to the configured limit, and adds jitter to prevent multiple
|
||||||
|
// blocking queries from all timing out at the same time.
|
||||||
|
func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration {
|
||||||
|
// Restrict the max query time, and ensure there is always one.
|
||||||
|
if queryTimeout > s.config.MaxQueryTime {
|
||||||
|
queryTimeout = s.config.MaxQueryTime
|
||||||
|
} else if queryTimeout <= 0 {
|
||||||
|
queryTimeout = s.config.DefaultQueryTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply a small amount of jitter to the request.
|
||||||
|
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
|
||||||
|
return queryTimeout
|
||||||
|
}
|
||||||
|
|
||||||
// maskResultsFilteredByACLs blanks out the ResultsFilteredByACLs flag if the
|
// maskResultsFilteredByACLs blanks out the ResultsFilteredByACLs flag if the
|
||||||
// request is unauthenticated, to limit information leaking.
|
// request is unauthenticated, to limit information leaking.
|
||||||
//
|
//
|
||||||
|
|
|
@ -236,7 +236,7 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
// Perform a non-blocking query. Note that it's significant that the meta has
|
// Perform a non-blocking query. Note that it's significant that the meta has
|
||||||
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
||||||
// this should not block still.
|
// this should not block still.
|
||||||
{
|
t.Run("non-blocking query", func(t *testing.T) {
|
||||||
var opts structs.QueryOptions
|
var opts structs.QueryOptions
|
||||||
var meta structs.QueryMeta
|
var meta structs.QueryMeta
|
||||||
var calls int
|
var calls int
|
||||||
|
@ -244,16 +244,13 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
calls++
|
calls++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
err := s.blockingQuery(&opts, &meta, fn)
|
||||||
t.Fatalf("err: %v", err)
|
require.NoError(t, err)
|
||||||
}
|
require.Equal(t, 1, calls)
|
||||||
if calls != 1 {
|
})
|
||||||
t.Fatalf("bad: %d", calls)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform a blocking query that gets woken up and loops around once.
|
// Perform a blocking query that gets woken up and loops around once.
|
||||||
{
|
t.Run("blocking query - single loop", func(t *testing.T) {
|
||||||
opts := structs.QueryOptions{
|
opts := structs.QueryOptions{
|
||||||
MinQueryIndex: 3,
|
MinQueryIndex: 3,
|
||||||
}
|
}
|
||||||
|
@ -272,13 +269,10 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
calls++
|
calls++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
err := s.blockingQuery(&opts, &meta, fn)
|
||||||
t.Fatalf("err: %v", err)
|
require.NoError(t, err)
|
||||||
}
|
require.Equal(t, 2, calls)
|
||||||
if calls != 2 {
|
})
|
||||||
t.Fatalf("bad: %d", calls)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform a blocking query that returns a zero index from blocking func (e.g.
|
// Perform a blocking query that returns a zero index from blocking func (e.g.
|
||||||
// no state yet). This should still return an empty response immediately, but
|
// no state yet). This should still return an empty response immediately, but
|
||||||
|
@ -289,7 +283,7 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
// covered by tests but eventually when hit in the wild causes blocking
|
// covered by tests but eventually when hit in the wild causes blocking
|
||||||
// clients to busy loop and burn CPU. This test ensure that blockingQuery
|
// clients to busy loop and burn CPU. This test ensure that blockingQuery
|
||||||
// systematically does the right thing to prevent future bugs like that.
|
// systematically does the right thing to prevent future bugs like that.
|
||||||
{
|
t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) {
|
||||||
opts := structs.QueryOptions{
|
opts := structs.QueryOptions{
|
||||||
MinQueryIndex: 0,
|
MinQueryIndex: 0,
|
||||||
}
|
}
|
||||||
|
@ -327,11 +321,11 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
assert.True(t, t1.Sub(t0) > 20*time.Millisecond,
|
assert.True(t, t1.Sub(t0) > 20*time.Millisecond,
|
||||||
"should have actually blocked waiting for timeout")
|
"should have actually blocked waiting for timeout")
|
||||||
|
|
||||||
}
|
})
|
||||||
|
|
||||||
// Perform a query that blocks and gets interrupted when the state store
|
// Perform a query that blocks and gets interrupted when the state store
|
||||||
// is abandoned.
|
// is abandoned.
|
||||||
{
|
t.Run("blocking query interrupted by abandonCh", func(t *testing.T) {
|
||||||
opts := structs.QueryOptions{
|
opts := structs.QueryOptions{
|
||||||
MinQueryIndex: 3,
|
MinQueryIndex: 3,
|
||||||
}
|
}
|
||||||
|
@ -360,13 +354,10 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
calls++
|
calls++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
err := s.blockingQuery(&opts, &meta, fn)
|
||||||
t.Fatalf("err: %v", err)
|
require.NoError(t, err)
|
||||||
}
|
require.Equal(t, 1, calls)
|
||||||
if calls != 1 {
|
})
|
||||||
t.Fatalf("bad: %d", calls)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("ResultsFilteredByACLs is reset for unauthenticated calls", func(t *testing.T) {
|
t.Run("ResultsFilteredByACLs is reset for unauthenticated calls", func(t *testing.T) {
|
||||||
opts := structs.QueryOptions{
|
opts := structs.QueryOptions{
|
||||||
|
|
|
@ -817,6 +817,7 @@ func TestTxn_Read(t *testing.T) {
|
||||||
},
|
},
|
||||||
QueryMeta: structs.QueryMeta{
|
QueryMeta: structs.QueryMeta{
|
||||||
KnownLeader: true,
|
KnownLeader: true,
|
||||||
|
Index: 1,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
require.Equal(t, expected, out)
|
require.Equal(t, expected, out)
|
||||||
|
|
|
@ -370,7 +370,9 @@ func (q QueryBackend) String() string {
|
||||||
// QueryMeta allows a query response to include potentially
|
// QueryMeta allows a query response to include potentially
|
||||||
// useful metadata about a query
|
// useful metadata about a query
|
||||||
type QueryMeta struct {
|
type QueryMeta struct {
|
||||||
// Index in the raft log of the latest item returned by the query.
|
// Index in the raft log of the latest item returned by the query. If the
|
||||||
|
// query did not return any results the Index will be a value that will
|
||||||
|
// change when a new item is added.
|
||||||
Index uint64
|
Index uint64
|
||||||
|
|
||||||
// If AllowStale is used, this is time elapsed since
|
// If AllowStale is used, this is time elapsed since
|
||||||
|
|
|
@ -10,11 +10,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/raft"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTxnEndpoint_Bad_JSON(t *testing.T) {
|
func TestTxnEndpoint_Bad_JSON(t *testing.T) {
|
||||||
|
@ -385,6 +386,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
|
||||||
},
|
},
|
||||||
QueryMeta: structs.QueryMeta{
|
QueryMeta: structs.QueryMeta{
|
||||||
KnownLeader: true,
|
KnownLeader: true,
|
||||||
|
Index: 1,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal(t, expected, txnResp)
|
assert.Equal(t, expected, txnResp)
|
||||||
|
|
Loading…
Reference in New Issue