mirror of https://github.com/hashicorp/consul
Implement prepared query upstreams watching for envoy (#5224)
Fixes #4969 This implements non-blocking request polling at the cache layer which is currently only used for prepared queries. Additionally this enables the proxycfg manager to poll prepared queries for use in envoy proxy upstreams.pull/5242/head
parent
8c87238b99
commit
7e6b3e6a0c
|
@ -22,6 +22,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
//go:generate mockery -all -inpkg
|
||||
|
@ -616,7 +617,7 @@ func backOffWait(failures uint) time.Duration {
|
|||
if waitTime > CacheRefreshMaxWait {
|
||||
waitTime = CacheRefreshMaxWait
|
||||
}
|
||||
return waitTime
|
||||
return waitTime + lib.RandomStagger(waitTime)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
|
|
@ -3,7 +3,10 @@ package cache
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
// UpdateEvent is a struct summarising an update to a cache entry
|
||||
|
@ -57,66 +60,175 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request,
|
|||
if !ok {
|
||||
return fmt.Errorf("unknown type in cache: %s", t)
|
||||
}
|
||||
if !tEntry.Type.SupportsBlocking() {
|
||||
return fmt.Errorf("watch requires the type to support blocking")
|
||||
}
|
||||
|
||||
// Always start at 0 index to deliver the inital (possibly currently cached
|
||||
// value).
|
||||
index := uint64(0)
|
||||
|
||||
go func() {
|
||||
var failures uint
|
||||
|
||||
for {
|
||||
// Check context hasn't been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Blocking request
|
||||
res, meta, err := c.getWithIndex(t, r, index)
|
||||
|
||||
// Check context hasn't been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check the index of the value returned in the cache entry to be sure it
|
||||
// changed
|
||||
if index < meta.Index {
|
||||
u := UpdateEvent{correlationID, res, meta, err}
|
||||
select {
|
||||
case ch <- u:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// Update index for next request
|
||||
index = meta.Index
|
||||
}
|
||||
|
||||
// Handle errors with backoff. Badly behaved blocking calls that returned
|
||||
// a zero index are considered as failures since we need to not get stuck
|
||||
// in a busy loop.
|
||||
if err == nil && meta.Index > 0 {
|
||||
failures = 0
|
||||
} else {
|
||||
failures++
|
||||
}
|
||||
if wait := backOffWait(failures); wait > 0 {
|
||||
select {
|
||||
case <-time.After(wait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
// Sanity check we always request blocking on second pass
|
||||
if index < 1 {
|
||||
index = 1
|
||||
}
|
||||
if tEntry.Type.SupportsBlocking() {
|
||||
go c.notifyBlockingQuery(ctx, t, r, correlationID, ch)
|
||||
} else {
|
||||
info := r.CacheInfo()
|
||||
if info.MaxAge == 0 {
|
||||
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
|
||||
}
|
||||
}()
|
||||
go c.notifyPollingQuery(ctx, t, r, correlationID, ch, info.MaxAge)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) {
|
||||
// Always start at 0 index to deliver the initial (possibly currently cached
|
||||
// value).
|
||||
index := uint64(0)
|
||||
failures := uint(0)
|
||||
|
||||
for {
|
||||
// Check context hasn't been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Blocking request
|
||||
res, meta, err := c.getWithIndex(t, r, index)
|
||||
|
||||
// Check context hasn't been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check the index of the value returned in the cache entry to be sure it
|
||||
// changed
|
||||
if index < meta.Index {
|
||||
u := UpdateEvent{correlationID, res, meta, err}
|
||||
select {
|
||||
case ch <- u:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// Update index for next request
|
||||
index = meta.Index
|
||||
}
|
||||
|
||||
// Handle errors with backoff. Badly behaved blocking calls that returned
|
||||
// a zero index are considered as failures since we need to not get stuck
|
||||
// in a busy loop.
|
||||
wait := 0 * time.Second
|
||||
if err == nil && meta.Index > 0 {
|
||||
failures = 0
|
||||
} else {
|
||||
failures++
|
||||
wait = backOffWait(failures)
|
||||
}
|
||||
|
||||
if wait > 0 {
|
||||
select {
|
||||
case <-time.After(wait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
// Sanity check we always request blocking on second pass
|
||||
if index < 1 {
|
||||
index = 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) {
|
||||
index := uint64(0)
|
||||
failures := uint(0)
|
||||
|
||||
var lastValue interface{} = nil
|
||||
|
||||
for {
|
||||
// Check context hasn't been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Make the request
|
||||
res, meta, err := c.getWithIndex(t, r, index)
|
||||
|
||||
// Check context hasn't been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for a change in the value or an index change
|
||||
if index < meta.Index || !reflect.DeepEqual(lastValue, res) {
|
||||
u := UpdateEvent{correlationID, res, meta, err}
|
||||
select {
|
||||
case ch <- u:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// Update index and lastValue
|
||||
lastValue = res
|
||||
index = meta.Index
|
||||
}
|
||||
|
||||
// Reset or increment failure counter
|
||||
if err == nil {
|
||||
failures = 0
|
||||
} else {
|
||||
failures++
|
||||
}
|
||||
|
||||
// Determining how long to wait before the next poll is complicated.
|
||||
// First off the happy path and the error path waits are handled distinctly
|
||||
//
|
||||
// Once fetching the data through the cache returns an error (and until a
|
||||
// non-error value is returned) the wait time between each round of the loop
|
||||
// gets controlled by the backOffWait function. Because we would have waited
|
||||
// at least until the age of the cached data was too old the error path should
|
||||
// immediately retry the fetch and backoff on the time as needed for persistent
|
||||
// failures which potentially will wait much longer than the MaxAge of the request
|
||||
//
|
||||
// When on the happy path we just need to fetch from the cache often enough to ensure
|
||||
// that the data is not older than the MaxAge. Therefore after fetching the data from
|
||||
// the cache we can sleep until the age of that data would exceed the MaxAge. Sometimes
|
||||
// this will be for the MaxAge duration (like when only a single notify was executed so
|
||||
// only 1 go routine is keeping the cache updated). Other times this will be some smaller
|
||||
// duration than MaxAge (when multiple notify calls were executed and this go routine just
|
||||
// got data back from the cache that was a cache hit after the other go routine fetched it
|
||||
// without a hit). We cannot just set MustRevalidate on the request and always sleep for MaxAge
|
||||
// as this would eliminate the single-flighting of these requests in the cache and
|
||||
// the efficiencies gained by it.
|
||||
if failures > 0 {
|
||||
|
||||
errWait := backOffWait(failures)
|
||||
select {
|
||||
case <-time.After(errWait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Default to immediately re-poll. This only will happen if the data
|
||||
// we just got out of the cache is already too stale
|
||||
pollWait := 0 * time.Second
|
||||
|
||||
// Calculate when the cached data's Age will get too stale and
|
||||
// need to be re-queried. When the data's Age already exceeds the
|
||||
// maxAge the pollWait value is left at 0 to immediately re-poll
|
||||
if meta.Age <= maxAge {
|
||||
pollWait = maxAge - meta.Age
|
||||
}
|
||||
|
||||
// Add a small amount of random jitter to the polling time. One
|
||||
// purpose of the jitter is to ensure that the next time
|
||||
// we fetch from the cache the data will be stale (unless another
|
||||
// notify go routine has updated it while this one is sleeping).
|
||||
// Without this it would be possible to wake up, fetch the data
|
||||
// again where the age of the data is strictly equal to the MaxAge
|
||||
// and then immediately have to re-fetch again. That wouldn't
|
||||
// be terrible but it would expend a bunch more cpu cycles when
|
||||
// we can definitely avoid it.
|
||||
pollWait += lib.RandomStagger(maxAge / 16)
|
||||
|
||||
select {
|
||||
case <-time.After(pollWait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ func TestCacheNotify(t *testing.T) {
|
|||
Err: nil,
|
||||
})
|
||||
|
||||
// Registere a second observer using same chan and request. Note that this is
|
||||
// Register a second observer using same chan and request. Note that this is
|
||||
// testing a few things implicitly:
|
||||
// - that multiple watchers on the same cache entity are de-duped in their
|
||||
// requests to the "backend"
|
||||
|
@ -144,6 +144,120 @@ func TestCacheNotify(t *testing.T) {
|
|||
// important things to get working.
|
||||
}
|
||||
|
||||
func TestCacheNotifyPolling(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ := TestTypeNonBlocking(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, &RegisterOptions{
|
||||
Refresh: false,
|
||||
})
|
||||
|
||||
// Configure the type
|
||||
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
|
||||
// Assert the right request type - all real Fetch implementations do this so
|
||||
// it keeps us honest that Watch doesn't require type mangling which will
|
||||
// break in real life (hint: it did on the first attempt)
|
||||
_, ok := args.Get(1).(*MockRequest)
|
||||
require.True(t, ok)
|
||||
})
|
||||
typ.Static(FetchResult{Value: 12, Index: 1}, nil).Once()
|
||||
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
ch := make(chan UpdateEvent)
|
||||
|
||||
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch)
|
||||
require.NoError(err)
|
||||
|
||||
// Should receive the first result pretty soon
|
||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||
CorrelationID: "test",
|
||||
Result: 1,
|
||||
Meta: ResultMeta{Hit: false, Index: 1},
|
||||
Err: nil,
|
||||
})
|
||||
|
||||
// There should be no more updates delivered yet
|
||||
require.Len(ch, 0)
|
||||
|
||||
// make sure the updates do not come too quickly
|
||||
select {
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
case <-ch:
|
||||
require.Fail("Received update too early")
|
||||
}
|
||||
|
||||
// make sure we get the update not too far out.
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
require.Fail("Didn't receive the notification")
|
||||
case result := <-ch:
|
||||
require.Equal(result.Result, 12)
|
||||
require.Equal(result.CorrelationID, "test")
|
||||
require.Equal(result.Meta.Hit, false)
|
||||
require.Equal(result.Meta.Index, uint64(1))
|
||||
// pretty conservative check it should be even newer because without a second
|
||||
// notifier each value returned will have been executed just then and not served
|
||||
// from the cache.
|
||||
require.True(result.Meta.Age < 50*time.Millisecond)
|
||||
require.NoError(result.Err)
|
||||
}
|
||||
|
||||
require.Len(ch, 0)
|
||||
|
||||
// Register a second observer using same chan and request. Note that this is
|
||||
// testing a few things implicitly:
|
||||
// - that multiple watchers on the same cache entity are de-duped in their
|
||||
// requests to the "backend"
|
||||
// - that multiple watchers can distinguish their results using correlationID
|
||||
err = c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test2", ch)
|
||||
require.NoError(err)
|
||||
|
||||
// Should get test2 notify immediately, and it should be a cache hit
|
||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||
CorrelationID: "test2",
|
||||
Result: 12,
|
||||
Meta: ResultMeta{Hit: true, Index: 1},
|
||||
Err: nil,
|
||||
})
|
||||
|
||||
require.Len(ch, 0)
|
||||
|
||||
// wait for the next batch of responses
|
||||
events := make([]UpdateEvent, 0)
|
||||
// 110 is needed to allow for the jitter
|
||||
timeout := time.After(110 * time.Millisecond)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-timeout:
|
||||
require.Fail("UpdateEvent not received in time")
|
||||
case eve := <-ch:
|
||||
events = append(events, eve)
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(events[0].Result, 42)
|
||||
require.Equal(events[0].Meta.Hit, false)
|
||||
require.Equal(events[0].Meta.Index, uint64(1))
|
||||
require.True(events[0].Meta.Age < 50*time.Millisecond)
|
||||
require.NoError(events[0].Err)
|
||||
require.Equal(events[1].Result, 42)
|
||||
// Sometimes this would be a hit and others not. It all depends on when the various getWithIndex calls got fired.
|
||||
// If both are done concurrently then it will not be a cache hit but the request gets single flighted and both
|
||||
// get notified at the same time.
|
||||
// require.Equal(events[1].Meta.Hit, true)
|
||||
require.Equal(events[1].Meta.Index, uint64(1))
|
||||
require.True(events[1].Meta.Age < 100*time.Millisecond)
|
||||
require.NoError(events[1].Err)
|
||||
}
|
||||
|
||||
// Test that a refresh performs a backoff.
|
||||
func TestCacheWatch_ErrorBackoff(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
@ -186,7 +300,7 @@ func TestCacheWatch_ErrorBackoff(t *testing.T) {
|
|||
// was running as fast as it could go we'd expect this to be huge. We have to
|
||||
// be a little careful here because the watch chan ch doesn't have a large
|
||||
// buffer so we could be artificially slowing down the loop without the
|
||||
// backoff actualy taking affect. We can validate that by ensuring this test
|
||||
// backoff actually taking effect. We can validate that by ensuring this test
|
||||
// fails without the backoff code reliably.
|
||||
timeoutC := time.After(500 * time.Millisecond)
|
||||
OUT:
|
||||
|
@ -206,3 +320,69 @@ OUT:
|
|||
actual := atomic.LoadUint32(&retries)
|
||||
require.True(actual < 10, fmt.Sprintf("actual: %d", actual))
|
||||
}
|
||||
|
||||
// Test that a refresh performs a backoff.
|
||||
func TestCacheWatch_ErrorBackoffNonBlocking(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ := TestTypeNonBlocking(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, &RegisterOptions{
|
||||
Refresh: false,
|
||||
})
|
||||
|
||||
// Configure the type
|
||||
var retries uint32
|
||||
fetchErr := fmt.Errorf("test fetch error")
|
||||
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
|
||||
typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) {
|
||||
atomic.AddUint32(&retries, 1)
|
||||
})
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
ch := make(chan UpdateEvent)
|
||||
|
||||
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch)
|
||||
require.NoError(err)
|
||||
|
||||
// Should receive the first result pretty soon
|
||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||
CorrelationID: "test",
|
||||
Result: 1,
|
||||
Meta: ResultMeta{Hit: false, Index: 4},
|
||||
Err: nil,
|
||||
})
|
||||
|
||||
numErrors := 0
|
||||
// Loop for a little while and count how many errors we see reported. If this
|
||||
// was running as fast as it could go we'd expect this to be huge. We have to
|
||||
// be a little careful here because the watch chan ch doesn't have a large
|
||||
// buffer so we could be artificially slowing down the loop without the
|
||||
// backoff actually taking effect. We can validate that by ensuring this test
|
||||
// fails without the backoff code reliably.
|
||||
//
|
||||
// 100 + 500 milliseconds. 100 because the first retry will not happen until
|
||||
// the 100 + jitter milliseconds have elapsed.
|
||||
timeoutC := time.After(600 * time.Millisecond)
|
||||
OUT:
|
||||
for {
|
||||
select {
|
||||
case <-timeoutC:
|
||||
break OUT
|
||||
case u := <-ch:
|
||||
numErrors++
|
||||
require.Error(u.Err)
|
||||
}
|
||||
}
|
||||
// Must be fewer than 10 failures in that time
|
||||
require.True(numErrors < 10, fmt.Sprintf("numErrors: %d", numErrors))
|
||||
|
||||
// Check the number of RPCs as a sanity check too
|
||||
actual := atomic.LoadUint32(&retries)
|
||||
require.True(actual < 10, fmt.Sprintf("actual: %d", actual))
|
||||
}
|
||||
|
|
|
@ -16,12 +16,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
coalesceTimeout = 200 * time.Millisecond
|
||||
rootsWatchID = "roots"
|
||||
leafWatchID = "leaf"
|
||||
intentionsWatchID = "intentions"
|
||||
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
|
||||
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
|
||||
coalesceTimeout = 200 * time.Millisecond
|
||||
rootsWatchID = "roots"
|
||||
leafWatchID = "leaf"
|
||||
intentionsWatchID = "intentions"
|
||||
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
|
||||
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
|
||||
defaultPreparedQueryPollInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
// state holds all the state needed to maintain the config for a registered
|
||||
|
@ -164,15 +165,12 @@ func (s *state) initWatches() error {
|
|||
|
||||
switch u.DestinationType {
|
||||
case structs.UpstreamDestTypePreparedQuery:
|
||||
// TODO(banks): prepared queries don't support blocking. We need to come
|
||||
// up with an alternative to Notify that will poll at a sensible rate.
|
||||
|
||||
// err = c.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
|
||||
// Datacenter: dc,
|
||||
// QueryOptions: structs.QueryOptions{Token: token},
|
||||
// QueryIDOrName: u.DestinationName,
|
||||
// Connect: true,
|
||||
// }, u.Identifier(), ch)
|
||||
err = s.cache.Notify(s.ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
|
||||
QueryIDOrName: u.DestinationName,
|
||||
Connect: true,
|
||||
}, u.Identifier(), s.ch)
|
||||
case structs.UpstreamDestTypeService:
|
||||
fallthrough
|
||||
case "": // Treat unset as the default Service type
|
||||
|
|
Loading…
Reference in New Issue