diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index c8fa6846cd..ddf4299508 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -919,22 +919,26 @@ type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { - var cancel func() var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh} - var queriesBlocking uint64 - var queryTimeout time.Duration - - // Instrument all queries run metrics.IncrCounter([]string{"rpc", "query"}, 1) minQueryIndex := queryOpts.GetMinQueryIndex() - // Fast path right to the non-blocking query. + // Perform a non-blocking query if minQueryIndex == 0 { - goto RUN_QUERY + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } + } + + var ws memdb.WatchSet + err := fn(ws, s.fsm.State()) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + return err } - queryTimeout = queryOpts.GetMaxQueryTime() + queryTimeout := queryOpts.GetMaxQueryTime() // Restrict the max query time, and ensure there is always one. if queryTimeout > s.config.MaxQueryTime { queryTimeout = s.config.MaxQueryTime @@ -946,64 +950,57 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) // wrap the base context with a deadline - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + ctx, cancel := context.WithTimeout(ctx, queryTimeout) defer cancel() // instrument blockingQueries // atomic inc our server's count of in-flight blockingQueries and store the new value - queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1) + queriesBlocking := atomic.AddUint64(&s.queriesBlocking, 1) // atomic dec when we return from blockingQuery() defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) // set the gauge directly to the new value of s.blockingQueries metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) -RUN_QUERY: - // Setup blocking loop - - // Validate - // If the read must be consistent we verify that we are still the leader. - if queryOpts.GetRequireConsistent() { - if err := s.consistentRead(); err != nil { - return err + for { + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } } - } - // Run query - - // Operate on a consistent set of state. This makes sure that the - // abandon channel goes with the state that the caller is using to - // build watches. - state := s.fsm.State() - - // We can skip all watch tracking if this isn't a blocking query. - var ws memdb.WatchSet - if minQueryIndex > 0 { - ws = memdb.NewWatchSet() + // Operate on a consistent set of state. This makes sure that the + // abandon channel goes with the state that the caller is using to + // build watches. + state := s.fsm.State() + ws := memdb.NewWatchSet() // This channel will be closed if a snapshot is restored and the // whole state store is abandoned. ws.Add(state.AbandonCh()) - } - // Execute the queryFn - err := fn(ws, state) + err := fn(ws, state) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + if err != nil { + return err + } - // Update the query metadata. - s.setQueryMeta(queryMeta, queryOpts.GetToken()) + // Note we check queryOpts.MinQueryIndex is greater than zero to determine if + // blocking was requested by client, NOT meta.Index since the state function + // might return zero if something is not initialized and care wasn't taken to + // handle that special case (in practice this happened a lot so fixing it + // systematically here beats trying to remember to add zero checks in every + // state method). We also need to ensure that unless there is an error, we + // return an index > 0 otherwise the client will never block and burn CPU and + // requests. + if queryMeta.GetIndex() < 1 { + queryMeta.SetIndex(1) + } - // Note we check queryOpts.MinQueryIndex is greater than zero to determine if - // blocking was requested by client, NOT meta.Index since the state function - // might return zero if something is not initialized and care wasn't taken to - // handle that special case (in practice this happened a lot so fixing it - // systematically here beats trying to remember to add zero checks in every - // state method). We also need to ensure that unless there is an error, we - // return an index > 0 otherwise the client will never block and burn CPU and - // requests. - if err == nil && queryMeta.GetIndex() < 1 { - queryMeta.SetIndex(1) - } - // block up to the timeout if we don't see anything fresh. - if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex { + if queryMeta.GetIndex() > minQueryIndex { + return nil + } + + // block up to the timeout if we don't see anything fresh. if err := ws.WatchCtx(ctx); err == nil { // a non-nil error only occurs when the context is cancelled @@ -1014,13 +1011,15 @@ RUN_QUERY: // case. select { case <-state.AbandonCh(): + return nil default: - // loop back and look for an update again - goto RUN_QUERY } } + + if ctx.Err() != nil { + return nil + } } - return err } // setQueryMeta is used to populate the QueryMeta data for an RPC call