agent/cache: blank cache key means to always fetch

pull/4275/head
Mitchell Hashimoto 2018-04-08 14:30:14 +01:00
parent 1cfb0f1922
commit 975be337a9
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
6 changed files with 122 additions and 56 deletions

70
agent/cache/cache.go vendored
View File

@ -20,29 +20,10 @@ import (
//go:generate mockery -all -inpkg //go:generate mockery -all -inpkg
// Pre-written options for type registration. These should not be modified. // TODO: DC-aware, ACL-aware
var (
// RegisterOptsPeriodic performs a periodic refresh of data fetched
// by the registered type.
RegisterOptsPeriodic = &RegisterOptions{
Refresh: true,
RefreshTimer: 30 * time.Second,
RefreshTimeout: 5 * time.Minute,
}
)
// TODO: DC-aware
// RPC is an interface that an RPC client must implement.
type RPC interface {
RPC(method string, args interface{}, reply interface{}) error
}
// Cache is a agent-local cache of Consul data. // Cache is a agent-local cache of Consul data.
type Cache struct { type Cache struct {
// rpcClient is the RPC-client.
rpcClient RPC
entriesLock sync.RWMutex entriesLock sync.RWMutex
entries map[string]cacheEntry entries map[string]cacheEntry
@ -50,6 +31,7 @@ type Cache struct {
types map[string]typeEntry types map[string]typeEntry
} }
// cacheEntry stores a single cache entry.
type cacheEntry struct { type cacheEntry struct {
// Fields pertaining to the actual value // Fields pertaining to the actual value
Value interface{} Value interface{}
@ -68,13 +50,17 @@ type typeEntry struct {
Opts *RegisterOptions Opts *RegisterOptions
} }
// Options are options for the Cache.
type Options struct {
// Nothing currently, reserved.
}
// New creates a new cache with the given RPC client and reasonable defaults. // New creates a new cache with the given RPC client and reasonable defaults.
// Further settings can be tweaked on the returned value. // Further settings can be tweaked on the returned value.
func New(rpc RPC) *Cache { func New(*Options) *Cache {
return &Cache{ return &Cache{
rpcClient: rpc, entries: make(map[string]cacheEntry),
entries: make(map[string]cacheEntry), types: make(map[string]typeEntry),
types: make(map[string]typeEntry),
} }
} }
@ -124,7 +110,11 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
// block on a single network request. // block on a single network request.
func (c *Cache) Get(t string, r Request) (interface{}, error) { func (c *Cache) Get(t string, r Request) (interface{}, error) {
key := r.CacheKey() key := r.CacheKey()
idx := r.CacheMinIndex() if key == "" {
// If no key is specified, then we do not cache this request.
// Pass directly through to the backend.
return c.fetchDirect(t, r)
}
RETRY_GET: RETRY_GET:
// Get the current value // Get the current value
@ -136,8 +126,11 @@ RETRY_GET:
// currently stored index then we return that right away. If the // currently stored index then we return that right away. If the
// index is zero and we have something in the cache we accept whatever // index is zero and we have something in the cache we accept whatever
// we have. // we have.
if ok && entry.Valid && (idx == 0 || idx < entry.Index) { if ok && entry.Valid {
return entry.Value, nil idx := r.CacheMinIndex()
if idx == 0 || idx < entry.Index {
return entry.Value, nil
}
} }
// At this point, we know we either don't have a value at all or the // At this point, we know we either don't have a value at all or the
@ -192,7 +185,6 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
// Start building the new entry by blocking on the fetch. // Start building the new entry by blocking on the fetch.
var newEntry cacheEntry var newEntry cacheEntry
result, err := tEntry.Type.Fetch(FetchOptions{ result, err := tEntry.Type.Fetch(FetchOptions{
RPC: c.rpcClient,
MinIndex: entry.Index, MinIndex: entry.Index,
}, r) }, r)
newEntry.Value = result.Value newEntry.Value = result.Value
@ -223,6 +215,28 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
return entry.Waiter, nil return entry.Waiter, nil
} }
// fetchDirect fetches the given request with no caching.
func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown type in cache: %s", t)
}
// Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{
MinIndex: r.CacheMinIndex(),
}, r)
if err != nil {
return nil, err
}
// Return the result and ignore the rest
return result.Value, nil
}
func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) { func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) {
// Sanity-check, we should not schedule anything that has refresh disabled // Sanity-check, we should not schedule anything that has refresh disabled
if !opts.Refresh { if !opts.Refresh {

View File

@ -41,6 +41,38 @@ func TestCacheGet_noIndex(t *testing.T) {
typ.AssertExpectations(t) typ.AssertExpectations(t)
} }
// Test a Get with a request that returns a blank cache key. This should
// force a backend request and skip the cache entirely.
func TestCacheGet_blankCacheKey(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(2)
// Get, should fetch
req := TestRequest(t, "", 0)
result, err := c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Get, should not fetch since we already have a satisfying value
result, err = c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}
// Test that Get blocks on the initial value // Test that Get blocks on the initial value
func TestCacheGet_blockingInitSameKey(t *testing.T) { func TestCacheGet_blockingInitSameKey(t *testing.T) {
t.Parallel() t.Parallel()

8
agent/cache/rpc.go vendored Normal file
View File

@ -0,0 +1,8 @@
package cache
// RPC is an interface that an RPC client must implement. This is a helper
// interface that is implemented by the agent delegate so that Type
// implementations can request RPC access.
type RPC interface {
RPC(method string, args interface{}, reply interface{}) error
}

View File

@ -11,7 +11,7 @@ import (
// TestCache returns a Cache instance configuring for testing. // TestCache returns a Cache instance configuring for testing.
func TestCache(t testing.T) *Cache { func TestCache(t testing.T) *Cache {
// Simple but lets us do some fine-tuning later if we want to. // Simple but lets us do some fine-tuning later if we want to.
return New(TestRPC(t)) return New(nil)
} }
// TestCacheGetCh returns a channel that returns the result of the Get call. // TestCacheGetCh returns a channel that returns the result of the Get call.

27
agent/cache/type.go vendored
View File

@ -20,9 +20,6 @@ type Type interface {
// FetchOptions are various settable options when a Fetch is called. // FetchOptions are various settable options when a Fetch is called.
type FetchOptions struct { type FetchOptions struct {
// RPC is the RPC client to communicate to a Consul server.
RPC RPC
// MinIndex is the minimum index to be used for blocking queries. // MinIndex is the minimum index to be used for blocking queries.
// If blocking queries aren't supported for data being returned, // If blocking queries aren't supported for data being returned,
// this value can be ignored. // this value can be ignored.
@ -42,27 +39,3 @@ type FetchResult struct {
// Index is the corresponding index value for this data. // Index is the corresponding index value for this data.
Index uint64 Index uint64
} }
/*
type TypeCARoot struct{}
func (c *TypeCARoot) Fetch(delegate RPC, idx uint64, req Request) (interface{}, uint64, error) {
// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.DCSpecificRequest)
if !ok {
return nil, 0, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = idx
// Fetch
var reply structs.IndexedCARoots
if err := delegate.RPC("ConnectCA.Roots", reqReal, &reply); err != nil {
return nil, 0, err
}
return &reply, reply.QueryMeta.Index, nil
}
*/

39
agent/cache/type_connect_ca.go vendored Normal file
View File

@ -0,0 +1,39 @@
package cache
/*
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
)
// TypeCARoot supports fetching the Connect CA roots.
type TypeCARoot struct {
RPC RPC
}
func (c *TypeCARoot) Fetch(opts FetchOptions, req Request) (FetchResult, error) {
var result FetchResult
// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.DCSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Fetch
var reply structs.IndexedCARoots
if err := c.RPC.RPC("ConnectCA.Roots", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
*/