Wait for clean old RSs statuses in the middle of Recreate rollouts

pull/6/head
Michail Kargakis 2017-04-02 17:59:30 +02:00
parent 74c23bdf68
commit 97fed0aff4
7 changed files with 103 additions and 26 deletions

View File

@ -71,6 +71,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/client-go/testing",

View File

@ -26,6 +26,7 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -537,10 +538,8 @@ func (dc *DeploymentController) getPodMapForDeployment(d *extensions.Deployment,
podMap[rs.UID] = &v1.PodList{}
}
for _, pod := range pods {
// Ignore inactive Pods since that's what ReplicaSet does.
if !controller.IsPodActive(pod) {
continue
}
// Do not ignore inactive Pods because Recreate Deployments need to verify that no
// Pods from older versions are running before spinning up new Pods.
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
continue
@ -614,6 +613,10 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err

View File

@ -560,18 +560,21 @@ func TestGetPodMapForReplicaSets(t *testing.T) {
for _, podList := range podMap {
podCount += len(podList.Items)
}
if got, want := podCount, 2; got != want {
if got, want := podCount, 3; got != want {
t.Errorf("podCount = %v, want %v", got, want)
}
if got, want := len(podMap), 2; got != want {
t.Errorf("len(podMap) = %v, want %v", got, want)
}
if got, want := len(podMap[rs1.UID].Items), 1; got != want {
if got, want := len(podMap[rs1.UID].Items), 2; got != want {
t.Errorf("len(podMap[rs1]) = %v, want %v", got, want)
}
if got, want := podMap[rs1.UID].Items[0].Name, "rs1-pod"; got != want {
t.Errorf("podMap[rs1] = [%v], want [%v]", got, want)
expect := map[string]struct{}{"rs1-pod": {}, "pod4": {}}
for _, pod := range podMap[rs1.UID].Items {
if _, ok := expect[pod.Name]; !ok {
t.Errorf("unexpected pod name for rs1: %s", pod.Name)
}
}
if got, want := len(podMap[rs2.UID].Items), 1; got != want {
t.Errorf("len(podMap[rs2]) = %v, want %v", got, want)

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
)
// rolloutRecreate implements the logic for recreating a replica set.
@ -43,18 +44,12 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList
return dc.syncRolloutStatus(allRSs, newRS, d)
}
newStatus := calculateStatus(allRSs, newRS, d)
// Do not process a deployment when it has old pods running.
if newStatus.UpdatedReplicas == 0 {
for _, podList := range podMap {
if len(podList.Items) > 0 {
if oldPodsRunning(newRS, oldRSs, podMap) {
return dc.syncRolloutStatus(allRSs, newRS, d)
}
}
}
// If we need to create a new RS, create it now.
// TODO: Create a new RS without re-listing all RSs.
if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
if err != nil {
@ -64,14 +59,9 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList
}
// scale up new replica set.
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d)
if err != nil {
if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus.
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// Sync deployment status.
return dc.syncRolloutStatus(allRSs, newRS, d)
@ -98,6 +88,23 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext
return scaled, nil
}
// oldPodsRunning returns whether there are old pods running or any of the old ReplicaSets thinks that it runs pods.
func oldPodsRunning(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) bool {
if oldPods := util.GetActualReplicaCountForReplicaSets(oldRSs); oldPods > 0 {
return true
}
for rsUID, podList := range podMap {
// If the pods belong to the new ReplicaSet, ignore.
if newRS != nil && newRS.UID == rsUID {
continue
}
if len(podList.Items) > 0 {
return true
}
}
return false
}
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate".
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)

View File

@ -21,8 +21,10 @@ import (
"testing"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
@ -82,3 +84,62 @@ func TestScaleDownOldReplicaSets(t *testing.T) {
}
}
}
func TestOldPodsRunning(t *testing.T) {
tests := []struct {
name string
newRS *extensions.ReplicaSet
oldRSs []*extensions.ReplicaSet
podMap map[types.UID]*v1.PodList
expected bool
}{
{
name: "no old RSs",
expected: false,
},
{
name: "old RSs with running pods",
oldRSs: []*extensions.ReplicaSet{rsWithUID("some-uid"), rsWithUID("other-uid")},
podMap: podMapWithUIDs([]string{"some-uid", "other-uid"}),
expected: true,
},
{
name: "old RSs without pods but with non-zero status replicas",
oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 1, nil)},
expected: true,
},
{
name: "old RSs without pods or non-zero status replicas",
oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 0, nil)},
expected: false,
},
}
for _, test := range tests {
if expected, got := test.expected, oldPodsRunning(test.newRS, test.oldRSs, test.podMap); expected != got {
t.Errorf("%s: expected %t, got %t", test.name, expected, got)
}
}
}
func rsWithUID(uid string) *extensions.ReplicaSet {
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
rs := newReplicaSet(d, fmt.Sprintf("foo-%s", uid), 0)
rs.UID = types.UID(uid)
return rs
}
func podMapWithUIDs(uids []string) map[types.UID]*v1.PodList {
podMap := make(map[types.UID]*v1.PodList)
for _, uid := range uids {
podMap[types.UID(uid)] = &v1.PodList{
Items: []v1.Pod{
{ /* supposedly a pod */ },
{ /* supposedly another pod pod */ },
},
}
}
return podMap
}

View File

@ -135,7 +135,7 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.D
// rsList should come from getReplicaSetsForDeployment(d).
// podMap should come from getPodMapForDeployment(d, rsList).
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, error) {
syncedRSList := []*extensions.ReplicaSet{}
var syncedRSList []*extensions.ReplicaSet
for _, rs := range rsList {
// Add pod-template-hash information if it's not in the RS.
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
@ -515,7 +515,6 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe
glog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name)
var errList []error
// TODO: This should be parallelized.
for i := int32(0); i < diff; i++ {
rs := cleanableRSes[i]
// Avoid delete replica set with non-zero replica counts

View File

@ -570,7 +570,7 @@ func rsListFromClient(c clientset.Interface) rsListFunc {
if err != nil {
return nil, err
}
ret := []*extensions.ReplicaSet{}
var ret []*extensions.ReplicaSet
for i := range rsList.Items {
ret = append(ret, &rsList.Items[i])
}
@ -827,9 +827,12 @@ func WaitForPodsHashPopulated(c extensionslisters.ReplicaSetLister, desiredGener
}
// LabelPodsWithHash labels all pods in the given podList with the new hash label.
// The returned bool value can be used to tell if all pods are actually labeled.
func LabelPodsWithHash(podList *v1.PodList, c clientset.Interface, podLister corelisters.PodLister, namespace, name, hash string) error {
for _, pod := range podList.Items {
// Ignore inactive Pods.
if !controller.IsPodActive(&pod) {
continue
}
// Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
_, err := UpdatePodWithRetries(c.Core().Pods(namespace), podLister, pod.Namespace, pod.Name,