You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/blockingquery/blockingquery.go

212 lines
6.7 KiB

[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package blockingquery
import (
"context"
"errors"
"fmt"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/lib"
)
// Sentinel errors that must be used with blockingQuery
var (
ErrNotFound = fmt.Errorf("no data found for query")
ErrNotChanged = fmt.Errorf("data did not change for query")
)
// QueryFn is used to perform a query operation. See Server.blockingQuery for
// the requirements of this function.
type QueryFn func(memdb.WatchSet, *state.Store) error
// RequestOptions are options used by Server.blockingQuery to modify the
// behaviour of the query operation, or to populate response metadata.
type RequestOptions interface {
GetToken() string
GetMinQueryIndex() uint64
GetMaxQueryTime() (time.Duration, error)
GetRequireConsistent() bool
}
// ResponseMeta is an interface used to populate the response struct
// with metadata about the query and the state of the server.
type ResponseMeta interface {
SetLastContact(time.Duration)
SetKnownLeader(bool)
GetIndex() uint64
SetIndex(uint64)
SetResultsFilteredByACLs(bool)
}
// FSMServer is interface into the stateful components of a Consul server, such
// as memdb or raft leadership.
type FSMServer interface {
ConsistentRead() error
DecrementBlockingQueries() uint64
GetShutdownChannel() chan struct{}
GetState() *state.Store
IncrementBlockingQueries() uint64
RPCQueryTimeout(time.Duration) time.Duration
SetQueryMeta(ResponseMeta, string)
}
// Query performs a blocking query if opts.GetMinQueryIndex is
// greater than 0, otherwise performs a non-blocking query. Blocking queries will
// block until responseMeta.Index is greater than opts.GetMinQueryIndex,
// or opts.GetMaxQueryTime is reached. Non-blocking queries return immediately
// after performing the query.
//
// If opts.GetRequireConsistent is true, blockingQuery will first verify it is
// still the cluster leader before performing the query.
//
// The query function is expected to be a closure that has access to responseMeta
// so that it can set the Index. The actual result of the query is opaque to blockingQuery.
//
// The query function can return ErrNotFound, which is a sentinel error. Returning
// ErrNotFound indicates that the query found no results, which allows
// blockingQuery to keep blocking until the query returns a non-nil error.
// The query function must take care to set the actual result of the query to
// nil in these cases, otherwise when blockingQuery times out it may return
// a previous result. ErrNotFound will never be returned to the caller, it is
// converted to nil before returning.
//
// The query function can return ErrNotChanged, which is a sentinel error. This
// can only be returned on calls AFTER the first call, as it would not be
// possible to detect the absence of a change on the first call. Returning
// ErrNotChanged indicates that the query results are identical to the prior
// results which allows blockingQuery to keep blocking until the query returns
// a real changed result.
//
// The query function must take care to ensure the actual result of the query
// is either left unmodified or explicitly left in a good state before
// returning, otherwise when blockingQuery times out it may return an
// incomplete or unexpected result. ErrNotChanged will never be returned to the
// caller, it is converted to nil before returning.
//
// If query function returns any other error, the error is returned to the caller
// immediately.
//
// The query function must follow these rules:
//
// 1. to access data it must use the passed in state.Store.
// 2. it must set the responseMeta.Index to an index greater than
// opts.GetMinQueryIndex if the results return by the query have changed.
// 3. any channels added to the memdb.WatchSet must unblock when the results
// returned by the query have changed.
//
// To ensure optimal performance of the query, the query function should make a
// best-effort attempt to follow these guidelines:
//
// 1. only set responseMeta.Index to an index greater than
// opts.GetMinQueryIndex when the results returned by the query have changed.
// 2. any channels added to the memdb.WatchSet should only unblock when the
// results returned by the query have changed.
func Query(
fsmServer FSMServer,
requestOpts RequestOptions,
responseMeta ResponseMeta,
query QueryFn,
) error {
var ctx context.Context = &lib.StopChannelContext{StopCh: fsmServer.GetShutdownChannel()}
metrics.IncrCounter([]string{"rpc", "query"}, 1)
minQueryIndex := requestOpts.GetMinQueryIndex()
// Perform a non-blocking query
if minQueryIndex == 0 {
if requestOpts.GetRequireConsistent() {
if err := fsmServer.ConsistentRead(); err != nil {
return err
}
}
var ws memdb.WatchSet
err := query(ws, fsmServer.GetState())
fsmServer.SetQueryMeta(responseMeta, requestOpts.GetToken())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotChanged) {
return nil
}
return err
}
maxQueryTimeout, err := requestOpts.GetMaxQueryTime()
if err != nil {
return err
}
timeout := fsmServer.RPCQueryTimeout(maxQueryTimeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
count := fsmServer.IncrementBlockingQueries()
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count))
// decrement the count when the function returns.
defer fsmServer.DecrementBlockingQueries()
var (
notFound bool
ranOnce bool
)
for {
if requestOpts.GetRequireConsistent() {
if err := fsmServer.ConsistentRead(); err != nil {
return err
}
}
// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
store := fsmServer.GetState()
ws := memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
// whole state store is abandoned.
ws.Add(store.AbandonCh())
err := query(ws, store)
fsmServer.SetQueryMeta(responseMeta, requestOpts.GetToken())
switch {
case errors.Is(err, ErrNotFound):
if notFound {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
notFound = true
case errors.Is(err, ErrNotChanged):
if ranOnce {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
case err != nil:
return err
}
ranOnce = true
if responseMeta.GetIndex() > minQueryIndex {
return nil
}
// block until something changes, or the timeout
if err := ws.WatchCtx(ctx); err != nil {
// exit if we've reached the timeout, or other cancellation
return nil
}
// exit if the state store has been abandoned
select {
case <-store.AbandonCh():
return nil
default:
}
}
}