mirror of https://github.com/hashicorp/consul
agent/cache: always fetch with minimum index of 1 at least
parent
e02a5fc2cc
commit
cf9b377c78
|
@ -168,12 +168,18 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
|||
// reached (on timeout ErrTimeout is returned on the channel).
|
||||
func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error,
|
||||
timeout time.Duration) {
|
||||
// We always want to block on at least an initial value. If this isn't
|
||||
minIndex := atomic.LoadUint64(&c.caIndex)
|
||||
if minIndex == 0 {
|
||||
minIndex = 1
|
||||
}
|
||||
|
||||
// Fetch some new roots. This will block until our MinQueryIndex is
|
||||
// matched or the timeout is reached.
|
||||
rawRoots, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
|
||||
Datacenter: datacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: atomic.LoadUint64(&c.caIndex),
|
||||
MinQueryIndex: minIndex,
|
||||
MaxQueryTime: timeout,
|
||||
},
|
||||
})
|
||||
|
|
|
@ -326,6 +326,14 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
|||
entry = cacheEntry{Valid: false, Waiter: make(chan struct{})}
|
||||
}
|
||||
|
||||
// We always specify an index greater than zero since index of zero
|
||||
// means to always return immediately and we want to block if possible.
|
||||
// Index 1 is always safe since Consul's own initialization always results
|
||||
// in a higher index (around 10 or above).
|
||||
if entry.Index == 0 {
|
||||
entry.Index = 1
|
||||
}
|
||||
|
||||
// Set that we're fetching to true, which makes it so that future
|
||||
// identical calls to fetch will return the same waiter rather than
|
||||
// perform multiple fetches.
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -413,6 +414,36 @@ func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
|
|||
require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual))
|
||||
}
|
||||
|
||||
// Test that fetching with no index actually sets to index to one, including
|
||||
// for background refreshes. This ensures we don't end up with any index 0
|
||||
// loops.
|
||||
func TestCacheGet_noIndexSetsOne(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ := TestType(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, &RegisterOptions{
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
RefreshTimeout: 5 * time.Minute,
|
||||
})
|
||||
|
||||
// Configure the type
|
||||
fetchErr := fmt.Errorf("test fetch error")
|
||||
typ.Static(FetchResult{Value: 42, Index: 0}, fetchErr).Run(func(args mock.Arguments) {
|
||||
opts := args.Get(0).(FetchOptions)
|
||||
assert.True(t, opts.MinIndex > 0, "minIndex > 0")
|
||||
})
|
||||
|
||||
// Fetch
|
||||
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||
TestCacheGetChResult(t, resultCh, 42)
|
||||
|
||||
// Sleep a bit so background refresh happens
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Test that the backend fetch sets the proper timeout.
|
||||
func TestCacheGet_fetchTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
Loading…
Reference in New Issue