Merge pull request #7596 from hashicorp/dnephin/agent-cache-type-entry

agent/cache: move typeEntry lookup to the edge
pull/7639/head
Daniel Nephin 2020-04-13 12:24:07 -04:00 committed by GitHub
commit 1f25bf88b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 56 deletions

77
agent/cache/cache.go vendored
View File

@ -83,6 +83,8 @@ type Cache struct {
// typeEntry is a single type that is registered with a Cache.
type typeEntry struct {
// Name that was used to register the Type
Name string
Type Type
Opts *RegisterOptions
}
@ -193,7 +195,7 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
c.typesLock.Lock()
defer c.typesLock.Unlock()
c.types[n] = typeEntry{Type: typ, Opts: opts}
c.types[n] = typeEntry{Name: n, Type: typ, Opts: opts}
}
// Get loads the data for the given type and request. If data satisfying the
@ -211,7 +213,15 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
// error is returned on timeout. This matches the behavior of Consul blocking
// queries.
func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
return c.getWithIndex(t, r, r.CacheInfo().MinIndex)
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
// Shouldn't happen given that we successfully fetched this at least
// once. But be robust against panics.
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
return c.getWithIndex(tEntry, r, r.CacheInfo().MinIndex)
}
// getEntryLocked retrieves a cache entry and checks if it is ready to be
@ -258,18 +268,17 @@ func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duratio
// getWithIndex implements the main Get functionality but allows internal
// callers (Watch) to manipulate the blocking index separately from the actual
// request object.
func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
info := r.CacheInfo()
if info.Key == "" {
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
// If no key is specified, then we do not cache this request.
// Pass directly through to the backend.
return c.fetchDirect(t, r, minIndex)
return c.fetchDirect(tEntry, r, minIndex)
}
// Get the actual key for our entry
key := c.entryKey(t, &info)
key := makeEntryKey(tEntry.Name, info.Datacenter, info.Token, info.Key)
// First time through
first := true
@ -278,16 +287,6 @@ func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{},
var timeoutCh <-chan time.Time
RETRY_GET:
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
// Shouldn't happen given that we successfully fetched this at least
// once. But be robust against panics.
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
// Get the current value
c.entriesLock.RLock()
_, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
@ -296,7 +295,7 @@ RETRY_GET:
if cacheHit {
meta := ResultMeta{Index: entry.Index}
if first {
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1)
meta.Hit = true
}
@ -351,11 +350,11 @@ RETRY_GET:
// We increment two different counters for cache misses depending on
// whether we're missing because we didn't have the data at all,
// or if we're missing because we're blocking on a set index.
missKey := "miss_block"
if minIndex == 0 {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1)
} else {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1)
missKey = "miss_new"
}
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, missKey}, 1)
}
// Set our timeout channel if we must
@ -365,7 +364,7 @@ RETRY_GET:
// At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data.
waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first)
waiterCh, err := c.fetch(tEntry, key, r, true, 0, minIndex, false, !first)
if err != nil {
return nil, ResultMeta{Index: entry.Index}, err
}
@ -384,12 +383,6 @@ RETRY_GET:
}
}
// entryKey returns the key for the entry in the cache. See the note
// about the entry key format in the structure docs for Cache.
func (c *Cache) entryKey(t string, r *RequestInfo) string {
return makeEntryKey(t, r.Datacenter, r.Token, r.Key)
}
func makeEntryKey(t, dc, token, key string) string {
return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key)
}
@ -402,15 +395,7 @@ func makeEntryKey(t, dc, token, key string) string {
// If allowNew is true then the fetch should create the cache entry
// if it doesn't exist. If this is false, then fetch will do nothing
// if the entry doesn't exist. This latter case is to support refreshing.
func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown type in cache: %s", t)
}
func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) {
info := r.CacheInfo()
// We acquire a write lock because we may have to set Fetching to true.
@ -543,7 +528,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
// Error handling
if err == nil {
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1)
if result.Index > 0 {
// Reset the attempts counter so we don't have any backoff
@ -572,7 +557,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
}
} else {
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1)
// Increment attempt counter
attempt++
@ -613,7 +598,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
// If refresh is enabled, run the refresh in due time. The refresh
// below might block, but saves us from spawning another goroutine.
if tEntry.Opts.Refresh {
c.refresh(tEntry.Opts, attempt, t, key, r)
c.refresh(tEntry.Opts, attempt, tEntry, key, r)
}
}()
@ -623,15 +608,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
// fetchDirect fetches the given request with no caching. Because this
// bypasses the caching entirely, multiple matching requests will result
// in multiple actual RPC calls (unlike fetch).
func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{
MinIndex: minIndex,
@ -661,7 +638,7 @@ func backOffWait(failures uint) time.Duration {
// refresh triggers a fetch for a specific Request according to the
// registration options.
func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) {
func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) {
// Sanity-check, we should not schedule anything that has refresh disabled
if !opts.Refresh {
return
@ -684,7 +661,7 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin
// Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry.
c.fetch(t, key, r, false, attempt, 0, true, true)
c.fetch(tEntry, key, r, false, attempt, 0, true, true)
}
// runExpiryLoop is a blocking function that watches the expiration

12
agent/cache/watch.go vendored
View File

@ -61,19 +61,19 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request,
return fmt.Errorf("unknown type in cache: %s", t)
}
if tEntry.Type.SupportsBlocking() {
go c.notifyBlockingQuery(ctx, t, r, correlationID, ch)
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
} else {
info := r.CacheInfo()
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
}
go c.notifyPollingQuery(ctx, t, r, correlationID, ch, info.MaxAge)
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
}
return nil
}
func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) {
func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
@ -86,7 +86,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
}
// Blocking request
res, meta, err := c.getWithIndex(t, r, index)
res, meta, err := c.getWithIndex(tEntry, r, index)
// Check context hasn't been canceled
if ctx.Err() != nil {
@ -132,7 +132,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
}
}
func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) {
func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) {
index := uint64(0)
failures := uint(0)
@ -145,7 +145,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, cor
}
// Make the request
res, meta, err := c.getWithIndex(t, r, index)
res, meta, err := c.getWithIndex(tEntry, r, index)
// Check context hasn't been canceled
if ctx.Err() != nil {