Merge pull request #49429 from enisoc/dedup-rc-rs

Automatic merge from submit-queue (batch tested with PRs 54773, 52523, 47497, 55356, 49429). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Deduplicate RC/RS controller code.

The code was already 99% similar between RC and RS. This is a wild idea to try to deduplicate the two controllers in a type-safe manner without adding tons of boilerplate, and without using code generation.

They are still separate resources and separate worker pools. This is a refactor that isn't intended to change any behavior.

```release-note
ReplicationController now shares its underlying controller implementation with ReplicaSet to reduce the maintenance burden going forward. However, they are still separate resources and there should be no externally visible effects from this change.
```

ref #49429
pull/6/head
Kubernetes Submit Queue 2017-11-08 22:12:03 -08:00 committed by GitHub
commit 3e315aa0f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1315 additions and 2728 deletions

View File

@ -24,6 +24,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",

View File

@ -1,7 +1,9 @@
approvers:
- caesarxuchao
- lavalamp
- enisoc
reviewers:
- caesarxuchao
- lavalamp
- tnozicka
- enisoc

View File

@ -14,7 +14,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// If you make changes to this file, you should also make the corresponding change in ReplicationController.
// ### ATTENTION ###
//
// This code implements both ReplicaSet and ReplicationController.
//
// For RC, the objects are converted on the way in and out (see ../replication/),
// as if ReplicationController were just an older API version of ReplicaSet.
// However, RC and RS still have separate storage and separate instantiations
// of the ReplicaSetController object.
//
// Use rsc.Kind in log messages rather than hard-coding "ReplicaSet".
package replicaset
@ -22,6 +31,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"
@ -32,6 +42,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -59,12 +70,14 @@ const (
statusUpdateRetries = 1
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
// ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
// in the system with actual running pods.
type ReplicaSetController struct {
// GroupVersionKind indicates the controller type.
// Different instances of this struct may handle different GVKs.
// For example, this struct can be used (with adapters) to handle ReplicationController.
schema.GroupVersionKind
kubeClient clientset.Interface
podControl controller.PodControlInterface
@ -95,22 +108,35 @@ type ReplicaSetController struct {
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
rsc := &ReplicaSetController{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
v1beta1.SchemeGroupVersion.WithKind("ReplicaSet"),
"replicaset_controller",
"replicaset",
controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
)
}
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -153,10 +179,11 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
glog.Infof("Starting replica set controller")
defer glog.Infof("Shutting down replica set Controller")
controllerName := strings.ToLower(rsc.Kind)
glog.Infof("Starting %v controller", controllerName)
defer glog.Infof("Shutting down %v controller", controllerName)
if !controller.WaitForCacheSync("replica set", stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
@ -176,7 +203,7 @@ func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.Re
if len(rss) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels))
utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
}
return rss
}
@ -187,7 +214,7 @@ func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.Re
func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *extensions.ReplicaSet {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
if controllerRef.Kind != rsc.Kind {
return nil
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
@ -220,7 +247,7 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
// that bad as ReplicaSets that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
glog.V(4).Infof("Replica set %v updated. Desired pod count change: %d->%d", curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
glog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
}
rsc.enqueueReplicaSet(cur)
}
@ -319,7 +346,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replica set controller).
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", rs.Name, rs.Spec.MinReadySeconds)
glog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
@ -434,7 +461,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
if diff < 0 {
@ -448,7 +475,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
@ -460,8 +487,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
APIVersion: rsc.GroupVersion().String(),
Kind: rsc.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
@ -485,7 +512,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 {
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for replica set %v/%v", skippedPods, rs.Namespace, rs.Name)
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
@ -496,7 +523,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, diff)
@ -518,7 +545,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
}
@ -543,9 +570,10 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime))
glog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -554,7 +582,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("ReplicaSet has been deleted %v", key)
glog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
rsc.expectations.DeleteExpectations(key)
return nil
}
@ -623,11 +651,11 @@ func (rsc *ReplicaSetController) claimPods(rs *extensions.ReplicaSet, selector l
return nil, err
}
if fresh.UID != rs.UID {
return nil, fmt.Errorf("original ReplicaSet %v/%v is gone: got uid %v, wanted %v", rs.Namespace, rs.Name, fresh.UID, rs.UID)
return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind, canAdoptFunc)
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
return cm.ClaimPods(filteredPods)
}

View File

