mirror of https://github.com/hashicorp/consul
Fix hot loop in cache for RPC returning zero index.
parent
43b48bc06b
commit
2e223ea2b7
|
@ -325,14 +325,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
||||||
entry = cacheEntry{Valid: false, Waiter: make(chan struct{})}
|
entry = cacheEntry{Valid: false, Waiter: make(chan struct{})}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We always specify an index greater than zero since index of zero
|
|
||||||
// means to always return immediately and we want to block if possible.
|
|
||||||
// Index 1 is always safe since Consul's own initialization always results
|
|
||||||
// in a higher index (around 10 or above).
|
|
||||||
if entry.Index == 0 {
|
|
||||||
entry.Index = 1
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set that we're fetching to true, which makes it so that future
|
// Set that we're fetching to true, which makes it so that future
|
||||||
// identical calls to fetch will return the same waiter rather than
|
// identical calls to fetch will return the same waiter rather than
|
||||||
// perform multiple fetches.
|
// perform multiple fetches.
|
||||||
|
@ -355,6 +347,17 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
||||||
// A new value was given, so we create a brand new entry.
|
// A new value was given, so we create a brand new entry.
|
||||||
newEntry.Value = result.Value
|
newEntry.Value = result.Value
|
||||||
newEntry.Index = result.Index
|
newEntry.Index = result.Index
|
||||||
|
if newEntry.Index < 1 {
|
||||||
|
// Less than one is invalid unless there was an error and in this case
|
||||||
|
// there wasn't since a value was returned. If a badly behaved RPC
|
||||||
|
// returns 0 when it has no data, we might get into a busy loop here. We
|
||||||
|
// set this to minimum of 1 which is safe because no valid user data can
|
||||||
|
// ever be written at raft index 1 due to the bootstrap process for
|
||||||
|
// raft. This insure that any subsequent background refresh request will
|
||||||
|
// always block, but allows the initial request to return immediately
|
||||||
|
// even if there is no data.
|
||||||
|
newEntry.Index = 1
|
||||||
|
}
|
||||||
|
|
||||||
// This is a valid entry with a result
|
// This is a valid entry with a result
|
||||||
newEntry.Valid = true
|
newEntry.Valid = true
|
||||||
|
@ -365,8 +368,25 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
|
||||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
|
||||||
|
|
||||||
// Reset the attempts counter so we don't have any backoff
|
if result.Index > 0 {
|
||||||
attempt = 0
|
// Reset the attempts counter so we don't have any backoff
|
||||||
|
attempt = 0
|
||||||
|
} else {
|
||||||
|
// Result having a zero index is an implicit error case. There was no
|
||||||
|
// actual error but it implies the RPC found in index (nothing written
|
||||||
|
// yet for that type) but didn't take care to return safe "1" index. We
|
||||||
|
// don't want to actually treat it like an error by setting
|
||||||
|
// newEntry.Error to something non-nil, but we should guard against 100%
|
||||||
|
// CPU burn hot loops caused by that case which will never block but
|
||||||
|
// also won't backoff either. So we treat it as a failed attempt so that
|
||||||
|
// at least the failure backoff will save our CPU while still
|
||||||
|
// periodically refreshing so normal service can resume when the servers
|
||||||
|
// actually have something to return from the RPC. If we get in this
|
||||||
|
// state it can be considered a bug in the RPC implementation (to ever
|
||||||
|
// return a zero index) however since it can happen this is a safety net
|
||||||
|
// for the future.
|
||||||
|
attempt++
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
|
||||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
|
||||||
|
|
|
@ -471,10 +471,8 @@ func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
|
||||||
require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual))
|
require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that fetching with no index actually sets to index to one, including
|
// Test that a badly behaved RPC that returns 0 index will perform a backoff.
|
||||||
// for background refreshes. This ensures we don't end up with any index 0
|
func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) {
|
||||||
// loops.
|
|
||||||
func TestCacheGet_noIndexSetsOne(t *testing.T) {
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
|
@ -487,18 +485,91 @@ func TestCacheGet_noIndexSetsOne(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
fetchErr := fmt.Errorf("test fetch error")
|
var retries uint32
|
||||||
typ.Static(FetchResult{Value: 42, Index: 0}, fetchErr).Run(func(args mock.Arguments) {
|
typ.Static(FetchResult{Value: 0, Index: 0}, nil).Run(func(args mock.Arguments) {
|
||||||
opts := args.Get(0).(FetchOptions)
|
atomic.AddUint32(&retries, 1)
|
||||||
assert.True(t, opts.MinIndex > 0, "minIndex > 0")
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// Fetch
|
// Fetch
|
||||||
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
TestCacheGetChResult(t, resultCh, 42)
|
TestCacheGetChResult(t, resultCh, 0)
|
||||||
|
|
||||||
// Sleep a bit so background refresh happens
|
// Sleep a bit. The refresh will quietly fail in the background. What we
|
||||||
time.Sleep(100 * time.Millisecond)
|
// want to verify is that it doesn't retry too much. "Too much" is hard
|
||||||
|
// to measure since its CPU dependent if this test is failing. But due
|
||||||
|
// to the short sleep below, we can calculate about what we'd expect if
|
||||||
|
// backoff IS working.
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
|
// Fetch should work, we should get a 0 still. Errors are ignored.
|
||||||
|
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
|
TestCacheGetChResult(t, resultCh, 0)
|
||||||
|
|
||||||
|
// Check the number
|
||||||
|
actual := atomic.LoadUint32(&retries)
|
||||||
|
require.True(t, actual < 10, fmt.Sprintf("%d retries, should be < 10", actual))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that fetching with no index makes an initial request with no index, but
|
||||||
|
// then ensures all background refreshes have > 0. This ensures we don't end up
|
||||||
|
// with any index 0 loops from background refreshed while also returning
|
||||||
|
// immediately on the initial request if there is no data written to that table
|
||||||
|
// yet.
|
||||||
|
func TestCacheGet_noIndexSetsOne(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
typ := TestType(t)
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ, &RegisterOptions{
|
||||||
|
Refresh: true,
|
||||||
|
RefreshTimer: 0,
|
||||||
|
RefreshTimeout: 5 * time.Minute,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Simulate "well behaved" RPC with no data yet but returning 1
|
||||||
|
{
|
||||||
|
first := int32(1)
|
||||||
|
|
||||||
|
typ.Static(FetchResult{Value: 0, Index: 1}, nil).Run(func(args mock.Arguments) {
|
||||||
|
opts := args.Get(0).(FetchOptions)
|
||||||
|
isFirst := atomic.SwapInt32(&first, 0)
|
||||||
|
if isFirst == 1 {
|
||||||
|
assert.Equal(t, uint64(0), opts.MinIndex)
|
||||||
|
} else {
|
||||||
|
assert.True(t, opts.MinIndex > 0, "minIndex > 0")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
|
TestCacheGetChResult(t, resultCh, 0)
|
||||||
|
|
||||||
|
// Sleep a bit so background refresh happens
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Same for "badly behaved" RPC that returns 0 index and no data
|
||||||
|
{
|
||||||
|
first := int32(1)
|
||||||
|
|
||||||
|
typ.Static(FetchResult{Value: 0, Index: 0}, nil).Run(func(args mock.Arguments) {
|
||||||
|
opts := args.Get(0).(FetchOptions)
|
||||||
|
isFirst := atomic.SwapInt32(&first, 0)
|
||||||
|
if isFirst == 1 {
|
||||||
|
assert.Equal(t, uint64(0), opts.MinIndex)
|
||||||
|
} else {
|
||||||
|
assert.True(t, opts.MinIndex > 0, "minIndex > 0")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
|
TestCacheGetChResult(t, resultCh, 0)
|
||||||
|
|
||||||
|
// Sleep a bit so background refresh happens
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that the backend fetch sets the proper timeout.
|
// Test that the backend fetch sets the proper timeout.
|
||||||
|
|
|
@ -132,6 +132,9 @@ func (s *Store) Intentions(ws memdb.WatchSet) (uint64, structs.Intentions, error
|
||||||
|
|
||||||
// Get the index
|
// Get the index
|
||||||
idx := maxIndexTxn(tx, intentionsTableName)
|
idx := maxIndexTxn(tx, intentionsTableName)
|
||||||
|
if idx < 1 {
|
||||||
|
idx = 1
|
||||||
|
}
|
||||||
|
|
||||||
// Get all intentions
|
// Get all intentions
|
||||||
iter, err := tx.Get(intentionsTableName, "id")
|
iter, err := tx.Get(intentionsTableName, "id")
|
||||||
|
@ -228,6 +231,9 @@ func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Int
|
||||||
|
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := maxIndexTxn(tx, intentionsTableName)
|
idx := maxIndexTxn(tx, intentionsTableName)
|
||||||
|
if idx < 1 {
|
||||||
|
idx = 1
|
||||||
|
}
|
||||||
|
|
||||||
// Look up by its ID.
|
// Look up by its ID.
|
||||||
watchCh, intention, err := tx.FirstWatch(intentionsTableName, "id", id)
|
watchCh, intention, err := tx.FirstWatch(intentionsTableName, "id", id)
|
||||||
|
@ -295,6 +301,9 @@ func (s *Store) IntentionMatch(ws memdb.WatchSet, args *structs.IntentionQueryMa
|
||||||
|
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := maxIndexTxn(tx, intentionsTableName)
|
idx := maxIndexTxn(tx, intentionsTableName)
|
||||||
|
if idx < 1 {
|
||||||
|
idx = 1
|
||||||
|
}
|
||||||
|
|
||||||
// Make all the calls and accumulate the results
|
// Make all the calls and accumulate the results
|
||||||
results := make([]structs.Intentions, len(args.Entries))
|
results := make([]structs.Intentions, len(args.Entries))
|
||||||
|
|
|
@ -17,7 +17,7 @@ func TestStore_IntentionGet_none(t *testing.T) {
|
||||||
// Querying with no results returns nil.
|
// Querying with no results returns nil.
|
||||||
ws := memdb.NewWatchSet()
|
ws := memdb.NewWatchSet()
|
||||||
idx, res, err := s.IntentionGet(ws, testUUID())
|
idx, res, err := s.IntentionGet(ws, testUUID())
|
||||||
assert.Equal(idx, uint64(0))
|
assert.Equal(uint64(1), idx)
|
||||||
assert.Nil(res)
|
assert.Nil(res)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ func TestStore_IntentionsList(t *testing.T) {
|
||||||
idx, res, err := s.Intentions(ws)
|
idx, res, err := s.Intentions(ws)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Nil(res)
|
assert.Nil(res)
|
||||||
assert.Equal(idx, uint64(0))
|
assert.Equal(uint64(1), idx)
|
||||||
|
|
||||||
// Create some intentions
|
// Create some intentions
|
||||||
ixns := structs.Intentions{
|
ixns := structs.Intentions{
|
||||||
|
|
|
@ -352,6 +352,16 @@ func TestConfig(sources ...config.Source) *config.RuntimeConfig {
|
||||||
ca_config {
|
ca_config {
|
||||||
cluster_id = "` + connect.TestClusterID + `"
|
cluster_id = "` + connect.TestClusterID + `"
|
||||||
}
|
}
|
||||||
|
proxy_defaults {
|
||||||
|
// Generally we don't actually need to test real proxy startup except
|
||||||
|
// in the Daemon package which explicitly manages how it starts things
|
||||||
|
// so making this a no-op long running command like /bin/sleep would
|
||||||
|
// be fine, but there is no such thing on windows etc. We hackily rely
|
||||||
|
// on the fact that if the executable doesn't exist, the Daemon
|
||||||
|
// manager will get an exec error and retry it with a backoff beningly
|
||||||
|
// until tests pass.
|
||||||
|
daemon_command = ["/bin/sleep", "3600"]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
performance {
|
performance {
|
||||||
raft_multiplier = 1
|
raft_multiplier = 1
|
||||||
|
|
Loading…
Reference in New Issue