You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/submatview/store.go

223 lines
5.1 KiB

package submatview
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/ttlcache"
)
type Store struct {
logger hclog.Logger
lock sync.RWMutex
byKey map[string]entry
expiryHeap *ttlcache.ExpiryHeap
// idleTTL is the duration of time an entry should remain in the Store after the
// last request for that entry has been terminated. It is a field on the struct
// so that it can be patched in tests without need a lock.
idleTTL time.Duration
}
type entry struct {
materializer *Materializer
expiry *ttlcache.Entry
stop func()
// requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0.
requests int
}
func NewStore(logger hclog.Logger) *Store {
return &Store{
logger: logger,
byKey: make(map[string]entry),
expiryHeap: ttlcache.NewExpiryHeap(),
idleTTL: 20 * time.Minute,
}
}
// Run the expiration loop until the context is cancelled.
func (s *Store) Run(ctx context.Context) {
for {
s.lock.RLock()
timer := s.expiryHeap.Next()
s.lock.RUnlock()
select {
case <-ctx.Done():
timer.Stop()
return
case <-s.expiryHeap.NotifyCh:
timer.Stop()
continue
case <-timer.Wait():
s.lock.Lock()
he := timer.Entry
s.expiryHeap.Remove(he.Index())
e := s.byKey[he.Key()]
// Only stop the materializer if there are no active requests.
if e.requests == 0 {
e.stop()
delete(s.byKey, he.Key())
}
s.lock.Unlock()
}
}
}
// TODO: godoc
type Request interface {
cache.Request
NewMaterializer() (*Materializer, error)
Type() string
}
// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
info := req.CacheInfo()
key, e, err := s.getEntry(req)
if err != nil {
return Result{}, err
}
defer s.releaseEntry(key)
ctx, cancel := context.WithTimeout(ctx, info.Timeout)
defer cancel()
result, err := e.materializer.getFromView(ctx, info.MinIndex)
// context.DeadlineExceeded is translated to nil to match the behaviour of
// agent/cache.Cache.Get.
if err == nil || errors.Is(err, context.DeadlineExceeded) {
return result, nil
}
return result, err
}
// Notify the updateCh when there are updates to the entry identified by req.
// See agent/cache.Cache.Notify for complete documentation.
//
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
// this case. Instead set a deadline on the context.
func (s *Store) Notify(
ctx context.Context,
req Request,
correlationID string,
updateCh chan<- cache.UpdateEvent,
) error {
info := req.CacheInfo()
key, e, err := s.getEntry(req)
if err != nil {
return err
}
go func() {
defer s.releaseEntry(key)
index := info.MinIndex
for {
result, err := e.materializer.getFromView(ctx, index)
switch {
case ctx.Err() != nil:
return
case err != nil:
// TODO: cache.Notify sends errors on updateCh, should this do the same?
// It seems like only fetch errors would ever get sent along and eventually
// logged, so sending may not provide any benefit here.
s.logger.Warn("handling error in Store.Notify",
"error", err,
"request-type", req.Type(),
"index", index)
continue
}
index = result.Index
u := cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index},
Err: err,
}
select {
case updateCh <- u:
case <-ctx.Done():
return
}
}
}()
return nil
}
// getEntry from the store, and increment the requests counter. releaseEntry
// must be called when the request is finished to decrement the counter.
func (s *Store) getEntry(req Request) (string, entry, error) {
info := req.CacheInfo()
key := makeEntryKey(req.Type(), info)
s.lock.Lock()
defer s.lock.Unlock()
e, ok := s.byKey[key]
if ok {
e.requests++
s.byKey[key] = e
return key, e, nil
}
mat, err := req.NewMaterializer()
if err != nil {
return "", e, err
}
ctx, cancel := context.WithCancel(context.Background())
go mat.Run(ctx)
e = entry{
materializer: mat,
stop: cancel,
requests: 1,
}
s.byKey[key] = e
return key, e, nil
}
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to getEntry.
func (s *Store) releaseEntry(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.requests--
s.byKey[key] = e
if e.requests > 0 {
return
}
if e.expiry.Index() == ttlcache.NotIndexed {
e.expiry = s.expiryHeap.Add(key, s.idleTTL)
s.byKey[key] = e
return
}
s.expiryHeap.Update(e.expiry.Index(), s.idleTTL)
}
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
}