diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 8ed0491f9a..5a8acb98e9 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -106,7 +106,7 @@ type ReplicaSetController struct { lookupCache *controller.MatchingCache // Controllers that need to be synced - queue *workqueue.Type + queue workqueue.RateLimitingInterface // garbageCollectorEnabled denotes if the garbage collector is enabled. RC // manager behaves differently if GC is enabled. @@ -138,7 +138,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), - queue: workqueue.NewNamed("replicaset"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"), garbageCollectorEnabled: garbageCollectorEnabled, } @@ -202,17 +202,20 @@ func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) // Run begins watching and syncing. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer rsc.queue.ShutDown() + go rsc.rsController.Run(stopCh) go rsc.podController.Run(stopCh) + for i := 0; i < workers; i++ { go wait.Until(rsc.worker, time.Second, stopCh) } + if rsc.internalPodInformer != nil { go rsc.internalPodInformer.Run(stopCh) } <-stopCh glog.Infof("Shutting down ReplicaSet Controller") - rsc.queue.ShutDown() } // getPodReplicaSet returns the replica set managing the given pod. @@ -224,7 +227,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl rs, ok := obj.(*extensions.ReplicaSet) if !ok { // This should not happen - glog.Errorf("lookup cache does not return a ReplicaSet object") + utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicaSet object")) return nil } if cached && rsc.isCacheValid(pod, rs) { @@ -247,7 +250,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl // More than two items in this list indicates user error. If two replicasets // overlap, sort by creation timestamp, subsort by name, then pick // the first. - glog.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels) + utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)) sort.Sort(overlappingReplicaSets(rss)) } @@ -335,7 +338,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { } rsKey, err := controller.KeyFunc(rs) if err != nil { - glog.Errorf("Couldn't get key for replica set %#v: %v", rs, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for replica set %#v: %v", rs, err)) return } if pod.DeletionTimestamp != nil { @@ -401,12 +404,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %+v", obj) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*api.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %#v", obj) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) return } } @@ -414,7 +417,7 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { if rs := rsc.getPodReplicaSet(pod); rs != nil { rsKey, err := controller.KeyFunc(rs) if err != nil { - glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)) return } rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) @@ -426,7 +429,7 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } @@ -442,32 +445,43 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rsc *ReplicaSetController) worker() { - for { - func() { - key, quit := rsc.queue.Get() - if quit { - return - } - defer rsc.queue.Done(key) - err := rsc.syncHandler(key.(string)) - if err != nil { - glog.Errorf("Error syncing ReplicaSet: %v", err) - } - }() + for rsc.processNextWorkItem() { } } +func (rsc *ReplicaSetController) processNextWorkItem() bool { + key, quit := rsc.queue.Get() + if quit { + return false + } + defer rsc.queue.Done(key) + + err := rsc.syncHandler(key.(string)) + if err == nil { + rsc.queue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err)) + rsc.queue.AddRateLimited(key) + + return true +} + // manageReplicas checks and updates replicas for the given ReplicaSet. // Does NOT modify . -func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) { +// It will requeue the replica set in case of an error while creating/deleting pods. +func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) error { diff := len(filteredPods) - int(rs.Spec.Replicas) rsKey, err := controller.KeyFunc(rs) if err != nil { - glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) - return + utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)) + return nil } + var errCh chan error if diff < 0 { diff *= -1 + errCh = make(chan error, diff) if diff > rsc.burstReplicas { diff = rsc.burstReplicas } @@ -502,7 +516,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) rsc.expectations.CreationObserved(rsKey) - utilruntime.HandleError(err) + errCh <- err } }() } @@ -511,6 +525,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext if diff > rsc.burstReplicas { diff = rsc.burstReplicas } + errCh = make(chan error, diff) glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff) // No need to sort pods if we are about to delete all of them if rs.Spec.Replicas != 0 { @@ -540,12 +555,22 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext podKey := controller.PodKey(filteredPods[ix]) glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name) rsc.expectations.DeletionObserved(rsKey, podKey) - utilruntime.HandleError(err) + errCh <- err } }(i) } wg.Wait() } + + select { + case err := <-errCh: + // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. + if err != nil { + return err + } + default: + } + return nil } // syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled, @@ -567,13 +592,11 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { obj, exists, err := rsc.rsStore.Indexer.GetByKey(key) if !exists { - glog.Infof("ReplicaSet has been deleted %v", key) + glog.V(4).Infof("ReplicaSet has been deleted %v", key) rsc.expectations.DeleteExpectations(key) return nil } if err != nil { - glog.Infof("Unable to retrieve ReplicaSet %v from store: %v", key, err) - rsc.queue.Add(key) return err } rs := *obj.(*extensions.ReplicaSet) @@ -582,16 +605,16 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { // in and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the ReplicaSet sync is just deferred till the next // relist. - rsKey, err := controller.KeyFunc(&rs) if err != nil { - glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) - return err + utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)) + // Explicitly return nil to avoid re-enqueue bad key + return nil } - rsNeedsSync := rsc.expectations.SatisfiedExpectations(rsKey) + rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { - glog.Errorf("Error converting pod selector to selector: %v", err) - return err + utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err)) + return nil } // NOTE: filteredPods are pointing to objects from cache - if you need to @@ -603,8 +626,6 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { // anymore but has the stale controller ref. pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything()) if err != nil { - glog.Errorf("Error getting pods for rs %q: %v", key, err) - rsc.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind()) @@ -635,21 +656,19 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { // push the RS into work queue again. We need to try to free the // pods again otherwise they will stuck with the stale // controllerRef. - rsc.queue.Add(key) return aggregate } } else { pods, err := rsc.podStore.Pods(rs.Namespace).List(selector) if err != nil { - glog.Errorf("Error getting pods for rs %q: %v", key, err) - rsc.queue.Add(key) return err } filteredPods = controller.FilterActivePods(pods) } + var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { - rsc.manageReplicas(filteredPods, &rs) + manageReplicasErr = rsc.manageReplicas(filteredPods, &rs) } // Count the number of pods that have labels matching the labels of the pod @@ -683,9 +702,8 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { availableReplicasCount, ); err != nil { // Multiple things could lead to this update failing. Requeuing the replica set ensures - // we retry with some fairness. - glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rs.Namespace, rs.Name, err) - rsc.enqueueReplicaSet(&rs) + // Returning an error causes a requeue without forcing a hotloop + return err } - return nil + return manageReplicasErr } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 59de1edd39..ac082db668 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" @@ -640,22 +641,10 @@ func TestControllerUpdateRequeue(t *testing.T) { fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl - manager.syncReplicaSet(getKey(rs, t)) - - ch := make(chan interface{}) - go func() { - item, _ := manager.queue.Get() - ch <- item - }() - select { - case key := <-ch: - expectedKey := getKey(rs, t) - if key != expectedKey { - t.Errorf("Expected requeue of replica set with key %s got %s", expectedKey, key) - } - case <-time.After(wait.ForeverTestTimeout): - manager.queue.ShutDown() - t.Errorf("Expected to find a ReplicaSet in the queue, found none.") + // an error from the sync function will be requeued, check to make sure we returned an error + err := manager.syncReplicaSet(getKey(rs, t)) + if err == nil { + t.Errorf("missing error for requeue") } // 1 Update and 1 GET, both of which fail fakeHandler.ValidateRequestCount(t, 2) @@ -1088,8 +1077,8 @@ func TestDeletionTimestamp(t *testing.T) { // setupManagerWithGCEnabled creates a RS manager with a fakePodControl // and with garbageCollectorEnabled set to true -func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) +func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) { + c := fakeclientset.NewSimpleClientset(objs...) fakePodControl = &controller.FakePodControl{} manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.garbageCollectorEnabled = true @@ -1099,9 +1088,9 @@ func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl } func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) manager.rsStore.Indexer.Add(rs) var trueVar = true otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar} @@ -1118,9 +1107,9 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { } func TestPatchPodWithOtherOwnerRef(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) manager.rsStore.Indexer.Add(rs) // add to podStore one more matching pod that doesn't have a controller // ref, but has an owner ref pointing to other object. Expect a patch to @@ -1139,9 +1128,9 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) { } func TestPatchPodWithCorrectOwnerRef(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) manager.rsStore.Indexer.Add(rs) // add to podStore a matching pod that has an ownerRef pointing to the rs, // but ownerRef.Controller is false. Expect a patch to take control it. @@ -1159,9 +1148,9 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) { } func TestPatchPodFails(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) manager.rsStore.Indexer.Add(rs) // add to podStore two matching pods. Expect two patches to take control // them. @@ -1171,17 +1160,17 @@ func TestPatchPodFails(t *testing.T) { // control of the pods and create new ones. fakePodControl.Err = fmt.Errorf("Fake Error") err := manager.syncReplicaSet(getKey(rs, t)) - if err != nil { - t.Fatal(err) + if err == nil || err.Error() != "Fake Error" { + t.Errorf("expected Fake Error, got %+v", err) } // 2 patches to take control of pod1 and pod2 (both fail), 2 creates. validateSyncReplicaSet(t, fakePodControl, 2, 0, 2) } func TestPatchExtraPodsThenDelete(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) manager.rsStore.Indexer.Add(rs) // add to podStore three matching pods. Expect three patches to take control // them, and later delete one of them. @@ -1197,9 +1186,9 @@ func TestPatchExtraPodsThenDelete(t *testing.T) { } func TestUpdateLabelsRemoveControllerRef(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) manager.rsStore.Indexer.Add(rs) // put one pod in the podStore pod := newPod("pod", rs, api.PodRunning, nil) @@ -1236,9 +1225,9 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) { } func TestUpdateSelectorControllerRef(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) // put 2 pods in the podStore newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") // update the RS so that its selector no longer matches the pods @@ -1270,9 +1259,9 @@ func TestUpdateSelectorControllerRef(t *testing.T) { // RS controller shouldn't adopt or create more pods if the rc is about to be // deleted. func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) + manager, fakePodControl := setupManagerWithGCEnabled(rs) now := unversioned.Now() rs.DeletionTimestamp = &now manager.rsStore.Indexer.Add(rs)