From f92dc11002c9697dcf1bd30fd9995ab5ce8f13b4 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 12 Jan 2022 17:46:12 -0500 Subject: [PATCH] rpc: refactor blocking query To remove the TODO, and make it more readable. In general this reduces the scope of variables, making them easier to reason about. It also introduces more early returns so that we can see the flow from the structure of the function. --- agent/consul/rpc.go | 101 ++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index c8fa6846cd..ddf4299508 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -919,22 +919,26 @@ type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { - var cancel func() var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh} - var queriesBlocking uint64 - var queryTimeout time.Duration - - // Instrument all queries run metrics.IncrCounter([]string{"rpc", "query"}, 1) minQueryIndex := queryOpts.GetMinQueryIndex() - // Fast path right to the non-blocking query. + // Perform a non-blocking query if minQueryIndex == 0 { - goto RUN_QUERY + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } + } + + var ws memdb.WatchSet + err := fn(ws, s.fsm.State()) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + return err } - queryTimeout = queryOpts.GetMaxQueryTime() + queryTimeout := queryOpts.GetMaxQueryTime() // Restrict the max query time, and ensure there is always one. if queryTimeout > s.config.MaxQueryTime { queryTimeout = s.config.MaxQueryTime @@ -946,64 +950,57 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) // wrap the base context with a deadline - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + ctx, cancel := context.WithTimeout(ctx, queryTimeout) defer cancel() // instrument blockingQueries // atomic inc our server's count of in-flight blockingQueries and store the new value - queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1) + queriesBlocking := atomic.AddUint64(&s.queriesBlocking, 1) // atomic dec when we return from blockingQuery() defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) // set the gauge directly to the new value of s.blockingQueries metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) -RUN_QUERY: - // Setup blocking loop - - // Validate - // If the read must be consistent we verify that we are still the leader. - if queryOpts.GetRequireConsistent() { - if err := s.consistentRead(); err != nil { - return err + for { + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } } - } - // Run query - - // Operate on a consistent set of state. This makes sure that the - // abandon channel goes with the state that the caller is using to - // build watches. - state := s.fsm.State() - - // We can skip all watch tracking if this isn't a blocking query. - var ws memdb.WatchSet - if minQueryIndex > 0 { - ws = memdb.NewWatchSet() + // Operate on a consistent set of state. This makes sure that the + // abandon channel goes with the state that the caller is using to + // build watches. + state := s.fsm.State() + ws := memdb.NewWatchSet() // This channel will be closed if a snapshot is restored and the // whole state store is abandoned. ws.Add(state.AbandonCh()) - } - // Execute the queryFn - err := fn(ws, state) + err := fn(ws, state) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + if err != nil { + return err + } - // Update the query metadata. - s.setQueryMeta(queryMeta, queryOpts.GetToken()) + // Note we check queryOpts.MinQueryIndex is greater than zero to determine if + // blocking was requested by client, NOT meta.Index since the state function + // might return zero if something is not initialized and care wasn't taken to + // handle that special case (in practice this happened a lot so fixing it + // systematically here beats trying to remember to add zero checks in every + // state method). We also need to ensure that unless there is an error, we + // return an index > 0 otherwise the client will never block and burn CPU and + // requests. + if queryMeta.GetIndex() < 1 { + queryMeta.SetIndex(1) + } - // Note we check queryOpts.MinQueryIndex is greater than zero to determine if - // blocking was requested by client, NOT meta.Index since the state function - // might return zero if something is not initialized and care wasn't taken to - // handle that special case (in practice this happened a lot so fixing it - // systematically here beats trying to remember to add zero checks in every - // state method). We also need to ensure that unless there is an error, we - // return an index > 0 otherwise the client will never block and burn CPU and - // requests. - if err == nil && queryMeta.GetIndex() < 1 { - queryMeta.SetIndex(1) - } - // block up to the timeout if we don't see anything fresh. - if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex { + if queryMeta.GetIndex() > minQueryIndex { + return nil + } + + // block up to the timeout if we don't see anything fresh. if err := ws.WatchCtx(ctx); err == nil { // a non-nil error only occurs when the context is cancelled @@ -1014,13 +1011,15 @@ RUN_QUERY: // case. select { case <-state.AbandonCh(): + return nil default: - // loop back and look for an update again - goto RUN_QUERY } } + + if ctx.Err() != nil { + return nil + } } - return err } // setQueryMeta is used to populate the QueryMeta data for an RPC call