Merge pull request #21856 from bprashanth/ttl_race

Lock across item expiration in the ttl store.
pull/6/head
Brian Grant 2016-02-24 15:06:25 -08:00
commit bea349a80d
8 changed files with 88 additions and 39 deletions

View File

@ -17,11 +17,11 @@ limitations under the License.
package cache
import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
)
// ExpirationCache implements the store interface
@ -29,12 +29,20 @@ import (
// a. The key is computed based off the original item/keyFunc
// b. The value inserted under that key is the timestamped item
// 2. Expiration happens lazily on read based on the expiration policy
// a. No item can be inserted into the store while we're expiring
// *any* item in the cache.
// 3. Time-stamps are stripped off unexpired entries before return
// Note that the ExpirationCache is inherently slower than a normal
// threadSafeStore because it takes a write lock everytime it checks if
// an item has expired.
type ExpirationCache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
clock util.Clock
expirationPolicy ExpirationPolicy
// expirationLock is a write lock used to guarantee that we don't clobber
// newly inserted objects because of a stale expiration timestamp comparison
expirationLock sync.Mutex
}
// ExpirationPolicy dictates when an object expires. Currently only abstracted out
@ -68,7 +76,6 @@ type timestampedEntry struct {
// getTimestampedEntry returnes the timestampedEntry stored under the given key.
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
item, _ := c.cacheStorage.Get(key)
// TODO: Check the cast instead
if tsEntry, ok := item.(*timestampedEntry); ok {
return tsEntry, true
}
@ -76,24 +83,20 @@ func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bo
}
// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
// already expired. It kicks-off a go routine to delete expired objects from
// the store and sets exists=false.
// already expired. It holds a write lock across deletion.
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
// Prevent all inserts from the time we deem an item as "expired" to when we
// delete it, so an un-expired item doesn't sneak in under the same key, just
// before the Delete.
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
timestampedItem, exists := c.getTimestampedEntry(key)
if !exists {
return nil, false
}
if c.expirationPolicy.IsExpired(timestampedItem) {
glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
// Since expiration happens lazily on read, don't hold up
// the reader trying to acquire a write lock for the delete.
// The next reader will retry the delete even if this one
// fails; as long as we only return un-expired entries a
// reader doesn't need to wait for the result of the delete.
go func() {
defer runtime.HandleCrash()
c.cacheStorage.Delete(key)
}()
c.cacheStorage.Delete(key)
return nil, false
}
return timestampedItem.obj, true
@ -141,6 +144,8 @@ func (c *ExpirationCache) ListKeys() []string {
// Add timestamps an item and inserts it into the cache, overwriting entries
// that might exist under the same key.
func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
@ -157,6 +162,8 @@ func (c *ExpirationCache) Update(obj interface{}) error {
// Delete removes an item from the cache.
func (c *ExpirationCache) Delete(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
@ -169,6 +176,8 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
// before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
items := map[string]interface{}{}
ts := c.clock.Now()
for _, item := range list {

View File

@ -28,6 +28,7 @@ type fakeThreadSafeMap struct {
func (c *fakeThreadSafeMap) Delete(key string) {
if c.deletedKeys != nil {
c.ThreadSafeStore.Delete(key)
c.deletedKeys <- key
}
}

View File

@ -28,7 +28,7 @@ import (
func TestTTLExpirationBasic(t *testing.T) {
testObj := testStoreObject{id: "foo", val: "bar"}
deleteChan := make(chan string)
deleteChan := make(chan string, 1)
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan,
&FakeExpirationPolicy{
@ -62,6 +62,59 @@ func TestTTLExpirationBasic(t *testing.T) {
close(deleteChan)
}
func TestReAddExpiredItem(t *testing.T) {
deleteChan := make(chan string, 1)
exp := &FakeExpirationPolicy{
NeverExpire: sets.NewString(),
RetrieveKeyFunc: func(obj interface{}) (string, error) {
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
},
}
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan, exp, util.RealClock{})
testKey := "foo"
testObj := testStoreObject{id: testKey, val: "bar"}
err := ttlStore.Add(testObj)
if err != nil {
t.Errorf("Unable to add obj %#v", testObj)
}
// This get will expire the item.
item, exists, err := ttlStore.Get(testObj)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if exists || item != nil {
t.Errorf("Got unexpected item %#v", item)
}
key, _ := testStoreKeyFunc(testObj)
differentValue := "different_bar"
err = ttlStore.Add(
testStoreObject{id: testKey, val: differentValue})
if err != nil {
t.Errorf("Failed to add second value")
}
select {
case delKey := <-deleteChan:
if delKey != key {
t.Errorf("Unexpected delete for key %s", key)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Unexpected timeout waiting on delete")
}
exp.NeverExpire = sets.NewString(testKey)
item, exists, err = ttlStore.GetByKey(testKey)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if !exists || item == nil || item.(testStoreObject).val != differentValue {
t.Errorf("Got unexpected item %#v", item)
}
close(deleteChan)
}
func TestTTLList(t *testing.T) {
testObjs := []testStoreObject{
{id: "foo", val: "bar"},
@ -69,7 +122,7 @@ func TestTTLList(t *testing.T) {
{id: "foo2", val: "bar2"},
}
expireKeys := sets.NewString(testObjs[0].id, testObjs[2].id)
deleteChan := make(chan string)
deleteChan := make(chan string, len(testObjs))
defer close(deleteChan)
ttlStore := NewFakeExpirationStore(

View File

@ -35,21 +35,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
)
const (
CreatedByAnnotation = "kubernetes.io/created-by"
// If a watch drops a delete event for a pod, it'll take this long
// before a dormant controller waiting for those packets is woken up anyway. It is
// specifically targeted at the case where some problem prevents an update
// of expectations, without it the controller could stay asleep forever. This should
// be set based on the expected latency of watch events.
//
// Currently a controller can service (create *and* observe the watch events for said
// creation) about 10-20 pods a second, so it takes about 1 min to service
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 3 * time.Minute
)
const CreatedByAnnotation = "kubernetes.io/created-by"
var (
KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
@ -221,7 +207,7 @@ func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
// NewControllerExpectations returns a store for ControlleeExpectations.
func NewControllerExpectations() *ControllerExpectations {
return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)}
return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
}
// PodControlInterface is an interface that knows how to add or delete pods

View File

@ -334,12 +334,12 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}

View File

@ -224,12 +224,12 @@ func (jm *JobController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a job recreates a pod", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before job recreates a pod", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}

View File

@ -351,12 +351,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a replica set recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before replica set recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}

View File

@ -348,12 +348,12 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}