mirror of https://github.com/hashicorp/consul
rpc: refactor blocking query
To remove the TODO, and make it more readable. In general this reduces the scope of variables, making them easier to reason about. It also introduces more early returns so that we can see the flow from the structure of the function.pull/12109/head
parent
f3ac9dafa6
commit
f92dc11002
|
@ -919,22 +919,26 @@ type queryFn func(memdb.WatchSet, *state.Store) error
|
|||
|
||||
// blockingQuery is used to process a potentially blocking query operation.
|
||||
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
||||
var cancel func()
|
||||
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
|
||||
|
||||
var queriesBlocking uint64
|
||||
var queryTimeout time.Duration
|
||||
|
||||
// Instrument all queries run
|
||||
metrics.IncrCounter([]string{"rpc", "query"}, 1)
|
||||
|
||||
minQueryIndex := queryOpts.GetMinQueryIndex()
|
||||
// Fast path right to the non-blocking query.
|
||||
// Perform a non-blocking query
|
||||
if minQueryIndex == 0 {
|
||||
goto RUN_QUERY
|
||||
if queryOpts.GetRequireConsistent() {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
queryTimeout = queryOpts.GetMaxQueryTime()
|
||||
var ws memdb.WatchSet
|
||||
err := fn(ws, s.fsm.State())
|
||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||
return err
|
||||
}
|
||||
|
||||
queryTimeout := queryOpts.GetMaxQueryTime()
|
||||
// Restrict the max query time, and ensure there is always one.
|
||||
if queryTimeout > s.config.MaxQueryTime {
|
||||
queryTimeout = s.config.MaxQueryTime
|
||||
|
@ -946,50 +950,39 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
|
|||
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
|
||||
|
||||
// wrap the base context with a deadline
|
||||
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
|
||||
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
|
||||
defer cancel()
|
||||
|
||||
// instrument blockingQueries
|
||||
// atomic inc our server's count of in-flight blockingQueries and store the new value
|
||||
queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1)
|
||||
queriesBlocking := atomic.AddUint64(&s.queriesBlocking, 1)
|
||||
// atomic dec when we return from blockingQuery()
|
||||
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
||||
// set the gauge directly to the new value of s.blockingQueries
|
||||
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking))
|
||||
|
||||
RUN_QUERY:
|
||||
// Setup blocking loop
|
||||
|
||||
// Validate
|
||||
// If the read must be consistent we verify that we are still the leader.
|
||||
for {
|
||||
if queryOpts.GetRequireConsistent() {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Run query
|
||||
|
||||
// Operate on a consistent set of state. This makes sure that the
|
||||
// abandon channel goes with the state that the caller is using to
|
||||
// build watches.
|
||||
state := s.fsm.State()
|
||||
|
||||
// We can skip all watch tracking if this isn't a blocking query.
|
||||
var ws memdb.WatchSet
|
||||
if minQueryIndex > 0 {
|
||||
ws = memdb.NewWatchSet()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
// This channel will be closed if a snapshot is restored and the
|
||||
// whole state store is abandoned.
|
||||
ws.Add(state.AbandonCh())
|
||||
}
|
||||
|
||||
// Execute the queryFn
|
||||
err := fn(ws, state)
|
||||
|
||||
// Update the query metadata.
|
||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
|
||||
// blocking was requested by client, NOT meta.Index since the state function
|
||||
|
@ -999,11 +992,15 @@ RUN_QUERY:
|
|||
// state method). We also need to ensure that unless there is an error, we
|
||||
// return an index > 0 otherwise the client will never block and burn CPU and
|
||||
// requests.
|
||||
if err == nil && queryMeta.GetIndex() < 1 {
|
||||
if queryMeta.GetIndex() < 1 {
|
||||
queryMeta.SetIndex(1)
|
||||
}
|
||||
|
||||
if queryMeta.GetIndex() > minQueryIndex {
|
||||
return nil
|
||||
}
|
||||
|
||||
// block up to the timeout if we don't see anything fresh.
|
||||
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
|
||||
if err := ws.WatchCtx(ctx); err == nil {
|
||||
// a non-nil error only occurs when the context is cancelled
|
||||
|
||||
|
@ -1014,13 +1011,15 @@ RUN_QUERY:
|
|||
// case.
|
||||
select {
|
||||
case <-state.AbandonCh():
|
||||
return nil
|
||||
default:
|
||||
// loop back and look for an update again
|
||||
goto RUN_QUERY
|
||||
}
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
||||
|
|
Loading…
Reference in New Issue