controller: save older revisions for Deployment's replica sets

pull/6/head
Michail Kargakis 2016-10-06 17:02:51 +02:00
parent db1985716f
commit 89eaa918be
3 changed files with 80 additions and 45 deletions

View File

@ -2134,6 +2134,8 @@ __EOF__
kubectl-with-retry rollout resume deployment nginx "${kube_flags[@]}" kubectl-with-retry rollout resume deployment nginx "${kube_flags[@]}"
# The resumed deployment can now be rolled back # The resumed deployment can now be rolled back
kubectl rollout undo deployment nginx "${kube_flags[@]}" kubectl rollout undo deployment nginx "${kube_flags[@]}"
# Check that the new replica set (nginx-618515232) has all old revisions stored in an annotation
kubectl get rs nginx-618515232 -o yaml | grep "deployment.kubernetes.io/revision-history: 1,3"
# Clean up # Clean up
kubectl delete deployment nginx "${kube_flags[@]}" kubectl delete deployment nginx "${kube_flags[@]}"

View File

@ -82,23 +82,12 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext
return nil, nil, err return nil, nil, err
} }
// Calculate the max revision number among all old RSes
maxOldV := deploymentutil.MaxRevision(allOldRSs)
// Get new replica set with the updated revision number // Get new replica set with the updated revision number
newRS, err := dc.getNewReplicaSet(deployment, rsList, maxOldV, allOldRSs, createIfNotExisted) newRS, err := dc.getNewReplicaSet(deployment, rsList, allOldRSs, createIfNotExisted)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
// Sync deployment's revision number with new replica set
if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 &&
(deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) {
if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil {
glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err)
}
}
return newRS, allOldRSs, nil return newRS, allOldRSs, nil
} }
@ -247,14 +236,22 @@ func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*ap
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
// Note that the pod-template-hash will be added to adopted RSes and pods. // Note that the pod-template-hash will be added to adopted RSes and pods.
func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
// Calculate revision number for this new replica set
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList) existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList)
if err != nil { if err != nil {
return nil, err return nil, err
} else if existingNewRS != nil { }
// Calculate the max revision number among all old RSes
maxOldRevision := deploymentutil.MaxRevision(oldRSs)
// Calculate revision number for this new replica set
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
// Latest replica set exists. We need to sync its annotations (includes copying all but
// annotationsToSkip from the parent deployment, and update revision, desiredReplicas,
// and maxReplicas) and also update the revision annotation in the deployment with the
// latest revision.
if existingNewRS != nil {
objCopy, err := api.Scheme.Copy(existingNewRS) objCopy, err := api.Scheme.Copy(existingNewRS)
if err != nil { if err != nil {
return nil, err return nil, err
@ -263,7 +260,17 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
// Set existing new replica set's annotation // Set existing new replica set's annotation
if deploymentutil.SetNewReplicaSetAnnotations(deployment, rsCopy, newRevision, true) { if deploymentutil.SetNewReplicaSetAnnotations(deployment, rsCopy, newRevision, true) {
return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(rsCopy) if rsCopy, err = dc.client.Extensions().ReplicaSets(rsCopy.Namespace).Update(rsCopy); err != nil {
return nil, err
}
}
updateConditions := deploymentutil.SetDeploymentRevision(deployment, newRevision)
if updateConditions {
if deployment, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment); err != nil {
return nil, err
}
} }
return rsCopy, nil return rsCopy, nil
} }
@ -273,7 +280,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
} }
// new ReplicaSet does not exist, create one. // new ReplicaSet does not exist, create one.
namespace := deployment.ObjectMeta.Namespace namespace := deployment.Namespace
podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template) podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template)
newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment) newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment)
// Add podTemplateHash label to selector. // Add podTemplateHash label to selector.
@ -309,19 +316,9 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
} }
return createdRS, dc.updateDeploymentRevision(deployment, newRevision) deploymentutil.SetDeploymentRevision(deployment, newRevision)
} _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
return createdRS, err
func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions.Deployment, revision string) error {
if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
_, err := dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
return err
}
return nil
} }
// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size // scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
@ -441,7 +438,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newSc
// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea. // NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
rs.Spec.Replicas = newScale rs.Spec.Replicas = newScale
deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment)) deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment))
rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) rs, err := dc.client.Extensions().ReplicaSets(rs.Namespace).Update(rs)
if err == nil { if err == nil {
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} }

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"sort" "sort"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -43,6 +44,8 @@ import (
const ( const (
// RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence // RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence
RevisionAnnotation = "deployment.kubernetes.io/revision" RevisionAnnotation = "deployment.kubernetes.io/revision"
// RevisionHistoryAnnotation maintains the history of all old revisions that a replica set has served for a deployment.
RevisionHistoryAnnotation = "deployment.kubernetes.io/revision-history"
// DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation // DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation
// in its replica sets. Helps in separating scaling events from the rollout process and for // in its replica sets. Helps in separating scaling events from the rollout process and for
// determining if the new replica set for a deployment is really saturated. // determining if the new replica set for a deployment is really saturated.
@ -59,13 +62,30 @@ const (
// RollbackDone is the done rollback event reason // RollbackDone is the done rollback event reason
RollbackDone = "DeploymentRollback" RollbackDone = "DeploymentRollback"
// OverlapAnnotation marks deployments with overlapping selector with other deployments // OverlapAnnotation marks deployments with overlapping selector with other deployments
// TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210 // TODO: Delete this annotation when we gracefully handle overlapping selectors.
// See https://github.com/kubernetes/kubernetes/issues/2210
OverlapAnnotation = "deployment.kubernetes.io/error-selector-overlapping-with" OverlapAnnotation = "deployment.kubernetes.io/error-selector-overlapping-with"
// SelectorUpdateAnnotation marks the last time deployment selector update // SelectorUpdateAnnotation marks the last time deployment selector update
// TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210 // TODO: Delete this annotation when we gracefully handle overlapping selectors.
// See https://github.com/kubernetes/kubernetes/issues/2210
SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at" SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at"
) )
// SetDeploymentRevision updates the revision for a deployment.
func SetDeploymentRevision(deployment *extensions.Deployment, revision string) bool {
updated := false
if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
if deployment.Annotations[RevisionAnnotation] != revision {
deployment.Annotations[RevisionAnnotation] = revision
updated = true
}
return updated
}
// MaxRevision finds the highest revision in the replica sets // MaxRevision finds the highest revision in the replica sets
func MaxRevision(allRSs []*extensions.ReplicaSet) int64 { func MaxRevision(allRSs []*extensions.ReplicaSet) int64 {
max := int64(0) max := int64(0)
@ -97,6 +117,15 @@ func LastRevision(allRSs []*extensions.ReplicaSet) int64 {
return secMax return secMax
} }
// Revision returns the revision number of the input replica set
func Revision(rs *extensions.ReplicaSet) (int64, error) {
v, ok := rs.Annotations[RevisionAnnotation]
if !ok {
return 0, nil
}
return strconv.ParseInt(v, 10, 64)
}
// SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and // SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
// copying required deployment annotations to it; it returns true if replica set's annotation is changed. // copying required deployment annotations to it; it returns true if replica set's annotation is changed.
func SetNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool { func SetNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool {
@ -106,14 +135,29 @@ func SetNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *exten
if newRS.Annotations == nil { if newRS.Annotations == nil {
newRS.Annotations = make(map[string]string) newRS.Annotations = make(map[string]string)
} }
oldRevision, ok := newRS.Annotations[RevisionAnnotation]
// The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number
// of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and
// newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision. // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision.
if newRS.Annotations[RevisionAnnotation] < newRevision { if oldRevision < newRevision {
newRS.Annotations[RevisionAnnotation] = newRevision newRS.Annotations[RevisionAnnotation] = newRevision
annotationChanged = true annotationChanged = true
glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision)
} }
// If a revision annotation already existed and this replica set was updated with a new revision
// then that means we are rolling back to this replica set. We need to preserve the old revisions
// for historical information.
if ok && annotationChanged {
revisionHistoryAnnotation := newRS.Annotations[RevisionHistoryAnnotation]
oldRevisions := strings.Split(revisionHistoryAnnotation, ",")
if len(oldRevisions[0]) == 0 {
newRS.Annotations[RevisionHistoryAnnotation] = oldRevision
} else {
oldRevisions = append(oldRevisions, oldRevision)
newRS.Annotations[RevisionHistoryAnnotation] = strings.Join(oldRevisions, ",")
}
}
// If the new replica set is about to be created, we need to add replica annotations to it.
if !exists && SetReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+MaxSurge(*deployment)) { if !exists && SetReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+MaxSurge(*deployment)) {
annotationChanged = true annotationChanged = true
} }
@ -123,6 +167,7 @@ func SetNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *exten
var annotationsToSkip = map[string]bool{ var annotationsToSkip = map[string]bool{
annotations.LastAppliedConfigAnnotation: true, annotations.LastAppliedConfigAnnotation: true,
RevisionAnnotation: true, RevisionAnnotation: true,
RevisionHistoryAnnotation: true,
DesiredReplicasAnnotation: true, DesiredReplicasAnnotation: true,
MaxReplicasAnnotation: true, MaxReplicasAnnotation: true,
OverlapAnnotation: true, OverlapAnnotation: true,
@ -694,15 +739,6 @@ func filterPodsMatchingReplicaSets(replicaSets []*extensions.ReplicaSet, podList
return allRSPods, nil return allRSPods, nil
} }
// Revision returns the revision number of the input replica set
func Revision(rs *extensions.ReplicaSet) (int64, error) {
v, ok := rs.Annotations[RevisionAnnotation]
if !ok {
return 0, nil
}
return strconv.ParseInt(v, 10, 64)
}
// IsRollingUpdate returns true if the strategy type is a rolling update. // IsRollingUpdate returns true if the strategy type is a rolling update.
func IsRollingUpdate(deployment *extensions.Deployment) bool { func IsRollingUpdate(deployment *extensions.Deployment) bool {
return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType