agent/cache: initial kind-of working cache

pull/4275/head
Mitchell Hashimoto 2018-04-03 20:46:07 -07:00
parent 33418afd3c
commit 1cfb0f1922
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
8 changed files with 698 additions and 0 deletions

239
agent/cache/cache.go vendored Normal file
View File

@ -0,0 +1,239 @@
// Package cache provides caching features for data from a Consul server.
//
// While this is similar in some ways to the "agent/ae" package, a key
// difference is that with anti-entropy, the agent is the authoritative
// source so it resolves differences the server may have. With caching (this
// package), the server is the authoritative source and we do our best to
// balance performance and correctness, depending on the type of data being
// requested.
//
// Currently, the cache package supports only continuous, blocking query
// caching. This means that the cache update is edge-triggered by Consul
// server blocking queries.
package cache
import (
"fmt"
"sync"
"time"
)
//go:generate mockery -all -inpkg
// Pre-written options for type registration. These should not be modified.
var (
// RegisterOptsPeriodic performs a periodic refresh of data fetched
// by the registered type.
RegisterOptsPeriodic = &RegisterOptions{
Refresh: true,
RefreshTimer: 30 * time.Second,
RefreshTimeout: 5 * time.Minute,
}
)
// TODO: DC-aware
// RPC is an interface that an RPC client must implement.
type RPC interface {
RPC(method string, args interface{}, reply interface{}) error
}
// Cache is a agent-local cache of Consul data.
type Cache struct {
// rpcClient is the RPC-client.
rpcClient RPC
entriesLock sync.RWMutex
entries map[string]cacheEntry
typesLock sync.RWMutex
types map[string]typeEntry
}
type cacheEntry struct {
// Fields pertaining to the actual value
Value interface{}
Error error
Index uint64
// Metadata that is used for internal accounting
Valid bool
Fetching bool
Waiter chan struct{}
}
// typeEntry is a single type that is registered with a Cache.
type typeEntry struct {
Type Type
Opts *RegisterOptions
}
// New creates a new cache with the given RPC client and reasonable defaults.
// Further settings can be tweaked on the returned value.
func New(rpc RPC) *Cache {
return &Cache{
rpcClient: rpc,
entries: make(map[string]cacheEntry),
types: make(map[string]typeEntry),
}
}
// RegisterOptions are options that can be associated with a type being
// registered for the cache. This changes the behavior of the cache for
// this type.
type RegisterOptions struct {
// Refresh configures whether the data is actively refreshed or if
// the data is only refreshed on an explicit Get. The default (false)
// is to only request data on explicit Get.
Refresh bool
// RefreshTimer is the time between attempting to refresh data.
// If this is zero, then data is refreshed immediately when a fetch
// is returned.
//
// RefreshTimeout determines the maximum query time for a refresh
// operation. This is specified as part of the query options and is
// expected to be implemented by the Type itself.
//
// Using these values, various "refresh" mechanisms can be implemented:
//
// * With a high timer duration and a low timeout, a timer-based
// refresh can be set that minimizes load on the Consul servers.
//
// * With a low timer and high timeout duration, a blocking-query-based
// refresh can be set so that changes in server data are recognized
// within the cache very quickly.
//
RefreshTimer time.Duration
RefreshTimeout time.Duration
}
// RegisterType registers a cacheable type.
func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
c.typesLock.Lock()
defer c.typesLock.Unlock()
c.types[n] = typeEntry{Type: typ, Opts: opts}
}
// Get loads the data for the given type and request. If data satisfying the
// minimum index is present in the cache, it is returned immediately. Otherwise,
// this will block until the data is available or the request timeout is
// reached.
//
// Multiple Get calls for the same Request (matching CacheKey value) will
// block on a single network request.
func (c *Cache) Get(t string, r Request) (interface{}, error) {
key := r.CacheKey()
idx := r.CacheMinIndex()
RETRY_GET:
// Get the current value
c.entriesLock.RLock()
entry, ok := c.entries[key]
c.entriesLock.RUnlock()
// If we have a current value and the index is greater than the
// currently stored index then we return that right away. If the
// index is zero and we have something in the cache we accept whatever
// we have.
if ok && entry.Valid && (idx == 0 || idx < entry.Index) {
return entry.Value, nil
}
// At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data.
waiter, err := c.fetch(t, r)
if err != nil {
return nil, err
}
// Wait on our waiter and then retry the cache load
<-waiter
goto RETRY_GET
}
func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown type in cache: %s", t)
}
// The cache key is used multiple times and might be dynamically
// constructed so let's just store it once here.
key := r.CacheKey()
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
// If we already have an entry and it is actively fetching, then return
// the currently active waiter.
if ok && entry.Fetching {
return entry.Waiter, nil
}
// If we don't have an entry, then create it. The entry must be marked
// as invalid so that it isn't returned as a valid value for a zero index.
if !ok {
entry = cacheEntry{Valid: false, Waiter: make(chan struct{})}
}
// Set that we're fetching to true, which makes it so that future
// identical calls to fetch will return the same waiter rather than
// perform multiple fetches.
entry.Fetching = true
c.entries[key] = entry
// The actual Fetch must be performed in a goroutine.
go func() {
// Start building the new entry by blocking on the fetch.
var newEntry cacheEntry
result, err := tEntry.Type.Fetch(FetchOptions{
RPC: c.rpcClient,
MinIndex: entry.Index,
}, r)
newEntry.Value = result.Value
newEntry.Index = result.Index
newEntry.Error = err
// This is a valid entry with a result
newEntry.Valid = true
// Create a new waiter that will be used for the next fetch.
newEntry.Waiter = make(chan struct{})
// Insert
c.entriesLock.Lock()
c.entries[key] = newEntry
c.entriesLock.Unlock()
// Trigger the waiter
close(entry.Waiter)
// If refresh is enabled, run the refresh in due time. The refresh
// below might block, but saves us from spawning another goroutine.
if tEntry.Opts != nil && tEntry.Opts.Refresh {
c.refresh(tEntry.Opts, t, r)
}
}()
return entry.Waiter, nil
}
func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) {
// Sanity-check, we should not schedule anything that has refresh disabled
if !opts.Refresh {
return
}
// If we have a timer, wait for it
if opts.RefreshTimer > 0 {
time.Sleep(opts.RefreshTimer)
}
// Trigger
c.fetch(t, r)
}

200
agent/cache/cache_test.go vendored Normal file
View File

@ -0,0 +1,200 @@
package cache
import (
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
// Test a basic Get with no indexes (and therefore no blocking queries).
func TestCacheGet_noIndex(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(1)
// Get, should fetch
req := TestRequest(t, "hello", 0)
result, err := c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Get, should not fetch since we already have a satisfying value
result, err = c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}
// Test that Get blocks on the initial value
func TestCacheGet_blockingInitSameKey(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
// Configure the type
triggerCh := make(chan time.Time)
typ.Static(FetchResult{Value: 42}, nil).WaitUntil(triggerCh).Times(1)
// Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0))
getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0))
// They should block
select {
case <-getCh1:
t.Fatal("should block (ch1)")
case <-getCh2:
t.Fatal("should block (ch2)")
case <-time.After(50 * time.Millisecond):
}
// Trigger it
close(triggerCh)
// Should return
TestCacheGetChResult(t, getCh1, 42)
TestCacheGetChResult(t, getCh2, 42)
}
// Test that Get with different cache keys both block on initial value
// but that the fetches were both properly called.
func TestCacheGet_blockingInitDiffKeys(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
// Keep track of the keys
var keysLock sync.Mutex
var keys []string
// Configure the type
triggerCh := make(chan time.Time)
typ.Static(FetchResult{Value: 42}, nil).
WaitUntil(triggerCh).
Times(2).
Run(func(args mock.Arguments) {
keysLock.Lock()
defer keysLock.Unlock()
keys = append(keys, args.Get(1).(Request).CacheKey())
})
// Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0))
getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "goodbye", 0))
// They should block
select {
case <-getCh1:
t.Fatal("should block (ch1)")
case <-getCh2:
t.Fatal("should block (ch2)")
case <-time.After(50 * time.Millisecond):
}
// Trigger it
close(triggerCh)
// Should return both!
TestCacheGetChResult(t, getCh1, 42)
TestCacheGetChResult(t, getCh2, 42)
// Verify proper keys
sort.Strings(keys)
require.Equal([]string{"goodbye", "hello"}, keys)
}
// Test a get with an index set will wait until an index that is higher
// is set in the cache.
func TestCacheGet_blockingIndex(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
// Configure the type
triggerCh := make(chan time.Time)
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once()
typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh)
// Fetch should block
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 5))
// Should block
select {
case <-resultCh:
t.Fatal("should block")
case <-time.After(50 * time.Millisecond):
}
// Wait a bit
close(triggerCh)
// Should return
TestCacheGetChResult(t, resultCh, 42)
}
// Test that a type registered with a periodic refresh will perform
// that refresh after the timer is up.
func TestCacheGet_periodicRefresh(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: true,
RefreshTimer: 100 * time.Millisecond,
RefreshTimeout: 5 * time.Minute,
})
// This is a bit weird, but we do this to ensure that the final
// call to the Fetch (if it happens, depends on timing) just blocks.
triggerCh := make(chan time.Time)
defer close(triggerCh)
// Configure the type
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once()
typ.Static(FetchResult{Value: 12, Index: 5}, nil).WaitUntil(triggerCh)
// Fetch should block
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0))
TestCacheGetChResult(t, resultCh, 1)
// Fetch again almost immediately should return old result
time.Sleep(5 * time.Millisecond)
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0))
TestCacheGetChResult(t, resultCh, 1)
// Wait for the timer
time.Sleep(200 * time.Millisecond)
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0))
TestCacheGetChResult(t, resultCh, 12)
}

23
agent/cache/mock_RPC.go vendored Normal file
View File

@ -0,0 +1,23 @@
// Code generated by mockery v1.0.0
package cache
import mock "github.com/stretchr/testify/mock"
// MockRPC is an autogenerated mock type for the RPC type
type MockRPC struct {
mock.Mock
}
// RPC provides a mock function with given fields: method, args, reply
func (_m *MockRPC) RPC(method string, args interface{}, reply interface{}) error {
ret := _m.Called(method, args, reply)
var r0 error
if rf, ok := ret.Get(0).(func(string, interface{}, interface{}) error); ok {
r0 = rf(method, args, reply)
} else {
r0 = ret.Error(0)
}
return r0
}

37
agent/cache/mock_Request.go vendored Normal file
View File

@ -0,0 +1,37 @@
// Code generated by mockery v1.0.0
package cache
import mock "github.com/stretchr/testify/mock"
// MockRequest is an autogenerated mock type for the Request type
type MockRequest struct {
mock.Mock
}
// CacheKey provides a mock function with given fields:
func (_m *MockRequest) CacheKey() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// CacheMinIndex provides a mock function with given fields:
func (_m *MockRequest) CacheMinIndex() uint64 {
ret := _m.Called()
var r0 uint64
if rf, ok := ret.Get(0).(func() uint64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(uint64)
}
return r0
}

30
agent/cache/mock_Type.go vendored Normal file
View File

