mirror of https://github.com/k3s-io/k3s
Updated client-go expiration cache to take in expiration policies
parent
2cc5100933
commit
e6174e0ab5
|
@ -48,7 +48,7 @@ type ExpirationCache struct {
|
||||||
// ExpirationPolicy dictates when an object expires. Currently only abstracted out
|
// ExpirationPolicy dictates when an object expires. Currently only abstracted out
|
||||||
// so unittests don't rely on the system clock.
|
// so unittests don't rely on the system clock.
|
||||||
type ExpirationPolicy interface {
|
type ExpirationPolicy interface {
|
||||||
IsExpired(obj *timestampedEntry) bool
|
IsExpired(obj *TimestampedEntry) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// TTLPolicy implements a ttl based ExpirationPolicy.
|
// TTLPolicy implements a ttl based ExpirationPolicy.
|
||||||
|
@ -63,26 +63,29 @@ type TTLPolicy struct {
|
||||||
|
|
||||||
// IsExpired returns true if the given object is older than the ttl, or it can't
|
// IsExpired returns true if the given object is older than the ttl, or it can't
|
||||||
// determine its age.
|
// determine its age.
|
||||||
func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
|
func (p *TTLPolicy) IsExpired(obj *TimestampedEntry) bool {
|
||||||
return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
|
return p.Ttl > 0 && p.Clock.Since(obj.Timestamp) > p.Ttl
|
||||||
}
|
}
|
||||||
|
|
||||||
// timestampedEntry is the only type allowed in a ExpirationCache.
|
// TimestampedEntry is the only type allowed in a ExpirationCache.
|
||||||
type timestampedEntry struct {
|
// Keep in mind that it is not safe to share timestamps between computers.
|
||||||
obj interface{}
|
// Behavior may be inconsistent if you get a timestamp from the API Server and
|
||||||
timestamp time.Time
|
// use it on the client machine as part of your ExpirationCache.
|
||||||
|
type TimestampedEntry struct {
|
||||||
|
Obj interface{}
|
||||||
|
Timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTimestampedEntry returns the timestampedEntry stored under the given key.
|
// getTimestampedEntry returns the TimestampedEntry stored under the given key.
|
||||||
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
|
func (c *ExpirationCache) getTimestampedEntry(key string) (*TimestampedEntry, bool) {
|
||||||
item, _ := c.cacheStorage.Get(key)
|
item, _ := c.cacheStorage.Get(key)
|
||||||
if tsEntry, ok := item.(*timestampedEntry); ok {
|
if tsEntry, ok := item.(*TimestampedEntry); ok {
|
||||||
return tsEntry, true
|
return tsEntry, true
|
||||||
}
|
}
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
|
// getOrExpire retrieves the object from the TimestampedEntry if and only if it hasn't
|
||||||
// already expired. It holds a write lock across deletion.
|
// already expired. It holds a write lock across deletion.
|
||||||
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
|
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
|
||||||
// Prevent all inserts from the time we deem an item as "expired" to when we
|
// Prevent all inserts from the time we deem an item as "expired" to when we
|
||||||
|
@ -95,11 +98,11 @@ func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
if c.expirationPolicy.IsExpired(timestampedItem) {
|
if c.expirationPolicy.IsExpired(timestampedItem) {
|
||||||
klog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
|
klog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.Obj)
|
||||||
c.cacheStorage.Delete(key)
|
c.cacheStorage.Delete(key)
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
return timestampedItem.obj, true
|
return timestampedItem.Obj, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetByKey returns the item stored under the key, or sets exists=false.
|
// GetByKey returns the item stored under the key, or sets exists=false.
|
||||||
|
@ -126,7 +129,7 @@ func (c *ExpirationCache) List() []interface{} {
|
||||||
|
|
||||||
list := make([]interface{}, 0, len(items))
|
list := make([]interface{}, 0, len(items))
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
obj := item.(*timestampedEntry).obj
|
obj := item.(*TimestampedEntry).Obj
|
||||||
if key, err := c.keyFunc(obj); err != nil {
|
if key, err := c.keyFunc(obj); err != nil {
|
||||||
list = append(list, obj)
|
list = append(list, obj)
|
||||||
} else if obj, exists := c.getOrExpire(key); exists {
|
} else if obj, exists := c.getOrExpire(key); exists {
|
||||||
|
@ -151,7 +154,7 @@ func (c *ExpirationCache) Add(obj interface{}) error {
|
||||||
c.expirationLock.Lock()
|
c.expirationLock.Lock()
|
||||||
defer c.expirationLock.Unlock()
|
defer c.expirationLock.Unlock()
|
||||||
|
|
||||||
c.cacheStorage.Add(key, ×tampedEntry{obj, c.clock.Now()})
|
c.cacheStorage.Add(key, &TimestampedEntry{obj, c.clock.Now()})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,7 +187,7 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return KeyError{item, err}
|
return KeyError{item, err}
|
||||||
}
|
}
|
||||||
items[key] = ×tampedEntry{item, ts}
|
items[key] = &TimestampedEntry{item, ts}
|
||||||
}
|
}
|
||||||
c.expirationLock.Lock()
|
c.expirationLock.Lock()
|
||||||
defer c.expirationLock.Unlock()
|
defer c.expirationLock.Unlock()
|
||||||
|
@ -199,10 +202,15 @@ func (c *ExpirationCache) Resync() error {
|
||||||
|
|
||||||
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
|
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
|
||||||
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
|
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
|
||||||
|
return NewExpirationStore(keyFunc, &TTLPolicy{ttl, clock.RealClock{}})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewExpirationStore creates and returns a ExpirationCache for a given policy
|
||||||
|
func NewExpirationStore(keyFunc KeyFunc, expirationPolicy ExpirationPolicy) Store {
|
||||||
return &ExpirationCache{
|
return &ExpirationCache{
|
||||||
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
|
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
|
||||||
keyFunc: keyFunc,
|
keyFunc: keyFunc,
|
||||||
clock: clock.RealClock{},
|
clock: clock.RealClock{},
|
||||||
expirationPolicy: &TTLPolicy{ttl, clock.RealClock{}},
|
expirationPolicy: expirationPolicy,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ type FakeExpirationPolicy struct {
|
||||||
RetrieveKeyFunc KeyFunc
|
RetrieveKeyFunc KeyFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool {
|
func (p *FakeExpirationPolicy) IsExpired(obj *TimestampedEntry) bool {
|
||||||
key, _ := p.RetrieveKeyFunc(obj)
|
key, _ := p.RetrieveKeyFunc(obj)
|
||||||
return !p.NeverExpire.Has(key)
|
return !p.NeverExpire.Has(key)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ func TestTTLExpirationBasic(t *testing.T) {
|
||||||
&FakeExpirationPolicy{
|
&FakeExpirationPolicy{
|
||||||
NeverExpire: sets.NewString(),
|
NeverExpire: sets.NewString(),
|
||||||
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
||||||
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
|
return obj.(*TimestampedEntry).Obj.(testStoreObject).id, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
clock.RealClock{},
|
clock.RealClock{},
|
||||||
|
@ -67,7 +67,7 @@ func TestReAddExpiredItem(t *testing.T) {
|
||||||
exp := &FakeExpirationPolicy{
|
exp := &FakeExpirationPolicy{
|
||||||
NeverExpire: sets.NewString(),
|
NeverExpire: sets.NewString(),
|
||||||
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
||||||
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
|
return obj.(*TimestampedEntry).Obj.(testStoreObject).id, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
ttlStore := NewFakeExpirationStore(
|
ttlStore := NewFakeExpirationStore(
|
||||||
|
@ -130,7 +130,7 @@ func TestTTLList(t *testing.T) {
|
||||||
&FakeExpirationPolicy{
|
&FakeExpirationPolicy{
|
||||||
NeverExpire: sets.NewString(testObjs[1].id),
|
NeverExpire: sets.NewString(testObjs[1].id),
|
||||||
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
||||||
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
|
return obj.(*TimestampedEntry).Obj.(testStoreObject).id, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
clock.RealClock{},
|
clock.RealClock{},
|
||||||
|
@ -168,15 +168,15 @@ func TestTTLPolicy(t *testing.T) {
|
||||||
expiredTime := fakeTime.Add(-(ttl + 1))
|
expiredTime := fakeTime.Add(-(ttl + 1))
|
||||||
|
|
||||||
policy := TTLPolicy{ttl, clock.NewFakeClock(fakeTime)}
|
policy := TTLPolicy{ttl, clock.NewFakeClock(fakeTime)}
|
||||||
fakeTimestampedEntry := ×tampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL}
|
fakeTimestampedEntry := &TimestampedEntry{Obj: struct{}{}, Timestamp: exactlyOnTTL}
|
||||||
if policy.IsExpired(fakeTimestampedEntry) {
|
if policy.IsExpired(fakeTimestampedEntry) {
|
||||||
t.Errorf("TTL cache should not expire entries exactly on ttl")
|
t.Errorf("TTL cache should not expire entries exactly on ttl")
|
||||||
}
|
}
|
||||||
fakeTimestampedEntry.timestamp = fakeTime
|
fakeTimestampedEntry.Timestamp = fakeTime
|
||||||
if policy.IsExpired(fakeTimestampedEntry) {
|
if policy.IsExpired(fakeTimestampedEntry) {
|
||||||
t.Errorf("TTL Cache should not expire entries before ttl")
|
t.Errorf("TTL Cache should not expire entries before ttl")
|
||||||
}
|
}
|
||||||
fakeTimestampedEntry.timestamp = expiredTime
|
fakeTimestampedEntry.Timestamp = expiredTime
|
||||||
if !policy.IsExpired(fakeTimestampedEntry) {
|
if !policy.IsExpired(fakeTimestampedEntry) {
|
||||||
t.Errorf("TTL Cache should expire entries older than ttl")
|
t.Errorf("TTL Cache should expire entries older than ttl")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue