mirror of https://github.com/hashicorp/consul
cache: extract cache eviction heap
Start creating an interface that doesn't require using heap and hides more of the entry internals.pull/8998/head
parent
6513faf14c
commit
2cdc90e01b
|
@ -406,8 +406,7 @@ RETRY_GET:
|
|||
|
||||
// Touch the expiration and fix the heap.
|
||||
c.entriesLock.Lock()
|
||||
entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL)
|
||||
c.entriesExpiryHeap.Fix(entry.Expiry)
|
||||
c.entriesExpiryHeap.Update(entry.Expiry.HeapIndex, r.TypeEntry.Opts.LastGetTTL)
|
||||
c.entriesLock.Unlock()
|
||||
|
||||
// We purposely do not return an error here since the cache only works with
|
||||
|
@ -689,9 +688,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
|||
// initial expiry information and insert. If we're already in
|
||||
// the heap we do nothing since we're reusing the same entry.
|
||||
if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 {
|
||||
newEntry.Expiry = &cacheEntryExpiry{Key: key}
|
||||
newEntry.Expiry.Update(tEntry.Opts.LastGetTTL)
|
||||
heap.Push(c.entriesExpiryHeap, newEntry.Expiry)
|
||||
newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL)
|
||||
}
|
||||
|
||||
c.entries[key] = newEntry
|
||||
|
|
|
@ -1000,6 +1000,9 @@ func (t *testPartitionType) RegisterOptions() RegisterOptions {
|
|||
// Test that background refreshing reports correct Age in failure and happy
|
||||
// states.
|
||||
func TestCacheGet_refreshAge(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for -short run")
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
@ -46,118 +45,3 @@ type cacheEntry struct {
|
|||
// FetchRateLimiter limits the rate at which fetch is called for this entry.
|
||||
FetchRateLimiter *rate.Limiter
|
||||
}
|
||||
|
||||
// cacheEntryExpiry contains the expiration information for a cache
|
||||
// entry. Any modifications to this struct should be done only while
|
||||
// the Cache entriesLock is held.
|
||||
type cacheEntryExpiry struct {
|
||||
Key string // Key in the cache map
|
||||
Expires time.Time // Time when entry expires (monotonic clock)
|
||||
HeapIndex int // Index in the heap
|
||||
}
|
||||
|
||||
// Update the expiry to d time from now.
|
||||
func (e *cacheEntryExpiry) Update(d time.Duration) {
|
||||
e.Expires = time.Now().Add(d)
|
||||
}
|
||||
|
||||
// expiryHeap is a heap implementation that stores information about
|
||||
// when entries expire. Implements container/heap.Interface.
|
||||
//
|
||||
// All operations on the heap and read/write of the heap contents require
|
||||
// the proper entriesLock to be held on Cache.
|
||||
type expiryHeap struct {
|
||||
Entries []*cacheEntryExpiry
|
||||
|
||||
// NotifyCh is sent a value whenever the 0 index value of the heap
|
||||
// changes. This can be used to detect when the earliest value
|
||||
// changes.
|
||||
//
|
||||
// There is a single edge case where the heap will not automatically
|
||||
// send a notification: if heap.Fix is called manually and the index
|
||||
// changed is 0 and the change doesn't result in any moves (stays at index
|
||||
// 0), then we won't detect the change. To work around this, please
|
||||
// always call the expiryHeap.Fix method instead.
|
||||
NotifyCh chan struct{}
|
||||
}
|
||||
|
||||
// Identical to heap.Fix for this heap instance but will properly handle
|
||||
// the edge case where idx == 0 and no heap modification is necessary,
|
||||
// and still notify the NotifyCh.
|
||||
//
|
||||
// This is important for cache expiry since the expiry time may have been
|
||||
// extended and if we don't send a message to the NotifyCh then we'll never
|
||||
// reset the timer and the entry will be evicted early.
|
||||
func (h *expiryHeap) Fix(entry *cacheEntryExpiry) {
|
||||
idx := entry.HeapIndex
|
||||
heap.Fix(h, idx)
|
||||
|
||||
// This is the edge case we handle: if the prev (idx) and current (HeapIndex)
|
||||
// is zero, it means the head-of-line didn't change while the value
|
||||
// changed. Notify to reset our expiry worker.
|
||||
if idx == 0 && entry.HeapIndex == 0 {
|
||||
h.notify()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *expiryHeap) Len() int { return len(h.Entries) }
|
||||
|
||||
func (h *expiryHeap) Swap(i, j int) {
|
||||
h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i]
|
||||
h.Entries[i].HeapIndex = i
|
||||
h.Entries[j].HeapIndex = j
|
||||
|
||||
// If we're moving the 0 index, update the channel since we need
|
||||
// to re-update the timer we're waiting on for the soonest expiring
|
||||
// value.
|
||||
if i == 0 || j == 0 {
|
||||
h.notify()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *expiryHeap) Less(i, j int) bool {
|
||||
// The usage of Before here is important (despite being obvious):
|
||||
// this function uses the monotonic time that should be available
|
||||
// on the time.Time value so the heap is immune to wall clock changes.
|
||||
return h.Entries[i].Expires.Before(h.Entries[j].Expires)
|
||||
}
|
||||
|
||||
// heap.Interface, this isn't expected to be called directly.
|
||||
func (h *expiryHeap) Push(x interface{}) {
|
||||
entry := x.(*cacheEntryExpiry)
|
||||
|
||||
// Set initial heap index, if we're going to the end then Swap
|
||||
// won't be called so we need to initialize
|
||||
entry.HeapIndex = len(h.Entries)
|
||||
|
||||
// For the first entry, we need to trigger a channel send because
|
||||
// Swap won't be called; nothing to swap! We can call it right away
|
||||
// because all heap operations are within a lock.
|
||||
if len(h.Entries) == 0 {
|
||||
h.notify()
|
||||
}
|
||||
|
||||
h.Entries = append(h.Entries, entry)
|
||||
}
|
||||
|
||||
// heap.Interface, this isn't expected to be called directly.
|
||||
func (h *expiryHeap) Pop() interface{} {
|
||||
old := h.Entries
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
h.Entries = old[0 : n-1]
|
||||
return x
|
||||
}
|
||||
|
||||
func (h *expiryHeap) notify() {
|
||||
select {
|
||||
case h.NotifyCh <- struct{}{}:
|
||||
// Good
|
||||
|
||||
default:
|
||||
// If the send would've blocked, we just ignore it. The reason this
|
||||
// is safe is because NotifyCh should always be a buffered channel.
|
||||
// If this blocks, it means that there is a pending message anyways
|
||||
// so the receiver will restart regardless.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExpiryHeap_impl(t *testing.T) {
|
||||
var _ heap.Interface = new(expiryHeap)
|
||||
}
|
||||
|
||||
func TestExpiryHeap(t *testing.T) {
|
||||
require := require.New(t)
|
||||
now := time.Now()
|
||||
ch := make(chan struct{}, 10) // buffered to prevent blocking in tests
|
||||
h := &expiryHeap{NotifyCh: ch}
|
||||
|
||||
// Init, shouldn't trigger anything
|
||||
heap.Init(h)
|
||||
testNoMessage(t, ch)
|
||||
|
||||
// Push an initial value, expect one message
|
||||
entry := &cacheEntryExpiry{Key: "foo", HeapIndex: -1, Expires: now.Add(100)}
|
||||
heap.Push(h, entry)
|
||||
require.Equal(0, entry.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // exactly one asserted above
|
||||
|
||||
// Push another that goes earlier than entry
|
||||
entry2 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(50)}
|
||||
heap.Push(h, entry2)
|
||||
require.Equal(0, entry2.HeapIndex)
|
||||
require.Equal(1, entry.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // exactly one asserted above
|
||||
|
||||
// Push another that goes at the end
|
||||
entry3 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(1000)}
|
||||
heap.Push(h, entry3)
|
||||
require.Equal(2, entry3.HeapIndex)
|
||||
testNoMessage(t, ch) // no notify cause index 0 stayed the same
|
||||
|
||||
// Remove the first entry (not Pop, since we don't use Pop, but that works too)
|
||||
remove := h.Entries[0]
|
||||
heap.Remove(h, remove.HeapIndex)
|
||||
require.Equal(0, entry.HeapIndex)
|
||||
require.Equal(1, entry3.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testMessage(t, ch) // we have two because two swaps happen
|
||||
testNoMessage(t, ch)
|
||||
|
||||
// Let's change entry 3 to be early, and fix it
|
||||
entry3.Expires = now.Add(10)
|
||||
h.Fix(entry3)
|
||||
require.Equal(1, entry.HeapIndex)
|
||||
require.Equal(0, entry3.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch)
|
||||
|
||||
// Let's change entry 3 again, this is an edge case where if the 0th
|
||||
// element changed, we didn't trigger the channel. Our Fix func should.
|
||||
entry.Expires = now.Add(20)
|
||||
h.Fix(entry3)
|
||||
require.Equal(1, entry.HeapIndex) // no move
|
||||
require.Equal(0, entry3.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // one message
|
||||
}
|
||||
|
||||
func testNoMessage(t *testing.T, ch <-chan struct{}) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("should not have a message")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func testMessage(t *testing.T, ch <-chan struct{}) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatal("should have a message")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
)
|
||||
|
||||
// cacheEntryExpiry contains the expiration time for a cache entry.
|
||||
type cacheEntryExpiry struct {
|
||||
Key string // Key in the cache map
|
||||
Expires time.Time // Time when entry expires (monotonic clock)
|
||||
HeapIndex int // Index in the heap
|
||||
}
|
||||
|
||||
// TODO: use or remove
|
||||
func newCacheEntry(key string, expiry time.Duration) *cacheEntryExpiry {
|
||||
return &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)}
|
||||
}
|
||||
|
||||
// expiryHeap is a container/heap.Interface implementation that expires entries
|
||||
// in the cache when their expiration time is reached.
|
||||
//
|
||||
// All operations on the heap and read/write of the heap contents require
|
||||
// the proper entriesLock to be held on Cache.
|
||||
type expiryHeap struct {
|
||||
Entries []*cacheEntryExpiry
|
||||
|
||||
// NotifyCh is sent a value whenever the 0 index value of the heap
|
||||
// changes. This can be used to detect when the earliest value
|
||||
// changes.
|
||||
//
|
||||
// There is a single edge case where the heap will not automatically
|
||||
// send a notification: if heap.Fix is called manually and the index
|
||||
// changed is 0 and the change doesn't result in any moves (stays at index
|
||||
// 0), then we won't detect the change. To work around this, please
|
||||
// always call the expiryHeap.Fix method instead.
|
||||
NotifyCh chan struct{}
|
||||
}
|
||||
|
||||
func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry {
|
||||
entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)}
|
||||
heap.Push(h, entry)
|
||||
return entry
|
||||
}
|
||||
|
||||
// Update the entry that is currently at idx with the new expiry time. The heap
|
||||
// will be rebalanced after the entry is updated.
|
||||
//
|
||||
// Must be synchronized by the caller.
|
||||
func (h *expiryHeap) Update(idx int, expiry time.Duration) {
|
||||
entry := h.Entries[idx]
|
||||
entry.Expires = time.Now().Add(expiry)
|
||||
heap.Fix(h, idx)
|
||||
|
||||
// If the previous index and current index are both zero then Fix did not
|
||||
// swap the entry, and notify must be called here.
|
||||
if idx == 0 && entry.HeapIndex == 0 {
|
||||
h.notify()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *expiryHeap) Len() int { return len(h.Entries) }
|
||||
|
||||
func (h *expiryHeap) Swap(i, j int) {
|
||||
h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i]
|
||||
h.Entries[i].HeapIndex = i
|
||||
h.Entries[j].HeapIndex = j
|
||||
|
||||
// If we're moving the 0 index, update the channel since we need
|
||||
// to re-update the timer we're waiting on for the soonest expiring
|
||||
// value.
|
||||
if i == 0 || j == 0 {
|
||||
h.notify()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *expiryHeap) Less(i, j int) bool {
|
||||
// The usage of Before here is important (despite being obvious):
|
||||
// this function uses the monotonic time that should be available
|
||||
// on the time.Time value so the heap is immune to wall clock changes.
|
||||
return h.Entries[i].Expires.Before(h.Entries[j].Expires)
|
||||
}
|
||||
|
||||
// heap.Interface, this isn't expected to be called directly.
|
||||
func (h *expiryHeap) Push(x interface{}) {
|
||||
entry := x.(*cacheEntryExpiry)
|
||||
|
||||
// Set the initial heap index to the last index. If the entry is swapped it
|
||||
// will have the correct set, and if it remains at the end the last index will
|
||||
// be correct.
|
||||
entry.HeapIndex = len(h.Entries)
|
||||
|
||||
// For the first entry, we need to trigger a channel send because
|
||||
// Swap won't be called; nothing to swap! We can call it right away
|
||||
// because all heap operations are within a lock.
|
||||
if len(h.Entries) == 0 {
|
||||
h.notify()
|
||||
}
|
||||
|
||||
h.Entries = append(h.Entries, entry)
|
||||
}
|
||||
|
||||
// heap.Interface, this isn't expected to be called directly.
|
||||
func (h *expiryHeap) Pop() interface{} {
|
||||
n := len(h.Entries)
|
||||
entries := h.Entries
|
||||
last := entries[n-1]
|
||||
h.Entries = entries[0 : n-1]
|
||||
return last
|
||||
}
|
||||
|
||||
func (h *expiryHeap) notify() {
|
||||
select {
|
||||
case h.NotifyCh <- struct{}{}:
|
||||
// Good
|
||||
|
||||
default:
|
||||
// If the send would've blocked, we just ignore it. The reason this
|
||||
// is safe is because NotifyCh should always be a buffered channel.
|
||||
// If this blocks, it means that there is a pending message anyways
|
||||
// so the receiver will restart regardless.
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var _ heap.Interface = new(expiryHeap)
|
||||
|
||||
func TestExpiryHeap(t *testing.T) {
|
||||
require := require.New(t)
|
||||
ch := make(chan struct{}, 10) // buffered to prevent blocking in tests
|
||||
h := &expiryHeap{NotifyCh: ch}
|
||||
var entry, entry2, entry3 *cacheEntryExpiry
|
||||
|
||||
// Init, shouldn't trigger anything
|
||||
heap.Init(h)
|
||||
testNoMessage(t, ch)
|
||||
|
||||
runStep(t, "add an entry", func(t *testing.T) {
|
||||
entry = h.Add("foo", 100*time.Millisecond)
|
||||
require.Equal(0, entry.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // exactly one asserted above
|
||||
})
|
||||
|
||||
runStep(t, "add a second entry in front", func(t *testing.T) {
|
||||
entry2 = h.Add("bar", 50*time.Millisecond)
|
||||
require.Equal(0, entry2.HeapIndex)
|
||||
require.Equal(1, entry.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // exactly one asserted above
|
||||
})
|
||||
|
||||
runStep(t, "add a third entry at the end", func(t *testing.T) {
|
||||
entry3 = h.Add("baz", 1000*time.Millisecond)
|
||||
require.Equal(2, entry3.HeapIndex)
|
||||
testNoMessage(t, ch) // no notify cause index 0 stayed the same
|
||||
})
|
||||
|
||||
runStep(t, "remove the first entry", func(t *testing.T) {
|
||||
remove := h.Entries[0]
|
||||
heap.Remove(h, remove.HeapIndex)
|
||||
require.Equal(0, entry.HeapIndex)
|
||||
require.Equal(1, entry3.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testMessage(t, ch) // we have two because two swaps happen
|
||||
testNoMessage(t, ch)
|
||||
})
|
||||
|
||||
runStep(t, "update entry3 to expire first", func(t *testing.T) {
|
||||
h.Update(entry3.HeapIndex, 10*time.Millisecond)
|
||||
assert.Equal(t, 1, entry.HeapIndex)
|
||||
assert.Equal(t, 0, entry3.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch)
|
||||
})
|
||||
|
||||
runStep(t, "0th element change triggers a notify", func(t *testing.T) {
|
||||
h.Update(entry3.HeapIndex, 20)
|
||||
require.Equal(1, entry.HeapIndex) // no move
|
||||
require.Equal(0, entry3.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testNoMessage(t, ch) // one message
|
||||
})
|
||||
}
|
||||
|
||||
func testNoMessage(t *testing.T, ch <-chan struct{}) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("should not have a message")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func testMessage(t *testing.T, ch <-chan struct{}) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatal("should have a message")
|
||||
}
|
||||
}
|
||||
|
||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||
if !t.Run(name, fn) {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue