Merge pull request #9006 from GoogleCloudPlatform/revert-8927-revert-8822-fifo_rc

Revert "Revert "Wake up rcs when pods get DeletionFinalStateUnknown t…
pull/6/head
Rohit Jnagal 2015-05-29 11:11:38 -07:00
commit 38c1fe112f
6 changed files with 122 additions and 36 deletions

View File

@ -22,6 +22,8 @@ import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
// NewDeltaFIFO returns a Store which can be used process changes to items.
@ -76,6 +78,13 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys K
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the puspose of figuring out which
// items have been deleted when Replace() is called. If the given KeyLister
// also satisfies the KeyGetter interface, the deleted objet will be
// included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
// series of Updates as a single Update).
type DeltaFIFO struct {
@ -334,7 +343,21 @@ func (f *DeltaFIFO) Replace(list []interface{}) error {
continue
}
}
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k}); err != nil {
var deletedObj interface{}
if keyGetter, ok := f.knownObjectKeys.(KeyGetter); ok {
var exists bool
var err error
deletedObj, exists, err = keyGetter.GetByKey(k)
if err != nil || !exists {
deletedObj = nil
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else {
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
}
}
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
@ -346,12 +369,9 @@ type KeyLister interface {
ListKeys() []string
}
// KeyListerFunc adapts a raw function to be a KeyLister.
type KeyListerFunc func() []string
// ListKeys just calls kl.
func (kl KeyListerFunc) ListKeys() []string {
return kl()
// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
}
// DeltaCompressor is an algorithm that removes redundant changes.
@ -427,8 +447,10 @@ func copyDeltas(d Deltas) Deltas {
}
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was was missed.
// In this case we don't know the final "resting" state of the object.
// an object was deleted but the watch deletion event was missed. In this
// case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
}

View File

@ -27,6 +27,24 @@ func testPop(f *DeltaFIFO) testFifoObject {
return f.Pop().(Deltas).Newest().Object.(testFifoObject)
}
// keyLookupFunc adapts a raw function to be a KeyLookup.
type keyLookupFunc func() []string
// ListKeys just calls kl.
func (kl keyLookupFunc) ListKeys() []string {
return kl()
}
// GetByKey returns the key if it exists in the list returned by kl.
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
for _, v := range kl() {
if v == key {
return key, true, nil
}
}
return nil, false, nil
}
func TestDeltaFIFO_basic(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
const amount = 500
@ -174,7 +192,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
KeyListerFunc(func() []string {
keyLookupFunc(func() []string {
return []string{"foo", "bar", "baz"}
}),
)
@ -184,7 +202,9 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
expectedList := []Deltas{
{{Deleted, mkFifoObj("baz", 10)}},
{{Sync, mkFifoObj("foo", 5)}},
{{Deleted, DeletedFinalStateUnknown{Key: "bar"}}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: "bar"}}},
}
for _, expected := range expectedList {
@ -259,9 +279,9 @@ func TestDeltaFIFO_KeyOf(t *testing.T) {
key string
}{
{obj: testFifoObject{name: "A"}, key: "A"},
{obj: DeletedFinalStateUnknown{Key: "B"}, key: "B"},
{obj: DeletedFinalStateUnknown{Key: "B", Obj: nil}, key: "B"},
{obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"},
{obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D"}}}, key: "D"},
{obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D", Obj: nil}}}, key: "D"},
}
for _, item := range table {

View File

@ -366,6 +366,16 @@ func (s *serviceCache) ListKeys() []string {
return keys
}
// GetByKey returns the value stored in the serviceMap under the given key
func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
if v, ok := s.serviceMap[key]; ok {
return v, true, nil
}
return nil, false, nil
}
// ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *serviceCache) allServices() []*cachedService {

View File

@ -237,26 +237,28 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
// When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) deletePod(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
if rc := rm.getPodControllers(pod); rc != nil {
rm.expectations.DeletionObserved(rc)
rm.enqueueController(rc)
}
return
}
pod, ok := obj.(*api.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone key. Since we don't
// know which rc to wake up/update expectations, we rely on the ttl on the
// expectation expiring. The rc syncs via the 30s periodic resync and notices
// fewer pods than its replica count.
podKey, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new rc will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, ExpectationsTimeout)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, ExpectationsTimeout)
return
}
}
if rc := rm.getPodControllers(pod); rc != nil {
rm.expectations.DeletionObserved(rc)
rm.enqueueController(rc)
}
// A periodic relist might not have a pod that the store has, in such cases we are sent a tombstone key.
// We don't know which controllers to sync, so just let the controller relist handle this.
glog.Infof("Pod %q was deleted but we don't have a record of its final state so it could take up to %v before a controller recreates a replica.", podKey, ExpectationsTimeout)
}
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item.

View File

@ -252,6 +252,38 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
validateSyncReplication(t, &fakePodControl, 0, 1)
}
func TestDeleteFinalStateUnknown(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podControl = &fakePodControl
received := make(chan string)
manager.syncHandler = func(key string) error {
received <- key
return nil
}
// The DeletedFinalStateUnknown object should cause the rc manager to insert
// the controller matching the selectors of the deleted pod into the work queue.
controllerSpec := newReplicationController(1)
manager.controllerStore.Store.Add(controllerSpec)
pods := newPodList(nil, 1, api.PodRunning, controllerSpec)
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
go manager.worker()
expected := getKey(controllerSpec, t)
select {
case key := <-received:
if key != expected {
t.Errorf("Unexpected sync all for rc %v, expected %v", key, expected)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
}
}
func TestSyncReplicationControllerCreates(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)

View File

@ -673,7 +673,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
current := 0
same := 0
By(fmt.Sprintf("Creating replication controller %s", name))
By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name))
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: name,
@ -703,7 +703,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
if err != nil {
return fmt.Errorf("Error creating replication controller: %v", err)
}
Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas)
Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
@ -714,7 +714,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
current = len(pods.Items)
failCount := 5
for same < failCount && current < replicas {
Logf("Controller %s: Found %d pods out of %d", name, current, replicas)
Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas)
if last < current {
same = 0
} else if last == current {
@ -738,9 +738,9 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
}
Logf("Controller %s in ns %s: Found %d pods out of %d", name, ns, current, replicas)
Logf("%v Controller %s in ns %s: Found %d pods out of %d", time.Now(), name, ns, current, replicas)
By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures))
By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures))
same = 0
last = 0
failCount = 10