From c329b4cb341793ba16b043dbf69a205c8e9a3bd9 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 10 Apr 2018 16:05:34 +0100 Subject: [PATCH] agent/cache: partition by DC/ACL token --- agent/cache/cache.go | 48 +++++++++++++++++++++++------------- agent/cache/cache_test.go | 52 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 17 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 04323a4c58..c512476d56 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -20,15 +20,23 @@ import ( //go:generate mockery -all -inpkg -// TODO: DC-aware, ACL-aware - // Cache is a agent-local cache of Consul data. type Cache struct { - entriesLock sync.RWMutex - entries map[string]cacheEntry - + // types stores the list of data types that the cache knows how to service. + // These can be dynamically registered with RegisterType. typesLock sync.RWMutex types map[string]typeEntry + + // entries contains the actual cache data. + // + // NOTE(mitchellh): The entry map key is currently a string in the format + // of "//" in order to properly partition + // requests to different datacenters and ACL tokens. This format has some + // big drawbacks: we can't evict by datacenter, ACL token, etc. For an + // initial implementaiton this works and the tests are agnostic to the + // internal storage format so changing this should be possible safely. + entriesLock sync.RWMutex + entries map[string]cacheEntry } // cacheEntry stores a single cache entry. @@ -116,10 +124,13 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) { return c.fetchDirect(t, r) } + // Get the actual key for our entry + key := c.entryKey(&info) + RETRY_GET: // Get the current value c.entriesLock.RLock() - entry, ok := c.entries[info.Key] + entry, ok := c.entries[key] c.entriesLock.RUnlock() // If we have a current value and the index is greater than the @@ -134,7 +145,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiter, err := c.fetch(t, r) + waiter, err := c.fetch(t, key, r) if err != nil { return nil, err } @@ -144,7 +155,13 @@ RETRY_GET: goto RETRY_GET } -func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { +// entryKey returns the key for the entry in the cache. See the note +// about the entry key format in the structure docs for Cache. +func (c *Cache) entryKey(r *RequestInfo) string { + return fmt.Sprintf("%s/%s/%s", r.Datacenter, r.Token, r.Key) +} + +func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { // Get the type that we're fetching c.typesLock.RLock() tEntry, ok := c.types[t] @@ -153,12 +170,9 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { return nil, fmt.Errorf("unknown type in cache: %s", t) } - // Grab the cache information while we're outside the lock. - info := r.CacheInfo() - c.entriesLock.Lock() defer c.entriesLock.Unlock() - entry, ok := c.entries[info.Key] + entry, ok := c.entries[key] // If we already have an entry and it is actively fetching, then return // the currently active waiter. @@ -176,7 +190,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { // identical calls to fetch will return the same waiter rather than // perform multiple fetches. entry.Fetching = true - c.entries[info.Key] = entry + c.entries[key] = entry // The actual Fetch must be performed in a goroutine. go func() { @@ -197,7 +211,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { // Insert c.entriesLock.Lock() - c.entries[info.Key] = newEntry + c.entries[key] = newEntry c.entriesLock.Unlock() // Trigger the waiter @@ -206,7 +220,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. if tEntry.Opts != nil && tEntry.Opts.Refresh { - c.refresh(tEntry.Opts, t, r) + c.refresh(tEntry.Opts, t, key, r) } }() @@ -235,7 +249,7 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) { return result.Value, nil } -func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) { +func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request) { // Sanity-check, we should not schedule anything that has refresh disabled if !opts.Refresh { return @@ -247,5 +261,5 @@ func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) { } // Trigger - c.fetch(t, r) + c.fetch(t, key, r) } diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 1bfed590c1..1e75490a03 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -1,6 +1,7 @@ package cache import ( + "fmt" "sort" "sync" "testing" @@ -231,3 +232,54 @@ func TestCacheGet_periodicRefresh(t *testing.T) { resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) TestCacheGetChResult(t, resultCh, 12) } + +// Test that Get partitions the caches based on DC so two equivalent requests +// to different datacenters are automatically cached even if their keys are +// the same. +func TestCacheGet_partitionDC(t *testing.T) { + t.Parallel() + + c := TestCache(t) + c.RegisterType("t", &testPartitionType{}, nil) + + // Perform multiple gets + getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ + Datacenter: "dc1", Key: "hello"})) + getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ + Datacenter: "dc9", Key: "hello"})) + + // Should return both! + TestCacheGetChResult(t, getCh1, "dc1") + TestCacheGetChResult(t, getCh2, "dc9") +} + +// Test that Get partitions the caches based on token so two equivalent requests +// with different ACL tokens do not return the same result. +func TestCacheGet_partitionToken(t *testing.T) { + t.Parallel() + + c := TestCache(t) + c.RegisterType("t", &testPartitionType{}, nil) + + // Perform multiple gets + getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ + Token: "", Key: "hello"})) + getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ + Token: "foo", Key: "hello"})) + + // Should return both! + TestCacheGetChResult(t, getCh1, "") + TestCacheGetChResult(t, getCh2, "foo") +} + +// testPartitionType implements Type for testing that simply returns a value +// comprised of the request DC and ACL token, used for testing cache +// partitioning. +type testPartitionType struct{} + +func (t *testPartitionType) Fetch(opts FetchOptions, r Request) (FetchResult, error) { + info := r.CacheInfo() + return FetchResult{ + Value: fmt.Sprintf("%s%s", info.Datacenter, info.Token), + }, nil +}