mirror of https://github.com/k3s-io/k3s
Deployment: Use ControllerRef to list controlled objects.
Although Deployment already applied its ControllerRef to adopt matching ReplicaSets, it actually still used label selectors to list objects that it controls. That meant it didn't actually follow the rules of ControllerRef, so it could still fight with other controller types. This should mean that the special handling for overlapping Deployments is no longer necessary, since each Deployment will only see objects that it owns (via ControllerRef).pull/6/head
parent
e82834e4d8
commit
92d75cbb23
|
@ -37,7 +37,7 @@ go_library(
|
|||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||
"//vendor:k8s.io/apimachinery/pkg/types",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/errors",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||
|
|
|
@ -31,8 +31,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
@ -58,9 +57,8 @@ const (
|
|||
maxRetries = 5
|
||||
)
|
||||
|
||||
func getDeploymentKind() schema.GroupVersionKind {
|
||||
return extensions.SchemeGroupVersion.WithKind("Deployment")
|
||||
}
|
||||
// controllerKind contains the schema.GroupVersionKind for this controller type.
|
||||
var controllerKind = extensions.SchemeGroupVersion.WithKind("Deployment")
|
||||
|
||||
// DeploymentController is responsible for synchronizing Deployment objects stored
|
||||
// in the system with actual running replica sets and pods.
|
||||
|
@ -174,28 +172,6 @@ func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
|
|||
curD := cur.(*extensions.Deployment)
|
||||
glog.V(4).Infof("Updating deployment %s", oldD.Name)
|
||||
dc.enqueueDeployment(curD)
|
||||
// If the selector of the current deployment just changed, we need to requeue any old
|
||||
// overlapping deployments. If the new selector steps on another deployment, the current
|
||||
// deployment will get denied during the resync loop.
|
||||
if !reflect.DeepEqual(curD.Spec.Selector, oldD.Spec.Selector) {
|
||||
deployments, err := dc.dLister.Deployments(curD.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("error listing deployments in namespace %s: %v", curD.Namespace, err))
|
||||
return
|
||||
}
|
||||
// Trigger cleanup of any old overlapping deployments; we don't care about any error
|
||||
// returned here.
|
||||
for i := range deployments {
|
||||
otherD := deployments[i]
|
||||
|
||||
oldOverlaps, oldErr := util.OverlapsWith(oldD, otherD)
|
||||
curOverlaps, curErr := util.OverlapsWith(curD, otherD)
|
||||
// Enqueue otherD so it gets cleaned up
|
||||
if oldErr == nil && curErr == nil && oldOverlaps && !curOverlaps {
|
||||
dc.enqueueDeployment(otherD)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) deleteDeployment(obj interface{}) {
|
||||
|
@ -214,22 +190,6 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) {
|
|||
}
|
||||
glog.V(4).Infof("Deleting deployment %s", d.Name)
|
||||
dc.enqueueDeployment(d)
|
||||
deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err))
|
||||
return
|
||||
}
|
||||
// Trigger cleanup of any old overlapping deployments; we don't care about any error
|
||||
// returned here.
|
||||
for i := range deployments {
|
||||
otherD := deployments[i]
|
||||
|
||||
overlaps, err := util.OverlapsWith(d, otherD)
|
||||
// Enqueue otherD so it gets cleaned up
|
||||
if err == nil && overlaps {
|
||||
dc.enqueueDeployment(otherD)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
|
||||
|
@ -336,8 +296,20 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
|
|||
}
|
||||
glog.V(4).Infof("Pod %s deleted.", pod.Name)
|
||||
if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == extensions.RecreateDeploymentStrategyType {
|
||||
podList, err := dc.listPods(d)
|
||||
if err == nil && len(podList.Items) == 0 {
|
||||
// Sync if this Deployment now has no more Pods.
|
||||
rsList, err := dc.getReplicaSetsForDeployment(d)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
podMap, err := dc.getPodMapForReplicaSets(d.Namespace, rsList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
numPods := 0
|
||||
for _, podList := range podMap {
|
||||
numPods += len(podList.Items)
|
||||
}
|
||||
if numPods == 0 {
|
||||
dc.enqueueDeployment(d)
|
||||
}
|
||||
}
|
||||
|
@ -447,23 +419,52 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
|||
dc.queue.Forget(key)
|
||||
}
|
||||
|
||||
// claimReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets
|
||||
// and adopts them if their labels match the Deployment but are missing the reference.
|
||||
// It also removes the controllerRef for ReplicaSets, whose labels no longer matches
|
||||
// the deployment.
|
||||
func (dc *DeploymentController) claimReplicaSets(deployment *extensions.Deployment) error {
|
||||
rsList, err := dc.rsLister.ReplicaSets(deployment.Namespace).List(labels.Everything())
|
||||
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile
|
||||
// ControllerRef by adopting and orphaning.
|
||||
// It returns the list of ReplicaSets that this Deployment should manage.
|
||||
func (dc *DeploymentController) getReplicaSetsForDeployment(d *extensions.Deployment) ([]*extensions.ReplicaSet, error) {
|
||||
// List all ReplicaSets to find those we own but that no longer match our
|
||||
// selector. They will be orphaned by ClaimReplicaSets().
|
||||
rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
|
||||
}
|
||||
cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind)
|
||||
return cm.ClaimReplicaSets(rsList)
|
||||
}
|
||||
|
||||
deploymentSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
|
||||
// getPodMapForReplicaSets scans the list of all Pods and returns a map from
|
||||
// RS UID to Pods controlled by that RS, based on the Pod's ControllerRef.
|
||||
func (dc *DeploymentController) getPodMapForReplicaSets(namespace string, rsList []*extensions.ReplicaSet) (map[types.UID]*v1.PodList, error) {
|
||||
// List all Pods.
|
||||
pods, err := dc.podLister.Pods(namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return fmt.Errorf("deployment %s/%s has invalid label selector: %v", deployment.Namespace, deployment.Name, err)
|
||||
return nil, err
|
||||
}
|
||||
cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, deployment, deploymentSelector, getDeploymentKind())
|
||||
_, err = cm.ClaimReplicaSets(rsList)
|
||||
return err
|
||||
// Group Pods by their controller (if it's in rsList).
|
||||
podMap := make(map[types.UID]*v1.PodList, len(rsList))
|
||||
for _, rs := range rsList {
|
||||
podMap[rs.UID] = &v1.PodList{}
|
||||
}
|
||||
for _, pod := range pods {
|
||||
// Ignore inactive Pods since that's what ReplicaSet does.
|
||||
if !controller.IsPodActive(pod) {
|
||||
continue
|
||||
}
|
||||
controllerRef := controller.GetControllerOf(pod)
|
||||
if controllerRef == nil {
|
||||
continue
|
||||
}
|
||||
// Only append if we care about this UID.
|
||||
if podList, ok := podMap[controllerRef.UID]; ok {
|
||||
podList.Items = append(podList.Items, *pod)
|
||||
}
|
||||
}
|
||||
return podMap, nil
|
||||
}
|
||||
|
||||
// syncDeployment will sync the deployment with the given key.
|
||||
|
@ -506,25 +507,21 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything())
|
||||
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
|
||||
// through adoption/orphaning.
|
||||
rsList, err := dc.getReplicaSetsForDeployment(d)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Handle overlapping deployments by deterministically avoid syncing deployments that fight over ReplicaSets.
|
||||
overlaps, err := dc.handleOverlap(d, deployments)
|
||||
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
|
||||
// This is expensive, so do it once and pass it along to subroutines.
|
||||
podMap, err := dc.getPodMapForReplicaSets(d.Namespace, rsList)
|
||||
if err != nil {
|
||||
if overlaps {
|
||||
// Emit an event and return a nil error for overlapping deployments so we won't resync them again.
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectorOverlap", err.Error())
|
||||
return nil
|
||||
}
|
||||
// For any other failure, we should retry the deployment.
|
||||
return err
|
||||
}
|
||||
|
||||
if d.DeletionTimestamp != nil {
|
||||
return dc.syncStatusOnly(d)
|
||||
return dc.syncStatusOnly(d, rsList, podMap)
|
||||
}
|
||||
|
||||
// Why run the cleanup policy only when there is no rollback request?
|
||||
|
@ -536,7 +533,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||
// (and chances are higher that they will work again as opposed to others that didn't) for candidates to
|
||||
// automatically roll back to (#23211) and the cleanup policy should help.
|
||||
if d.Spec.RollbackTo == nil {
|
||||
_, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
|
||||
_, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -546,11 +543,6 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||
dc.cleanupDeployment(oldRSs, d)
|
||||
}
|
||||
|
||||
err = dc.claimReplicaSets(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update deployment conditions with an Unknown condition when pausing/resuming
|
||||
// a deployment. In this way, we can be sure that we won't timeout when a user
|
||||
// resumes a Deployment with a set progressDeadlineSeconds.
|
||||
|
@ -558,7 +550,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
_, err = dc.hasFailed(d)
|
||||
_, err = dc.hasFailed(d, rsList, podMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -567,152 +559,29 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||
// See https://github.com/kubernetes/kubernetes/issues/23211.
|
||||
|
||||
if d.Spec.Paused {
|
||||
return dc.sync(d)
|
||||
return dc.sync(d, rsList, podMap)
|
||||
}
|
||||
|
||||
// rollback is not re-entrant in case the underlying replica sets are updated with a new
|
||||
// revision so we should ensure that we won't proceed to update replica sets until we
|
||||
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
|
||||
if d.Spec.RollbackTo != nil {
|
||||
return dc.rollback(d)
|
||||
return dc.rollback(d, rsList, podMap)
|
||||
}
|
||||
|
||||
scalingEvent, err := dc.isScalingEvent(d)
|
||||
scalingEvent, err := dc.isScalingEvent(d, rsList, podMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scalingEvent {
|
||||
return dc.sync(d)
|
||||
return dc.sync(d, rsList, podMap)
|
||||
}
|
||||
|
||||
switch d.Spec.Strategy.Type {
|
||||
case extensions.RecreateDeploymentStrategyType:
|
||||
return dc.rolloutRecreate(d)
|
||||
return dc.rolloutRecreate(d, rsList, podMap)
|
||||
case extensions.RollingUpdateDeploymentStrategyType:
|
||||
return dc.rolloutRolling(d)
|
||||
return dc.rolloutRolling(d, rsList, podMap)
|
||||
}
|
||||
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
|
||||
}
|
||||
|
||||
// handleOverlap will avoid syncing the newer overlapping ones (only sync the oldest one). New/old is
|
||||
// determined by when the deployment's selector is last updated.
|
||||
func (dc *DeploymentController) handleOverlap(d *extensions.Deployment, deployments []*extensions.Deployment) (bool, error) {
|
||||
overlapping := false
|
||||
var errs []error
|
||||
|
||||
for i := range deployments {
|
||||
otherD := deployments[i]
|
||||
|
||||
if d.Name == otherD.Name {
|
||||
continue
|
||||
}
|
||||
|
||||
// Error is already checked during validation
|
||||
foundOverlaps, _ := util.OverlapsWith(d, otherD)
|
||||
|
||||
// If the otherD deployment overlaps with the current we need to identify which one
|
||||
// holds the set longer and mark the other as overlapping. Requeue the overlapping
|
||||
// deployments if this one has been marked deleted, we only update its status as long
|
||||
// as it is not actually deleted.
|
||||
if foundOverlaps && d.DeletionTimestamp == nil {
|
||||
overlapping = true
|
||||
// Look at the overlapping annotation in both deployments. If one of them has it and points
|
||||
// to the other one then we don't need to compare their timestamps.
|
||||
otherOverlapsWith := otherD.Annotations[util.OverlapAnnotation]
|
||||
currentOverlapsWith := d.Annotations[util.OverlapAnnotation]
|
||||
// The other deployment is already marked as overlapping with the current one.
|
||||
if otherOverlapsWith == d.Name {
|
||||
var err error
|
||||
if d, err = dc.clearDeploymentOverlap(d, otherD.Name); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
otherCopy, err := util.DeploymentDeepCopy(otherD)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Skip syncing this one if older overlapping one is found.
|
||||
if currentOverlapsWith == otherCopy.Name || util.SelectorUpdatedBefore(otherCopy, d) {
|
||||
if _, err = dc.markDeploymentOverlap(d, otherCopy.Name); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if _, err = dc.clearDeploymentOverlap(otherCopy, d.Name); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, fmt.Errorf("deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, otherCopy.Namespace, otherCopy.Name)
|
||||
}
|
||||
|
||||
// TODO: We need to support annotations in deployments that overlap with multiple other
|
||||
// deployments.
|
||||
if _, err = dc.markDeploymentOverlap(otherCopy, d.Name); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
// This is going to get some deployments into update hotlooping if we remove the overlapping
|
||||
// annotation unconditionally.
|
||||
//
|
||||
// Scenario:
|
||||
// --> Deployment foo with label selector A=A is created.
|
||||
// --> Deployment bar with label selector A=A,B=B is created. Marked as overlapping since it
|
||||
// overlaps with foo.
|
||||
// --> Deployment baz with label selector B=B is created. Marked as overlapping, since it
|
||||
// overlaps with bar, bar overlapping annotation is cleaned up. Next sync loop marks bar
|
||||
// as overlapping and it gets in an update hotloop.
|
||||
if d, err = dc.clearDeploymentOverlap(d, otherCopy.Name); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// If the otherD deployment does not overlap with the current deployment *anymore*
|
||||
// we need to cleanup otherD from the overlapping annotation so it can be synced by
|
||||
// the deployment controller.
|
||||
dName, hasOverlappingAnnotation := otherD.Annotations[util.OverlapAnnotation]
|
||||
if hasOverlappingAnnotation && dName == d.Name {
|
||||
otherCopy, err := util.DeploymentDeepCopy(otherD)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if _, err = dc.clearDeploymentOverlap(otherCopy, d.Name); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !overlapping {
|
||||
var err error
|
||||
if d, err = dc.clearDeploymentOverlap(d, ""); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
return false, utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) markDeploymentOverlap(deployment *extensions.Deployment, withDeployment string) (*extensions.Deployment, error) {
|
||||
if deployment.Annotations[util.OverlapAnnotation] == withDeployment && deployment.Status.ObservedGeneration >= deployment.Generation {
|
||||
return deployment, nil
|
||||
}
|
||||
if deployment.Annotations == nil {
|
||||
deployment.Annotations = make(map[string]string)
|
||||
}
|
||||
// Update observedGeneration for overlapping deployments so that their deletion won't be blocked.
|
||||
deployment.Status.ObservedGeneration = deployment.Generation
|
||||
deployment.Annotations[util.OverlapAnnotation] = withDeployment
|
||||
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.Deployment, otherName string) (*extensions.Deployment, error) {
|
||||
overlapsWith := deployment.Annotations[util.OverlapAnnotation]
|
||||
if len(overlapsWith) == 0 {
|
||||
return deployment, nil
|
||||
}
|
||||
// This is not the deployment found in the annotation - do not remove the annotation.
|
||||
if len(otherName) > 0 && otherName != overlapsWith {
|
||||
return deployment, nil
|
||||
}
|
||||
delete(deployment.Annotations, util.OverlapAnnotation)
|
||||
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package deployment
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
@ -108,13 +107,13 @@ func newDeployment(name string, replicas int, revisionHistoryLimit *int32, maxSu
|
|||
}
|
||||
|
||||
func newReplicaSet(d *extensions.Deployment, name string, replicas int) *extensions.ReplicaSet {
|
||||
control := true
|
||||
return &extensions.ReplicaSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
UID: uuid.NewUUID(),
|
||||
Namespace: metav1.NamespaceDefault,
|
||||
Labels: d.Spec.Selector.MatchLabels,
|
||||
OwnerReferences: []metav1.OwnerReference{{APIVersion: getDeploymentKind().GroupVersion().Version, Kind: getDeploymentKind().Kind, Name: d.Name, UID: d.UID, Controller: &control}},
|
||||
OwnerReferences: []metav1.OwnerReference{*newControllerRef(d)},
|
||||
},
|
||||
Spec: extensions.ReplicaSetSpec{
|
||||
Selector: d.Spec.Selector,
|
||||
|
@ -311,195 +310,6 @@ func TestReentrantRollback(t *testing.T) {
|
|||
f.run(getKey(d, t))
|
||||
}
|
||||
|
||||
// TestOverlappingDeployment ensures that an overlapping deployment will not be synced by
|
||||
// the controller.
|
||||
func TestOverlappingDeployment(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
now := metav1.Now()
|
||||
later := metav1.Time{Time: now.Add(time.Minute)}
|
||||
|
||||
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
foo.CreationTimestamp = now
|
||||
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"})
|
||||
bar.CreationTimestamp = later
|
||||
|
||||
f.dLister = append(f.dLister, foo, bar)
|
||||
f.objects = append(f.objects, foo, bar)
|
||||
|
||||
f.expectUpdateDeploymentStatusAction(bar)
|
||||
f.run(getKey(bar, t))
|
||||
|
||||
for _, a := range filterInformerActions(f.client.Actions()) {
|
||||
action, ok := a.(core.UpdateAction)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
d, ok := action.GetObject().(*extensions.Deployment)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if d.Name == "bar" && d.Annotations[util.OverlapAnnotation] != "foo" {
|
||||
t.Errorf("annotations weren't updated for the overlapping deployment: %v", d.Annotations)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyncOverlappedDeployment ensures that from two overlapping deployments, the older
|
||||
// one will be synced and the newer will be marked as overlapping. Note that in reality it's
|
||||
// not always the older deployment that is the one that works vs the rest but the one which
|
||||
// has the selector unchanged for longer time.
|
||||
func TestSyncOverlappedDeployment(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
now := metav1.Now()
|
||||
later := metav1.Time{Time: now.Add(time.Minute)}
|
||||
|
||||
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
foo.CreationTimestamp = now
|
||||
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"})
|
||||
bar.CreationTimestamp = later
|
||||
|
||||
f.dLister = append(f.dLister, foo, bar)
|
||||
f.objects = append(f.objects, foo, bar)
|
||||
|
||||
f.expectUpdateDeploymentStatusAction(bar)
|
||||
f.expectCreateRSAction(newReplicaSet(foo, "foo-rs", 1))
|
||||
f.expectUpdateDeploymentStatusAction(foo)
|
||||
f.expectUpdateDeploymentStatusAction(foo)
|
||||
f.run(getKey(foo, t))
|
||||
|
||||
for _, a := range filterInformerActions(f.client.Actions()) {
|
||||
action, ok := a.(core.UpdateAction)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
d, ok := action.GetObject().(*extensions.Deployment)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if d.Name == "bar" && d.Annotations[util.OverlapAnnotation] != "foo" {
|
||||
t.Errorf("annotations weren't updated for the overlapping deployment: %v", d.Annotations)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSelectorUpdate ensures that from two overlapping deployments, the one that is working won't
|
||||
// be marked as overlapping if its selector is updated but still overlaps with the other one.
|
||||
func TestSelectorUpdate(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
now := metav1.Now()
|
||||
later := metav1.Time{Time: now.Add(time.Minute)}
|
||||
selectorUpdated := metav1.Time{Time: later.Add(time.Minute)}
|
||||
|
||||
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
foo.CreationTimestamp = now
|
||||
foo.Annotations = map[string]string{util.SelectorUpdateAnnotation: selectorUpdated.Format(time.RFC3339)}
|
||||
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"})
|
||||
bar.CreationTimestamp = later
|
||||
bar.Annotations = map[string]string{util.OverlapAnnotation: "foo"}
|
||||
|
||||
f.dLister = append(f.dLister, foo, bar)
|
||||
f.objects = append(f.objects, foo, bar)
|
||||
|
||||
f.expectCreateRSAction(newReplicaSet(foo, "foo-rs", 1))
|
||||
f.expectUpdateDeploymentStatusAction(foo)
|
||||
f.expectUpdateDeploymentStatusAction(foo)
|
||||
f.run(getKey(foo, t))
|
||||
|
||||
for _, a := range filterInformerActions(f.client.Actions()) {
|
||||
action, ok := a.(core.UpdateAction)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
d, ok := action.GetObject().(*extensions.Deployment)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if d.Name == "foo" && len(d.Annotations[util.OverlapAnnotation]) > 0 {
|
||||
t.Errorf("deployment %q should not have the overlapping annotation", d.Name)
|
||||
}
|
||||
if d.Name == "bar" && len(d.Annotations[util.OverlapAnnotation]) == 0 {
|
||||
t.Errorf("deployment %q should have the overlapping annotation", d.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeletedDeploymentShouldCleanupOverlaps ensures that the deletion of a deployment
|
||||
// will cleanup any deployments that overlap with it.
|
||||
func TestDeletedDeploymentShouldCleanupOverlaps(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
now := metav1.Now()
|
||||
earlier := metav1.Time{Time: now.Add(-time.Minute)}
|
||||
later := metav1.Time{Time: now.Add(time.Minute)}
|
||||
|
||||
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
foo.CreationTimestamp = earlier
|
||||
foo.DeletionTimestamp = &now
|
||||
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
bar.CreationTimestamp = later
|
||||
bar.Annotations = map[string]string{util.OverlapAnnotation: "foo"}
|
||||
|
||||
f.dLister = append(f.dLister, foo, bar)
|
||||
f.objects = append(f.objects, foo, bar)
|
||||
|
||||
f.expectUpdateDeploymentStatusAction(bar)
|
||||
f.expectUpdateDeploymentStatusAction(foo)
|
||||
f.run(getKey(foo, t))
|
||||
|
||||
for _, a := range filterInformerActions(f.client.Actions()) {
|
||||
action, ok := a.(core.UpdateAction)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
d := action.GetObject().(*extensions.Deployment)
|
||||
if d.Name != "bar" {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(d.Annotations[util.OverlapAnnotation]) > 0 {
|
||||
t.Errorf("annotations weren't cleaned up for the overlapping deployment: %v", d.Annotations)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeletedDeploymentShouldNotCleanupOtherOverlaps ensures that the deletion of
|
||||
// a deployment will not cleanup deployments that overlap with another deployment.
|
||||
func TestDeletedDeploymentShouldNotCleanupOtherOverlaps(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
now := metav1.Now()
|
||||
earlier := metav1.Time{Time: now.Add(-time.Minute)}
|
||||
later := metav1.Time{Time: now.Add(time.Minute)}
|
||||
|
||||
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
foo.CreationTimestamp = earlier
|
||||
foo.DeletionTimestamp = &now
|
||||
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"bla": "bla"})
|
||||
bar.CreationTimestamp = later
|
||||
// Notice this deployment is overlapping with another deployment
|
||||
bar.Annotations = map[string]string{util.OverlapAnnotation: "baz"}
|
||||
|
||||
f.dLister = append(f.dLister, foo, bar)
|
||||
f.objects = append(f.objects, foo, bar)
|
||||
|
||||
f.expectUpdateDeploymentStatusAction(foo)
|
||||
f.run(getKey(foo, t))
|
||||
|
||||
for _, a := range filterInformerActions(f.client.Actions()) {
|
||||
action, ok := a.(core.UpdateAction)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
d := action.GetObject().(*extensions.Deployment)
|
||||
if d.Name != "bar" {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(d.Annotations[util.OverlapAnnotation]) == 0 {
|
||||
t.Errorf("overlapping annotation should not be cleaned up for bar: %v", d.Annotations)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPodDeletionEnqueuesRecreateDeployment ensures that the deletion of a pod
|
||||
// will requeue a Recreate deployment iff there is no other pod returned from the
|
||||
// client.
|
||||
|
@ -562,6 +372,179 @@ func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetReplicaSetsForDeployment(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
|
||||
// Two Deployments with same labels.
|
||||
d1 := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
d2 := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
|
||||
// Two ReplicaSets that match labels for both Deployments,
|
||||
// but have ControllerRefs to make ownership explicit.
|
||||
rs1 := newReplicaSet(d1, "rs1", 1)
|
||||
rs2 := newReplicaSet(d2, "rs2", 1)
|
||||
|
||||
f.dLister = append(f.dLister, d1, d2)
|
||||
f.rsLister = append(f.rsLister, rs1, rs2)
|
||||
f.objects = append(f.objects, d1, d2, rs1, rs2)
|
||||
|
||||
// Start the fixture.
|
||||
c, informers := f.newController()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
|
||||
rsList, err := c.getReplicaSetsForDeployment(d1)
|
||||
if err != nil {
|
||||
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
|
||||
}
|
||||
rsNames := []string{}
|
||||
for _, rs := range rsList {
|
||||
rsNames = append(rsNames, rs.Name)
|
||||
}
|
||||
if len(rsNames) != 1 || rsNames[0] != rs1.Name {
|
||||
t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs1.Name)
|
||||
}
|
||||
|
||||
rsList, err = c.getReplicaSetsForDeployment(d2)
|
||||
if err != nil {
|
||||
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
|
||||
}
|
||||
rsNames = []string{}
|
||||
for _, rs := range rsList {
|
||||
rsNames = append(rsNames, rs.Name)
|
||||
}
|
||||
if len(rsNames) != 1 || rsNames[0] != rs2.Name {
|
||||
t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs2.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetReplicaSetsForDeploymentAdopt(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
|
||||
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
|
||||
// RS with matching labels, but orphaned. Should be adopted and returned.
|
||||
rs := newReplicaSet(d, "rs", 1)
|
||||
rs.OwnerReferences = nil
|
||||
|
||||
f.dLister = append(f.dLister, d)
|
||||
f.rsLister = append(f.rsLister, rs)
|
||||
f.objects = append(f.objects, d, rs)
|
||||
|
||||
// Start the fixture.
|
||||
c, informers := f.newController()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
|
||||
rsList, err := c.getReplicaSetsForDeployment(d)
|
||||
if err != nil {
|
||||
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
|
||||
}
|
||||
rsNames := []string{}
|
||||
for _, rs := range rsList {
|
||||
rsNames = append(rsNames, rs.Name)
|
||||
}
|
||||
if len(rsNames) != 1 || rsNames[0] != rs.Name {
|
||||
t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetReplicaSetsForDeploymentRelease(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
|
||||
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
|
||||
// RS with matching ControllerRef, but wrong labels. Should be released.
|
||||
rs := newReplicaSet(d, "rs", 1)
|
||||
rs.Labels = map[string]string{"foo": "notbar"}
|
||||
|
||||
f.dLister = append(f.dLister, d)
|
||||
f.rsLister = append(f.rsLister, rs)
|
||||
f.objects = append(f.objects, d, rs)
|
||||
|
||||
// Start the fixture.
|
||||
c, informers := f.newController()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
|
||||
rsList, err := c.getReplicaSetsForDeployment(d)
|
||||
if err != nil {
|
||||
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
|
||||
}
|
||||
rsNames := []string{}
|
||||
for _, rs := range rsList {
|
||||
rsNames = append(rsNames, rs.Name)
|
||||
}
|
||||
if len(rsNames) != 0 {
|
||||
t.Errorf("getReplicaSetsForDeployment() = %v, want []", rsNames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPodMapForReplicaSets(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
|
||||
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
|
||||
|
||||
// Two ReplicaSets that match labels for both Deployments,
|
||||
// but have ControllerRefs to make ownership explicit.
|
||||
rs1 := newReplicaSet(d, "rs1", 1)
|
||||
rs2 := newReplicaSet(d, "rs2", 1)
|
||||
|
||||
// Add a Pod for each ReplicaSet.
|
||||
pod1 := generatePodFromRS(rs1)
|
||||
pod2 := generatePodFromRS(rs2)
|
||||
// Add a Pod that has matching labels, but no ControllerRef.
|
||||
pod3 := generatePodFromRS(rs1)
|
||||
pod3.Name = "pod3"
|
||||
pod3.OwnerReferences = nil
|
||||
// Add a Pod that has matching labels and ControllerRef, but is inactive.
|
||||
pod4 := generatePodFromRS(rs1)
|
||||
pod4.Name = "pod4"
|
||||
pod4.Status.Phase = v1.PodFailed
|
||||
|
||||
f.dLister = append(f.dLister, d)
|
||||
f.rsLister = append(f.rsLister, rs1, rs2)
|
||||
f.podLister = append(f.podLister, pod1, pod2, pod3, pod4)
|
||||
f.objects = append(f.objects, d, rs1, rs2, pod1, pod2, pod3, pod4)
|
||||
|
||||
// Start the fixture.
|
||||
c, informers := f.newController()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
|
||||
podMap, err := c.getPodMapForReplicaSets(d.Namespace, f.rsLister)
|
||||
if err != nil {
|
||||
t.Fatalf("getPodMapForReplicaSets() error: %v", err)
|
||||
}
|
||||
podCount := 0
|
||||
for _, podList := range podMap {
|
||||
podCount += len(podList.Items)
|
||||
}
|
||||
if got, want := podCount, 2; got != want {
|
||||
t.Errorf("podCount = %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if got, want := len(podMap), 2; got != want {
|
||||
t.Errorf("len(podMap) = %v, want %v", got, want)
|
||||
}
|
||||
if got, want := len(podMap[rs1.UID].Items), 1; got != want {
|
||||
t.Errorf("len(podMap[rs1]) = %v, want %v", got, want)
|
||||
}
|
||||
if got, want := podMap[rs1.UID].Items[0].Name, "rs1-pod"; got != want {
|
||||
t.Errorf("podMap[rs1] = [%v], want [%v]", got, want)
|
||||
}
|
||||
if got, want := len(podMap[rs2.UID].Items), 1; got != want {
|
||||
t.Errorf("len(podMap[rs2]) = %v, want %v", got, want)
|
||||
}
|
||||
if got, want := podMap[rs2.UID].Items[0].Name, "rs2-pod"; got != want {
|
||||
t.Errorf("podMap[rs2] = [%v], want [%v]", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// generatePodFromRS creates a pod, with the input ReplicaSet's selector and its template
|
||||
func generatePodFromRS(rs *extensions.ReplicaSet) *v1.Pod {
|
||||
trueVar := true
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
|
@ -33,12 +34,12 @@ import (
|
|||
// and when new pods scale up or old pods scale down. Progress is not estimated for paused
|
||||
// deployments or when users don't really care about it ie. progressDeadlineSeconds is not
|
||||
// specified.
|
||||
func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error) {
|
||||
func (dc *DeploymentController) hasFailed(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) {
|
||||
if d.Spec.ProgressDeadlineSeconds == nil || d.Spec.RollbackTo != nil || d.Spec.Paused {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
|
@ -17,14 +17,16 @@ limitations under the License.
|
|||
package deployment
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
)
|
||||
|
||||
// rolloutRecreate implements the logic for recreating a replica set.
|
||||
func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error {
|
||||
func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
|
||||
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -32,31 +34,29 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen
|
|||
activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
|
||||
|
||||
// scale down old replica sets
|
||||
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, deployment)
|
||||
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledDown {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
newStatus := calculateStatus(allRSs, newRS, deployment)
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
// Do not process a deployment when it has old pods running.
|
||||
if newStatus.UpdatedReplicas == 0 {
|
||||
podList, err := dc.listPods(deployment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(podList.Items) > 0 {
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
for _, podList := range podMap {
|
||||
if len(podList.Items) > 0 {
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we need to create a new RS, create it now
|
||||
// TODO: Create a new RS without re-listing all RSs.
|
||||
if newRS == nil {
|
||||
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(deployment, true)
|
||||
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -64,17 +64,17 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen
|
|||
}
|
||||
|
||||
// scale up new replica set
|
||||
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment)
|
||||
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledUp {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// Sync deployment status
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate"
|
||||
|
|
|
@ -21,14 +21,15 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
)
|
||||
|
||||
// rollback the deployment to the specified revision. In any case cleanup the rollback spec.
|
||||
func (dc *DeploymentController) rollback(d *extensions.Deployment) error {
|
||||
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, true)
|
||||
func (dc *DeploymentController) rollback(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
|
||||
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -21,42 +21,44 @@ import (
|
|||
"sort"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/util/integer"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
)
|
||||
|
||||
// rolloutRolling implements the logic for rolling a new replica set.
|
||||
func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
|
||||
func (dc *DeploymentController) rolloutRolling(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allRSs := append(oldRSs, newRS)
|
||||
|
||||
// Scale up, if we can.
|
||||
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment)
|
||||
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledUp {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// Scale down, if we can.
|
||||
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment)
|
||||
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledDown {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// Sync deployment status
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
return dc.syncRolloutStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
|
@ -36,31 +36,31 @@ import (
|
|||
)
|
||||
|
||||
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
|
||||
func (dc *DeploymentController) syncStatusOnly(deployment *extensions.Deployment) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
|
||||
func (dc *DeploymentController) syncStatusOnly(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allRSs := append(oldRSs, newRS)
|
||||
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
|
||||
return dc.syncDeploymentStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// sync is responsible for reconciling deployments on scaling events or when they
|
||||
// are paused.
|
||||
func (dc *DeploymentController) sync(deployment *extensions.Deployment) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
|
||||
func (dc *DeploymentController) sync(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dc.scale(deployment, newRS, oldRSs); err != nil {
|
||||
if err := dc.scale(d, newRS, oldRSs); err != nil {
|
||||
// If we get an error while trying to scale, the deployment will be requeued
|
||||
// so we can abort this resync
|
||||
return err
|
||||
}
|
||||
|
||||
allRSs := append(oldRSs, newRS)
|
||||
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
|
||||
return dc.syncDeploymentStatus(allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
|
||||
|
@ -98,25 +98,31 @@ func (dc *DeploymentController) checkPausedConditions(d *extensions.Deployment)
|
|||
}
|
||||
|
||||
// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
|
||||
//
|
||||
// rsList should come from getReplicaSetsForDeployment(d).
|
||||
// podMap should come from getPodMapForReplicaSets(rsList).
|
||||
// These are passed around to avoid repeating expensive API calls.
|
||||
//
|
||||
// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV).
|
||||
// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
|
||||
// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
|
||||
// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
|
||||
//
|
||||
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
|
||||
// This may lead to stale reads of replica sets, thus incorrect deployment status.
|
||||
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
|
||||
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
|
||||
// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
|
||||
rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment)
|
||||
rsList, podList, err := dc.rsAndPodsWithHashKeySynced(d, rsList, podMap)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
|
||||
}
|
||||
_, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList)
|
||||
_, allOldRSs, err := deploymentutil.FindOldReplicaSets(d, rsList, podList)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Get new replica set with the updated revision number
|
||||
newRS, err := dc.getNewReplicaSet(deployment, rsList, allOldRSs, createIfNotExisted)
|
||||
newRS, err := dc.getNewReplicaSet(d, rsList, allOldRSs, createIfNotExisted)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -124,33 +130,28 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext
|
|||
return newRS, allOldRSs, nil
|
||||
}
|
||||
|
||||
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced.
|
||||
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, *v1.PodList, error) {
|
||||
rsList, err := deploymentutil.ListReplicaSets(deployment,
|
||||
func(namespace string, options metav1.ListOptions) ([]*extensions.ReplicaSet, error) {
|
||||
parsed, err := labels.Parse(options.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dc.rsLister.ReplicaSets(namespace).List(parsed)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err)
|
||||
}
|
||||
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment
|
||||
// targets, with pod-template-hash information synced.
|
||||
//
|
||||
// rsList should come from getReplicaSetsForDeployment(d).
|
||||
// podMap should come from getPodMapForReplicaSets(rsList).
|
||||
// These are passed around to avoid repeating expensive API calls.
|
||||
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, *v1.PodList, error) {
|
||||
syncedRSList := []*extensions.ReplicaSet{}
|
||||
for _, rs := range rsList {
|
||||
// Add pod-template-hash information if it's not in the RS.
|
||||
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
|
||||
// that aren't constrained by the pod-template-hash.
|
||||
syncedRS, err := dc.addHashKeyToRSAndPods(rs)
|
||||
syncedRS, err := dc.addHashKeyToRSAndPods(rs, podMap[rs.UID])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
syncedRSList = append(syncedRSList, syncedRS)
|
||||
}
|
||||
syncedPodList, err := dc.listPods(deployment)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
// Put all Pods from podMap into one list.
|
||||
syncedPodList := &v1.PodList{}
|
||||
for _, podList := range podMap {
|
||||
syncedPodList.Items = append(syncedPodList.Items, podList.Items...)
|
||||
}
|
||||
return syncedRSList, syncedPodList, nil
|
||||
}
|
||||
|
@ -159,7 +160,7 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension
|
|||
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
|
||||
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
|
||||
// 3. Add hash label to the rs's label and selector
|
||||
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
|
||||
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet, podList *v1.PodList) (*extensions.ReplicaSet, error) {
|
||||
// If the rs already has the new hash label in its selector, it's done syncing
|
||||
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
|
||||
return rs, nil
|
||||
|
@ -189,24 +190,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
|
|||
}
|
||||
|
||||
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
|
||||
selector, err := metav1.LabelSelectorAsSelector(updatedRS.Spec.Selector)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
|
||||
}
|
||||
options := metav1.ListOptions{LabelSelector: selector.String()}
|
||||
parsed, err := labels.Parse(options.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pods, err := dc.podLister.Pods(updatedRS.Namespace).List(parsed)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", rs.Namespace, options, err)
|
||||
}
|
||||
podList := v1.PodList{Items: make([]v1.Pod, 0, len(pods))}
|
||||
for i := range pods {
|
||||
podList.Items = append(podList.Items, *pods[i])
|
||||
}
|
||||
if err := deploymentutil.LabelPodsWithHash(&podList, dc.client, dc.podLister, rs.Namespace, rs.Name, hash); err != nil {
|
||||
if err := deploymentutil.LabelPodsWithHash(podList, dc.client, dc.podLister, rs.Namespace, rs.Name, hash); err != nil {
|
||||
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
|
||||
}
|
||||
|
||||
|
@ -242,22 +226,6 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
|
|||
return updatedRS, nil
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*v1.PodList, error) {
|
||||
return deploymentutil.ListPods(deployment,
|
||||
func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
|
||||
parsed, err := labels.Parse(options.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pods, err := dc.podLister.Pods(namespace).List(parsed)
|
||||
result := v1.PodList{Items: make([]v1.Pod, 0, len(pods))}
|
||||
for i := range pods {
|
||||
result.Items = append(result.Items, *pods[i])
|
||||
}
|
||||
return &result, err
|
||||
})
|
||||
}
|
||||
|
||||
// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
|
||||
// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
|
||||
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
|
||||
|
@ -329,25 +297,17 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
|
|||
newRS := extensions.ReplicaSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
// Make the name deterministic, to ensure idempotence
|
||||
Name: deployment.Name + "-" + podTemplateSpecHash,
|
||||
Namespace: namespace,
|
||||
Name: deployment.Name + "-" + podTemplateSpecHash,
|
||||
Namespace: namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{*newControllerRef(deployment)},
|
||||
},
|
||||
Spec: extensions.ReplicaSetSpec{
|
||||
Replicas: func(i int32) *int32 { return &i }(0),
|
||||
Replicas: new(int32),
|
||||
MinReadySeconds: deployment.Spec.MinReadySeconds,
|
||||
Selector: newRSSelector,
|
||||
Template: newRSTemplate,
|
||||
},
|
||||
}
|
||||
var trueVar = true
|
||||
controllerRef := &metav1.OwnerReference{
|
||||
APIVersion: getDeploymentKind().GroupVersion().String(),
|
||||
Kind: getDeploymentKind().Kind,
|
||||
Name: deployment.Name,
|
||||
UID: deployment.UID,
|
||||
Controller: &trueVar,
|
||||
}
|
||||
newRS.OwnerReferences = append(newRS.OwnerReferences, *controllerRef)
|
||||
allRSs := append(oldRSs, &newRS)
|
||||
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
|
||||
if err != nil {
|
||||
|
@ -632,8 +592,12 @@ func calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaS
|
|||
|
||||
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
|
||||
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
|
||||
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool, error) {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
|
||||
//
|
||||
// rsList should come from getReplicaSetsForDeployment(d).
|
||||
// podMap should come from getPodMapForReplicaSets(rsList).
|
||||
// These are passed around to avoid repeating expensive API calls.
|
||||
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -649,3 +613,15 @@ func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool,
|
|||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// newControllerRef returns a ControllerRef pointing to the deployment.
|
||||
func newControllerRef(d *extensions.Deployment) *metav1.OwnerReference {
|
||||
isController := true
|
||||
return &metav1.OwnerReference{
|
||||
APIVersion: controllerKind.GroupVersion().String(),
|
||||
Kind: controllerKind.Kind,
|
||||
Name: d.Name,
|
||||
UID: d.UID,
|
||||
Controller: &isController,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue