From 1cfb0f1922c164d6566d515b9d5e5742faa6aade Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 3 Apr 2018 20:46:07 -0700 Subject: [PATCH] agent/cache: initial kind-of working cache --- agent/cache/cache.go | 239 ++++++++++++++++++++++++++++++++++++ agent/cache/cache_test.go | 200 ++++++++++++++++++++++++++++++ agent/cache/mock_RPC.go | 23 ++++ agent/cache/mock_Request.go | 37 ++++++ agent/cache/mock_Type.go | 30 +++++ agent/cache/request.go | 17 +++ agent/cache/testing.go | 84 +++++++++++++ agent/cache/type.go | 68 ++++++++++ 8 files changed, 698 insertions(+) create mode 100644 agent/cache/cache.go create mode 100644 agent/cache/cache_test.go create mode 100644 agent/cache/mock_RPC.go create mode 100644 agent/cache/mock_Request.go create mode 100644 agent/cache/mock_Type.go create mode 100644 agent/cache/request.go create mode 100644 agent/cache/testing.go create mode 100644 agent/cache/type.go diff --git a/agent/cache/cache.go b/agent/cache/cache.go new file mode 100644 index 0000000000..d0172cdc0a --- /dev/null +++ b/agent/cache/cache.go @@ -0,0 +1,239 @@ +// Package cache provides caching features for data from a Consul server. +// +// While this is similar in some ways to the "agent/ae" package, a key +// difference is that with anti-entropy, the agent is the authoritative +// source so it resolves differences the server may have. With caching (this +// package), the server is the authoritative source and we do our best to +// balance performance and correctness, depending on the type of data being +// requested. +// +// Currently, the cache package supports only continuous, blocking query +// caching. This means that the cache update is edge-triggered by Consul +// server blocking queries. +package cache + +import ( + "fmt" + "sync" + "time" +) + +//go:generate mockery -all -inpkg + +// Pre-written options for type registration. These should not be modified. +var ( + // RegisterOptsPeriodic performs a periodic refresh of data fetched + // by the registered type. + RegisterOptsPeriodic = &RegisterOptions{ + Refresh: true, + RefreshTimer: 30 * time.Second, + RefreshTimeout: 5 * time.Minute, + } +) + +// TODO: DC-aware + +// RPC is an interface that an RPC client must implement. +type RPC interface { + RPC(method string, args interface{}, reply interface{}) error +} + +// Cache is a agent-local cache of Consul data. +type Cache struct { + // rpcClient is the RPC-client. + rpcClient RPC + + entriesLock sync.RWMutex + entries map[string]cacheEntry + + typesLock sync.RWMutex + types map[string]typeEntry +} + +type cacheEntry struct { + // Fields pertaining to the actual value + Value interface{} + Error error + Index uint64 + + // Metadata that is used for internal accounting + Valid bool + Fetching bool + Waiter chan struct{} +} + +// typeEntry is a single type that is registered with a Cache. +type typeEntry struct { + Type Type + Opts *RegisterOptions +} + +// New creates a new cache with the given RPC client and reasonable defaults. +// Further settings can be tweaked on the returned value. +func New(rpc RPC) *Cache { + return &Cache{ + rpcClient: rpc, + entries: make(map[string]cacheEntry), + types: make(map[string]typeEntry), + } +} + +// RegisterOptions are options that can be associated with a type being +// registered for the cache. This changes the behavior of the cache for +// this type. +type RegisterOptions struct { + // Refresh configures whether the data is actively refreshed or if + // the data is only refreshed on an explicit Get. The default (false) + // is to only request data on explicit Get. + Refresh bool + + // RefreshTimer is the time between attempting to refresh data. + // If this is zero, then data is refreshed immediately when a fetch + // is returned. + // + // RefreshTimeout determines the maximum query time for a refresh + // operation. This is specified as part of the query options and is + // expected to be implemented by the Type itself. + // + // Using these values, various "refresh" mechanisms can be implemented: + // + // * With a high timer duration and a low timeout, a timer-based + // refresh can be set that minimizes load on the Consul servers. + // + // * With a low timer and high timeout duration, a blocking-query-based + // refresh can be set so that changes in server data are recognized + // within the cache very quickly. + // + RefreshTimer time.Duration + RefreshTimeout time.Duration +} + +// RegisterType registers a cacheable type. +func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { + c.typesLock.Lock() + defer c.typesLock.Unlock() + c.types[n] = typeEntry{Type: typ, Opts: opts} +} + +// Get loads the data for the given type and request. If data satisfying the +// minimum index is present in the cache, it is returned immediately. Otherwise, +// this will block until the data is available or the request timeout is +// reached. +// +// Multiple Get calls for the same Request (matching CacheKey value) will +// block on a single network request. +func (c *Cache) Get(t string, r Request) (interface{}, error) { + key := r.CacheKey() + idx := r.CacheMinIndex() + +RETRY_GET: + // Get the current value + c.entriesLock.RLock() + entry, ok := c.entries[key] + c.entriesLock.RUnlock() + + // If we have a current value and the index is greater than the + // currently stored index then we return that right away. If the + // index is zero and we have something in the cache we accept whatever + // we have. + if ok && entry.Valid && (idx == 0 || idx < entry.Index) { + return entry.Value, nil + } + + // At this point, we know we either don't have a value at all or the + // value we have is too old. We need to wait for new data. + waiter, err := c.fetch(t, r) + if err != nil { + return nil, err + } + + // Wait on our waiter and then retry the cache load + <-waiter + goto RETRY_GET +} + +func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { + // Get the type that we're fetching + c.typesLock.RLock() + tEntry, ok := c.types[t] + c.typesLock.RUnlock() + if !ok { + return nil, fmt.Errorf("unknown type in cache: %s", t) + } + + // The cache key is used multiple times and might be dynamically + // constructed so let's just store it once here. + key := r.CacheKey() + + c.entriesLock.Lock() + defer c.entriesLock.Unlock() + entry, ok := c.entries[key] + + // If we already have an entry and it is actively fetching, then return + // the currently active waiter. + if ok && entry.Fetching { + return entry.Waiter, nil + } + + // If we don't have an entry, then create it. The entry must be marked + // as invalid so that it isn't returned as a valid value for a zero index. + if !ok { + entry = cacheEntry{Valid: false, Waiter: make(chan struct{})} + } + + // Set that we're fetching to true, which makes it so that future + // identical calls to fetch will return the same waiter rather than + // perform multiple fetches. + entry.Fetching = true + c.entries[key] = entry + + // The actual Fetch must be performed in a goroutine. + go func() { + // Start building the new entry by blocking on the fetch. + var newEntry cacheEntry + result, err := tEntry.Type.Fetch(FetchOptions{ + RPC: c.rpcClient, + MinIndex: entry.Index, + }, r) + newEntry.Value = result.Value + newEntry.Index = result.Index + newEntry.Error = err + + // This is a valid entry with a result + newEntry.Valid = true + + // Create a new waiter that will be used for the next fetch. + newEntry.Waiter = make(chan struct{}) + + // Insert + c.entriesLock.Lock() + c.entries[key] = newEntry + c.entriesLock.Unlock() + + // Trigger the waiter + close(entry.Waiter) + + // If refresh is enabled, run the refresh in due time. The refresh + // below might block, but saves us from spawning another goroutine. + if tEntry.Opts != nil && tEntry.Opts.Refresh { + c.refresh(tEntry.Opts, t, r) + } + }() + + return entry.Waiter, nil +} + +func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) { + // Sanity-check, we should not schedule anything that has refresh disabled + if !opts.Refresh { + return + } + + // If we have a timer, wait for it + if opts.RefreshTimer > 0 { + time.Sleep(opts.RefreshTimer) + } + + // Trigger + c.fetch(t, r) +} diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go new file mode 100644 index 0000000000..d82ded1954 --- /dev/null +++ b/agent/cache/cache_test.go @@ -0,0 +1,200 @@ +package cache + +import ( + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// Test a basic Get with no indexes (and therefore no blocking queries). +func TestCacheGet_noIndex(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Configure the type + typ.Static(FetchResult{Value: 42}, nil).Times(1) + + // Get, should fetch + req := TestRequest(t, "hello", 0) + result, err := c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Get, should not fetch since we already have a satisfying value + result, err = c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Sleep a tiny bit just to let maybe some background calls happen + // then verify that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) +} + +// Test that Get blocks on the initial value +func TestCacheGet_blockingInitSameKey(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Configure the type + triggerCh := make(chan time.Time) + typ.Static(FetchResult{Value: 42}, nil).WaitUntil(triggerCh).Times(1) + + // Perform multiple gets + getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + + // They should block + select { + case <-getCh1: + t.Fatal("should block (ch1)") + case <-getCh2: + t.Fatal("should block (ch2)") + case <-time.After(50 * time.Millisecond): + } + + // Trigger it + close(triggerCh) + + // Should return + TestCacheGetChResult(t, getCh1, 42) + TestCacheGetChResult(t, getCh2, 42) +} + +// Test that Get with different cache keys both block on initial value +// but that the fetches were both properly called. +func TestCacheGet_blockingInitDiffKeys(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Keep track of the keys + var keysLock sync.Mutex + var keys []string + + // Configure the type + triggerCh := make(chan time.Time) + typ.Static(FetchResult{Value: 42}, nil). + WaitUntil(triggerCh). + Times(2). + Run(func(args mock.Arguments) { + keysLock.Lock() + defer keysLock.Unlock() + keys = append(keys, args.Get(1).(Request).CacheKey()) + }) + + // Perform multiple gets + getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "goodbye", 0)) + + // They should block + select { + case <-getCh1: + t.Fatal("should block (ch1)") + case <-getCh2: + t.Fatal("should block (ch2)") + case <-time.After(50 * time.Millisecond): + } + + // Trigger it + close(triggerCh) + + // Should return both! + TestCacheGetChResult(t, getCh1, 42) + TestCacheGetChResult(t, getCh2, 42) + + // Verify proper keys + sort.Strings(keys) + require.Equal([]string{"goodbye", "hello"}, keys) +} + +// Test a get with an index set will wait until an index that is higher +// is set in the cache. +func TestCacheGet_blockingIndex(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Configure the type + triggerCh := make(chan time.Time) + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once() + typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once() + typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh) + + // Fetch should block + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 5)) + + // Should block + select { + case <-resultCh: + t.Fatal("should block") + case <-time.After(50 * time.Millisecond): + } + + // Wait a bit + close(triggerCh) + + // Should return + TestCacheGetChResult(t, resultCh, 42) +} + +// Test that a type registered with a periodic refresh will perform +// that refresh after the timer is up. +func TestCacheGet_periodicRefresh(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + Refresh: true, + RefreshTimer: 100 * time.Millisecond, + RefreshTimeout: 5 * time.Minute, + }) + + // This is a bit weird, but we do this to ensure that the final + // call to the Fetch (if it happens, depends on timing) just blocks. + triggerCh := make(chan time.Time) + defer close(triggerCh) + + // Configure the type + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once() + typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once() + typ.Static(FetchResult{Value: 12, Index: 5}, nil).WaitUntil(triggerCh) + + // Fetch should block + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + TestCacheGetChResult(t, resultCh, 1) + + // Fetch again almost immediately should return old result + time.Sleep(5 * time.Millisecond) + resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + TestCacheGetChResult(t, resultCh, 1) + + // Wait for the timer + time.Sleep(200 * time.Millisecond) + resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + TestCacheGetChResult(t, resultCh, 12) +} diff --git a/agent/cache/mock_RPC.go b/agent/cache/mock_RPC.go new file mode 100644 index 0000000000..a1100d2a7e --- /dev/null +++ b/agent/cache/mock_RPC.go @@ -0,0 +1,23 @@ +// Code generated by mockery v1.0.0 +package cache + +import mock "github.com/stretchr/testify/mock" + +// MockRPC is an autogenerated mock type for the RPC type +type MockRPC struct { + mock.Mock +} + +// RPC provides a mock function with given fields: method, args, reply +func (_m *MockRPC) RPC(method string, args interface{}, reply interface{}) error { + ret := _m.Called(method, args, reply) + + var r0 error + if rf, ok := ret.Get(0).(func(string, interface{}, interface{}) error); ok { + r0 = rf(method, args, reply) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/agent/cache/mock_Request.go b/agent/cache/mock_Request.go new file mode 100644 index 0000000000..1579121825 --- /dev/null +++ b/agent/cache/mock_Request.go @@ -0,0 +1,37 @@ +// Code generated by mockery v1.0.0 +package cache + +import mock "github.com/stretchr/testify/mock" + +// MockRequest is an autogenerated mock type for the Request type +type MockRequest struct { + mock.Mock +} + +// CacheKey provides a mock function with given fields: +func (_m *MockRequest) CacheKey() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// CacheMinIndex provides a mock function with given fields: +func (_m *MockRequest) CacheMinIndex() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} diff --git a/agent/cache/mock_Type.go b/agent/cache/mock_Type.go new file mode 100644 index 0000000000..110fc57876 --- /dev/null +++ b/agent/cache/mock_Type.go @@ -0,0 +1,30 @@ +// Code generated by mockery v1.0.0 +package cache + +import mock "github.com/stretchr/testify/mock" + +// MockType is an autogenerated mock type for the Type type +type MockType struct { + mock.Mock +} + +// Fetch provides a mock function with given fields: _a0, _a1 +func (_m *MockType) Fetch(_a0 FetchOptions, _a1 Request) (FetchResult, error) { + ret := _m.Called(_a0, _a1) + + var r0 FetchResult + if rf, ok := ret.Get(0).(func(FetchOptions, Request) FetchResult); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(FetchResult) + } + + var r1 error + if rf, ok := ret.Get(1).(func(FetchOptions, Request) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/agent/cache/request.go b/agent/cache/request.go new file mode 100644 index 0000000000..c75c8ad847 --- /dev/null +++ b/agent/cache/request.go @@ -0,0 +1,17 @@ +package cache + +// Request is a cache-able request. +// +// This interface is typically implemented by request structures in +// the agent/structs package. +type Request interface { + // CacheKey is a unique cache key for this request. This key should + // absolutely uniquely identify this request, since any conflicting + // cache keys could result in invalid data being returned from the cache. + CacheKey() string + + // CacheMinIndex is the minimum index being queried. This is used to + // determine if we already have data satisfying the query or if we need + // to block until new data is available. + CacheMinIndex() uint64 +} diff --git a/agent/cache/testing.go b/agent/cache/testing.go new file mode 100644 index 0000000000..7bf2bf8911 --- /dev/null +++ b/agent/cache/testing.go @@ -0,0 +1,84 @@ +package cache + +import ( + "reflect" + "time" + + "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/mock" +) + +// TestCache returns a Cache instance configuring for testing. +func TestCache(t testing.T) *Cache { + // Simple but lets us do some fine-tuning later if we want to. + return New(TestRPC(t)) +} + +// TestCacheGetCh returns a channel that returns the result of the Get call. +// This is useful for testing timing and concurrency with Get calls. Any +// error will be logged, so the result value should always be asserted. +func TestCacheGetCh(t testing.T, c *Cache, typ string, r Request) <-chan interface{} { + resultCh := make(chan interface{}) + go func() { + result, err := c.Get(typ, r) + if err != nil { + t.Logf("Error: %s", err) + close(resultCh) + return + } + + resultCh <- result + }() + + return resultCh +} + +// TestCacheGetChResult tests that the result from TestCacheGetCh matches +// within a reasonable period of time (it expects it to be "immediate" but +// waits some milliseconds). +func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface{}) { + t.Helper() + + select { + case result := <-ch: + if !reflect.DeepEqual(result, expected) { + t.Fatalf("Result doesn't match!\n\n%#v\n\n%#v", result, expected) + } + case <-time.After(50 * time.Millisecond): + } +} + +// TestRequest returns a Request that returns the given cache key and index. +// The Reset method can be called to reset it for custom usage. +func TestRequest(t testing.T, key string, index uint64) *MockRequest { + req := &MockRequest{} + req.On("CacheKey").Return(key) + req.On("CacheMinIndex").Return(index) + return req +} + +// TestRPC returns a mock implementation of the RPC interface. +func TestRPC(t testing.T) *MockRPC { + // This function is relatively useless but this allows us to perhaps + // perform some initialization later. + return &MockRPC{} +} + +// TestType returns a MockType that can be used to setup expectations +// on data fetching. +func TestType(t testing.T) *MockType { + typ := &MockType{} + return typ +} + +// A bit weird, but we add methods to the auto-generated structs here so that +// they don't get clobbered. The helper methods are conveniences. + +// Static sets a static value to return for a call to Fetch. +func (m *MockType) Static(r FetchResult, err error) *mock.Call { + return m.Mock.On("Fetch", mock.Anything, mock.Anything).Return(r, err) +} + +func (m *MockRequest) Reset() { + m.Mock = mock.Mock{} +} diff --git a/agent/cache/type.go b/agent/cache/type.go new file mode 100644 index 0000000000..fbb65761fc --- /dev/null +++ b/agent/cache/type.go @@ -0,0 +1,68 @@ +package cache + +import ( + "time" +) + +// Type implement the logic to fetch certain types of data. +type Type interface { + // Fetch fetches a single unique item. + // + // The FetchOptions contain the index and timeouts for blocking queries. + // The CacheMinIndex value on the Request itself should NOT be used + // as the blocking index since a request may be reused multiple times + // as part of Refresh behavior. + // + // The return value is a FetchResult which contains information about + // the fetch. + Fetch(FetchOptions, Request) (FetchResult, error) +} + +// FetchOptions are various settable options when a Fetch is called. +type FetchOptions struct { + // RPC is the RPC client to communicate to a Consul server. + RPC RPC + + // MinIndex is the minimum index to be used for blocking queries. + // If blocking queries aren't supported for data being returned, + // this value can be ignored. + MinIndex uint64 + + // Timeout is the maximum time for the query. This must be implemented + // in the Fetch itself. + Timeout time.Duration +} + +// FetchResult is the result of a Type Fetch operation and contains the +// data along with metadata gathered from that operation. +type FetchResult struct { + // Value is the result of the fetch. + Value interface{} + + // Index is the corresponding index value for this data. + Index uint64 +} + +/* +type TypeCARoot struct{} + +func (c *TypeCARoot) Fetch(delegate RPC, idx uint64, req Request) (interface{}, uint64, error) { + // The request should be a DCSpecificRequest. + reqReal, ok := req.(*structs.DCSpecificRequest) + if !ok { + return nil, 0, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = idx + + // Fetch + var reply structs.IndexedCARoots + if err := delegate.RPC("ConnectCA.Roots", reqReal, &reply); err != nil { + return nil, 0, err + } + + return &reply, reply.QueryMeta.Index, nil +} +*/