package submatview
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/ttlcache"
)
// Store of Materializers. Store implements an interface similar to
// agent/cache.Cache, and allows a single Materializer to fulfil multiple requests
// as long as the requests are identical.
// Store is used in place of agent/cache.Cache because with the streaming
// backend there is no longer any need to run a background goroutine to refresh
// stored values.
type Store struct {
logger hclog . Logger
lock sync . RWMutex
byKey map [ string ] entry
// expiryHeap tracks entries with 0 remaining requests. Entries are ordered
// by most recent expiry first.
expiryHeap * ttlcache . ExpiryHeap
// idleTTL is the duration of time an entry should remain in the Store after the
// last request for that entry has been terminated. It is a field on the struct
// so that it can be patched in tests without needing a global lock.
idleTTL time . Duration
}
type entry struct {
materializer * Materializer
expiry * ttlcache . Entry
stop func ( )
// requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0.
requests int
}
// NewStore creates and returns a Store that is ready for use. The caller must
// call Store.Run (likely in a separate goroutine) to start the expiration loop.
func NewStore ( logger hclog . Logger ) * Store {
return & Store {
logger : logger ,
byKey : make ( map [ string ] entry ) ,
expiryHeap : ttlcache . NewExpiryHeap ( ) ,
idleTTL : 20 * time . Minute ,
}
}
// Run the expiration loop until the context is cancelled.
func ( s * Store ) Run ( ctx context . Context ) {
for {
s . lock . RLock ( )
timer := s . expiryHeap . Next ( )
s . lock . RUnlock ( )
select {
case <- ctx . Done ( ) :
timer . Stop ( )
return
// the first item in the heap has changed, restart the timer with the
// new TTL.
case <- s . expiryHeap . NotifyCh :
timer . Stop ( )
continue
// the TTL for the first item has been reached, attempt an expiration.
case <- timer . Wait ( ) :
s . lock . Lock ( )
he := timer . Entry
s . expiryHeap . Remove ( he . Index ( ) )
e := s . byKey [ he . Key ( ) ]
// Only stop the materializer if there are no active requests.
if e . requests == 0 {
e . stop ( )
delete ( s . byKey , he . Key ( ) )
}
s . lock . Unlock ( )
}
}
}
// Request is used to request data from the Store.
// Note that cache.Request is required, but some of the fields cache.RequestInfo
// fields are ignored (ex: MaxAge, and MustRevalidate).
type Request interface {
cache . Request
// NewMaterializer will be called if there is no active materializer to fulfil
// the request. It should return a Materializer appropriate for streaming
// data to fulfil this request.
NewMaterializer ( ) ( * Materializer , error )
// Type should return a string which uniquely identifies this type of request.
// The returned value is used as the prefix of the key used to index
// entries in the Store.
Type ( ) string
}
// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func ( s * Store ) Get ( ctx context . Context , req Request ) ( Result , error ) {
info := req . CacheInfo ( )
key , materializer , err := s . readEntry ( req )
if err != nil {
return Result { } , err
}
defer s . releaseEntry ( key )
if info . Timeout > 0 {
var cancel context . CancelFunc
ctx , cancel = context . WithTimeout ( ctx , info . Timeout )
defer cancel ( )
}
result , err := materializer . getFromView ( ctx , info . MinIndex )
// context.DeadlineExceeded is translated to nil to match the timeout
// behaviour of agent/cache.Cache.Get.
if err == nil || errors . Is ( err , context . DeadlineExceeded ) {
return result , nil
}
return result , err
}
// Notify the updateCh when there are updates to the entry identified by req.
// See agent/cache.Cache.Notify for complete documentation.
//
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
// this case. Instead set a deadline on the context.
func ( s * Store ) Notify (
ctx context . Context ,
req Request ,
correlationID string ,
updateCh chan <- cache . UpdateEvent ,
) error {
info := req . CacheInfo ( )
key , materializer , err := s . readEntry ( req )
if err != nil {
return err
}
go func ( ) {
defer s . releaseEntry ( key )
index := info . MinIndex
for {
result , err := materializer . getFromView ( ctx , index )
switch {
case ctx . Err ( ) != nil :
return
case err != nil :
s . logger . Warn ( "handling error in Store.Notify" ,
"error" , err ,
"request-type" , req . Type ( ) ,
"index" , index )
continue
}
index = result . Index
u := cache . UpdateEvent {
CorrelationID : correlationID ,
Result : result . Value ,
Meta : cache . ResultMeta { Index : result . Index , Hit : result . Cached } ,
}
select {
case updateCh <- u :
case <- ctx . Done ( ) :
return
}
}
} ( )
return nil
}
// readEntry from the store, and increment the requests counter. releaseEntry
// must be called when the request is finished to decrement the counter.
func ( s * Store ) readEntry ( req Request ) ( string , * Materializer , error ) {
info := req . CacheInfo ( )
key := makeEntryKey ( req . Type ( ) , info )
s . lock . Lock ( )
defer s . lock . Unlock ( )
e , ok := s . byKey [ key ]
if ok {
e . requests ++
s . byKey [ key ] = e
return key , e . materializer , nil
}
mat , err := req . NewMaterializer ( )
if err != nil {
return "" , nil , err
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
go mat . Run ( ctx )
e = entry {
materializer : mat ,
stop : cancel ,
requests : 1 ,
}
s . byKey [ key ] = e
return key , e . materializer , nil
}
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to readEntry.
func ( s * Store ) releaseEntry ( key string ) {
s . lock . Lock ( )
defer s . lock . Unlock ( )
e := s . byKey [ key ]
e . requests --
s . byKey [ key ] = e
if e . requests > 0 {
return
}
if e . expiry . Index ( ) == ttlcache . NotIndexed {
e . expiry = s . expiryHeap . Add ( key , s . idleTTL )
s . byKey [ key ] = e
return
}
s . expiryHeap . Update ( e . expiry . Index ( ) , s . idleTTL )
}
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey ( typ string , r cache . RequestInfo ) string {
return fmt . Sprintf ( "%s/%s/%s/%s" , typ , r . Datacenter , r . Token , r . Key )
}