agent/cache: partition by DC/ACL token

pull/4275/head
Mitchell Hashimoto 2018-04-10 16:05:34 +01:00
parent e3c1162881
commit c329b4cb34
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
2 changed files with 83 additions and 17 deletions

48
agent/cache/cache.go vendored
View File

@ -20,15 +20,23 @@ import (
//go:generate mockery -all -inpkg //go:generate mockery -all -inpkg
// TODO: DC-aware, ACL-aware
// Cache is a agent-local cache of Consul data. // Cache is a agent-local cache of Consul data.
type Cache struct { type Cache struct {
entriesLock sync.RWMutex // types stores the list of data types that the cache knows how to service.
entries map[string]cacheEntry // These can be dynamically registered with RegisterType.
typesLock sync.RWMutex typesLock sync.RWMutex
types map[string]typeEntry types map[string]typeEntry
// entries contains the actual cache data.
//
// NOTE(mitchellh): The entry map key is currently a string in the format
// of "<DC>/<ACL token>/<Request key>" in order to properly partition
// requests to different datacenters and ACL tokens. This format has some
// big drawbacks: we can't evict by datacenter, ACL token, etc. For an
// initial implementaiton this works and the tests are agnostic to the
// internal storage format so changing this should be possible safely.
entriesLock sync.RWMutex
entries map[string]cacheEntry
} }
// cacheEntry stores a single cache entry. // cacheEntry stores a single cache entry.
@ -116,10 +124,13 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) {
return c.fetchDirect(t, r) return c.fetchDirect(t, r)
} }
// Get the actual key for our entry
key := c.entryKey(&info)
RETRY_GET: RETRY_GET:
// Get the current value // Get the current value
c.entriesLock.RLock() c.entriesLock.RLock()
entry, ok := c.entries[info.Key] entry, ok := c.entries[key]
c.entriesLock.RUnlock() c.entriesLock.RUnlock()
// If we have a current value and the index is greater than the // If we have a current value and the index is greater than the
@ -134,7 +145,7 @@ RETRY_GET:
// At this point, we know we either don't have a value at all or the // At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data. // value we have is too old. We need to wait for new data.
waiter, err := c.fetch(t, r) waiter, err := c.fetch(t, key, r)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -144,7 +155,13 @@ RETRY_GET:
goto RETRY_GET goto RETRY_GET
} }
func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { // entryKey returns the key for the entry in the cache. See the note
// about the entry key format in the structure docs for Cache.
func (c *Cache) entryKey(r *RequestInfo) string {
return fmt.Sprintf("%s/%s/%s", r.Datacenter, r.Token, r.Key)
}
func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
// Get the type that we're fetching // Get the type that we're fetching
c.typesLock.RLock() c.typesLock.RLock()
tEntry, ok := c.types[t] tEntry, ok := c.types[t]
@ -153,12 +170,9 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
return nil, fmt.Errorf("unknown type in cache: %s", t) return nil, fmt.Errorf("unknown type in cache: %s", t)
} }
// Grab the cache information while we're outside the lock.
info := r.CacheInfo()
c.entriesLock.Lock() c.entriesLock.Lock()
defer c.entriesLock.Unlock() defer c.entriesLock.Unlock()
entry, ok := c.entries[info.Key] entry, ok := c.entries[key]
// If we already have an entry and it is actively fetching, then return // If we already have an entry and it is actively fetching, then return
// the currently active waiter. // the currently active waiter.
@ -176,7 +190,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
// identical calls to fetch will return the same waiter rather than // identical calls to fetch will return the same waiter rather than
// perform multiple fetches. // perform multiple fetches.
entry.Fetching = true entry.Fetching = true
c.entries[info.Key] = entry c.entries[key] = entry
// The actual Fetch must be performed in a goroutine. // The actual Fetch must be performed in a goroutine.
go func() { go func() {
@ -197,7 +211,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
// Insert // Insert
c.entriesLock.Lock() c.entriesLock.Lock()
c.entries[info.Key] = newEntry c.entries[key] = newEntry
c.entriesLock.Unlock() c.entriesLock.Unlock()
// Trigger the waiter // Trigger the waiter
@ -206,7 +220,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
// If refresh is enabled, run the refresh in due time. The refresh // If refresh is enabled, run the refresh in due time. The refresh
// below might block, but saves us from spawning another goroutine. // below might block, but saves us from spawning another goroutine.
if tEntry.Opts != nil && tEntry.Opts.Refresh { if tEntry.Opts != nil && tEntry.Opts.Refresh {
c.refresh(tEntry.Opts, t, r) c.refresh(tEntry.Opts, t, key, r)
} }
}() }()
@ -235,7 +249,7 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
return result.Value, nil return result.Value, nil
} }
func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) { func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request) {
// Sanity-check, we should not schedule anything that has refresh disabled // Sanity-check, we should not schedule anything that has refresh disabled
if !opts.Refresh { if !opts.Refresh {
return return
@ -247,5 +261,5 @@ func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) {
} }
// Trigger // Trigger
c.fetch(t, r) c.fetch(t, key, r)
} }

View File

@ -1,6 +1,7 @@
package cache package cache
import ( import (
"fmt"
"sort" "sort"
"sync" "sync"
"testing" "testing"
@ -231,3 +232,54 @@ func TestCacheGet_periodicRefresh(t *testing.T) {
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
TestCacheGetChResult(t, resultCh, 12) TestCacheGetChResult(t, resultCh, 12)
} }
// Test that Get partitions the caches based on DC so two equivalent requests
// to different datacenters are automatically cached even if their keys are
// the same.
func TestCacheGet_partitionDC(t *testing.T) {
t.Parallel()
c := TestCache(t)
c.RegisterType("t", &testPartitionType{}, nil)
// Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
Datacenter: "dc1", Key: "hello"}))
getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
Datacenter: "dc9", Key: "hello"}))
// Should return both!
TestCacheGetChResult(t, getCh1, "dc1")
TestCacheGetChResult(t, getCh2, "dc9")
}
// Test that Get partitions the caches based on token so two equivalent requests
// with different ACL tokens do not return the same result.
func TestCacheGet_partitionToken(t *testing.T) {
t.Parallel()
c := TestCache(t)
c.RegisterType("t", &testPartitionType{}, nil)
// Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
Token: "", Key: "hello"}))
getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
Token: "foo", Key: "hello"}))
// Should return both!
TestCacheGetChResult(t, getCh1, "")
TestCacheGetChResult(t, getCh2, "foo")
}
// testPartitionType implements Type for testing that simply returns a value
// comprised of the request DC and ACL token, used for testing cache
// partitioning.
type testPartitionType struct{}
func (t *testPartitionType) Fetch(opts FetchOptions, r Request) (FetchResult, error) {
info := r.CacheInfo()
return FetchResult{
Value: fmt.Sprintf("%s%s", info.Datacenter, info.Token),
}, nil
}