@ -14,8 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// If you make changes to this file, you should also make the corresponding change in ReplicationController.
package replicaset
import (

View File

@ -55,7 +55,7 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext
var getErr, updateErr error
var updatedRS *extensions.ReplicaSet
for i, rs := 0, rs; ; i++ {
glog.V(4).Infof(fmt.Sprintf("Updating status for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
glog.V(4).Infof(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +

View File

@ -9,63 +9,47 @@ load(
go_library(
name = "go_default_library",
srcs = [
"conversion.go",
"doc.go",
"replication_controller.go",
"replication_controller_utils.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/replication",
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/controller/replicaset:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/apps/v1beta2:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["replication_controller_test.go"],
srcs = ["replication_controller_utils_test.go"],
importpath = "k8s.io/kubernetes/pkg/controller/replication",
library = ":go_default_library",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/securitycontext:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
deps = ["//vendor/k8s.io/api/core/v1:go_default_library"],
)
filegroup(

View File

@ -1,7 +1,9 @@
approvers:
- caesarxuchao
- lavalamp
- enisoc
reviewers:
- caesarxuchao
- lavalamp
- tnozicka
- enisoc

View File

@ -0,0 +1,372 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file contains adapters that convert between RC and RS,
// as if ReplicationController were an older API version of ReplicaSet.
// It allows ReplicaSetController to directly replace the old ReplicationManager,
// which was previously a manually-maintained copy-paste of RSC.
package replication
import (
"errors"
"fmt"
"time"
"k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
appsv1beta2 "k8s.io/client-go/kubernetes/typed/apps/v1beta2"
v1client "k8s.io/client-go/kubernetes/typed/core/v1"
extensionsv1beta1client "k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
v1listers "k8s.io/client-go/listers/core/v1"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsinternalv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
)
// informerAdapter implements ReplicaSetInformer by wrapping ReplicationControllerInformer
// and converting objects.
type informerAdapter struct {
rcInformer coreinformers.ReplicationControllerInformer
}
func (i informerAdapter) Informer() cache.SharedIndexInformer {
return conversionInformer{i.rcInformer.Informer()}
}
func (i informerAdapter) Lister() extensionslisters.ReplicaSetLister {
return conversionLister{i.rcInformer.Lister()}
}
type conversionInformer struct {
cache.SharedIndexInformer
}
func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) {
i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler})
}
func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod)
}
type conversionLister struct {
rcLister v1listers.ReplicationControllerLister
}
func (l conversionLister) List(selector labels.Selector) ([]*extensionsv1beta1.ReplicaSet, error) {
rcList, err := l.rcLister.List(selector)
if err != nil {
return nil, err
}
return convertSlice(rcList)
}
func (l conversionLister) ReplicaSets(namespace string) extensionslisters.ReplicaSetNamespaceLister {
return conversionNamespaceLister{l.rcLister.ReplicationControllers(namespace)}
}
func (l conversionLister) GetPodReplicaSets(pod *v1.Pod) ([]*extensionsv1beta1.ReplicaSet, error) {
rcList, err := l.rcLister.GetPodControllers(pod)
if err != nil {
return nil, err
}
return convertSlice(rcList)
}
type conversionNamespaceLister struct {
rcLister v1listers.ReplicationControllerNamespaceLister
}
func (l conversionNamespaceLister) List(selector labels.Selector) ([]*extensionsv1beta1.ReplicaSet, error) {
rcList, err := l.rcLister.List(selector)
if err != nil {
return nil, err
}
return convertSlice(rcList)
}
func (l conversionNamespaceLister) Get(name string) (*extensionsv1beta1.ReplicaSet, error) {
rc, err := l.rcLister.Get(name)
if err != nil {
return nil, err
}
return convertRCtoRS(rc, nil)
}
type conversionEventHandler struct {
handler cache.ResourceEventHandler
}
func (h conversionEventHandler) OnAdd(obj interface{}) {
rs, err := convertRCtoRS(obj.(*v1.ReplicationController), nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("dropping RC OnAdd event: can't convert object %#v to RS: %v", obj, err))
return
}
h.handler.OnAdd(rs)
}
func (h conversionEventHandler) OnUpdate(oldObj, newObj interface{}) {
oldRS, err := convertRCtoRS(oldObj.(*v1.ReplicationController), nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("dropping RC OnUpdate event: can't convert old object %#v to RS: %v", oldObj, err))
return
}
newRS, err := convertRCtoRS(newObj.(*v1.ReplicationController), nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("dropping RC OnUpdate event: can't convert new object %#v to RS: %v", newObj, err))
return
}
h.handler.OnUpdate(oldRS, newRS)
}
func (h conversionEventHandler) OnDelete(obj interface{}) {
rc, ok := obj.(*v1.ReplicationController)
if !ok {
// Convert the Obj inside DeletedFinalStateUnknown.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: couldn't get object from tombstone %+v", obj))
return
}
rc, ok = tombstone.Obj.(*v1.ReplicationController)
if !ok {
utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: tombstone contained object that is not a RC %#v", obj))
return
}
rs, err := convertRCtoRS(rc, nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: can't convert object %#v to RS: %v", obj, err))
return
}
h.handler.OnDelete(cache.DeletedFinalStateUnknown{Key: tombstone.Key, Obj: rs})
return
}
// It's a regular RC object.
rs, err := convertRCtoRS(rc, nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: can't convert object %#v to RS: %v", obj, err))
return
}
h.handler.OnDelete(rs)
}
type clientsetAdapter struct {
clientset.Interface
}
func (c clientsetAdapter) ExtensionsV1beta1() extensionsv1beta1client.ExtensionsV1beta1Interface {
return conversionExtensionsClient{c.Interface, c.Interface.ExtensionsV1beta1()}
}
func (c clientsetAdapter) Extensions() extensionsv1beta1client.ExtensionsV1beta1Interface {
return conversionExtensionsClient{c.Interface, c.Interface.Extensions()}
}
func (c clientsetAdapter) AppsV1beta2() appsv1beta2.AppsV1beta2Interface {
return conversionAppsV1beta2Client{c.Interface, c.Interface.AppsV1beta2()}
}
func (c clientsetAdapter) AppsV1() appsv1.AppsV1Interface {
return conversionAppsV1Client{c.Interface, c.Interface.AppsV1()}
}
func (c clientsetAdapter) Apps() appsv1.AppsV1Interface {
return conversionAppsV1Client{c.Interface, c.Interface.AppsV1()}
}
type conversionAppsV1beta2Client struct {
clientset clientset.Interface
appsv1beta2.AppsV1beta2Interface
}
func (c conversionAppsV1beta2Client) ReplicaSets(namespace string) appsv1beta2.ReplicaSetInterface {
// TODO(enisoc): This will force RC integration tests to fail if anyone tries to update
// ReplicaSetController to use apps/v1beta2 without updating this conversion adapter.
// Please change conversionClient to use the new RS version instead of extensions/v1beta1,
// and then return a conversionClient here.
panic("need to update RC/RS conversionClient for apps/v1beta2")
}
type conversionAppsV1Client struct {
clientset clientset.Interface
appsv1.AppsV1Interface
}
func (c conversionAppsV1Client) ReplicaSets(namespace string) appsv1.ReplicaSetInterface {
// TODO(enisoc): This will force RC integration tests to fail if anyone tries to update
// ReplicaSetController to use apps/v1 without updating this conversion adapter.
// Please change conversionClient to use the new RS version instead of extensions/v1beta1,
// and then return a conversionClient here.
panic("need to update RC/RS conversionClient for apps/v1")
}
type conversionExtensionsClient struct {
clientset clientset.Interface
extensionsv1beta1client.ExtensionsV1beta1Interface
}
func (c conversionExtensionsClient) ReplicaSets(namespace string) extensionsv1beta1client.ReplicaSetInterface {
return conversionClient{c.clientset.CoreV1().ReplicationControllers(namespace)}
}
type conversionClient struct {
v1client.ReplicationControllerInterface
}
func (c conversionClient) Create(rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) {
return convertCall(c.ReplicationControllerInterface.Create, rs)
}
func (c conversionClient) Update(rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) {
return convertCall(c.ReplicationControllerInterface.Update, rs)
}
func (c conversionClient) UpdateStatus(rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) {
return convertCall(c.ReplicationControllerInterface.UpdateStatus, rs)
}
func (c conversionClient) Get(name string, options metav1.GetOptions) (*extensionsv1beta1.ReplicaSet, error) {
rc, err := c.ReplicationControllerInterface.Get(name, options)
if err != nil {
return nil, err
}
return convertRCtoRS(rc, nil)
}
func (c conversionClient) List(opts metav1.ListOptions) (*extensionsv1beta1.ReplicaSetList, error) {
rcList, err := c.ReplicationControllerInterface.List(opts)
if err != nil {
return nil, err
}
return convertList(rcList)
}
func (c conversionClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
// This is not used by RSC because we wrap the shared informer instead.
return nil, errors.New("Watch() is not implemented for conversionClient")
}
func (c conversionClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *extensionsv1beta1.ReplicaSet, err error) {
// This is not used by RSC.
return nil, errors.New("Patch() is not implemented for conversionClient")
}
func convertSlice(rcList []*v1.ReplicationController) ([]*extensionsv1beta1.ReplicaSet, error) {
rsList := make([]*extensionsv1beta1.ReplicaSet, 0, len(rcList))
for _, rc := range rcList {
rs, err := convertRCtoRS(rc, nil)
if err != nil {
return nil, err
}
rsList = append(rsList, rs)
}
return rsList, nil
}
func convertList(rcList *v1.ReplicationControllerList) (*extensionsv1beta1.ReplicaSetList, error) {
rsList := &extensionsv1beta1.ReplicaSetList{Items: make([]extensionsv1beta1.ReplicaSet, len(rcList.Items))}
for i := range rcList.Items {
rc := &rcList.Items[i]
_, err := convertRCtoRS(rc, &rsList.Items[i])
if err != nil {
return nil, err
}
}
return rsList, nil
}
func convertCall(fn func(*v1.ReplicationController) (*v1.ReplicationController, error), rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) {
rc, err := convertRStoRC(rs)
if err != nil {
return nil, err
}
result, err := fn(rc)
if err != nil {
return nil, err
}
return convertRCtoRS(result, nil)
}
func convertRCtoRS(rc *v1.ReplicationController, out *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) {
var rsInternal extensions.ReplicaSet
if err := apiv1.Convert_v1_ReplicationController_to_extensions_ReplicaSet(rc, &rsInternal, nil); err != nil {
return nil, fmt.Errorf("can't convert ReplicationController %v/%v to ReplicaSet: %v", rc.Namespace, rc.Name, err)
}
if out == nil {
out = new(extensionsv1beta1.ReplicaSet)
}
if err := extensionsinternalv1beta1.Convert_extensions_ReplicaSet_To_v1beta1_ReplicaSet(&rsInternal, out, nil); err != nil {
return nil, fmt.Errorf("can't convert ReplicaSet (converted from ReplicationController %v/%v) from internal to extensions/v1beta1: %v", rc.Namespace, rc.Name, err)
}
return out, nil
}
func convertRStoRC(rs *extensionsv1beta1.ReplicaSet) (*v1.ReplicationController, error) {
var rsInternal extensions.ReplicaSet
if err := extensionsinternalv1beta1.Convert_v1beta1_ReplicaSet_To_extensions_ReplicaSet(rs, &rsInternal, nil); err != nil {
return nil, fmt.Errorf("can't convert ReplicaSet (converting to ReplicationController %v/%v) from extensions/v1beta1 to internal: %v", rs.Namespace, rs.Name, err)
}
var rc v1.ReplicationController
if err := apiv1.Convert_extensions_ReplicaSet_to_v1_ReplicationController(&rsInternal, &rc, nil); err != nil {
return nil, fmt.Errorf("can't convert ReplicaSet to ReplicationController %v/%v: %v", rs.Namespace, rs.Name, err)
}
return &rc, nil
}
type podControlAdapter struct {
controller.PodControlInterface
}
func (pc podControlAdapter) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
// This is not used by RSC.
return errors.New("CreatePods() is not implemented for podControlAdapter")
}
func (pc podControlAdapter) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
// This is not used by RSC.
return errors.New("CreatePodsOnNode() is not implemented for podControlAdapter")
}
func (pc podControlAdapter) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
rc, err := convertRStoRC(object.(*extensionsv1beta1.ReplicaSet))
if err != nil {
return err
}
return pc.PodControlInterface.CreatePodsWithControllerRef(namespace, template, rc, controllerRef)
}
func (pc podControlAdapter) DeletePod(namespace string, podID string, object runtime.Object) error {
rc, err := convertRStoRC(object.(*extensionsv1beta1.ReplicaSet))
if err != nil {
return err
}
return pc.PodControlInterface.DeletePod(namespace, podID, rc)
}

View File

@ -14,653 +14,54 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// If you make changes to this file, you should also make the corresponding change in ReplicaSet.
// ### ATTENTION ###
//
// ReplicationManager is now just a wrapper around ReplicaSetController,
// with a conversion layer that effectively treats ReplicationController
// as if it were an older API version of ReplicaSet.
//
// However, RC and RS still have separate storage and separate instantiations
// of the ReplicaSetController object.
package replication
import (
"fmt"
"reflect"
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utiltrace "k8s.io/apiserver/pkg/util/trace"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/integer"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/controller/replicaset"
)
const (
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// The number of times we retry updating a replication controller's status.
statusUpdateRetries = 1
BurstReplicas = replicaset.BurstReplicas
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1.SchemeGroupVersion.WithKind("ReplicationController")
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
// NOTE: using this name to distinguish this type from API object "ReplicationController"; will
// not fix it right now. Refer to #41459 for more detail.
// It is actually just a wrapper around ReplicaSetController.
type ReplicationManager struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncReplicationController for testing.
syncHandler func(rcKey string) error
// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations
rcLister corelisters.ReplicationControllerLister
rcListerSynced cache.InformerSynced
podLister corelisters.PodLister
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced
// Controllers that need to be synced
queue workqueue.RateLimitingInterface
replicaset.ReplicaSetController
}
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
rm := &ReplicationManager{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
}
rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
})
rm.rcLister = rcInformer.Lister()
rm.rcListerSynced = rcInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podLister = podInformer.Lister()
rm.podListerSynced = podInformer.Informer().HasSynced
rm.syncHandler = rm.syncReplicationController
return rm
}
// SetEventRecorder replaces the event recorder used by the replication manager
// with the given recorder. Only used for testing.
func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rm.podControl = controller.RealPodControl{KubeClient: rm.kubeClient, Recorder: recorder}
}
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rm.queue.ShutDown()
glog.Infof("Starting RC controller")
defer glog.Infof("Shutting down RC controller")
if !controller.WaitForCacheSync("RC", stopCh, rm.podListerSynced, rm.rcListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
<-stopCh
}
// getPodControllers returns a list of ReplicationControllers matching the given pod.
func (rm *ReplicationManager) getPodControllers(pod *v1.Pod) []*v1.ReplicationController {
rcs, err := rm.rcLister.GetPodControllers(pod)
if err != nil {
return nil
}
if len(rcs) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicationController is selecting pods with labels: %+v", pod.Labels))
}
return rcs
}
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind.
func (rm *ReplicationManager) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *v1.ReplicationController {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
rc, err := rm.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if rc.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return rc
}
// callback when RC is updated
func (rm *ReplicationManager) updateRC(old, cur interface{}) {
oldRC := old.(*v1.ReplicationController)
curRC := cur.(*v1.ReplicationController)
// You might imagine that we only really need to enqueue the
// controller when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
// resync can requeue any controllers that don't yet have pods
// but whose last attempts at creating a pod have failed (since
// we don't block on creation of pods) instead of those
// controllers stalling indefinitely. Enqueueing every time
// does result in some spurious syncs (like when Status.Replica
// is updated and the watch notification from it retriggers
// this function), but in general extra resyncs shouldn't be
// that bad as rcs that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if *(oldRC.Spec.Replicas) != *(curRC.Spec.Replicas) {
glog.V(4).Infof("Replication controller %v updated. Desired pod count change: %d->%d", curRC.Name, *(oldRC.Spec.Replicas), *(curRC.Spec.Replicas))
}
rm.enqueueController(cur)
}
// When a pod is created, enqueue the ReplicationController that manages it and update its expectations.
func (rm *ReplicationManager) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rm.deletePod(pod)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rc := rm.resolveControllerRef(pod.Namespace, controllerRef)
if rc == nil {
return
}
rsKey, err := controller.KeyFunc(rc)
if err != nil {
return
}
glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rm.expectations.CreationObserved(rsKey)
rm.enqueueController(rc)
return
}
// Otherwise, it's an orphan. Get a list of all matching ReplicationControllers and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
rcs := rm.getPodControllers(pod)
if len(rcs) == 0 {
return
}
glog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
for _, rc := range rcs {
rm.enqueueController(rc)
return &ReplicationManager{
*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
v1.SchemeGroupVersion.WithKind("ReplicationController"),
"replication_controller",
"replicationmanager",
podControlAdapter{controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
}},
),
}
}
// When a pod is updated, figure out what ReplicationController/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
// and new ReplicationController. old and cur must be *v1.Pod types.
func (rm *ReplicationManager) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rc never initiates a phase change, and so is never asleep waiting for the same.
rm.deletePod(curPod)
if labelChanged {
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
rm.deletePod(oldPod)
}
return
}
curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if rc := rm.resolveControllerRef(oldPod.Namespace, oldControllerRef); rc != nil {
rm.enqueueController(rc)
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
rc := rm.resolveControllerRef(curPod.Namespace, curControllerRef)
if rc == nil {
return
}
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
rm.enqueueController(rc)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning ReplicationController thus
// having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because
// a Pod transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the ReplicationController controller).
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rc.Spec.MinReadySeconds > 0 {
glog.V(2).Infof("ReplicationController %q will be enqueued after %ds for availability check", rc.Name, rc.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
rm.enqueueControllerAfter(rc, (time.Duration(rc.Spec.MinReadySeconds)*time.Second)+time.Second)
}
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
rcs := rm.getPodControllers(curPod)
if len(rcs) == 0 {
return
}
glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
for _, rc := range rcs {
rm.enqueueController(rc)
}
}
}
// When a pod is deleted, enqueue the ReplicationController that manages the pod and update its expectations.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new ReplicationController will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
}
}
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
rc := rm.resolveControllerRef(pod.Namespace, controllerRef)
if rc == nil {
return
}
rsKey, err := controller.KeyFunc(rc)
if err != nil {
return
}
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
rm.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rm.enqueueController(rc)
}
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rm.queue.Add(key)
}
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rm.queue.AddAfter(key, after)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
for rm.processNextWorkItem() {
}
glog.Infof("replication controller worker shutting down")
}
func (rm *ReplicationManager) processNextWorkItem() bool {
key, quit := rm.queue.Get()
if quit {
return false
}
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err == nil {
rm.queue.Forget(key)
return true
}
rm.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true
}
// manageReplicas checks and updates replicas for the given replication controller.
// Does NOT modify <filteredPods>.
func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error {
diff := len(filteredPods) - int(*(rc.Spec.Replicas))
rcKey, err := controller.KeyFunc(rc)
if err != nil {
return err
}
if diff == 0 {
return nil
}
if diff < 0 {
diff *= -1
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
errCh := make(chan error, diff)
rm.expectations.ExpectCreations(rcKey, diff)
var wg sync.WaitGroup
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := integer.IntMin(diff, controller.SlowStartInitialBatchSize); diff > 0; batchSize = integer.IntMin(2*batchSize, diff) {
errorCount := len(errCh)
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
var err error
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: rc.Name,
UID: rc.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey)
errCh <- err
utilruntime.HandleError(err)
}
}()
}
wg.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := diff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for controller %q/%q", skippedPods, rc.Namespace, rc.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rm.expectations.CreationObserved(rcKey)
}
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
diff -= batchSize
}
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
return nil
}
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
// No need to sort pods if we are about to delete all of them
if *(rc.Spec.Replicas) != 0 {
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rc change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
deletedPodKeys := []string{}
for i := 0; i < diff; i++ {
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
}
// We use pod namespace/name as a UID to wait for deletions, so if the
// labels on a pod/rc change in a way that the pod gets orphaned, the
// rc will only wake up after the expectation has expired.
errCh := make(chan error, diff)
rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
var wg sync.WaitGroup
wg.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wg.Done()
if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey, podKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
wg.Wait()
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
return nil
}
// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (rm *ReplicationManager) syncReplicationController(key string) error {
trace := utiltrace.New("syncReplicationController: " + key)
defer trace.LogIfLong(250 * time.Millisecond)
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
rc, err := rm.rcLister.ReplicationControllers(namespace).Get(name)
if errors.IsNotFound(err) {
glog.Infof("Replication Controller has been deleted %v", key)
rm.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
trace.Step("ReplicationController restored")
rcNeedsSync := rm.expectations.SatisfiedExpectations(key)
trace.Step("Expectations restored")
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
allPods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
return err
}
// Ignore inactive pods.
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := rm.kubeClient.CoreV1().ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != rc.UID {
return nil, fmt.Errorf("original ReplicationController %v/%v is gone: got uid %v, wanted %v", rc.Namespace, rc.Name, fresh.UID, rc.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind, canAdoptFunc)
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
filteredPods, err = cm.ClaimPods(filteredPods)
if err != nil {
return err
}
var manageReplicasErr error
if rcNeedsSync && rc.DeletionTimestamp == nil {
manageReplicasErr = rm.manageReplicas(filteredPods, rc)
}
trace.Step("manageReplicas done")
rc = rc.DeepCopy()
newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRC, err := updateReplicationControllerStatus(rm.kubeClient.CoreV1().ReplicationControllers(rc.Namespace), *rc, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicationController after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRC.Spec.MinReadySeconds > 0 &&
updatedRC.Status.ReadyReplicas == *(updatedRC.Spec.Replicas) &&
updatedRC.Status.AvailableReplicas != *(updatedRC.Spec.Replicas) {
rm.enqueueControllerAfter(updatedRC, time.Duration(updatedRC.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}

File diff suppressed because it is too large Load Diff

View File

@ -19,123 +19,10 @@ limitations under the License.
package replication
import (
"fmt"
"reflect"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)
// updateReplicationControllerStatus attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (*v1.ReplicationController, error) {
// This is the steady state. It happens when the rc doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if rc.Status.Replicas == newStatus.Replicas &&
rc.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
rc.Status.ReadyReplicas == newStatus.ReadyReplicas &&
rc.Status.AvailableReplicas == newStatus.AvailableReplicas &&
rc.Generation == rc.Status.ObservedGeneration &&
reflect.DeepEqual(rc.Status.Conditions, newStatus.Conditions) {
return &rc, nil
}
// Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry.
// TODO: This can clobber an update if we allow multiple agents to write to the
// same status.
newStatus.ObservedGeneration = rc.Generation
var getErr, updateErr error
var updatedRC *v1.ReplicationController
for i, rc := 0, &rc; ; i++ {
glog.V(4).Infof(fmt.Sprintf("Updating status for rc: %s/%s, ", rc.Namespace, rc.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rc.Status.Replicas, newStatus.Replicas, *(rc.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rc.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rc.Status.ReadyReplicas, newStatus.ReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", rc.Status.AvailableReplicas, newStatus.AvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", rc.Status.ObservedGeneration, newStatus.ObservedGeneration))
rc.Status = newStatus
updatedRC, updateErr = c.UpdateStatus(rc)
if updateErr == nil {
return updatedRC, nil
}
// Stop retrying if we exceed statusUpdateRetries - the replicationController will be requeued with a rate limit.
if i >= statusUpdateRetries {
break
}
// Update the controller with the latest resource version for the next poll
if rc, getErr = c.Get(rc.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return nil, getErr
}
}
return nil, updateErr
}
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
type OverlappingControllers []*v1.ReplicationController
func (o OverlappingControllers) Len() int { return len(o) }
func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o OverlappingControllers) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
}
func calculateStatus(rc *v1.ReplicationController, filteredPods []*v1.Pod, manageReplicasErr error) v1.ReplicationControllerStatus {
newStatus := rc.Status
// Count the number of pods that have labels matching the labels of the pod
// template of the replication controller, the matching pods may have more
// labels than are in the template. Because the label of podTemplateSpec is
// a superset of the selector of the replication controller, so the possible
// matching pods must be part of the filteredPods.
fullyLabeledReplicasCount := 0
readyReplicasCount := 0
availableReplicasCount := 0
templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelectorPreValidated()
for _, pod := range filteredPods {
if templateLabel.Matches(labels.Set(pod.Labels)) {
fullyLabeledReplicasCount++
}
if podutil.IsPodReady(pod) {
readyReplicasCount++
if podutil.IsPodAvailable(pod, rc.Spec.MinReadySeconds, metav1.Now()) {
availableReplicasCount++
}
}
}
failureCond := GetCondition(rc.Status, v1.ReplicationControllerReplicaFailure)
if manageReplicasErr != nil && failureCond == nil {
var reason string
if diff := len(filteredPods) - int(*(rc.Spec.Replicas)); diff < 0 {
reason = "FailedCreate"
} else if diff > 0 {
reason = "FailedDelete"
}
cond := NewReplicationControllerCondition(v1.ReplicationControllerReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error())
SetCondition(&newStatus, cond)
} else if manageReplicasErr == nil && failureCond != nil {
RemoveCondition(&newStatus, v1.ReplicationControllerReplicaFailure)
}
newStatus.Replicas = int32(len(filteredPods))
newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
newStatus.ReadyReplicas = int32(readyReplicasCount)
newStatus.AvailableReplicas = int32(availableReplicasCount)
return newStatus
}
// NewReplicationControllerCondition creates a new replication controller condition.
func NewReplicationControllerCondition(condType v1.ReplicationControllerConditionType, status v1.ConditionStatus, reason, msg string) v1.ReplicationControllerCondition {
return v1.ReplicationControllerCondition{

View File

@ -0,0 +1,184 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replication
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
)
var (
imagePullBackOff v1.ReplicationControllerConditionType = "ImagePullBackOff"
condImagePullBackOff = func() v1.ReplicationControllerCondition {
return v1.ReplicationControllerCondition{
Type: imagePullBackOff,
Status: v1.ConditionTrue,
Reason: "NonExistentImage",
}
}
condReplicaFailure = func() v1.ReplicationControllerCondition {
return v1.ReplicationControllerCondition{
Type: v1.ReplicationControllerReplicaFailure,
Status: v1.ConditionTrue,
Reason: "OtherFailure",
}
}
condReplicaFailure2 = func() v1.ReplicationControllerCondition {
return v1.ReplicationControllerCondition{
Type: v1.ReplicationControllerReplicaFailure,
Status: v1.ConditionTrue,
Reason: "AnotherFailure",
}
}
status = func() *v1.ReplicationControllerStatus {
return &v1.ReplicationControllerStatus{
Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()},
}
}
)
func TestGetCondition(t *testing.T) {
exampleStatus := status()
tests := []struct {
name string
status v1.ReplicationControllerStatus
condType v1.ReplicationControllerConditionType
condStatus v1.ConditionStatus
condReason string
expected bool
}{
{
name: "condition exists",
status: *exampleStatus,
condType: v1.ReplicationControllerReplicaFailure,
expected: true,
},
{
name: "condition does not exist",
status: *exampleStatus,
condType: imagePullBackOff,
expected: false,
},
}
for _, test := range tests {
cond := GetCondition(test.status, test.condType)
exists := cond != nil
if exists != test.expected {
t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists)
}
}
}
func TestSetCondition(t *testing.T) {
tests := []struct {
name string
status *v1.ReplicationControllerStatus
cond v1.ReplicationControllerCondition
expectedStatus *v1.ReplicationControllerStatus
}{
{
name: "set for the first time",
status: &v1.ReplicationControllerStatus{},
cond: condReplicaFailure(),
expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}},
},
{
name: "simple set",
status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condImagePullBackOff()}},
cond: condReplicaFailure(),
expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condImagePullBackOff(), condReplicaFailure()}},
},
{
name: "overwrite",
status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}},
cond: condReplicaFailure2(),
expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure2()}},
},
}
for _, test := range tests {
SetCondition(test.status, test.cond)
if !reflect.DeepEqual(test.status, test.expectedStatus) {
t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
}
}
}
func TestRemoveCondition(t *testing.T) {
tests := []struct {
name string
status *v1.ReplicationControllerStatus
condType v1.ReplicationControllerConditionType
expectedStatus *v1.ReplicationControllerStatus
}{
{
name: "remove from empty status",
status: &v1.ReplicationControllerStatus{},
condType: v1.ReplicationControllerReplicaFailure,
expectedStatus: &v1.ReplicationControllerStatus{},
},
{
name: "simple remove",
status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}},
condType: v1.ReplicationControllerReplicaFailure,
expectedStatus: &v1.ReplicationControllerStatus{},
},
{
name: "doesn't remove anything",
status: status(),
condType: imagePullBackOff,
expectedStatus: status(),
},
}
for _, test := range tests {
RemoveCondition(test.status, test.condType)
if !reflect.DeepEqual(test.status, test.expectedStatus) {
t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
}
}
}

