mirror of https://github.com/k3s-io/k3s
fix replica set hot loop
Change-Id: I5176eb9350324de8e7b2035686c4e2c2abc5ef3dpull/6/head
parent
46c009952d
commit
a76a743ecd
|
@ -106,7 +106,7 @@ type ReplicaSetController struct {
|
||||||
lookupCache *controller.MatchingCache
|
lookupCache *controller.MatchingCache
|
||||||
|
|
||||||
// Controllers that need to be synced
|
// Controllers that need to be synced
|
||||||
queue *workqueue.Type
|
queue workqueue.RateLimitingInterface
|
||||||
|
|
||||||
// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
|
// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
|
||||||
// manager behaves differently if GC is enabled.
|
// manager behaves differently if GC is enabled.
|
||||||
|
@ -138,7 +138,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac
|
||||||
},
|
},
|
||||||
burstReplicas: burstReplicas,
|
burstReplicas: burstReplicas,
|
||||||
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
|
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
|
||||||
queue: workqueue.NewNamed("replicaset"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
|
||||||
garbageCollectorEnabled: garbageCollectorEnabled,
|
garbageCollectorEnabled: garbageCollectorEnabled,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,17 +202,20 @@ func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder)
|
||||||
// Run begins watching and syncing.
|
// Run begins watching and syncing.
|
||||||
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
|
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
defer rsc.queue.ShutDown()
|
||||||
|
|
||||||
go rsc.rsController.Run(stopCh)
|
go rsc.rsController.Run(stopCh)
|
||||||
go rsc.podController.Run(stopCh)
|
go rsc.podController.Run(stopCh)
|
||||||
|
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go wait.Until(rsc.worker, time.Second, stopCh)
|
go wait.Until(rsc.worker, time.Second, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rsc.internalPodInformer != nil {
|
if rsc.internalPodInformer != nil {
|
||||||
go rsc.internalPodInformer.Run(stopCh)
|
go rsc.internalPodInformer.Run(stopCh)
|
||||||
}
|
}
|
||||||
<-stopCh
|
<-stopCh
|
||||||
glog.Infof("Shutting down ReplicaSet Controller")
|
glog.Infof("Shutting down ReplicaSet Controller")
|
||||||
rsc.queue.ShutDown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodReplicaSet returns the replica set managing the given pod.
|
// getPodReplicaSet returns the replica set managing the given pod.
|
||||||
|
@ -224,7 +227,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl
|
||||||
rs, ok := obj.(*extensions.ReplicaSet)
|
rs, ok := obj.(*extensions.ReplicaSet)
|
||||||
if !ok {
|
if !ok {
|
||||||
// This should not happen
|
// This should not happen
|
||||||
glog.Errorf("lookup cache does not return a ReplicaSet object")
|
utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicaSet object"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if cached && rsc.isCacheValid(pod, rs) {
|
if cached && rsc.isCacheValid(pod, rs) {
|
||||||
|
@ -247,7 +250,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl
|
||||||
// More than two items in this list indicates user error. If two replicasets
|
// More than two items in this list indicates user error. If two replicasets
|
||||||
// overlap, sort by creation timestamp, subsort by name, then pick
|
// overlap, sort by creation timestamp, subsort by name, then pick
|
||||||
// the first.
|
// the first.
|
||||||
glog.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)
|
utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels))
|
||||||
sort.Sort(overlappingReplicaSets(rss))
|
sort.Sort(overlappingReplicaSets(rss))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,7 +338,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
|
||||||
}
|
}
|
||||||
rsKey, err := controller.KeyFunc(rs)
|
rsKey, err := controller.KeyFunc(rs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for replica set %#v: %v", rs, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for replica set %#v: %v", rs, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if pod.DeletionTimestamp != nil {
|
if pod.DeletionTimestamp != nil {
|
||||||
|
@ -401,12 +404,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Couldn't get object from tombstone %+v", obj)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pod, ok = tombstone.Obj.(*api.Pod)
|
pod, ok = tombstone.Obj.(*api.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Tombstone contained object that is not a pod %#v", obj)
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -414,7 +417,7 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
|
||||||
if rs := rsc.getPodReplicaSet(pod); rs != nil {
|
if rs := rsc.getPodReplicaSet(pod); rs != nil {
|
||||||
rsKey, err := controller.KeyFunc(rs)
|
rsKey, err := controller.KeyFunc(rs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
|
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
|
||||||
|
@ -426,7 +429,7 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
|
||||||
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
|
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
|
||||||
key, err := controller.KeyFunc(obj)
|
key, err := controller.KeyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -442,32 +445,43 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||||
func (rsc *ReplicaSetController) worker() {
|
func (rsc *ReplicaSetController) worker() {
|
||||||
for {
|
for rsc.processNextWorkItem() {
|
||||||
func() {
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rsc *ReplicaSetController) processNextWorkItem() bool {
|
||||||
key, quit := rsc.queue.Get()
|
key, quit := rsc.queue.Get()
|
||||||
if quit {
|
if quit {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
defer rsc.queue.Done(key)
|
defer rsc.queue.Done(key)
|
||||||
|
|
||||||
err := rsc.syncHandler(key.(string))
|
err := rsc.syncHandler(key.(string))
|
||||||
if err != nil {
|
if err == nil {
|
||||||
glog.Errorf("Error syncing ReplicaSet: %v", err)
|
rsc.queue.Forget(key)
|
||||||
}
|
return true
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
|
||||||
|
rsc.queue.AddRateLimited(key)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// manageReplicas checks and updates replicas for the given ReplicaSet.
|
// manageReplicas checks and updates replicas for the given ReplicaSet.
|
||||||
// Does NOT modify <filteredPods>.
|
// Does NOT modify <filteredPods>.
|
||||||
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) {
|
// It will requeue the replica set in case of an error while creating/deleting pods.
|
||||||
|
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) error {
|
||||||
diff := len(filteredPods) - int(rs.Spec.Replicas)
|
diff := len(filteredPods) - int(rs.Spec.Replicas)
|
||||||
rsKey, err := controller.KeyFunc(rs)
|
rsKey, err := controller.KeyFunc(rs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
var errCh chan error
|
||||||
if diff < 0 {
|
if diff < 0 {
|
||||||
diff *= -1
|
diff *= -1
|
||||||
|
errCh = make(chan error, diff)
|
||||||
if diff > rsc.burstReplicas {
|
if diff > rsc.burstReplicas {
|
||||||
diff = rsc.burstReplicas
|
diff = rsc.burstReplicas
|
||||||
}
|
}
|
||||||
|
@ -502,7 +516,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
|
||||||
// Decrement the expected number of creates because the informer won't observe this pod
|
// Decrement the expected number of creates because the informer won't observe this pod
|
||||||
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
|
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
|
||||||
rsc.expectations.CreationObserved(rsKey)
|
rsc.expectations.CreationObserved(rsKey)
|
||||||
utilruntime.HandleError(err)
|
errCh <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -511,6 +525,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
|
||||||
if diff > rsc.burstReplicas {
|
if diff > rsc.burstReplicas {
|
||||||
diff = rsc.burstReplicas
|
diff = rsc.burstReplicas
|
||||||
}
|
}
|
||||||
|
errCh = make(chan error, diff)
|
||||||
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff)
|
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff)
|
||||||
// No need to sort pods if we are about to delete all of them
|
// No need to sort pods if we are about to delete all of them
|
||||||
if rs.Spec.Replicas != 0 {
|
if rs.Spec.Replicas != 0 {
|
||||||
|
@ -540,12 +555,22 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
|
||||||
podKey := controller.PodKey(filteredPods[ix])
|
podKey := controller.PodKey(filteredPods[ix])
|
||||||
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
|
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
|
||||||
rsc.expectations.DeletionObserved(rsKey, podKey)
|
rsc.expectations.DeletionObserved(rsKey, podKey)
|
||||||
utilruntime.HandleError(err)
|
errCh <- err
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
|
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
|
||||||
|
@ -567,13 +592,11 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||||
|
|
||||||
obj, exists, err := rsc.rsStore.Indexer.GetByKey(key)
|
obj, exists, err := rsc.rsStore.Indexer.GetByKey(key)
|
||||||
if !exists {
|
if !exists {
|
||||||
glog.Infof("ReplicaSet has been deleted %v", key)
|
glog.V(4).Infof("ReplicaSet has been deleted %v", key)
|
||||||
rsc.expectations.DeleteExpectations(key)
|
rsc.expectations.DeleteExpectations(key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Infof("Unable to retrieve ReplicaSet %v from store: %v", key, err)
|
|
||||||
rsc.queue.Add(key)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rs := *obj.(*extensions.ReplicaSet)
|
rs := *obj.(*extensions.ReplicaSet)
|
||||||
|
@ -582,16 +605,16 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||||
// in and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
// in and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||||
// the store after we've checked the expectation, the ReplicaSet sync is just deferred till the next
|
// the store after we've checked the expectation, the ReplicaSet sync is just deferred till the next
|
||||||
// relist.
|
// relist.
|
||||||
rsKey, err := controller.KeyFunc(&rs)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
|
||||||
return err
|
// Explicitly return nil to avoid re-enqueue bad key
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
rsNeedsSync := rsc.expectations.SatisfiedExpectations(rsKey)
|
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
|
||||||
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
|
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error converting pod selector to selector: %v", err)
|
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: filteredPods are pointing to objects from cache - if you need to
|
// NOTE: filteredPods are pointing to objects from cache - if you need to
|
||||||
|
@ -603,8 +626,6 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||||
// anymore but has the stale controller ref.
|
// anymore but has the stale controller ref.
|
||||||
pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
|
pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error getting pods for rs %q: %v", key, err)
|
|
||||||
rsc.queue.Add(key)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind())
|
cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind())
|
||||||
|
@ -635,21 +656,19 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||||
// push the RS into work queue again. We need to try to free the
|
// push the RS into work queue again. We need to try to free the
|
||||||
// pods again otherwise they will stuck with the stale
|
// pods again otherwise they will stuck with the stale
|
||||||
// controllerRef.
|
// controllerRef.
|
||||||
rsc.queue.Add(key)
|
|
||||||
return aggregate
|
return aggregate
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pods, err := rsc.podStore.Pods(rs.Namespace).List(selector)
|
pods, err := rsc.podStore.Pods(rs.Namespace).List(selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error getting pods for rs %q: %v", key, err)
|
|
||||||
rsc.queue.Add(key)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
filteredPods = controller.FilterActivePods(pods)
|
filteredPods = controller.FilterActivePods(pods)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var manageReplicasErr error
|
||||||
if rsNeedsSync && rs.DeletionTimestamp == nil {
|
if rsNeedsSync && rs.DeletionTimestamp == nil {
|
||||||
rsc.manageReplicas(filteredPods, &rs)
|
manageReplicasErr = rsc.manageReplicas(filteredPods, &rs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count the number of pods that have labels matching the labels of the pod
|
// Count the number of pods that have labels matching the labels of the pod
|
||||||
|
@ -683,9 +702,8 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||||
availableReplicasCount,
|
availableReplicasCount,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
// Multiple things could lead to this update failing. Requeuing the replica set ensures
|
// Multiple things could lead to this update failing. Requeuing the replica set ensures
|
||||||
// we retry with some fairness.
|
// Returning an error causes a requeue without forcing a hotloop
|
||||||
glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rs.Namespace, rs.Name, err)
|
return err
|
||||||
rsc.enqueueReplicaSet(&rs)
|
|
||||||
}
|
}
|
||||||
return nil
|
return manageReplicasErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/client/cache"
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
|
fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
"k8s.io/kubernetes/pkg/client/restclient"
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
@ -640,22 +641,10 @@ func TestControllerUpdateRequeue(t *testing.T) {
|
||||||
fakePodControl := controller.FakePodControl{}
|
fakePodControl := controller.FakePodControl{}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
|
|
||||||
manager.syncReplicaSet(getKey(rs, t))
|
// an error from the sync function will be requeued, check to make sure we returned an error
|
||||||
|
err := manager.syncReplicaSet(getKey(rs, t))
|
||||||
ch := make(chan interface{})
|
if err == nil {
|
||||||
go func() {
|
t.Errorf("missing error for requeue")
|
||||||
item, _ := manager.queue.Get()
|
|
||||||
ch <- item
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case key := <-ch:
|
|
||||||
expectedKey := getKey(rs, t)
|
|
||||||
if key != expectedKey {
|
|
||||||
t.Errorf("Expected requeue of replica set with key %s got %s", expectedKey, key)
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
manager.queue.ShutDown()
|
|
||||||
t.Errorf("Expected to find a ReplicaSet in the queue, found none.")
|
|
||||||
}
|
}
|
||||||
// 1 Update and 1 GET, both of which fail
|
// 1 Update and 1 GET, both of which fail
|
||||||
fakeHandler.ValidateRequestCount(t, 2)
|
fakeHandler.ValidateRequestCount(t, 2)
|
||||||
|
@ -1088,8 +1077,8 @@ func TestDeletionTimestamp(t *testing.T) {
|
||||||
|
|
||||||
// setupManagerWithGCEnabled creates a RS manager with a fakePodControl
|
// setupManagerWithGCEnabled creates a RS manager with a fakePodControl
|
||||||
// and with garbageCollectorEnabled set to true
|
// and with garbageCollectorEnabled set to true
|
||||||
func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) {
|
func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) {
|
||||||
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}})
|
c := fakeclientset.NewSimpleClientset(objs...)
|
||||||
fakePodControl = &controller.FakePodControl{}
|
fakePodControl = &controller.FakePodControl{}
|
||||||
manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
|
manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
|
||||||
manager.garbageCollectorEnabled = true
|
manager.garbageCollectorEnabled = true
|
||||||
|
@ -1099,9 +1088,9 @@ func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
|
func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
manager.rsStore.Indexer.Add(rs)
|
manager.rsStore.Indexer.Add(rs)
|
||||||
var trueVar = true
|
var trueVar = true
|
||||||
otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
|
otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
|
||||||
|
@ -1118,9 +1107,9 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPatchPodWithOtherOwnerRef(t *testing.T) {
|
func TestPatchPodWithOtherOwnerRef(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
manager.rsStore.Indexer.Add(rs)
|
manager.rsStore.Indexer.Add(rs)
|
||||||
// add to podStore one more matching pod that doesn't have a controller
|
// add to podStore one more matching pod that doesn't have a controller
|
||||||
// ref, but has an owner ref pointing to other object. Expect a patch to
|
// ref, but has an owner ref pointing to other object. Expect a patch to
|
||||||
|
@ -1139,9 +1128,9 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
|
func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
manager.rsStore.Indexer.Add(rs)
|
manager.rsStore.Indexer.Add(rs)
|
||||||
// add to podStore a matching pod that has an ownerRef pointing to the rs,
|
// add to podStore a matching pod that has an ownerRef pointing to the rs,
|
||||||
// but ownerRef.Controller is false. Expect a patch to take control it.
|
// but ownerRef.Controller is false. Expect a patch to take control it.
|
||||||
|
@ -1159,9 +1148,9 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPatchPodFails(t *testing.T) {
|
func TestPatchPodFails(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
manager.rsStore.Indexer.Add(rs)
|
manager.rsStore.Indexer.Add(rs)
|
||||||
// add to podStore two matching pods. Expect two patches to take control
|
// add to podStore two matching pods. Expect two patches to take control
|
||||||
// them.
|
// them.
|
||||||
|
@ -1171,17 +1160,17 @@ func TestPatchPodFails(t *testing.T) {
|
||||||
// control of the pods and create new ones.
|
// control of the pods and create new ones.
|
||||||
fakePodControl.Err = fmt.Errorf("Fake Error")
|
fakePodControl.Err = fmt.Errorf("Fake Error")
|
||||||
err := manager.syncReplicaSet(getKey(rs, t))
|
err := manager.syncReplicaSet(getKey(rs, t))
|
||||||
if err != nil {
|
if err == nil || err.Error() != "Fake Error" {
|
||||||
t.Fatal(err)
|
t.Errorf("expected Fake Error, got %+v", err)
|
||||||
}
|
}
|
||||||
// 2 patches to take control of pod1 and pod2 (both fail), 2 creates.
|
// 2 patches to take control of pod1 and pod2 (both fail), 2 creates.
|
||||||
validateSyncReplicaSet(t, fakePodControl, 2, 0, 2)
|
validateSyncReplicaSet(t, fakePodControl, 2, 0, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPatchExtraPodsThenDelete(t *testing.T) {
|
func TestPatchExtraPodsThenDelete(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
manager.rsStore.Indexer.Add(rs)
|
manager.rsStore.Indexer.Add(rs)
|
||||||
// add to podStore three matching pods. Expect three patches to take control
|
// add to podStore three matching pods. Expect three patches to take control
|
||||||
// them, and later delete one of them.
|
// them, and later delete one of them.
|
||||||
|
@ -1197,9 +1186,9 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
|
func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
manager.rsStore.Indexer.Add(rs)
|
manager.rsStore.Indexer.Add(rs)
|
||||||
// put one pod in the podStore
|
// put one pod in the podStore
|
||||||
pod := newPod("pod", rs, api.PodRunning, nil)
|
pod := newPod("pod", rs, api.PodRunning, nil)
|
||||||
|
@ -1236,9 +1225,9 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateSelectorControllerRef(t *testing.T) {
|
func TestUpdateSelectorControllerRef(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
// put 2 pods in the podStore
|
// put 2 pods in the podStore
|
||||||
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
|
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
|
||||||
// update the RS so that its selector no longer matches the pods
|
// update the RS so that its selector no longer matches the pods
|
||||||
|
@ -1270,9 +1259,9 @@ func TestUpdateSelectorControllerRef(t *testing.T) {
|
||||||
// RS controller shouldn't adopt or create more pods if the rc is about to be
|
// RS controller shouldn't adopt or create more pods if the rc is about to be
|
||||||
// deleted.
|
// deleted.
|
||||||
func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
|
func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
|
||||||
manager, fakePodControl := setupManagerWithGCEnabled()
|
|
||||||
labelMap := map[string]string{"foo": "bar"}
|
labelMap := map[string]string{"foo": "bar"}
|
||||||
rs := newReplicaSet(2, labelMap)
|
rs := newReplicaSet(2, labelMap)
|
||||||
|
manager, fakePodControl := setupManagerWithGCEnabled(rs)
|
||||||
now := unversioned.Now()
|
now := unversioned.Now()
|
||||||
rs.DeletionTimestamp = &now
|
rs.DeletionTimestamp = &now
|
||||||
manager.rsStore.Indexer.Add(rs)
|
manager.rsStore.Indexer.Add(rs)
|
||||||
|
|
Loading…
Reference in New Issue