Lock across item expiration in the ttl store.

pull/6/head
Prashanth Balasubramanian 2016-02-23 18:44:59 -08:00
parent 2e3053a204
commit 45eb835833
3 changed files with 78 additions and 15 deletions

View File

@ -17,11 +17,11 @@ limitations under the License.
package cache package cache
import ( import (
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
) )
// ExpirationCache implements the store interface // ExpirationCache implements the store interface
@ -29,12 +29,20 @@ import (
// a. The key is computed based off the original item/keyFunc // a. The key is computed based off the original item/keyFunc
// b. The value inserted under that key is the timestamped item // b. The value inserted under that key is the timestamped item
// 2. Expiration happens lazily on read based on the expiration policy // 2. Expiration happens lazily on read based on the expiration policy
// a. No item can be inserted into the store while we're expiring
// *any* item in the cache.
// 3. Time-stamps are stripped off unexpired entries before return // 3. Time-stamps are stripped off unexpired entries before return
// Note that the ExpirationCache is inherently slower than a normal
// threadSafeStore because it takes a write lock everytime it checks if
// an item has expired.
type ExpirationCache struct { type ExpirationCache struct {
cacheStorage ThreadSafeStore cacheStorage ThreadSafeStore
keyFunc KeyFunc keyFunc KeyFunc
clock util.Clock clock util.Clock
expirationPolicy ExpirationPolicy expirationPolicy ExpirationPolicy
// expirationLock is a write lock used to guarantee that we don't clobber
// newly inserted objects because of a stale expiration timestamp comparison
expirationLock sync.Mutex
} }
// ExpirationPolicy dictates when an object expires. Currently only abstracted out // ExpirationPolicy dictates when an object expires. Currently only abstracted out
@ -68,7 +76,6 @@ type timestampedEntry struct {
// getTimestampedEntry returnes the timestampedEntry stored under the given key. // getTimestampedEntry returnes the timestampedEntry stored under the given key.
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) { func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
item, _ := c.cacheStorage.Get(key) item, _ := c.cacheStorage.Get(key)
// TODO: Check the cast instead
if tsEntry, ok := item.(*timestampedEntry); ok { if tsEntry, ok := item.(*timestampedEntry); ok {
return tsEntry, true return tsEntry, true
} }
@ -76,24 +83,20 @@ func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bo
} }
// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't // getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
// already expired. It kicks-off a go routine to delete expired objects from // already expired. It holds a write lock across deletion.
// the store and sets exists=false.
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) { func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
// Prevent all inserts from the time we deem an item as "expired" to when we
// delete it, so an un-expired item doesn't sneak in under the same key, just
// before the Delete.
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
timestampedItem, exists := c.getTimestampedEntry(key) timestampedItem, exists := c.getTimestampedEntry(key)
if !exists { if !exists {
return nil, false return nil, false
} }
if c.expirationPolicy.IsExpired(timestampedItem) { if c.expirationPolicy.IsExpired(timestampedItem) {
glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj) glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
// Since expiration happens lazily on read, don't hold up c.cacheStorage.Delete(key)
// the reader trying to acquire a write lock for the delete.
// The next reader will retry the delete even if this one
// fails; as long as we only return un-expired entries a
// reader doesn't need to wait for the result of the delete.
go func() {
defer runtime.HandleCrash()
c.cacheStorage.Delete(key)
}()
return nil, false return nil, false
} }
return timestampedItem.obj, true return timestampedItem.obj, true
@ -141,6 +144,8 @@ func (c *ExpirationCache) ListKeys() []string {
// Add timestamps an item and inserts it into the cache, overwriting entries // Add timestamps an item and inserts it into the cache, overwriting entries
// that might exist under the same key. // that might exist under the same key.
func (c *ExpirationCache) Add(obj interface{}) error { func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj) key, err := c.keyFunc(obj)
if err != nil { if err != nil {
return KeyError{obj, err} return KeyError{obj, err}
@ -157,6 +162,8 @@ func (c *ExpirationCache) Update(obj interface{}) error {
// Delete removes an item from the cache. // Delete removes an item from the cache.
func (c *ExpirationCache) Delete(obj interface{}) error { func (c *ExpirationCache) Delete(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj) key, err := c.keyFunc(obj)
if err != nil { if err != nil {
return KeyError{obj, err} return KeyError{obj, err}
@ -169,6 +176,8 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
// before attempting the replace operation. The replace operation will // before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`. // delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error { func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
items := map[string]interface{}{} items := map[string]interface{}{}
ts := c.clock.Now() ts := c.clock.Now()
for _, item := range list { for _, item := range list {

View File

@ -28,6 +28,7 @@ type fakeThreadSafeMap struct {
func (c *fakeThreadSafeMap) Delete(key string) { func (c *fakeThreadSafeMap) Delete(key string) {
if c.deletedKeys != nil { if c.deletedKeys != nil {
c.ThreadSafeStore.Delete(key)
c.deletedKeys <- key c.deletedKeys <- key
} }
} }

View File

@ -28,7 +28,7 @@ import (
func TestTTLExpirationBasic(t *testing.T) { func TestTTLExpirationBasic(t *testing.T) {
testObj := testStoreObject{id: "foo", val: "bar"} testObj := testStoreObject{id: "foo", val: "bar"}
deleteChan := make(chan string) deleteChan := make(chan string, 1)
ttlStore := NewFakeExpirationStore( ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan, testStoreKeyFunc, deleteChan,
&FakeExpirationPolicy{ &FakeExpirationPolicy{
@ -62,6 +62,59 @@ func TestTTLExpirationBasic(t *testing.T) {
close(deleteChan) close(deleteChan)
} }
func TestReAddExpiredItem(t *testing.T) {
deleteChan := make(chan string, 1)
exp := &FakeExpirationPolicy{
NeverExpire: sets.NewString(),
RetrieveKeyFunc: func(obj interface{}) (string, error) {
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
},
}
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan, exp, util.RealClock{})
testKey := "foo"
testObj := testStoreObject{id: testKey, val: "bar"}
err := ttlStore.Add(testObj)
if err != nil {
t.Errorf("Unable to add obj %#v", testObj)
}
// This get will expire the item.
item, exists, err := ttlStore.Get(testObj)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if exists || item != nil {
t.Errorf("Got unexpected item %#v", item)
}
key, _ := testStoreKeyFunc(testObj)
differentValue := "different_bar"
err = ttlStore.Add(
testStoreObject{id: testKey, val: differentValue})
if err != nil {
t.Errorf("Failed to add second value")
}
select {
case delKey := <-deleteChan:
if delKey != key {
t.Errorf("Unexpected delete for key %s", key)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Unexpected timeout waiting on delete")
}
exp.NeverExpire = sets.NewString(testKey)
item, exists, err = ttlStore.GetByKey(testKey)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if !exists || item == nil || item.(testStoreObject).val != differentValue {
t.Errorf("Got unexpected item %#v", item)
}
close(deleteChan)
}
func TestTTLList(t *testing.T) { func TestTTLList(t *testing.T) {
testObjs := []testStoreObject{ testObjs := []testStoreObject{
{id: "foo", val: "bar"}, {id: "foo", val: "bar"},
@ -69,7 +122,7 @@ func TestTTLList(t *testing.T) {
{id: "foo2", val: "bar2"}, {id: "foo2", val: "bar2"},
} }
expireKeys := sets.NewString(testObjs[0].id, testObjs[2].id) expireKeys := sets.NewString(testObjs[0].id, testObjs[2].id)
deleteChan := make(chan string) deleteChan := make(chan string, len(testObjs))
defer close(deleteChan) defer close(deleteChan)
ttlStore := NewFakeExpirationStore( ttlStore := NewFakeExpirationStore(