View File

@ -15,16 +15,21 @@ go_test(
importpath = "k8s.io/kubernetes/test/integration/replicationcontroller",
tags = ["integration"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller/replication:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)

View File

@ -24,24 +24,29 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/test/integration/framework"
)
const (
pollInterval = 100 * time.Millisecond
pollTimeout = 60 * time.Second
interval = 100 * time.Millisecond
timeout = 60 * time.Second
)
func testLabels() map[string]string {
return map[string]string{"name": "test"}
func labelMap() map[string]string {
return map[string]string{"foo": "bar"}
}
func newRC(name, namespace string, replicas int) *v1.ReplicationController {
@ -56,11 +61,11 @@ func newRC(name, namespace string, replicas int) *v1.ReplicationController {
Name: name,
},
Spec: v1.ReplicationControllerSpec{
Selector: testLabels(),
Selector: labelMap(),
Replicas: &replicasCopy,
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: testLabels(),
Labels: labelMap(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -84,7 +89,7 @@ func newMatchingPod(podName, namespace string) *v1.Pod {
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: testLabels(),
Labels: labelMap(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -126,7 +131,7 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
return ret, nil
}
func rmSetup(t *testing.T, stopCh chan struct{}) (*httptest.Server, framework.CloseFunc, *replication.ReplicationManager, informers.SharedInformerFactory, clientset.Interface) {
func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *replication.ReplicationManager, informers.SharedInformerFactory, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s, closeFn := framework.RunAMaster(masterConfig)
@ -136,23 +141,259 @@ func rmSetup(t *testing.T, stopCh chan struct{}) (*httptest.Server, framework.Cl
t.Fatalf("Error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "rc-informers")), resyncPeriod)
informers := informers.NewSharedInformerFactory(clientSet, resyncPeriod)
rm := replication.NewReplicationManager(informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientSet, replication.BurstReplicas)
informers.Start(stopCh)
rm := replication.NewReplicationManager(
informers.Core().V1().Pods(),
informers.Core().V1().ReplicationControllers(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),
replication.BurstReplicas,
)
if err != nil {
t.Fatalf("Failed to create replication controller")
}
return s, closeFn, rm, informers, clientSet
}
func rmSimpleSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s, closeFn := framework.RunAMaster(masterConfig)
config := restclient.Config{Host: s.URL}
clientSet, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
return s, closeFn, clientSet
}
// Run RC controller and informers
func runControllerAndInformers(t *testing.T, rm *replication.ReplicationManager, informers informers.SharedInformerFactory, podNum int) chan struct{} {
stopCh := make(chan struct{})
informers.Start(stopCh)
waitToObservePods(t, informers.Core().V1().Pods().Informer(), podNum)
go rm.Run(5, stopCh)
return stopCh
}
// wait for the podInformer to observe the pods. Call this function before
// running the RC manager to prevent the rc manager from creating new pods
// running the RC controller to prevent the rc manager from creating new pods
// rather than adopting the existing ones.
func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int) {
if err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
objects := podInformer.GetIndexer().List()
return len(objects) == podNum, nil
}); err != nil {
t.Fatal(err)
t.Fatalf("Error encountered when waiting for podInformer to observe the pods: %v", err)
}
}
func createRCsPods(t *testing.T, clientSet clientset.Interface, rcs []*v1.ReplicationController, pods []*v1.Pod) ([]*v1.ReplicationController, []*v1.Pod) {
var createdRCs []*v1.ReplicationController
var createdPods []*v1.Pod
for _, rc := range rcs {
createdRC, err := clientSet.CoreV1().ReplicationControllers(rc.Namespace).Create(rc)
if err != nil {
t.Fatalf("Failed to create replication controller %s: %v", rc.Name, err)
}
createdRCs = append(createdRCs, createdRC)
}
for _, pod := range pods {
createdPod, err := clientSet.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
createdPods = append(createdPods, createdPod)
}
return createdRCs, createdPods
}
// Verify .Status.Replicas is equal to .Spec.Replicas
func waitRCStable(t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController) {
rcClient := clientSet.CoreV1().ReplicationControllers(rc.Namespace)
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRC, err := rcClient.Get(rc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return newRC.Status.Replicas == *rc.Spec.Replicas, nil
}); err != nil {
t.Fatalf("Failed to verify .Status.Replicas is equal to .Spec.Replicas for rc %s: %v", rc.Name, err)
}
}
// Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly
func scaleRC(t *testing.T, c clientset.Interface, rc *v1.ReplicationController, replicas int32) {
rcClient := c.CoreV1().ReplicationControllers(rc.Namespace)
rc = updateRC(t, rcClient, rc.Name, func(rc *v1.ReplicationController) {
*rc.Spec.Replicas = replicas
})
waitRCStable(t, c, rc)
}
func updatePod(t *testing.T, podClient typedv1.PodInterface, podName string, updateFunc func(*v1.Pod)) *v1.Pod {
var pod *v1.Pod
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newPod, err := podClient.Get(podName, metav1.GetOptions{})
if err != nil {
return err
}
updateFunc(newPod)
pod, err = podClient.Update(newPod)
return err
}); err != nil {
t.Fatalf("Failed to update pod %s: %v", podName, err)
}
return pod
}
func updatePodStatus(t *testing.T, podClient typedv1.PodInterface, pod *v1.Pod, updateStatusFunc func(*v1.Pod)) {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newPod, err := podClient.Get(pod.Name, metav1.GetOptions{})
if err != nil {
return err
}
updateStatusFunc(newPod)
_, err = podClient.UpdateStatus(newPod)
return err
}); err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
}
func getPods(t *testing.T, podClient typedv1.PodInterface, labelMap map[string]string) *v1.PodList {
podSelector := labels.Set(labelMap).AsSelector()
options := metav1.ListOptions{LabelSelector: podSelector.String()}
pods, err := podClient.List(options)
if err != nil {
t.Fatalf("Failed obtaining a list of pods that match the pod labels %v: %v", labelMap, err)
}
return pods
}
func updateRC(t *testing.T, rcClient typedv1.ReplicationControllerInterface, rcName string, updateFunc func(*v1.ReplicationController)) *v1.ReplicationController {
var rc *v1.ReplicationController
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newRC, err := rcClient.Get(rcName, metav1.GetOptions{})
if err != nil {
return err
}
updateFunc(newRC)
rc, err = rcClient.Update(newRC)
return err
}); err != nil {
t.Fatalf("Failed to update rc %s: %v", rcName, err)
}
return rc
}
// Verify ControllerRef of a RC pod that has incorrect attributes is automatically patched by the RC
func testPodControllerRefPatch(t *testing.T, c clientset.Interface, pod *v1.Pod, ownerReference *metav1.OwnerReference, rc *v1.ReplicationController, expectedOwnerReferenceNum int) {
ns := rc.Namespace
podClient := c.CoreV1().Pods(ns)
updatePod(t, podClient, pod.Name, func(pod *v1.Pod) {
pod.OwnerReferences = []metav1.OwnerReference{*ownerReference}
})
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newPod, err := podClient.Get(pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return metav1.GetControllerOf(newPod) != nil, nil
}); err != nil {
t.Fatalf("Failed to verify ControllerRef for the pod %s is not nil: %v", pod.Name, err)
}
newPod, err := podClient.Get(pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to obtain pod %s: %v", pod.Name, err)
}
controllerRef := metav1.GetControllerOf(newPod)
if controllerRef.UID != rc.UID {
t.Fatalf("RC owner of the pod %s has a different UID: Expected %v, got %v", newPod.Name, rc.UID, controllerRef.UID)
}
ownerReferenceNum := len(newPod.GetOwnerReferences())
if ownerReferenceNum != expectedOwnerReferenceNum {
t.Fatalf("Unexpected number of owner references for pod %s: Expected %d, got %d", newPod.Name, expectedOwnerReferenceNum, ownerReferenceNum)
}
}
func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1.PodList, conditionStatus v1.ConditionStatus, lastTransitionTime time.Time) {
replicas := int32(len(pods.Items))
var readyPods int32
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
readyPods = 0
for i := range pods.Items {
pod := &pods.Items[i]
if podutil.IsPodReady(pod) {
readyPods++
continue
}
pod.Status.Phase = v1.PodRunning
_, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady)
if condition != nil {
condition.Status = conditionStatus
condition.LastTransitionTime = metav1.Time{Time: lastTransitionTime}
} else {
condition = &v1.PodCondition{
Type: v1.PodReady,
Status: conditionStatus,
LastTransitionTime: metav1.Time{Time: lastTransitionTime},
}
pod.Status.Conditions = append(pod.Status.Conditions, *condition)
}
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
// When status fails to be updated, we continue to next pod
continue
}
readyPods++
}
return readyPods >= replicas, nil
})
if err != nil {
t.Fatalf("failed to mark all ReplicationController pods to ready: %v", err)
}
}
func testScalingUsingScaleSubresource(t *testing.T, c clientset.Interface, rc *v1.ReplicationController, replicas int32) {
ns := rc.Namespace
rcClient := c.CoreV1().ReplicationControllers(ns)
newRC, err := rcClient.Get(rc.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to obtain rc %s: %v", rc.Name, err)
}
kind := "ReplicationController"
scaleClient := c.ExtensionsV1beta1().Scales(ns)
scale, err := scaleClient.Get(kind, rc.Name)
if err != nil {
t.Fatalf("Failed to obtain scale subresource for rc %s: %v", rc.Name, err)
}
if scale.Spec.Replicas != *newRC.Spec.Replicas {
t.Fatalf("Scale subresource for rc %s does not match .Spec.Replicas: expected %d, got %d", rc.Name, *newRC.Spec.Replicas, scale.Spec.Replicas)
}
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
scale, err := scaleClient.Get(kind, rc.Name)
if err != nil {
return err
}
scale.Spec.Replicas = replicas
_, err = scaleClient.Update(kind, scale)
return err
}); err != nil {
t.Fatalf("Failed to set .Spec.Replicas of scale subresource for rc %s: %v", rc.Name, err)
}
newRC, err = rcClient.Get(rc.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to obtain rc %s: %v", rc.Name, err)
}
if *newRC.Spec.Replicas != replicas {
t.Fatalf(".Spec.Replicas of rc %s does not match its scale subresource: expected %d, got %d", rc.Name, replicas, *newRC.Spec.Replicas)
}
}
@ -207,239 +448,440 @@ func TestAdoption(t *testing.T) {
},
}
for i, tc := range testCases {
stopCh := make(chan struct{})
s, closeFn, rm, informers, clientSet := rmSetup(t, stopCh)
defer closeFn()
ns := framework.CreateTestingNamespace(fmt.Sprintf("adoption-%d", i), s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
func() {
s, closeFn, rm, informers, clientSet := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace(fmt.Sprintf("rc-adoption-%d", i), s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name)
podClient := clientSet.CoreV1().Pods(ns.Name)
const rcName = "rc"
rc, err := rcClient.Create(newRC(rcName, ns.Name, 1))
if err != nil {
t.Fatalf("Failed to create replication controller: %v", err)
}
podName := fmt.Sprintf("pod%d", i)
pod := newMatchingPod(podName, ns.Name)
pod.OwnerReferences = tc.existingOwnerReferences(rc)
_, err = podClient.Create(pod)
if err != nil {
t.Fatalf("Failed to create Pod: %v", err)
}
informers.Start(stopCh)
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1)
go rm.Run(5, stopCh)
if err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
updatedPod, err := podClient.Get(pod.Name, metav1.GetOptions{})
rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name)
podClient := clientSet.CoreV1().Pods(ns.Name)
const rcName = "rc"
rc, err := rcClient.Create(newRC(rcName, ns.Name, 1))
if err != nil {
return false, err
t.Fatalf("Failed to create replication controllers: %v", err)
}
if e, a := tc.expectedOwnerReferences(rc), updatedPod.OwnerReferences; reflect.DeepEqual(e, a) {
return true, nil
} else {
t.Logf("ownerReferences don't match, expect %v, got %v", e, a)
return false, nil
podName := fmt.Sprintf("pod%d", i)
pod := newMatchingPod(podName, ns.Name)
pod.OwnerReferences = tc.existingOwnerReferences(rc)
_, err = podClient.Create(pod)
if err != nil {
t.Fatalf("Failed to create Pod: %v", err)
}
}); err != nil {
t.Fatalf("test %q failed: %v", tc.name, err)
}
close(stopCh)
stopCh := runControllerAndInformers(t, rm, informers, 1)
defer close(stopCh)
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
updatedPod, err := podClient.Get(pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if e, a := tc.expectedOwnerReferences(rc), updatedPod.OwnerReferences; reflect.DeepEqual(e, a) {
return true, nil
} else {
t.Logf("ownerReferences don't match, expect %v, got %v", e, a)
return false, nil
}
}); err != nil {
t.Fatalf("test %q failed: %v", tc.name, err)
}
}()
}
}
func createRCsPods(t *testing.T, clientSet clientset.Interface, rcs []*v1.ReplicationController, pods []*v1.Pod, ns string) {
rcClient := clientSet.CoreV1().ReplicationControllers(ns)
podClient := clientSet.CoreV1().Pods(ns)
for _, rc := range rcs {
if _, err := rcClient.Create(rc); err != nil {
t.Fatalf("Failed to create replication controller %s: %v", rc.Name, err)
}
}
for _, pod := range pods {
if _, err := podClient.Create(pod); err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
}
}
func TestSpecReplicasChange(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-spec-replicas-change", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
func waitRCStable(t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController, ns string) {
rcClient := clientSet.CoreV1().ReplicationControllers(ns)
if err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
updatedRC, err := rcClient.Get(rc.Name, metav1.GetOptions{})
rc := newRC("rc", ns.Name, 2)
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
rc = rcs[0]
waitRCStable(t, c, rc)
// Update .Spec.Replicas and verify .Status.Replicas is changed accordingly
scaleRC(t, c, rc, 3)
scaleRC(t, c, rc, 0)
scaleRC(t, c, rc, 2)
// Add a template annotation change to test RC's status does update
// without .Spec.Replicas change
rcClient := c.CoreV1().ReplicationControllers(ns.Name)
var oldGeneration int64
newRC := updateRC(t, rcClient, rc.Name, func(rc *v1.ReplicationController) {
oldGeneration = rc.Generation
rc.Spec.Template.Annotations = map[string]string{"test": "annotation"}
})
savedGeneration := newRC.Generation
if savedGeneration == oldGeneration {
t.Fatalf("Failed to verify .Generation has incremented for rc %s", rc.Name)
}
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRC, err := rcClient.Get(rc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return updatedRC.Status.Replicas == *rc.Spec.Replicas, nil
return newRC.Status.ObservedGeneration >= savedGeneration, nil
}); err != nil {
t.Fatal(err)
t.Fatalf("Failed to verify .Status.ObservedGeneration has incremented for rc %s: %v", rc.Name, err)
}
}
func TestUpdateSelectorToAdopt(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector
// matches pod1 only; change the selector to match pod2 as well. Verify
// there is only one pod left.
stopCh := make(chan struct{})
s, closeFn, rm, _, clientSet := rmSetup(t, stopCh)
func TestDeletingAndFailedPods(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("update-selector-to-adopt", s, t)
ns := framework.CreateTestingNamespace("test-deleting-and-failed-pods", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 1)
// let rc's selector only match pod1
rc.Spec.Selector["uniqueKey"] = "1"
rc.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"spec":{"selector":{"uniqueKey":null}}}`
rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name)
rc, err := rcClient.Patch(rc.Name, types.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replication controller: %v", err)
}
t.Logf("patched rc = %#v", rc)
// wait for the rc select both pods and delete one of them
if err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}
func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=2. At first rc.Selector
// matches pod1 and pod2; change the selector to match only pod1. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
stopCh := make(chan struct{})
s, closeFn, rm, informers, clientSet := rmSetup(t, stopCh)
defer closeFn()
ns := framework.CreateTestingNamespace("update-selector-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
rc = rcs[0]
waitRCStable(t, c, rc)
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 2)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"spec":{"selector":{"uniqueKey":"1"},"template":{"metadata":{"labels":{"uniqueKey":"1"}}}}}`
rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name)
rc, err := rcClient.Patch(rc.Name, types.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replication controller: %v", err)
// Verify RC creates 2 pods
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap())
if len(pods.Items) != 2 {
t.Fatalf("len(pods) = %d, want 2", len(pods.Items))
}
t.Logf("patched rc = %#v", rc)
// wait for the rc to create one more pod
if err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
// Set first pod as deleting pod
// Set finalizers for the pod to simulate pending deletion status
deletingPod := &pods.Items[0]
updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) {
pod.Finalizers = []string{"fake.example.com/blockDeletion"}
})
if err := c.CoreV1().Pods(ns.Name).Delete(deletingPod.Name, &metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error deleting pod %s: %v", deletingPod.Name, err)
}
// Set second pod as failed pod
failedPod := &pods.Items[1]
updatePodStatus(t, podClient, failedPod, func(pod *v1.Pod) {
pod.Status.Phase = v1.PodFailed
})
// Pool until 2 new pods have been created to replace deleting and failed pods
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pods = getPods(t, podClient, labelMap())
return len(pods.Items) == 4, nil
}); err != nil {
t.Fatal(err)
t.Fatalf("Failed to verify 2 new pods have been created (expected 4 pods): %v", err)
}
podClient := clientSet.CoreV1().Pods(ns.Name)
pod2, err = podClient.Get(pod2.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
// Verify deleting and failed pods are among the four pods
foundDeletingPod := false
foundFailedPod := false
for _, pod := range pods.Items {
if pod.UID == deletingPod.UID {
foundDeletingPod = true
}
if pod.UID == failedPod.UID {
foundFailedPod = true
}
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
// Verify deleting pod exists
if !foundDeletingPod {
t.Fatalf("expected deleting pod %s exists, but it is not found", deletingPod.Name)
}
// Verify failed pod exists
if !foundFailedPod {
t.Fatalf("expected failed pod %s exists, but it is not found", failedPod.Name)
}
close(stopCh)
}
func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=2. At first rc.Selector
// matches pod1 and pod2; change pod2's labels to non-matching. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
stopCh := make(chan struct{})
s, closeFn, rm, _, clientSet := rmSetup(t, stopCh)
func TestOverlappingRCs(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("update-label-to-remove-controllerref", s, t)
ns := framework.CreateTestingNamespace("test-overlapping-rcs", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod2 := newMatchingPod("pod2", ns.Name)
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// Create 2 RCs with identical selectors
for i := 0; i < 2; i++ {
// One RC has 1 replica, and another has 2 replicas
rc := newRC(fmt.Sprintf("rc-%d", i+1), ns.Name, i+1)
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
waitRCStable(t, c, rcs[0])
}
// change the rc's selector to match both pods
patch := `{"metadata":{"labels":{"name":null}}}`
podClient := clientSet.CoreV1().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, types.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
// Expect 3 total Pods to be created
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap())
if len(pods.Items) != 3 {
t.Errorf("len(pods) = %d, want 3", len(pods.Items))
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rc to create one more pod
if err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
}); err != nil {
t.Fatal(err)
// Expect both RCs have .status.replicas = .spec.replicas
for i := 0; i < 2; i++ {
newRC, err := c.CoreV1().ReplicationControllers(ns.Name).Get(fmt.Sprintf("rc-%d", i+1), metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to obtain rc rc-%d: %v", i+1, err)
}
if newRC.Status.Replicas != *newRC.Spec.Replicas {
t.Fatalf(".Status.Replicas %d is not equal to .Spec.Replicas %d", newRC.Status.Replicas, *newRC.Spec.Replicas)
}
}
pod2, err = podClient.Get(pod2.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
}
close(stopCh)
}
func TestUpdateLabelToBeAdopted(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector
// matches pod1 only; change pod2's labels to be matching. Verify the RC
// controller adopts pod2 and delete one of them, so there is only 1 pod
// left.
stopCh := make(chan struct{})
s, closeFn, rm, _, clientSet := rmSetup(t, stopCh)
func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("update-label-to-be-adopted", s, t)
ns := framework.CreateTestingNamespace("test-pod-orphaning-and-adoption-when-labels-change", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
rc := newRC("rc", ns.Name, 1)
// let rc's selector only matches pod1
rc.Spec.Selector["uniqueKey"] = "1"
rc.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
rc = rcs[0]
waitRCStable(t, c, rc)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"metadata":{"labels":{"uniqueKey":"1"}}}`
podClient := clientSet.CoreV1().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, types.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
// Orphaning: RC should remove OwnerReference from a pod when the pod's labels change to not match its labels
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap())
if len(pods.Items) != 1 {
t.Fatalf("len(pods) = %d, want 1", len(pods.Items))
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rc to select both pods and delete one of them
if err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
pod := &pods.Items[0]
// Start by verifying ControllerRef for the pod is not nil
if metav1.GetControllerOf(pod) == nil {
t.Fatalf("ControllerRef of pod %s is nil", pod.Name)
}
newLabelMap := map[string]string{"new-foo": "new-bar"}
updatePod(t, podClient, pod.Name, func(pod *v1.Pod) {
pod.Labels = newLabelMap
})
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newPod, err := podClient.Get(pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
pod = newPod
return metav1.GetControllerOf(newPod) == nil, nil
}); err != nil {
t.Fatal(err)
t.Fatalf("Failed to verify ControllerRef for the pod %s is nil: %v", pod.Name, err)
}
// Adoption: RC should add ControllerRef to a pod when the pod's labels change to match its labels
updatePod(t, podClient, pod.Name, func(pod *v1.Pod) {
pod.Labels = labelMap()
})
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newPod, err := podClient.Get(pod.Name, metav1.GetOptions{})
if err != nil {
// If the pod is not found, it means the RC picks the pod for deletion (it is extra)
// Verify there is only one pod in namespace and it has ControllerRef to the RC
if errors.IsNotFound(err) {
pods := getPods(t, podClient, labelMap())
if len(pods.Items) != 1 {
return false, fmt.Errorf("Expected 1 pod in current namespace, got %d", len(pods.Items))
}
// Set the pod accordingly
pod = &pods.Items[0]
return true, nil
}
return false, err
}
// Always update the pod so that we can save a GET call to API server later
pod = newPod
// If the pod is found, verify the pod has a ControllerRef
return metav1.GetControllerOf(newPod) != nil, nil
}); err != nil {
t.Fatalf("Failed to verify ControllerRef for pod %s is not nil: %v", pod.Name, err)
}
// Verify the pod has a ControllerRef to the RC
// Do nothing if the pod is nil (i.e., has been picked for deletion)
if pod != nil {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef.UID != rc.UID {
t.Fatalf("RC owner of the pod %s has a different UID: Expected %v, got %v", pod.Name, rc.UID, controllerRef.UID)
}
}
}
func TestGeneralPodAdoption(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-general-pod-adoption", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
rc := newRC("rc", ns.Name, 1)
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
rc = rcs[0]
waitRCStable(t, c, rc)
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap())
if len(pods.Items) != 1 {
t.Fatalf("len(pods) = %d, want 1", len(pods.Items))
}
pod := &pods.Items[0]
var falseVar = false
// When the only OwnerReference of the pod points to another type of API object such as statefulset
// with Controller=false, the RC should add a second OwnerReference (ControllerRef) pointing to itself
// with Controller=true
ownerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "apps/v1beta1", Kind: "StatefulSet", Name: rc.Name, Controller: &falseVar}
testPodControllerRefPatch(t, c, pod, &ownerReference, rc, 2)
// When the only OwnerReference of the pod points to the RC, but Controller=false
ownerReference = metav1.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &falseVar}
testPodControllerRefPatch(t, c, pod, &ownerReference, rc, 1)
}
func TestReadyAndAvailableReplicas(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-ready-and-available-replicas", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
rc := newRC("rc", ns.Name, 3)
rc.Spec.MinReadySeconds = 3600
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
rc = rcs[0]
waitRCStable(t, c, rc)
// First verify no pod is available
if rc.Status.AvailableReplicas != 0 {
t.Fatalf("Unexpected .Status.AvailableReplicas: Expected 0, saw %d", rc.Status.AvailableReplicas)
}
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap())
if len(pods.Items) != 3 {
t.Fatalf("len(pods) = %d, want 3", len(pods.Items))
}
// Separate 3 pods into their own list
firstPodList := &v1.PodList{Items: pods.Items[:1]}
secondPodList := &v1.PodList{Items: pods.Items[1:2]}
thirdPodList := &v1.PodList{Items: pods.Items[2:]}
// First pod: Running, but not Ready
// by setting the Ready condition to false with LastTransitionTime to be now
setPodsReadyCondition(t, c, firstPodList, v1.ConditionFalse, time.Now())
// Second pod: Running and Ready, but not Available
// by setting LastTransitionTime to now
setPodsReadyCondition(t, c, secondPodList, v1.ConditionTrue, time.Now())
// Third pod: Running, Ready, and Available
// by setting LastTransitionTime to more than 3600 seconds ago
setPodsReadyCondition(t, c, thirdPodList, v1.ConditionTrue, time.Now().Add(-120*time.Minute))
rcClient := c.CoreV1().ReplicationControllers(ns.Name)
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRC, err := rcClient.Get(rc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// Verify 3 pods exist, 2 pods are Ready, and 1 pod is Available
return newRC.Status.Replicas == 3 && newRC.Status.ReadyReplicas == 2 && newRC.Status.AvailableReplicas == 1, nil
}); err != nil {
t.Fatalf("Failed to verify number of Replicas, ReadyReplicas and AvailableReplicas of rc %s to be as expected: %v", rc.Name, err)
}
}
func TestRCScaleSubresource(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-rc-scale-subresource", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
rc := newRC("rc", ns.Name, 1)
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
rc = rcs[0]
waitRCStable(t, c, rc)
// Use scale subresource to scale up .Spec.Replicas to 3
testScalingUsingScaleSubresource(t, c, rc, 3)
// Use the scale subresource to scale down .Spec.Replicas to 0
testScalingUsingScaleSubresource(t, c, rc, 0)
}
func TestExtraPodsAdoptionAndDeletion(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-extra-pods-adoption-and-deletion", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
// Create 3 pods, RC should adopt only 2 of them
podList := []*v1.Pod{}
for i := 0; i < 3; i++ {
pod := newMatchingPod(fmt.Sprintf("pod-%d", i+1), ns.Name)
pod.Labels = labelMap()
podList = append(podList, pod)
}
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, podList)
rc = rcs[0]
stopCh := runControllerAndInformers(t, rm, informers, 3)
defer close(stopCh)
waitRCStable(t, c, rc)
// Verify the extra pod is deleted eventually by determining whether number of
// all pods within namespace matches .spec.replicas of the RC (2 in this case)
podClient := c.CoreV1().Pods(ns.Name)
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
// All pods have labelMap as their labels
pods := getPods(t, podClient, labelMap())
return int32(len(pods.Items)) == *rc.Spec.Replicas, nil
}); err != nil {
t.Fatalf("Failed to verify number of all pods within current namespace matches .spec.replicas of rc %s: %v", rc.Name, err)
}
}
func TestFullyLabeledReplicas(t *testing.T) {
s, closeFn, rm, informers, c := rmSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-fully-labeled-replicas", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := runControllerAndInformers(t, rm, informers, 0)
defer close(stopCh)
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
rc := newRC("rc", ns.Name, 2)
rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{})
rc = rcs[0]
waitRCStable(t, c, rc)
// Change RC's template labels to have extra labels, but not its selector
rcClient := c.CoreV1().ReplicationControllers(ns.Name)
updateRC(t, rcClient, rc.Name, func(rc *v1.ReplicationController) {
rc.Spec.Template.Labels = extraLabelMap
})
// Set one of the pods to have extra labels
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap())
if len(pods.Items) != 2 {
t.Fatalf("len(pods) = %d, want 2", len(pods.Items))
}
fullyLabeledPod := &pods.Items[0]
updatePod(t, podClient, fullyLabeledPod.Name, func(pod *v1.Pod) {
pod.Labels = extraLabelMap
})
// Verify only one pod is fully labeled
if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
newRC, err := rcClient.Get(rc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return (newRC.Status.Replicas == 2 && newRC.Status.FullyLabeledReplicas == 1), nil
}); err != nil {
t.Fatalf("Failed to verify only one pod is fully labeled: %v", err)
}
close(stopCh)
}