mirror of https://github.com/hashicorp/consul
lib/ttlcache: extract package from agent/cache
parent
c4122edd22
commit
bbb816aa8a
|
@ -26,6 +26,8 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/lib/ttlcache"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
|
@ -87,7 +89,7 @@ type Cache struct {
|
|||
// internal storage format so changing this should be possible safely.
|
||||
entriesLock sync.RWMutex
|
||||
entries map[string]cacheEntry
|
||||
entriesExpiryHeap *ExpiryHeap
|
||||
entriesExpiryHeap *ttlcache.ExpiryHeap
|
||||
|
||||
// stopped is used as an atomic flag to signal that the Cache has been
|
||||
// discarded so background fetches and expiry processing should stop.
|
||||
|
@ -169,7 +171,7 @@ func New(options Options) *Cache {
|
|||
c := &Cache{
|
||||
types: make(map[string]typeEntry),
|
||||
entries: make(map[string]cacheEntry),
|
||||
entriesExpiryHeap: NewExpiryHeap(),
|
||||
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
|
||||
stopCh: make(chan struct{}),
|
||||
options: options,
|
||||
rateLimitContext: ctx,
|
||||
|
@ -400,7 +402,7 @@ RETRY_GET:
|
|||
|
||||
// Touch the expiration and fix the heap.
|
||||
c.entriesLock.Lock()
|
||||
c.entriesExpiryHeap.Update(entry.Expiry.HeapIndex, r.TypeEntry.Opts.LastGetTTL)
|
||||
c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL)
|
||||
c.entriesLock.Unlock()
|
||||
|
||||
// We purposely do not return an error here since the cache only works with
|
||||
|
@ -681,7 +683,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
|||
// If this is a new entry (not in the heap yet), then setup the
|
||||
// initial expiry information and insert. If we're already in
|
||||
// the heap we do nothing since we're reusing the same entry.
|
||||
if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 {
|
||||
if newEntry.Expiry == nil || newEntry.Expiry.Index() == -1 {
|
||||
newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL)
|
||||
}
|
||||
|
||||
|
@ -762,7 +764,7 @@ func (c *Cache) runExpiryLoop() {
|
|||
|
||||
// Entry expired! Remove it.
|
||||
delete(c.entries, entry.Key)
|
||||
c.entriesExpiryHeap.Remove(entry.HeapIndex)
|
||||
c.entriesExpiryHeap.Remove(entry.Index())
|
||||
|
||||
// Set some metrics
|
||||
metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)
|
||||
|
@ -803,7 +805,6 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro
|
|||
Index: res.Index,
|
||||
FetchedAt: time.Now(),
|
||||
Waiter: make(chan struct{}),
|
||||
Expiry: &CacheEntryExpiry{Key: key},
|
||||
FetchRateLimiter: rate.NewLimiter(
|
||||
c.options.EntryFetchRate,
|
||||
c.options.EntryFetchMaxBurst,
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/lib/ttlcache"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
|
@ -1405,3 +1406,27 @@ OUT:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) {
|
||||
c := &Cache{
|
||||
stopCh: make(chan struct{}),
|
||||
entries: make(map[string]cacheEntry),
|
||||
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
|
||||
}
|
||||
chStart := make(chan struct{})
|
||||
chDone := make(chan struct{})
|
||||
go func() {
|
||||
close(chStart)
|
||||
c.runExpiryLoop()
|
||||
close(chDone)
|
||||
}()
|
||||
|
||||
<-chStart
|
||||
close(c.stopCh)
|
||||
|
||||
select {
|
||||
case <-chDone:
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("expected loop to exit when stopped")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/lib/ttlcache"
|
||||
)
|
||||
|
||||
// cacheEntry stores a single cache entry.
|
||||
|
@ -31,7 +33,7 @@ type cacheEntry struct {
|
|||
// Expiry contains information about the expiration of this
|
||||
// entry. This is a pointer as its shared as a value in the
|
||||
// ExpiryHeap as well.
|
||||
Expiry *CacheEntryExpiry
|
||||
Expiry *ttlcache.Entry
|
||||
|
||||
// FetchedAt stores the time the cache entry was retrieved for determining
|
||||
// it's age later.
|
||||
|
|
|
@ -1,15 +1,24 @@
|
|||
package cache
|
||||
package ttlcache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CacheEntryExpiry contains the expiration time for a cache entry.
|
||||
type CacheEntryExpiry struct {
|
||||
Key string // Key in the cache map
|
||||
Expires time.Time // Time when entry expires (monotonic clock)
|
||||
HeapIndex int // Index in the heap
|
||||
// Entry in the ExpiryHeap, tracks the index and expiry time of an item in a
|
||||
// ttl cache.
|
||||
type Entry struct {
|
||||
// TODO: can Key be unexported?
|
||||
Key string
|
||||
expiry time.Time
|
||||
heapIndex int
|
||||
}
|
||||
|
||||
func (c *Entry) Index() int {
|
||||
if c == nil {
|
||||
return -1
|
||||
}
|
||||
return c.heapIndex
|
||||
}
|
||||
|
||||
// ExpiryHeap is a container/heap.Interface implementation that expires entries
|
||||
|
@ -18,7 +27,7 @@ type CacheEntryExpiry struct {
|
|||
// All operations on the heap and read/write of the heap contents require
|
||||
// the proper entriesLock to be held on Cache.
|
||||
type ExpiryHeap struct {
|
||||
entries []*CacheEntryExpiry
|
||||
entries []*Entry
|
||||
|
||||
// NotifyCh is sent a value whenever the 0 index value of the heap
|
||||
// changes. This can be used to detect when the earliest value
|
||||
|
@ -26,9 +35,7 @@ type ExpiryHeap struct {
|
|||
NotifyCh chan struct{}
|
||||
}
|
||||
|
||||
// Initialize the heap. The buffer of 1 is really important because
|
||||
// its possible for the expiry loop to trigger the heap to update
|
||||
// itself and it'd block forever otherwise.
|
||||
// NewExpiryHeap creates and returns a new ExpiryHeap.
|
||||
func NewExpiryHeap() *ExpiryHeap {
|
||||
h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)}
|
||||
heap.Init((*entryHeap)(h))
|
||||
|
@ -38,17 +45,17 @@ func NewExpiryHeap() *ExpiryHeap {
|
|||
// Add an entry to the heap.
|
||||
//
|
||||
// Must be synchronized by the caller.
|
||||
func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry {
|
||||
entry := &CacheEntryExpiry{
|
||||
Key: key,
|
||||
Expires: time.Now().Add(expiry),
|
||||
func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry {
|
||||
entry := &Entry{
|
||||
Key: key,
|
||||
expiry: time.Now().Add(expiry),
|
||||
// Set the initial heap index to the last index. If the entry is swapped it
|
||||
// will have the correct index set, and if it remains at the end the last
|
||||
// index will be correct.
|
||||
HeapIndex: len(h.entries),
|
||||
heapIndex: len(h.entries),
|
||||
}
|
||||
heap.Push((*entryHeap)(h), entry)
|
||||
if entry.HeapIndex == 0 {
|
||||
if entry.heapIndex == 0 {
|
||||
h.notify()
|
||||
}
|
||||
return entry
|
||||
|
@ -60,16 +67,18 @@ func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry {
|
|||
// Must be synchronized by the caller.
|
||||
func (h *ExpiryHeap) Update(idx int, expiry time.Duration) {
|
||||
entry := h.entries[idx]
|
||||
entry.Expires = time.Now().Add(expiry)
|
||||
entry.expiry = time.Now().Add(expiry)
|
||||
heap.Fix((*entryHeap)(h), idx)
|
||||
|
||||
// If the previous index and current index are both zero then Fix did not
|
||||
// swap the entry, and notify must be called here.
|
||||
if idx == 0 || entry.HeapIndex == 0 {
|
||||
if idx == 0 || entry.heapIndex == 0 {
|
||||
h.notify()
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the entry at idx from the heap.
|
||||
//
|
||||
// Must be synchronized by the caller.
|
||||
func (h *ExpiryHeap) Remove(idx int) {
|
||||
entry := h.entries[idx]
|
||||
|
@ -77,9 +86,9 @@ func (h *ExpiryHeap) Remove(idx int) {
|
|||
|
||||
// A goroutine which is fetching a new value will have a reference to this
|
||||
// entry. When it re-acquires the lock it needs to be informed that
|
||||
// the entry was expired while it was fetching. Setting HeapIndex to -1
|
||||
// the entry was expired while it was fetching. Setting heapIndex to -1
|
||||
// indicates that the entry is no longer in the heap, and must be re-added.
|
||||
entry.HeapIndex = -1
|
||||
entry.heapIndex = -1
|
||||
|
||||
if idx == 0 {
|
||||
h.notify()
|
||||
|
@ -92,20 +101,20 @@ func (h *entryHeap) Len() int { return len(h.entries) }
|
|||
|
||||
func (h *entryHeap) Swap(i, j int) {
|
||||
h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
|
||||
h.entries[i].HeapIndex = i
|
||||
h.entries[j].HeapIndex = j
|
||||
h.entries[i].heapIndex = i
|
||||
h.entries[j].heapIndex = j
|
||||
}
|
||||
|
||||
func (h *entryHeap) Less(i, j int) bool {
|
||||
// The usage of Before here is important (despite being obvious):
|
||||
// this function uses the monotonic time that should be available
|
||||
// on the time.Time value so the heap is immune to wall clock changes.
|
||||
return h.entries[i].Expires.Before(h.entries[j].Expires)
|
||||
return h.entries[i].expiry.Before(h.entries[j].expiry)
|
||||
}
|
||||
|
||||
// heap.Interface, this isn't expected to be called directly.
|
||||
func (h *entryHeap) Push(x interface{}) {
|
||||
h.entries = append(h.entries, x.(*CacheEntryExpiry))
|
||||
h.entries = append(h.entries, x.(*Entry))
|
||||
}
|
||||
|
||||
// heap.Interface, this isn't expected to be called directly.
|
||||
|
@ -128,6 +137,8 @@ func (h *ExpiryHeap) notify() {
|
|||
}
|
||||
}
|
||||
|
||||
// Next returns a Timer that waits until the first entry in the heap expires.
|
||||
//
|
||||
// Must be synchronized by the caller.
|
||||
func (h *ExpiryHeap) Next() Timer {
|
||||
if len(h.entries) == 0 {
|
||||
|
@ -135,14 +146,14 @@ func (h *ExpiryHeap) Next() Timer {
|
|||
}
|
||||
entry := h.entries[0]
|
||||
return Timer{
|
||||
timer: time.NewTimer(time.Until(entry.Expires)),
|
||||
timer: time.NewTimer(time.Until(entry.expiry)),
|
||||
Entry: entry,
|
||||
}
|
||||
}
|
||||
|
||||
type Timer struct {
|
||||
timer *time.Timer
|
||||
Entry *CacheEntryExpiry
|
||||
Entry *Entry
|
||||
}
|
||||
|
||||
func (t *Timer) Wait() <-chan time.Time {
|
|
@ -1,4 +1,4 @@
|
|||
package cache
|
||||
package ttlcache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
|
@ -13,52 +13,52 @@ var _ heap.Interface = (*entryHeap)(nil)
|
|||
func TestExpiryHeap(t *testing.T) {
|
||||
h := NewExpiryHeap()
|
||||
ch := h.NotifyCh
|
||||
var entry, entry2, entry3 *CacheEntryExpiry
|
||||
var entry, entry2, entry3 *Entry
|
||||
|
||||
// Init, shouldn't trigger anything
|
||||
testNoMessage(t, ch)
|
||||
|
||||
runStep(t, "add an entry", func(t *testing.T) {
|
||||
entry = h.Add("foo", 100*time.Millisecond)
|
||||
assert.Equal(t, 0, entry.HeapIndex)
|
||||
assert.Equal(t, 0, entry.heapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // exactly one asserted above
|
||||
})
|
||||
|
||||
runStep(t, "add a second entry in front", func(t *testing.T) {
|
||||
entry2 = h.Add("bar", 50*time.Millisecond)
|
||||
assert.Equal(t, 0, entry2.HeapIndex)
|
||||
assert.Equal(t, 1, entry.HeapIndex)
|
||||
assert.Equal(t, 0, entry2.heapIndex)
|
||||
assert.Equal(t, 1, entry.heapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // exactly one asserted above
|
||||
})
|
||||
|
||||
runStep(t, "add a third entry at the end", func(t *testing.T) {
|
||||
entry3 = h.Add("baz", 1000*time.Millisecond)
|
||||
assert.Equal(t, 2, entry3.HeapIndex)
|
||||
assert.Equal(t, 2, entry3.heapIndex)
|
||||
testNoMessage(t, ch) // no notify cause index 0 stayed the same
|
||||
})
|
||||
|
||||
runStep(t, "remove the first entry", func(t *testing.T) {
|
||||
h.Remove(0)
|
||||
assert.Equal(t, 0, entry.HeapIndex)
|
||||
assert.Equal(t, 1, entry3.HeapIndex)
|
||||
assert.Equal(t, 0, entry.heapIndex)
|
||||
assert.Equal(t, 1, entry3.heapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch)
|
||||
})
|
||||
|
||||
runStep(t, "update entry3 to expire first", func(t *testing.T) {
|
||||
h.Update(entry3.HeapIndex, 10*time.Millisecond)
|
||||
assert.Equal(t, 1, entry.HeapIndex)
|
||||
assert.Equal(t, 0, entry3.HeapIndex)
|
||||
h.Update(entry3.heapIndex, 10*time.Millisecond)
|
||||
assert.Equal(t, 1, entry.heapIndex)
|
||||
assert.Equal(t, 0, entry3.heapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch)
|
||||
})
|
||||
|
||||
runStep(t, "0th element change triggers a notify", func(t *testing.T) {
|
||||
h.Update(entry3.HeapIndex, 20)
|
||||
assert.Equal(t, 1, entry.HeapIndex) // no move
|
||||
assert.Equal(t, 0, entry3.HeapIndex)
|
||||
h.Update(entry3.heapIndex, 20)
|
||||
assert.Equal(t, 1, entry.heapIndex) // no move
|
||||
assert.Equal(t, 0, entry3.heapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // one message
|
||||
})
|
||||
|
@ -89,27 +89,3 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpiryLoop_ExitsWhenStopped(t *testing.T) {
|
||||
c := &Cache{
|
||||
stopCh: make(chan struct{}),
|
||||
entries: make(map[string]cacheEntry),
|
||||
entriesExpiryHeap: NewExpiryHeap(),
|
||||
}
|
||||
chStart := make(chan struct{})
|
||||
chDone := make(chan struct{})
|
||||
go func() {
|
||||
close(chStart)
|
||||
c.runExpiryLoop()
|
||||
close(chDone)
|
||||
}()
|
||||
|
||||
<-chStart
|
||||
close(c.stopCh)
|
||||
|
||||
select {
|
||||
case <-chDone:
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("expected loop to exit when stopped")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue