clean up unhealthy replicas frst for old rcs when reconcileOldRCs

pull/6/head
mqliang 2016-01-30 15:09:26 +08:00
parent 0952dcd349
commit 86aea1d59c
2 changed files with 405 additions and 33 deletions

View File

@ -806,20 +806,11 @@ func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationControll
func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment, expectationsCheck bool) (bool, error) { func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment, expectationsCheck bool) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs) oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs)
if oldPodsCount == 0 { if oldPodsCount == 0 {
// Cant scale down further // Can't scale down further
return false, nil return false, nil
} }
maxUnavailable, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxUnavailable)
if err != nil { // Check the expectations of deployment before reconciling
return false, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
}
if isPercent {
maxUnavailable = util.GetValueFromPercent(maxUnavailable, deployment.Spec.Replicas)
}
// Check if we can scale down.
minAvailable := deployment.Spec.Replicas - maxUnavailable
minReadySeconds := deployment.Spec.MinReadySeconds
// Check the expectations of deployment before counting available pods
dKey, err := controller.KeyFunc(&deployment) dKey, err := controller.KeyFunc(&deployment)
if err != nil { if err != nil {
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err) return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
@ -828,20 +819,144 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl
glog.V(4).Infof("Pod expectations not met yet before reconciling old RCs\n") glog.V(4).Infof("Pod expectations not met yet before reconciling old RCs\n")
return false, nil return false, nil
} }
// Find the number of ready pods.
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, allRCs, minReadySeconds) minReadySeconds := deployment.Spec.MinReadySeconds
allPodsCount := deploymentutil.GetReplicaCountForRCs(allRCs)
newRCAvailablePodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, []*api.ReplicationController{newRC}, minReadySeconds)
if err != nil { if err != nil {
return false, fmt.Errorf("could not find available pods: %v", err) return false, fmt.Errorf("could not find available pods: %v", err)
} }
if readyPodCount <= minAvailable { maxUnavailable, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxUnavailable)
// Cannot scale down. if err != nil {
return false, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
}
if isPercent {
maxUnavailable = util.GetValueFromPercent(maxUnavailable, deployment.Spec.Replicas)
}
// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old rcs have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
// increase unavailability.
// * New rc has scaled up and it's replicas becomes ready, then we can scale down old rcs in a further step.
//
// maxScaledDown := allPodsCount - minAvailable - newRCPodsUnavailable
// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
// the newRC, so that the unavailable pods from the newRC would not make us scale down old RCs in a further step(that will
// increase unavailability).
//
// Concrete example:
//
// * 10 replicas
// * 2 maxUnavailable (absolute number, not percent)
// * 3 maxSurge (absolute number, not percent)
//
// case 1:
// * Deployment is updated, newRC is created with 3 replicas, oldRC is scaled down to 8, and newRC is scaled up to 5.
// * The new RC pods crashloop and never become available.
// * allPodsCount is 13. minAvailable is 8. newRCPodsUnavailable is 5.
// * A node fails and causes one of the oldRC pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRC won't be scaled down.
// * The user notices the crashloop and does kubectl rollout undo to rollback.
// * newRCPodsUnavailable is 1, since we rolled back to the good RC, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
// * The total number of pods will then be 9 and the newRC can be scaled up to 10.
//
// case 2:
// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
// * The new RC created must start with 0 replicas because allPodsCount is already at 13.
// * However, newRCPodsUnavailable would also be 0, so the 2 old RCs could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new RC to be scaled up by 5.
minAvailable := deployment.Spec.Replicas - maxUnavailable
newRCUnavailablePodCount := newRC.Spec.Replicas - newRCAvailablePodCount
maxScaledDown := allPodsCount - minAvailable - newRCUnavailablePodCount
if maxScaledDown <= 0 {
return false, nil return false, nil
} }
totalScaleDownCount := readyPodCount - minAvailable
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRCs, deployment, maxScaledDown)
if err != nil {
return false, nil
}
// Scale down old rcs, need check maxUnavailable to ensure we can scale down
scaledDownCount, err := dc.scaleDownOldRCsForRollingUpdate(allRCs, oldRCs, deployment)
if err != nil {
return false, nil
}
totalScaledDown := cleanupCount + scaledDownCount
if expectationsCheck {
dc.podExpectations.ExpectDeletions(dKey, totalScaledDown)
}
return totalScaledDown > 0, nil
}
// cleanupUnhealthyReplicas will scale down old rcs with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRCs []*api.ReplicationController, deployment extensions.Deployment, maxCleanupCount int) (int, error) {
sort.Sort(controller.ControllersByCreationTimestamp(oldRCs))
// Safely scale down all old rcs with unhealthy replicas. ReplicationController/ReplicaSet will sort the pods in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
totalScaledDown := 0 totalScaledDown := 0
for _, targetRC := range oldRCs { for _, targetRC := range oldRCs {
if totalScaleDownCount == 0 { if totalScaledDown >= maxCleanupCount {
break
}
if targetRC.Spec.Replicas == 0 {
// cannot scale down this RC.
continue
}
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, []*api.ReplicationController{targetRC}, 0)
if err != nil {
return totalScaledDown, fmt.Errorf("could not find available pods: %v", err)
}
if targetRC.Spec.Replicas == readyPodCount {
// no unhealthy replicas found, no scaling required.
continue
}
scaledDownCount := int(math.Min(float64(maxCleanupCount-totalScaledDown), float64(targetRC.Spec.Replicas-readyPodCount)))
newReplicasCount := targetRC.Spec.Replicas - scaledDownCount
_, err = dc.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment)
if err != nil {
return totalScaledDown, err
}
totalScaledDown += scaledDownCount
}
return totalScaledDown, nil
}
// scaleDownOldRCsForRollingUpdate scales down old rcs when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability
func (dc *DeploymentController) scaleDownOldRCsForRollingUpdate(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, deployment extensions.Deployment) (int, error) {
maxUnavailable, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxUnavailable)
if err != nil {
return 0, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
}
if isPercent {
maxUnavailable = util.GetValueFromPercent(maxUnavailable, deployment.Spec.Replicas)
}
// Check if we can scale down.
minAvailable := deployment.Spec.Replicas - maxUnavailable
minReadySeconds := deployment.Spec.MinReadySeconds
// Find the number of ready pods.
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, allRCs, minReadySeconds)
if err != nil {
return 0, fmt.Errorf("could not find available pods: %v", err)
}
if readyPodCount <= minAvailable {
// Cannot scale down.
return 0, nil
}
sort.Sort(controller.ControllersByCreationTimestamp(oldRCs))
totalScaledDown := 0
totalScaleDownCount := readyPodCount - minAvailable
for _, targetRC := range oldRCs {
if totalScaledDown >= totalScaleDownCount {
// No further scaling required. // No further scaling required.
break break
} }
@ -850,24 +965,17 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl
continue continue
} }
// Scale down. // Scale down.
scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount))) scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount-totalScaledDown)))
newReplicasCount := targetRC.Spec.Replicas - scaleDownCount newReplicasCount := targetRC.Spec.Replicas - scaleDownCount
_, err = dc.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment) _, err = dc.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment)
if err != nil { if err != nil {
return false, err return totalScaledDown, err
} }
totalScaledDown += scaleDownCount totalScaledDown += scaleDownCount
totalScaleDownCount -= scaleDownCount
} }
// Expect to see old rcs scaled down by exactly totalScaledDownCount (sum of scaleDownCount) replicas.
dKey, err = controller.KeyFunc(&deployment) return totalScaledDown, nil
if err != nil {
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
}
if expectationsCheck {
dc.podExpectations.ExpectDeletions(dKey, totalScaledDown)
}
return true, err
} }
// scaleDownOldRCsForRecreate scales down old rcs when deployment strategy is "Recreate" // scaleDownOldRCsForRecreate scales down old rcs when deployment strategy is "Recreate"

View File

@ -123,6 +123,270 @@ func TestDeploymentController_reconcileNewRC(t *testing.T) {
} }
func TestDeploymentController_reconcileOldRCs(t *testing.T) { func TestDeploymentController_reconcileOldRCs(t *testing.T) {
tests := []struct {
deploymentReplicas int
maxUnavailable intstr.IntOrString
oldReplicas int
newReplicas int
readyPodsFromOldRC int
readyPodsFromNewRC int
scaleExpected bool
expectedOldReplicas int
}{
{
deploymentReplicas: 10,
maxUnavailable: intstr.FromInt(0),
oldReplicas: 10,
newReplicas: 0,
readyPodsFromOldRC: 10,
readyPodsFromNewRC: 0,
scaleExpected: false,
},
{
deploymentReplicas: 10,
maxUnavailable: intstr.FromInt(2),
oldReplicas: 10,
newReplicas: 0,
readyPodsFromOldRC: 10,
readyPodsFromNewRC: 0,
scaleExpected: true,
expectedOldReplicas: 8,
},
{ // expect unhealthy replicas from old rcs been cleaned up
deploymentReplicas: 10,
maxUnavailable: intstr.FromInt(2),
oldReplicas: 10,
newReplicas: 0,
readyPodsFromOldRC: 8,
readyPodsFromNewRC: 0,
scaleExpected: true,
expectedOldReplicas: 8,
},
{ // expect 1 unhealthy replica from old rcs been cleaned up, and 1 ready pod been scaled down
deploymentReplicas: 10,
maxUnavailable: intstr.FromInt(2),
oldReplicas: 10,
newReplicas: 0,
readyPodsFromOldRC: 9,
readyPodsFromNewRC: 0,
scaleExpected: true,
expectedOldReplicas: 8,
},
{ // the unavailable pods from the newRC would not make us scale down old RCs in a further step
deploymentReplicas: 10,
maxUnavailable: intstr.FromInt(2),
oldReplicas: 8,
newReplicas: 2,
readyPodsFromOldRC: 8,
readyPodsFromNewRC: 0,
scaleExpected: false,
},
}
for i, test := range tests {
t.Logf("executing scenario %d", i)
newSelector := map[string]string{"foo": "new"}
oldSelector := map[string]string{"foo": "old"}
newRc := rc("foo-new", test.newReplicas, newSelector)
oldRc := rc("foo-old", test.oldReplicas, oldSelector)
oldRCs := []*api.ReplicationController{oldRc}
allRCs := []*api.ReplicationController{oldRc, newRc}
deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable)
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
case core.ListAction:
podList := &api.PodList{}
for podIndex := 0; podIndex < test.readyPodsFromOldRC; podIndex++ {
podList.Items = append(podList.Items, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s-oldReadyPod-%d", oldRc.Name, podIndex),
Labels: oldSelector,
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionTrue,
},
},
},
})
}
for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRC; podIndex++ {
podList.Items = append(podList.Items, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s-oldUnhealthyPod-%d", oldRc.Name, podIndex),
Labels: oldSelector,
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionFalse,
},
},
},
})
}
for podIndex := 0; podIndex < test.readyPodsFromNewRC; podIndex++ {
podList.Items = append(podList.Items, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s-newReadyPod-%d", oldRc.Name, podIndex),
Labels: newSelector,
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionTrue,
},
},
},
})
}
for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRC; podIndex++ {
podList.Items = append(podList.Items, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s-newUnhealthyPod-%d", oldRc.Name, podIndex),
Labels: newSelector,
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionFalse,
},
},
},
})
}
return true, podList, nil
}
return false, nil, nil
})
controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.reconcileOldRCs(allRCs, oldRCs, newRc, deployment, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
}
if !test.scaleExpected && scaled {
t.Errorf("unexpected scaling: %v", fakeClientset.Actions())
}
if test.scaleExpected && !scaled {
t.Errorf("expected scaling to occur")
continue
}
continue
}
}
func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
tests := []struct {
oldReplicas int
readyPods int
unHealthyPods int
maxCleanupCount int
cleanupCountExpected int
}{
{
oldReplicas: 10,
readyPods: 8,
unHealthyPods: 2,
maxCleanupCount: 1,
cleanupCountExpected: 1,
},
{
oldReplicas: 10,
readyPods: 8,
unHealthyPods: 2,
maxCleanupCount: 3,
cleanupCountExpected: 2,
},
{
oldReplicas: 10,
readyPods: 8,
unHealthyPods: 2,
maxCleanupCount: 0,
cleanupCountExpected: 0,
},
{
oldReplicas: 10,
readyPods: 10,
unHealthyPods: 0,
maxCleanupCount: 3,
cleanupCountExpected: 0,
},
}
for i, test := range tests {
t.Logf("executing scenario %d", i)
oldRc := rc("foo-v2", test.oldReplicas, nil)
oldRCs := []*api.ReplicationController{oldRc}
deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2))
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
case core.ListAction:
podList := &api.PodList{}
for podIndex := 0; podIndex < test.readyPods; podIndex++ {
podList.Items = append(podList.Items, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s-readyPod-%d", oldRc.Name, podIndex),
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionTrue,
},
},
},
})
}
for podIndex := 0; podIndex < test.unHealthyPods; podIndex++ {
podList.Items = append(podList.Items, api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s-unHealthyPod-%d", oldRc.Name, podIndex),
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionFalse,
},
},
},
})
}
return true, podList, nil
}
return false, nil, nil
})
controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRCs, deployment, test.maxCleanupCount)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
}
if cleanupCount != test.cleanupCountExpected {
t.Errorf("expected %v unhealthy replicas been cleaned up, got %v", test.cleanupCountExpected, cleanupCount)
continue
}
}
}
func TestDeploymentController_scaleDownOldRCsForRollingUpdate(t *testing.T) {
tests := []struct { tests := []struct {
deploymentReplicas int deploymentReplicas int
maxUnavailable intstr.IntOrString maxUnavailable intstr.IntOrString
@ -196,18 +460,18 @@ func TestDeploymentController_reconcileOldRCs(t *testing.T) {
client: &fakeClientset, client: &fakeClientset,
eventRecorder: &record.FakeRecorder{}, eventRecorder: &record.FakeRecorder{},
} }
scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment, false) scaled, err := controller.scaleDownOldRCsForRollingUpdate(allRcs, oldRcs, deployment)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
continue continue
} }
if !test.scaleExpected { if !test.scaleExpected {
if scaled { if scaled != 0 {
t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) t.Errorf("unexpected scaling: %v", fakeClientset.Actions())
} }
continue continue
} }
if test.scaleExpected && !scaled { if test.scaleExpected && scaled == 0 {
t.Errorf("expected scaling to occur; actions: %v", fakeClientset.Actions()) t.Errorf("expected scaling to occur; actions: %v", fakeClientset.Actions())
continue continue
} }