mirror of https://github.com/hashicorp/consul
Merge pull request #8998 from hashicorp/dnephin/lib-ttlcache
lib/ttlcache: extract a new package from agent/cachepull/8974/head
commit
74ac34e358
|
@ -15,7 +15,6 @@
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -28,6 +27,7 @@ import (
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
|
"github.com/hashicorp/consul/lib/ttlcache"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate mockery -all -inpkg
|
//go:generate mockery -all -inpkg
|
||||||
|
@ -88,7 +88,7 @@ type Cache struct {
|
||||||
// internal storage format so changing this should be possible safely.
|
// internal storage format so changing this should be possible safely.
|
||||||
entriesLock sync.RWMutex
|
entriesLock sync.RWMutex
|
||||||
entries map[string]cacheEntry
|
entries map[string]cacheEntry
|
||||||
entriesExpiryHeap *expiryHeap
|
entriesExpiryHeap *ttlcache.ExpiryHeap
|
||||||
|
|
||||||
// stopped is used as an atomic flag to signal that the Cache has been
|
// stopped is used as an atomic flag to signal that the Cache has been
|
||||||
// discarded so background fetches and expiry processing should stop.
|
// discarded so background fetches and expiry processing should stop.
|
||||||
|
@ -166,16 +166,11 @@ func applyDefaultValuesOnOptions(options Options) Options {
|
||||||
// Further settings can be tweaked on the returned value.
|
// Further settings can be tweaked on the returned value.
|
||||||
func New(options Options) *Cache {
|
func New(options Options) *Cache {
|
||||||
options = applyDefaultValuesOnOptions(options)
|
options = applyDefaultValuesOnOptions(options)
|
||||||
// Initialize the heap. The buffer of 1 is really important because
|
|
||||||
// its possible for the expiry loop to trigger the heap to update
|
|
||||||
// itself and it'd block forever otherwise.
|
|
||||||
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
|
|
||||||
heap.Init(h)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c := &Cache{
|
c := &Cache{
|
||||||
types: make(map[string]typeEntry),
|
types: make(map[string]typeEntry),
|
||||||
entries: make(map[string]cacheEntry),
|
entries: make(map[string]cacheEntry),
|
||||||
entriesExpiryHeap: h,
|
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
options: options,
|
options: options,
|
||||||
rateLimitContext: ctx,
|
rateLimitContext: ctx,
|
||||||
|
@ -406,8 +401,7 @@ RETRY_GET:
|
||||||
|
|
||||||
// Touch the expiration and fix the heap.
|
// Touch the expiration and fix the heap.
|
||||||
c.entriesLock.Lock()
|
c.entriesLock.Lock()
|
||||||
entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL)
|
c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL)
|
||||||
c.entriesExpiryHeap.Fix(entry.Expiry)
|
|
||||||
c.entriesLock.Unlock()
|
c.entriesLock.Unlock()
|
||||||
|
|
||||||
// We purposely do not return an error here since the cache only works with
|
// We purposely do not return an error here since the cache only works with
|
||||||
|
@ -688,10 +682,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||||
// If this is a new entry (not in the heap yet), then setup the
|
// If this is a new entry (not in the heap yet), then setup the
|
||||||
// initial expiry information and insert. If we're already in
|
// initial expiry information and insert. If we're already in
|
||||||
// the heap we do nothing since we're reusing the same entry.
|
// the heap we do nothing since we're reusing the same entry.
|
||||||
if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 {
|
if newEntry.Expiry == nil || newEntry.Expiry.Index() == ttlcache.NotIndexed {
|
||||||
newEntry.Expiry = &cacheEntryExpiry{Key: key}
|
newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL)
|
||||||
newEntry.Expiry.Update(tEntry.Opts.LastGetTTL)
|
|
||||||
heap.Push(c.entriesExpiryHeap, newEntry.Expiry)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.entries[key] = newEntry
|
c.entries[key] = newEntry
|
||||||
|
@ -748,47 +740,30 @@ func backOffWait(failures uint) time.Duration {
|
||||||
// runExpiryLoop is a blocking function that watches the expiration
|
// runExpiryLoop is a blocking function that watches the expiration
|
||||||
// heap and invalidates entries that have expired.
|
// heap and invalidates entries that have expired.
|
||||||
func (c *Cache) runExpiryLoop() {
|
func (c *Cache) runExpiryLoop() {
|
||||||
var expiryTimer *time.Timer
|
|
||||||
for {
|
for {
|
||||||
// If we have a previous timer, stop it.
|
|
||||||
if expiryTimer != nil {
|
|
||||||
expiryTimer.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the entry expiring soonest
|
|
||||||
var entry *cacheEntryExpiry
|
|
||||||
var expiryCh <-chan time.Time
|
|
||||||
c.entriesLock.RLock()
|
c.entriesLock.RLock()
|
||||||
if len(c.entriesExpiryHeap.Entries) > 0 {
|
timer := c.entriesExpiryHeap.Next()
|
||||||
entry = c.entriesExpiryHeap.Entries[0]
|
|
||||||
expiryTimer = time.NewTimer(time.Until(entry.Expires))
|
|
||||||
expiryCh = expiryTimer.C
|
|
||||||
}
|
|
||||||
c.entriesLock.RUnlock()
|
c.entriesLock.RUnlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c.stopCh:
|
case <-c.stopCh:
|
||||||
|
timer.Stop()
|
||||||
return
|
return
|
||||||
case <-c.entriesExpiryHeap.NotifyCh:
|
case <-c.entriesExpiryHeap.NotifyCh:
|
||||||
// Entries changed, so the heap may have changed. Restart loop.
|
timer.Stop()
|
||||||
|
continue
|
||||||
|
|
||||||
case <-expiryCh:
|
case <-timer.Wait():
|
||||||
c.entriesLock.Lock()
|
c.entriesLock.Lock()
|
||||||
|
|
||||||
// Perform cleanup operations on the entry's state, if applicable.
|
entry := timer.Entry
|
||||||
state := c.entries[entry.Key].State
|
if closer, ok := c.entries[entry.Key()].State.(io.Closer); ok {
|
||||||
if closer, ok := state.(io.Closer); ok {
|
|
||||||
closer.Close()
|
closer.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Entry expired! Remove it.
|
// Entry expired! Remove it.
|
||||||
delete(c.entries, entry.Key)
|
delete(c.entries, entry.Key())
|
||||||
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
|
c.entriesExpiryHeap.Remove(entry.Index())
|
||||||
|
|
||||||
// This is subtle but important: if we race and simultaneously
|
|
||||||
// evict and fetch a new value, then we set this to -1 to
|
|
||||||
// have it treated as a new value so that the TTL is extended.
|
|
||||||
entry.HeapIndex = -1
|
|
||||||
|
|
||||||
// Set some metrics
|
// Set some metrics
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)
|
||||||
|
@ -829,7 +804,6 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro
|
||||||
Index: res.Index,
|
Index: res.Index,
|
||||||
FetchedAt: time.Now(),
|
FetchedAt: time.Now(),
|
||||||
Waiter: make(chan struct{}),
|
Waiter: make(chan struct{}),
|
||||||
Expiry: &cacheEntryExpiry{Key: key},
|
|
||||||
FetchRateLimiter: rate.NewLimiter(
|
FetchRateLimiter: rate.NewLimiter(
|
||||||
c.options.EntryFetchRate,
|
c.options.EntryFetchRate,
|
||||||
c.options.EntryFetchMaxBurst,
|
c.options.EntryFetchMaxBurst,
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib/ttlcache"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1000,6 +1001,9 @@ func (t *testPartitionType) RegisterOptions() RegisterOptions {
|
||||||
// Test that background refreshing reports correct Age in failure and happy
|
// Test that background refreshing reports correct Age in failure and happy
|
||||||
// states.
|
// states.
|
||||||
func TestCacheGet_refreshAge(t *testing.T) {
|
func TestCacheGet_refreshAge(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for -short run")
|
||||||
|
}
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
@ -1402,3 +1406,73 @@ OUT:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) {
|
||||||
|
c := &Cache{
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
entries: make(map[string]cacheEntry),
|
||||||
|
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
|
||||||
|
}
|
||||||
|
chStart := make(chan struct{})
|
||||||
|
chDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
close(chStart)
|
||||||
|
c.runExpiryLoop()
|
||||||
|
close(chDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-chStart
|
||||||
|
close(c.stopCh)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-chDone:
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
t.Fatalf("expected loop to exit when stopped")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCache_Prepopulate(t *testing.T) {
|
||||||
|
typ := &fakeType{index: 5}
|
||||||
|
c := New(Options{})
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
|
c.Prepopulate("t", FetchResult{Value: 17, Index: 1}, "dc1", "token", "v1")
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
req := fakeRequest{
|
||||||
|
info: RequestInfo{
|
||||||
|
Key: "v1",
|
||||||
|
Token: "token",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
MinIndex: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
result, _, err := c.Get(ctx, "t", req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 17, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeType struct {
|
||||||
|
index uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f fakeType) Fetch(_ FetchOptions, _ Request) (FetchResult, error) {
|
||||||
|
idx := atomic.LoadUint64(&f.index)
|
||||||
|
return FetchResult{Value: int(idx * 2), Index: idx}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f fakeType) RegisterOptions() RegisterOptions {
|
||||||
|
return RegisterOptions{Refresh: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Type = (*fakeType)(nil)
|
||||||
|
|
||||||
|
type fakeRequest struct {
|
||||||
|
info RequestInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f fakeRequest) CacheInfo() RequestInfo {
|
||||||
|
return f.info
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Request = (*fakeRequest)(nil)
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib/ttlcache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// cacheEntry stores a single cache entry.
|
// cacheEntry stores a single cache entry.
|
||||||
|
@ -31,8 +32,8 @@ type cacheEntry struct {
|
||||||
|
|
||||||
// Expiry contains information about the expiration of this
|
// Expiry contains information about the expiration of this
|
||||||
// entry. This is a pointer as its shared as a value in the
|
// entry. This is a pointer as its shared as a value in the
|
||||||
// expiryHeap as well.
|
// ExpiryHeap as well.
|
||||||
Expiry *cacheEntryExpiry
|
Expiry *ttlcache.Entry
|
||||||
|
|
||||||
// FetchedAt stores the time the cache entry was retrieved for determining
|
// FetchedAt stores the time the cache entry was retrieved for determining
|
||||||
// it's age later.
|
// it's age later.
|
||||||
|
@ -46,118 +47,3 @@ type cacheEntry struct {
|
||||||
// FetchRateLimiter limits the rate at which fetch is called for this entry.
|
// FetchRateLimiter limits the rate at which fetch is called for this entry.
|
||||||
FetchRateLimiter *rate.Limiter
|
FetchRateLimiter *rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacheEntryExpiry contains the expiration information for a cache
|
|
||||||
// entry. Any modifications to this struct should be done only while
|
|
||||||
// the Cache entriesLock is held.
|
|
||||||
type cacheEntryExpiry struct {
|
|
||||||
Key string // Key in the cache map
|
|
||||||
Expires time.Time // Time when entry expires (monotonic clock)
|
|
||||||
HeapIndex int // Index in the heap
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the expiry to d time from now.
|
|
||||||
func (e *cacheEntryExpiry) Update(d time.Duration) {
|
|
||||||
e.Expires = time.Now().Add(d)
|
|
||||||
}
|
|
||||||
|
|
||||||
// expiryHeap is a heap implementation that stores information about
|
|
||||||
// when entries expire. Implements container/heap.Interface.
|
|
||||||
//
|
|
||||||
// All operations on the heap and read/write of the heap contents require
|
|
||||||
// the proper entriesLock to be held on Cache.
|
|
||||||
type expiryHeap struct {
|
|
||||||
Entries []*cacheEntryExpiry
|
|
||||||
|
|
||||||
// NotifyCh is sent a value whenever the 0 index value of the heap
|
|
||||||
// changes. This can be used to detect when the earliest value
|
|
||||||
// changes.
|
|
||||||
//
|
|
||||||
// There is a single edge case where the heap will not automatically
|
|
||||||
// send a notification: if heap.Fix is called manually and the index
|
|
||||||
// changed is 0 and the change doesn't result in any moves (stays at index
|
|
||||||
// 0), then we won't detect the change. To work around this, please
|
|
||||||
// always call the expiryHeap.Fix method instead.
|
|
||||||
NotifyCh chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Identical to heap.Fix for this heap instance but will properly handle
|
|
||||||
// the edge case where idx == 0 and no heap modification is necessary,
|
|
||||||
// and still notify the NotifyCh.
|
|
||||||
//
|
|
||||||
// This is important for cache expiry since the expiry time may have been
|
|
||||||
// extended and if we don't send a message to the NotifyCh then we'll never
|
|
||||||
// reset the timer and the entry will be evicted early.
|
|
||||||
func (h *expiryHeap) Fix(entry *cacheEntryExpiry) {
|
|
||||||
idx := entry.HeapIndex
|
|
||||||
heap.Fix(h, idx)
|
|
||||||
|
|
||||||
// This is the edge case we handle: if the prev (idx) and current (HeapIndex)
|
|
||||||
// is zero, it means the head-of-line didn't change while the value
|
|
||||||
// changed. Notify to reset our expiry worker.
|
|
||||||
if idx == 0 && entry.HeapIndex == 0 {
|
|
||||||
h.notify()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *expiryHeap) Len() int { return len(h.Entries) }
|
|
||||||
|
|
||||||
func (h *expiryHeap) Swap(i, j int) {
|
|
||||||
h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i]
|
|
||||||
h.Entries[i].HeapIndex = i
|
|
||||||
h.Entries[j].HeapIndex = j
|
|
||||||
|
|
||||||
// If we're moving the 0 index, update the channel since we need
|
|
||||||
// to re-update the timer we're waiting on for the soonest expiring
|
|
||||||
// value.
|
|
||||||
if i == 0 || j == 0 {
|
|
||||||
h.notify()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *expiryHeap) Less(i, j int) bool {
|
|
||||||
// The usage of Before here is important (despite being obvious):
|
|
||||||
// this function uses the monotonic time that should be available
|
|
||||||
// on the time.Time value so the heap is immune to wall clock changes.
|
|
||||||
return h.Entries[i].Expires.Before(h.Entries[j].Expires)
|
|
||||||
}
|
|
||||||
|
|
||||||
// heap.Interface, this isn't expected to be called directly.
|
|
||||||
func (h *expiryHeap) Push(x interface{}) {
|
|
||||||
entry := x.(*cacheEntryExpiry)
|
|
||||||
|
|
||||||
// Set initial heap index, if we're going to the end then Swap
|
|
||||||
// won't be called so we need to initialize
|
|
||||||
entry.HeapIndex = len(h.Entries)
|
|
||||||
|
|
||||||
// For the first entry, we need to trigger a channel send because
|
|
||||||
// Swap won't be called; nothing to swap! We can call it right away
|
|
||||||
// because all heap operations are within a lock.
|
|
||||||
if len(h.Entries) == 0 {
|
|
||||||
h.notify()
|
|
||||||
}
|
|
||||||
|
|
||||||
h.Entries = append(h.Entries, entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
// heap.Interface, this isn't expected to be called directly.
|
|
||||||
func (h *expiryHeap) Pop() interface{} {
|
|
||||||
old := h.Entries
|
|
||||||
n := len(old)
|
|
||||||
x := old[n-1]
|
|
||||||
h.Entries = old[0 : n-1]
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *expiryHeap) notify() {
|
|
||||||
select {
|
|
||||||
case h.NotifyCh <- struct{}{}:
|
|
||||||
// Good
|
|
||||||
|
|
||||||
default:
|
|
||||||
// If the send would've blocked, we just ignore it. The reason this
|
|
||||||
// is safe is because NotifyCh should always be a buffered channel.
|
|
||||||
// If this blocks, it means that there is a pending message anyways
|
|
||||||
// so the receiver will restart regardless.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
package cache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/heap"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestExpiryHeap_impl(t *testing.T) {
|
|
||||||
var _ heap.Interface = new(expiryHeap)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestExpiryHeap(t *testing.T) {
|
|
||||||
require := require.New(t)
|
|
||||||
now := time.Now()
|
|
||||||
ch := make(chan struct{}, 10) // buffered to prevent blocking in tests
|
|
||||||
h := &expiryHeap{NotifyCh: ch}
|
|
||||||
|
|
||||||
// Init, shouldn't trigger anything
|
|
||||||
heap.Init(h)
|
|
||||||
testNoMessage(t, ch)
|
|
||||||
|
|
||||||
// Push an initial value, expect one message
|
|
||||||
entry := &cacheEntryExpiry{Key: "foo", HeapIndex: -1, Expires: now.Add(100)}
|
|
||||||
heap.Push(h, entry)
|
|
||||||
require.Equal(0, entry.HeapIndex)
|
|
||||||
testMessage(t, ch)
|
|
||||||
testNoMessage(t, ch) // exactly one asserted above
|
|
||||||
|
|
||||||
// Push another that goes earlier than entry
|
|
||||||
entry2 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(50)}
|
|
||||||
heap.Push(h, entry2)
|
|
||||||
require.Equal(0, entry2.HeapIndex)
|
|
||||||
require.Equal(1, entry.HeapIndex)
|
|
||||||
testMessage(t, ch)
|
|
||||||
testNoMessage(t, ch) // exactly one asserted above
|
|
||||||
|
|
||||||
// Push another that goes at the end
|
|
||||||
entry3 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(1000)}
|
|
||||||
heap.Push(h, entry3)
|
|
||||||
require.Equal(2, entry3.HeapIndex)
|
|
||||||
testNoMessage(t, ch) // no notify cause index 0 stayed the same
|
|
||||||
|
|
||||||
// Remove the first entry (not Pop, since we don't use Pop, but that works too)
|
|
||||||
remove := h.Entries[0]
|
|
||||||
heap.Remove(h, remove.HeapIndex)
|
|
||||||
require.Equal(0, entry.HeapIndex)
|
|
||||||
require.Equal(1, entry3.HeapIndex)
|
|
||||||
testMessage(t, ch)
|
|
||||||
testMessage(t, ch) // we have two because two swaps happen
|
|
||||||
testNoMessage(t, ch)
|
|
||||||
|
|
||||||
// Let's change entry 3 to be early, and fix it
|
|
||||||
entry3.Expires = now.Add(10)
|
|
||||||
h.Fix(entry3)
|
|
||||||
require.Equal(1, entry.HeapIndex)
|
|
||||||
require.Equal(0, entry3.HeapIndex)
|
|
||||||
testMessage(t, ch)
|
|
||||||
testNoMessage(t, ch)
|
|
||||||
|
|
||||||
// Let's change entry 3 again, this is an edge case where if the 0th
|
|
||||||
// element changed, we didn't trigger the channel. Our Fix func should.
|
|
||||||
entry.Expires = now.Add(20)
|
|
||||||
h.Fix(entry3)
|
|
||||||
require.Equal(1, entry.HeapIndex) // no move
|
|
||||||
require.Equal(0, entry3.HeapIndex)
|
|
||||||
testMessage(t, ch)
|
|
||||||
testNoMessage(t, ch) // one message
|
|
||||||
}
|
|
||||||
|
|
||||||
func testNoMessage(t *testing.T, ch <-chan struct{}) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
t.Fatal("should not have a message")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testMessage(t *testing.T, ch <-chan struct{}) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
default:
|
|
||||||
t.Fatal("should have a message")
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,195 @@
|
||||||
|
/*
|
||||||
|
Package ttlcache provides an ExpiryHeap that can be used by a cache to track the
|
||||||
|
expiration time of its entries. When an expiry is reached the Timer will fire
|
||||||
|
and the entry can be removed.
|
||||||
|
*/
|
||||||
|
package ttlcache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Entry in the ExpiryHeap, tracks the index and expiry time of an item in a
|
||||||
|
// ttl cache.
|
||||||
|
type Entry struct {
|
||||||
|
key string
|
||||||
|
expiry time.Time
|
||||||
|
heapIndex int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotIndexed indicates that the entry does not exist in the heap. Either because
|
||||||
|
// it is nil, or because it was removed.
|
||||||
|
const NotIndexed = -1
|
||||||
|
|
||||||
|
// Index returns the index of this entry within the heap.
|
||||||
|
func (e *Entry) Index() int {
|
||||||
|
if e == nil {
|
||||||
|
return NotIndexed
|
||||||
|
}
|
||||||
|
return e.heapIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key returns the key for the entry in the heap.
|
||||||
|
func (e *Entry) Key() string {
|
||||||
|
return e.key
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExpiryHeap is a heap that is ordered by the expiry time of entries. It may
|
||||||
|
// be used by a cache or storage to expiry items after a TTL.
|
||||||
|
//
|
||||||
|
// ExpiryHeap expects the caller to synchronize calls to most of its methods. This
|
||||||
|
// is necessary because the cache needs to ensure that updates to both its
|
||||||
|
// storage and the ExpiryHeap are synchronized.
|
||||||
|
type ExpiryHeap struct {
|
||||||
|
entries []*Entry
|
||||||
|
|
||||||
|
// NotifyCh is sent a value whenever the 0 index value of the heap
|
||||||
|
// changes. This can be used to detect when the earliest value
|
||||||
|
// changes.
|
||||||
|
NotifyCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewExpiryHeap creates and returns a new ExpiryHeap.
|
||||||
|
func NewExpiryHeap() *ExpiryHeap {
|
||||||
|
h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)}
|
||||||
|
heap.Init((*entryHeap)(h))
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add an entry to the heap.
|
||||||
|
//
|
||||||
|
// Must be synchronized by the caller.
|
||||||
|
func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry {
|
||||||
|
entry := &Entry{
|
||||||
|
key: key,
|
||||||
|
expiry: time.Now().Add(expiry),
|
||||||
|
// Set the initial heap index to the last index. If the entry is swapped it
|
||||||
|
// will have the correct index set, and if it remains at the end the last
|
||||||
|
// index will be correct.
|
||||||
|
heapIndex: len(h.entries),
|
||||||
|
}
|
||||||
|
heap.Push((*entryHeap)(h), entry)
|
||||||
|
if entry.heapIndex == 0 {
|
||||||
|
h.notify()
|
||||||
|
}
|
||||||
|
return entry
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the entry that is currently at idx with the new expiry time. The heap
|
||||||
|
// will be rebalanced after the entry is updated.
|
||||||
|
//
|
||||||
|
// Must be synchronized by the caller.
|
||||||
|
func (h *ExpiryHeap) Update(idx int, expiry time.Duration) {
|
||||||
|
if idx == NotIndexed {
|
||||||
|
// the previous entry did not have a valid index, its not in the heap
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry := h.entries[idx]
|
||||||
|
entry.expiry = time.Now().Add(expiry)
|
||||||
|
heap.Fix((*entryHeap)(h), idx)
|
||||||
|
|
||||||
|
// If the previous index and current index are both zero then Fix did not
|
||||||
|
// swap the entry, and notify must be called here.
|
||||||
|
if idx == 0 || entry.heapIndex == 0 {
|
||||||
|
h.notify()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the entry at idx from the heap.
|
||||||
|
//
|
||||||
|
// Must be synchronized by the caller.
|
||||||
|
func (h *ExpiryHeap) Remove(idx int) {
|
||||||
|
entry := h.entries[idx]
|
||||||
|
heap.Remove((*entryHeap)(h), idx)
|
||||||
|
|
||||||
|
// A goroutine which is fetching a new value will have a reference to this
|
||||||
|
// entry. When it re-acquires the lock it needs to be informed that
|
||||||
|
// the entry was expired while it was fetching. Setting heapIndex to -1
|
||||||
|
// indicates that the entry is no longer in the heap, and must be re-added.
|
||||||
|
entry.heapIndex = NotIndexed
|
||||||
|
|
||||||
|
if idx == 0 {
|
||||||
|
h.notify()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type entryHeap ExpiryHeap
|
||||||
|
|
||||||
|
func (h *entryHeap) Len() int { return len(h.entries) }
|
||||||
|
|
||||||
|
func (h *entryHeap) Swap(i, j int) {
|
||||||
|
h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
|
||||||
|
h.entries[i].heapIndex = i
|
||||||
|
h.entries[j].heapIndex = j
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *entryHeap) Less(i, j int) bool {
|
||||||
|
// The usage of Before here is important (despite being obvious):
|
||||||
|
// this function uses the monotonic time that should be available
|
||||||
|
// on the time.Time value so the heap is immune to wall clock changes.
|
||||||
|
return h.entries[i].expiry.Before(h.entries[j].expiry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// heap.Interface, this isn't expected to be called directly.
|
||||||
|
func (h *entryHeap) Push(x interface{}) {
|
||||||
|
h.entries = append(h.entries, x.(*Entry))
|
||||||
|
}
|
||||||
|
|
||||||
|
// heap.Interface, this isn't expected to be called directly.
|
||||||
|
func (h *entryHeap) Pop() interface{} {
|
||||||
|
n := len(h.entries)
|
||||||
|
entries := h.entries
|
||||||
|
last := entries[n-1]
|
||||||
|
h.entries = entries[0 : n-1]
|
||||||
|
return last
|
||||||
|
}
|
||||||
|
|
||||||
|
// notify the timer that the head value has changed, so the expiry time has
|
||||||
|
// also likely changed.
|
||||||
|
func (h *ExpiryHeap) notify() {
|
||||||
|
// Send to channel without blocking. Skips sending if there is already
|
||||||
|
// an item in the buffered channel.
|
||||||
|
select {
|
||||||
|
case h.NotifyCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next returns a Timer that waits until the first entry in the heap expires.
|
||||||
|
//
|
||||||
|
// Must be synchronized by the caller.
|
||||||
|
func (h *ExpiryHeap) Next() Timer {
|
||||||
|
if len(h.entries) == 0 {
|
||||||
|
return Timer{}
|
||||||
|
}
|
||||||
|
entry := h.entries[0]
|
||||||
|
return Timer{
|
||||||
|
timer: time.NewTimer(time.Until(entry.expiry)),
|
||||||
|
Entry: entry,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timer provides a channel to block on. When the Wait channel receives an
|
||||||
|
// item the Timer.Entry has expired. The caller is expected to call
|
||||||
|
// ExpiryHeap.Remove with the Entry.Index().
|
||||||
|
//
|
||||||
|
// The caller is responsible for calling Stop to stop the timer if the timer has
|
||||||
|
// not fired.
|
||||||
|
type Timer struct {
|
||||||
|
timer *time.Timer
|
||||||
|
Entry *Entry
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Timer) Wait() <-chan time.Time {
|
||||||
|
if t.timer == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return t.timer.C
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Timer) Stop() {
|
||||||
|
if t.timer != nil {
|
||||||
|
t.timer.Stop()
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,91 @@
|
||||||
|
package ttlcache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ heap.Interface = (*entryHeap)(nil)
|
||||||
|
|
||||||
|
func TestExpiryHeap(t *testing.T) {
|
||||||
|
h := NewExpiryHeap()
|
||||||
|
ch := h.NotifyCh
|
||||||
|
var entry, entry2, entry3 *Entry
|
||||||
|
|
||||||
|
// Init, shouldn't trigger anything
|
||||||
|
testNoMessage(t, ch)
|
||||||
|
|
||||||
|
runStep(t, "add an entry", func(t *testing.T) {
|
||||||
|
entry = h.Add("foo", 100*time.Millisecond)
|
||||||
|
assert.Equal(t, 0, entry.heapIndex)
|
||||||
|
testMessage(t, ch)
|
||||||
|
testNoMessage(t, ch) // exactly one asserted above
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "add a second entry in front", func(t *testing.T) {
|
||||||
|
entry2 = h.Add("bar", 50*time.Millisecond)
|
||||||
|
assert.Equal(t, 0, entry2.heapIndex)
|
||||||
|
assert.Equal(t, 1, entry.heapIndex)
|
||||||
|
testMessage(t, ch)
|
||||||
|
testNoMessage(t, ch) // exactly one asserted above
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "add a third entry at the end", func(t *testing.T) {
|
||||||
|
entry3 = h.Add("baz", 1000*time.Millisecond)
|
||||||
|
assert.Equal(t, 2, entry3.heapIndex)
|
||||||
|
testNoMessage(t, ch) // no notify cause index 0 stayed the same
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "remove the first entry", func(t *testing.T) {
|
||||||
|
h.Remove(0)
|
||||||
|
assert.Equal(t, 0, entry.heapIndex)
|
||||||
|
assert.Equal(t, 1, entry3.heapIndex)
|
||||||
|
testMessage(t, ch)
|
||||||
|
testNoMessage(t, ch)
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "update entry3 to expire first", func(t *testing.T) {
|
||||||
|
h.Update(entry3.heapIndex, 10*time.Millisecond)
|
||||||
|
assert.Equal(t, 1, entry.heapIndex)
|
||||||
|
assert.Equal(t, 0, entry3.heapIndex)
|
||||||
|
testMessage(t, ch)
|
||||||
|
testNoMessage(t, ch)
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "0th element change triggers a notify", func(t *testing.T) {
|
||||||
|
h.Update(entry3.heapIndex, 20)
|
||||||
|
assert.Equal(t, 1, entry.heapIndex) // no move
|
||||||
|
assert.Equal(t, 0, entry3.heapIndex)
|
||||||
|
testMessage(t, ch)
|
||||||
|
testNoMessage(t, ch) // one message
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testNoMessage(t *testing.T, ch <-chan struct{}) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatal("should not have a message")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testMessage(t *testing.T, ch <-chan struct{}) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
default:
|
||||||
|
t.Fatal("should have a message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||||
|
if !t.Run(name, fn) {
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue