You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/submatview/store.go

321 lines
8.6 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package submatview
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/ttlcache"
)
// Store of Materializers. Store implements an interface similar to
// agent/cache.Cache, and allows a single Materializer to fulfil multiple requests
// as long as the requests are identical.
// Store is used in place of agent/cache.Cache because with the streaming
// backend there is no longer any need to run a background goroutine to refresh
// stored values.
type Store struct {
logger hclog.Logger
lock sync.RWMutex
byKey map[string]entry
// expiryHeap tracks entries with 0 remaining requests. Entries are ordered
// by most recent expiry first.
expiryHeap *ttlcache.ExpiryHeap
// idleTTL is the duration of time an entry should remain in the Store after the
// last request for that entry has been terminated. It is a field on the struct
// so that it can be patched in tests without needing a global lock.
idleTTL time.Duration
}
// A Materializer maintains a materialized view of a subscription on an event stream.
type Materializer interface {
Run(ctx context.Context)
Query(ctx context.Context, minIndex uint64) (Result, error)
}
type entry struct {
materializer Materializer
expiry *ttlcache.Entry
stop func()
// requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0.
requests int
// evicting is used to mark an entry that will be evicted when the current in-
// flight requests finish.
evicting bool
}
// NewStore creates and returns a Store that is ready for use. The caller must
// call Store.Run (likely in a separate goroutine) to start the expiration loop.
func NewStore(logger hclog.Logger) *Store {
return &Store{
logger: logger,
byKey: make(map[string]entry),
expiryHeap: ttlcache.NewExpiryHeap(),
idleTTL: 20 * time.Minute,
}
}
// Run the expiration loop until the context is cancelled.
func (s *Store) Run(ctx context.Context) {
for {
s.lock.RLock()
timer := s.expiryHeap.Next()
s.lock.RUnlock()
select {
case <-ctx.Done():
timer.Stop()
return
// the first item in the heap has changed, restart the timer with the
// new TTL.
case <-s.expiryHeap.NotifyCh:
timer.Stop()
continue
// the TTL for the first item has been reached, attempt an expiration.
case <-timer.Wait():
s.lock.Lock()
he := timer.Entry
s.expiryHeap.Remove(he.Index())
e := s.byKey[he.Key()]
// Only stop the materializer if there are no active requests.
if e.requests == 0 {
s.logger.Trace("evicting item from store", "key", he.Key())
e.stop()
delete(s.byKey, he.Key())
}
s.lock.Unlock()
}
}
}
// Request is used to request data from the Store.
// Note that cache.Request is required, but some of the fields cache.RequestInfo
// fields are ignored (ex: MaxAge, and MustRevalidate).
type Request interface {
cache.Request
// NewMaterializer will be called if there is no active materializer to fulfill
// the request. It should return a Materializer appropriate for streaming
// data to fulfil this request.
NewMaterializer() (Materializer, error)
// Type should return a string which uniquely identifies this type of request.
// The returned value is used as the prefix of the key used to index
// entries in the Store.
Type() string
}
// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
info := req.CacheInfo()
key, materializer, err := s.readEntry(req)
if err != nil {
return Result{}, err
}
defer s.releaseEntry(key)
if info.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, info.Timeout)
defer cancel()
}
result, err := materializer.Query(ctx, info.MinIndex)
// context.DeadlineExceeded is translated to nil to match the timeout
// behaviour of agent/cache.Cache.Get.
if err == nil || errors.Is(err, context.DeadlineExceeded) {
return result, nil
}
return result, err
}
// Notify the updateCh when there are updates to the entry identified by req.
// See agent/cache.Cache.Notify for complete documentation.
//
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
// this case. Instead set a deadline on the context.
func (s *Store) Notify(
ctx context.Context,
req Request,
correlationID string,
updateCh chan<- cache.UpdateEvent,
) error {
return s.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
select {
case updateCh <- event:
case <-ctx.Done():
return
}
})
}
// NotifyCallback subscribes to updates of the entry identified by req in the
// same way as Notify, but accepts a callback function instead of a channel.
func (s *Store) NotifyCallback(
ctx context.Context,
req Request,
correlationID string,
cb cache.Callback,
) error {
info := req.CacheInfo()
key, materializer, err := s.readEntry(req)
if err != nil {
return err
}
go func() {
defer s.releaseEntry(key)
index := info.MinIndex
for {
result, err := materializer.Query(ctx, index)
switch {
case ctx.Err() != nil:
return
case err != nil:
s.logger.Warn("handling error in Store.Notify",
"error", err,
"request-type", req.Type(),
"index", index)
}
index = result.Index
cb(ctx, cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Err: err,
Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached},
})
}
}()
return nil
}
// readEntry from the store, and increment the requests counter. releaseEntry
// must be called when the request is finished to decrement the counter.
func (s *Store) readEntry(req Request) (string, Materializer, error) {
info := req.CacheInfo()
key := makeEntryKey(req.Type(), info)
s.lock.Lock()
defer s.lock.Unlock()
e, ok := s.byKey[key]
if ok {
if e.evicting {
return "", nil, errors.New("item is marked for eviction")
}
e.requests++
s.byKey[key] = e
return key, e.materializer, nil
}
mat, err := req.NewMaterializer()
if err != nil {
return "", nil, err
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
mat.Run(ctx)
// Materializers run until they either reach their TTL and are evicted (which
// cancels the given context) or encounter an irrecoverable error.
//
// If the context hasn't been canceled, we know it's the error case so we
// trigger an immediate eviction.
if ctx.Err() == nil {
s.evictNow(key)
}
}()
e = entry{
materializer: mat,
stop: cancel,
requests: 1,
}
s.byKey[key] = e
return key, e.materializer, nil
}
// evictNow causes the item with the given key to be evicted immediately.
//
// If there are requests in-flight, the item is marked for eviction such that
// once the requests have been served releaseEntry will move it to the top of
// the expiry heap. If there are no requests in-flight, evictNow will move the
// item to the top of the expiry heap itself.
//
// In either case, the entry's evicting flag prevents it from being served by
// readEntry (and thereby gaining new in-flight requests).
func (s *Store) evictNow(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.evicting = true
s.byKey[key] = e
if e.requests == 0 {
s.expireNowLocked(key)
}
}
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to readEntry.
func (s *Store) releaseEntry(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.requests--
s.byKey[key] = e
if e.requests > 0 {
return
}
if e.evicting {
s.expireNowLocked(key)
return
}
if e.expiry.Index() == ttlcache.NotIndexed {
e.expiry = s.expiryHeap.Add(key, s.idleTTL)
s.byKey[key] = e
return
}
s.expiryHeap.Update(e.expiry.Index(), s.idleTTL)
}
// expireNowLocked moves the item with the given key to the top of the expiry
// heap, causing it to be picked up by the expiry loop and evicted immediately.
func (s *Store) expireNowLocked(key string) {
e := s.byKey[key]
if idx := e.expiry.Index(); idx != ttlcache.NotIndexed {
s.expiryHeap.Remove(idx)
}
e.expiry = s.expiryHeap.Add(key, time.Duration(0))
s.byKey[key] = e
}
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
}