@ -0,0 +1,30 @@
// Code generated by mockery v1.0.0
package cache
import mock "github.com/stretchr/testify/mock"
// MockType is an autogenerated mock type for the Type type
type MockType struct {
mock.Mock
}
// Fetch provides a mock function with given fields: _a0, _a1
func (_m *MockType) Fetch(_a0 FetchOptions, _a1 Request) (FetchResult, error) {
ret := _m.Called(_a0, _a1)
var r0 FetchResult
if rf, ok := ret.Get(0).(func(FetchOptions, Request) FetchResult); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(FetchResult)
}
var r1 error
if rf, ok := ret.Get(1).(func(FetchOptions, Request) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

17
agent/cache/request.go vendored Normal file
View File

@ -0,0 +1,17 @@
package cache
// Request is a cache-able request.
//
// This interface is typically implemented by request structures in
// the agent/structs package.
type Request interface {
// CacheKey is a unique cache key for this request. This key should
// absolutely uniquely identify this request, since any conflicting
// cache keys could result in invalid data being returned from the cache.
CacheKey() string
// CacheMinIndex is the minimum index being queried. This is used to
// determine if we already have data satisfying the query or if we need
// to block until new data is available.
CacheMinIndex() uint64
}

84
agent/cache/testing.go vendored Normal file
View File

@ -0,0 +1,84 @@
package cache
import (
"reflect"
"time"
"github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/mock"
)
// TestCache returns a Cache instance configuring for testing.
func TestCache(t testing.T) *Cache {
// Simple but lets us do some fine-tuning later if we want to.
return New(TestRPC(t))
}
// TestCacheGetCh returns a channel that returns the result of the Get call.
// This is useful for testing timing and concurrency with Get calls. Any
// error will be logged, so the result value should always be asserted.
func TestCacheGetCh(t testing.T, c *Cache, typ string, r Request) <-chan interface{} {
resultCh := make(chan interface{})
go func() {
result, err := c.Get(typ, r)
if err != nil {
t.Logf("Error: %s", err)
close(resultCh)
return
}
resultCh <- result
}()
return resultCh
}
// TestCacheGetChResult tests that the result from TestCacheGetCh matches
// within a reasonable period of time (it expects it to be "immediate" but
// waits some milliseconds).
func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface{}) {
t.Helper()
select {
case result := <-ch:
if !reflect.DeepEqual(result, expected) {
t.Fatalf("Result doesn't match!\n\n%#v\n\n%#v", result, expected)
}
case <-time.After(50 * time.Millisecond):
}
}
// TestRequest returns a Request that returns the given cache key and index.
// The Reset method can be called to reset it for custom usage.
func TestRequest(t testing.T, key string, index uint64) *MockRequest {
req := &MockRequest{}
req.On("CacheKey").Return(key)
req.On("CacheMinIndex").Return(index)
return req
}
// TestRPC returns a mock implementation of the RPC interface.
func TestRPC(t testing.T) *MockRPC {
// This function is relatively useless but this allows us to perhaps
// perform some initialization later.
return &MockRPC{}
}
// TestType returns a MockType that can be used to setup expectations
// on data fetching.
func TestType(t testing.T) *MockType {
typ := &MockType{}
return typ
}
// A bit weird, but we add methods to the auto-generated structs here so that
// they don't get clobbered. The helper methods are conveniences.
// Static sets a static value to return for a call to Fetch.
func (m *MockType) Static(r FetchResult, err error) *mock.Call {
return m.Mock.On("Fetch", mock.Anything, mock.Anything).Return(r, err)
}
func (m *MockRequest) Reset() {
m.Mock = mock.Mock{}
}

68
agent/cache/type.go vendored Normal file
View File

@ -0,0 +1,68 @@
package cache
import (
"time"
)
// Type implement the logic to fetch certain types of data.
type Type interface {
// Fetch fetches a single unique item.
//
// The FetchOptions contain the index and timeouts for blocking queries.
// The CacheMinIndex value on the Request itself should NOT be used
// as the blocking index since a request may be reused multiple times
// as part of Refresh behavior.
//
// The return value is a FetchResult which contains information about
// the fetch.
Fetch(FetchOptions, Request) (FetchResult, error)
}
// FetchOptions are various settable options when a Fetch is called.
type FetchOptions struct {
// RPC is the RPC client to communicate to a Consul server.
RPC RPC
// MinIndex is the minimum index to be used for blocking queries.
// If blocking queries aren't supported for data being returned,
// this value can be ignored.
MinIndex uint64
// Timeout is the maximum time for the query. This must be implemented
// in the Fetch itself.
Timeout time.Duration
}
// FetchResult is the result of a Type Fetch operation and contains the
// data along with metadata gathered from that operation.
type FetchResult struct {
// Value is the result of the fetch.
Value interface{}
// Index is the corresponding index value for this data.
Index uint64
}
/*
type TypeCARoot struct{}
func (c *TypeCARoot) Fetch(delegate RPC, idx uint64, req Request) (interface{}, uint64, error) {
// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.DCSpecificRequest)
if !ok {
return nil, 0, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = idx
// Fetch
var reply structs.IndexedCARoots
if err := delegate.RPC("ConnectCA.Roots", reqReal, &reply); err != nil {
return nil, 0, err
}
return &reply, reply.QueryMeta.Index, nil
}
*/