Merge pull request #22828 from janetkuo/scale-down-flake

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-03-10 20:40:23 -08:00
commit a5369a2c1e
3 changed files with 26 additions and 9 deletions

View File

@ -886,29 +886,32 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
if err != nil {
return false, nil
}
glog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)
// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
allRSs = append(oldRSs, newRS)
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
if err != nil {
return false, nil
}
glog.V(4).Infof("Scaled down old RSes by %d", scaledDownCount)
totalScaledDown := cleanupCount + scaledDownCount
return totalScaledDown > 0, nil
}
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, maxCleanupCount int) (int, error) {
func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, maxCleanupCount int) ([]*extensions.ReplicaSet, int, error) {
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
totalScaledDown := 0
for _, targetRS := range oldRSs {
for i, targetRS := range oldRSs {
if totalScaledDown >= maxCleanupCount {
break
}
@ -918,7 +921,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
}
readyPodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{targetRS}, 0)
if err != nil {
return totalScaledDown, fmt.Errorf("could not find available pods: %v", err)
return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err)
}
if targetRS.Spec.Replicas == readyPodCount {
// no unhealthy replicas found, no scaling required.
@ -927,13 +930,17 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
scaledDownCount := integer.IntMin(maxCleanupCount-totalScaledDown, targetRS.Spec.Replicas-readyPodCount)
newReplicasCount := targetRS.Spec.Replicas - scaledDownCount
_, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
if newReplicasCount > targetRS.Spec.Replicas {
return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount)
}
_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
if err != nil {
return totalScaledDown, err
return nil, totalScaledDown, err
}
totalScaledDown += scaledDownCount
oldRSs[i] = updatedOldRS
}
return totalScaledDown, nil
return oldRSs, totalScaledDown, nil
}
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
@ -973,6 +980,9 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [
// Scale down.
scaleDownCount := integer.IntMin(targetRS.Spec.Replicas, totalScaleDownCount-totalScaledDown)
newReplicasCount := targetRS.Spec.Replicas - scaleDownCount
if newReplicasCount > targetRS.Spec.Replicas {
return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount)
}
_, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
if err != nil {
return totalScaledDown, err

View File

@ -471,7 +471,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, test.maxCleanupCount)
_, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, test.maxCleanupCount)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
@ -522,6 +522,13 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
oldReplicas: 0,
scaleExpected: false,
},
{
deploymentReplicas: 10,
maxUnavailable: intstr.FromInt(2),
readyPods: 1,
oldReplicas: 10,
scaleExpected: false,
},
}
for i, test := range tests {

View File

@ -182,7 +182,7 @@ func stopDeployment(c *clientset.Clientset, oldC client.Interface, ns, deploymen
_, err = c.Extensions().Deployments(ns).Get(deployment.Name)
Expect(err).To(HaveOccurred())
Expect(errors.IsNotFound(err)).To(BeTrue())
Logf("ensuring deployment %s rcs were deleted", deploymentName)
Logf("ensuring deployment %s RSes were deleted", deploymentName)
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
Expect(err).NotTo(HaveOccurred())
options := api.ListOptions{LabelSelector: selector}