mirror of https://github.com/hashicorp/consul
agent/cache: send the RefreshTimeout into the backend fetch
parent
db4c47df27
commit
1df99514ca
|
@ -134,6 +134,10 @@ type RegisterOptions struct {
|
||||||
// This makes the type available for Get but does not automatically perform
|
// This makes the type available for Get but does not automatically perform
|
||||||
// any prefetching. In order to populate the cache, Get must be called.
|
// any prefetching. In order to populate the cache, Get must be called.
|
||||||
func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
|
func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
|
||||||
|
if opts == nil {
|
||||||
|
opts = &RegisterOptions{}
|
||||||
|
}
|
||||||
|
|
||||||
c.typesLock.Lock()
|
c.typesLock.Lock()
|
||||||
defer c.typesLock.Unlock()
|
defer c.typesLock.Unlock()
|
||||||
c.types[n] = typeEntry{Type: typ, Opts: opts}
|
c.types[n] = typeEntry{Type: typ, Opts: opts}
|
||||||
|
@ -290,6 +294,7 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
|
||||||
// Start building the new entry by blocking on the fetch.
|
// Start building the new entry by blocking on the fetch.
|
||||||
result, err := tEntry.Type.Fetch(FetchOptions{
|
result, err := tEntry.Type.Fetch(FetchOptions{
|
||||||
MinIndex: entry.Index,
|
MinIndex: entry.Index,
|
||||||
|
Timeout: tEntry.Opts.RefreshTimeout,
|
||||||
}, r)
|
}, r)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -336,7 +341,7 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
|
||||||
|
|
||||||
// If refresh is enabled, run the refresh in due time. The refresh
|
// If refresh is enabled, run the refresh in due time. The refresh
|
||||||
// below might block, but saves us from spawning another goroutine.
|
// below might block, but saves us from spawning another goroutine.
|
||||||
if tEntry.Opts != nil && tEntry.Opts.Refresh {
|
if tEntry.Opts.Refresh {
|
||||||
c.refresh(tEntry.Opts, t, key, r)
|
c.refresh(tEntry.Opts, t, key, r)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -336,6 +336,39 @@ func TestCacheGet_periodicRefresh(t *testing.T) {
|
||||||
TestCacheGetChResult(t, resultCh, 12)
|
TestCacheGetChResult(t, resultCh, 12)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that the backend fetch sets the proper timeout.
|
||||||
|
func TestCacheGet_fetchTimeout(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
typ := TestType(t)
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
|
||||||
|
// Register the type with a timeout
|
||||||
|
timeout := 10 * time.Minute
|
||||||
|
c.RegisterType("t", typ, &RegisterOptions{
|
||||||
|
RefreshTimeout: timeout,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Configure the type
|
||||||
|
var actual time.Duration
|
||||||
|
typ.Static(FetchResult{Value: 42}, nil).Times(1).Run(func(args mock.Arguments) {
|
||||||
|
opts := args.Get(0).(FetchOptions)
|
||||||
|
actual = opts.Timeout
|
||||||
|
})
|
||||||
|
|
||||||
|
// Get, should fetch
|
||||||
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
|
result, err := c.Get("t", req)
|
||||||
|
require.Nil(err)
|
||||||
|
require.Equal(42, result)
|
||||||
|
|
||||||
|
// Test the timeout
|
||||||
|
require.Equal(timeout, actual)
|
||||||
|
}
|
||||||
|
|
||||||
// Test that Get partitions the caches based on DC so two equivalent requests
|
// Test that Get partitions the caches based on DC so two equivalent requests
|
||||||
// to different datacenters are automatically cached even if their keys are
|
// to different datacenters are automatically cached even if their keys are
|
||||||
// the same.
|
// the same.
|
||||||
|
|
Loading…
Reference in New Issue