Merge pull request #40330 from janetkuo/kill-failed-daemon-pods

Automatic merge from submit-queue

DaemonSet controller actively kills failed pods (to recreate them)

Ref #36482, @erictune @yujuhong @mikedanese @kargakis @lukaszo @piosz @kubernetes/sig-apps-bugs 

This also helps with DaemonSet update

```release-note
```
pull/6/head
Kubernetes Submit Queue 2017-01-27 13:47:09 -08:00 committed by GitHub
commit 62c802203b
3 changed files with 150 additions and 107 deletions

View File

@ -461,26 +461,42 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
}
var nodesNeedingDaemonPods, podsToDelete []string
var failedPodsObserved int
for _, node := range nodeList.Items {
_, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(&node, ds)
if err != nil {
continue
}
daemonPods, isRunning := nodeToDaemonPods[node.Name]
daemonPods, exists := nodeToDaemonPods[node.Name]
switch {
case shouldSchedule && !isRunning:
case shouldSchedule && !exists:
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
case shouldContinueRunning && len(daemonPods) > 1:
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
// Sort the daemon pods by creation time, so the the oldest is preserved.
sort.Sort(podByCreationTimestamp(daemonPods))
for i := 1; i < len(daemonPods); i++ {
podsToDelete = append(podsToDelete, daemonPods[i].Name)
case shouldContinueRunning:
// If a daemon pod failed, delete it
// If there's no daemon pods left on this node, we will create it in the next sync loop
var daemonPodsRunning []*v1.Pod
for i := range daemonPods {
pod := daemonPods[i]
if pod.Status.Phase == v1.PodFailed {
glog.V(2).Infof("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, node.Name, pod.Name)
podsToDelete = append(podsToDelete, pod.Name)
failedPodsObserved++
} else {
daemonPodsRunning = append(daemonPodsRunning, pod)
}
}
case !shouldContinueRunning && isRunning:
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
// Sort the daemon pods by creation time, so the oldest is preserved.
if len(daemonPodsRunning) > 1 {
sort.Sort(podByCreationTimestamp(daemonPodsRunning))
for i := 1; i < len(daemonPodsRunning); i++ {
podsToDelete = append(podsToDelete, daemonPods[i].Name)
}
}
case !shouldContinueRunning && exists:
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
for i := range daemonPods {
podsToDelete = append(podsToDelete, daemonPods[i].Name)
@ -547,6 +563,10 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
for err := range errCh {
errors = append(errors, err)
}
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 {
errors = append(errors, fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name))
}
return utilerrors.NewAggregate(errors)
}
@ -774,7 +794,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten
predicates.ErrTaintsTolerationsNotMatch:
return false, false, false, fmt.Errorf("unexpected reason: GeneralPredicates should not return reason %s", reason.GetReason())
default:
glog.V(4).Infof("unknownd predicate failure reason: %s", reason.GetReason())
glog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
emitEvent = true
}

View File

@ -138,6 +138,14 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
}
}
func addFailedPods(podStore cache.Store, nodeName string, label map[string]string, number int) {
for i := 0; i < number; i++ {
pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, label)
pod.Status = v1.PodStatus{Phase: v1.PodFailed}
podStore.Add(pod)
}
}
func newTestController(initialObjects ...runtime.Object) (*DaemonSetsController, *controller.FakePodControl, *fake.Clientset) {
clientset := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(clientset, nil, controller.NoResyncPeriodFunc())
@ -653,6 +661,31 @@ func TestObservedGeneration(t *testing.T) {
}
}
// DaemonSet controller should kill all failed pods and create at most 1 pod on every node.
func TestDaemonKillFailedPods(t *testing.T) {
tests := []struct {
numFailedPods, numNormalPods, expectedCreates, expectedDeletes int
test string
}{
{numFailedPods: 0, numNormalPods: 1, expectedCreates: 0, expectedDeletes: 0, test: "normal (do nothing)"},
{numFailedPods: 0, numNormalPods: 0, expectedCreates: 1, expectedDeletes: 0, test: "no pods (create 1)"},
{numFailedPods: 1, numNormalPods: 0, expectedCreates: 0, expectedDeletes: 1, test: "1 failed pod (kill 1), 0 normal pod (create 0; will create in the next sync)"},
{numFailedPods: 1, numNormalPods: 3, expectedCreates: 0, expectedDeletes: 3, test: "1 failed pod (kill 1), 3 normal pods (kill 2)"},
{numFailedPods: 2, numNormalPods: 1, expectedCreates: 0, expectedDeletes: 2, test: "2 failed pods (kill 2), 1 normal pod"},
}
for _, test := range tests {
t.Logf("test case: %s\n", test.test)
manager, podControl, _ := newTestController()
addNodes(manager.nodeStore.Store, 0, 1, nil)
addFailedPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel, test.numFailedPods)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel, test.numNormalPods)
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, test.expectedCreates, test.expectedDeletes)
}
}
func TestNodeShouldRunDaemonPod(t *testing.T) {
cases := []struct {
podsOnNode []*v1.Pod

View File

@ -32,6 +32,7 @@ import (
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/test/e2e/framework"
@ -59,6 +60,20 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
var f *framework.Framework
AfterEach(func() {
// Clean up
daemonsets, err := f.ClientSet.Extensions().DaemonSets(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "unable to dump DaemonSets")
if daemonsets != nil && len(daemonsets.Items) > 0 {
for _, ds := range daemonsets.Items {
By(fmt.Sprintf("Deleting DaemonSet %q with reaper", ds.Name))
dsReaper, err := kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), f.InternalClientset)
Expect(err).NotTo(HaveOccurred())
err = dsReaper.Stop(f.Namespace.Name, ds.Name, 0, nil)
Expect(err).NotTo(HaveOccurred())
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds.Spec.Template.Labels))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to be reaped")
}
}
if daemonsets, err := f.ClientSet.Extensions().DaemonSets(f.Namespace.Name).List(metav1.ListOptions{}); err == nil {
framework.Logf("daemonset: %s", runtime.EncodeOrDie(api.Codecs.LegacyCodec(api.Registry.EnabledVersions()...), daemonsets))
} else {
@ -69,7 +84,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
} else {
framework.Logf("unable to dump pods: %v", err)
}
err := clearDaemonSetNodeLabels(f.ClientSet)
err = clearDaemonSetNodeLabels(f.ClientSet)
Expect(err).NotTo(HaveOccurred())
})
@ -92,38 +107,9 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
It("should run and stop simple daemon", func() {
label := map[string]string{daemonsetNameLabel: dsName}
framework.Logf("Creating simple daemon set %s", dsName)
_, err := c.Extensions().DaemonSets(ns).Create(&extensions.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: dsName,
},
Spec: extensions.DaemonSetSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: label,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: dsName,
Image: image,
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
},
},
},
},
},
})
By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
_, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label))
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.Logf("Check that reaper kills all daemon pods for %s", dsName)
dsReaper, err := kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), f.InternalClientset)
Expect(err).NotTo(HaveOccurred())
err = dsReaper.Stop(ns, dsName, 0, nil)
Expect(err).NotTo(HaveOccurred())
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, label))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to be reaped")
}()
By("Check that daemon pods launch on every node of the cluster.")
Expect(err).NotTo(HaveOccurred())
@ -133,48 +119,21 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
Expect(err).NotTo(HaveOccurred())
By("Stop a daemon pod, check that the daemon pod is revived.")
podClient := c.Core().Pods(ns)
selector := labels.Set(label).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := podClient.List(options)
Expect(err).NotTo(HaveOccurred())
Expect(len(podList.Items)).To(BeNumerically(">", 0))
podList := listDaemonPods(c, ns, label)
pod := podList.Items[0]
err = podClient.Delete(pod.Name, nil)
err = c.Core().Pods(ns).Delete(pod.Name, nil)
Expect(err).NotTo(HaveOccurred())
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive")
})
It("should run and stop complex daemon", func() {
complexLabel := map[string]string{daemonsetNameLabel: dsName}
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
framework.Logf("Creating daemon with a node selector %s", dsName)
_, err := c.Extensions().DaemonSets(ns).Create(&extensions.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: dsName,
},
Spec: extensions.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: complexLabel},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: complexLabel,
},
Spec: v1.PodSpec{
NodeSelector: nodeSelector,
Containers: []v1.Container{
{
Name: dsName,
Image: image,
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
},
},
},
},
},
})
framework.Logf("Creating daemon %q with a node selector", dsName)
ds := newDaemonSet(dsName, image, complexLabel)
ds.Spec.Template.Spec.NodeSelector = nodeSelector
_, err := c.Extensions().DaemonSets(ns).Create(ds)
Expect(err).NotTo(HaveOccurred())
By("Initially, daemon pods should not be running on any nodes.")
@ -198,17 +157,14 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))).
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")
By("We should now be able to delete the daemon set.")
Expect(c.Extensions().DaemonSets(ns).Delete(dsName, nil)).NotTo(HaveOccurred())
})
It("should run and stop complex daemon with node affinity", func() {
complexLabel := map[string]string{daemonsetNameLabel: dsName}
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
framework.Logf("Creating daemon with a node affinity %s", dsName)
affinity := &v1.Affinity{
framework.Logf("Creating daemon %q with a node affinity", dsName)
ds := newDaemonSet(dsName, image, complexLabel)
ds.Spec.Template.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
@ -225,29 +181,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
},
},
}
_, err := c.Extensions().DaemonSets(ns).Create(&extensions.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: dsName,
},
Spec: extensions.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: complexLabel},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: complexLabel,
},
Spec: v1.PodSpec{
Affinity: affinity,
Containers: []v1.Container{
{
Name: dsName,
Image: image,
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
},
},
},
},
},
})
_, err := c.Extensions().DaemonSets(ns).Create(ds)
Expect(err).NotTo(HaveOccurred())
By("Initially, daemon pods should not be running on any nodes.")
@ -271,13 +205,67 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))).
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")
})
By("We should now be able to delete the daemon set.")
Expect(c.Extensions().DaemonSets(ns).Delete(dsName, nil)).NotTo(HaveOccurred())
It("should retry creating failed daemon pods", func() {
label := map[string]string{daemonsetNameLabel: dsName}
By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
_, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label))
Expect(err).NotTo(HaveOccurred())
By("Check that daemon pods launch on every node of the cluster.")
Expect(err).NotTo(HaveOccurred())
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start")
err = checkDaemonStatus(f, dsName)
Expect(err).NotTo(HaveOccurred())
By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
podList := listDaemonPods(c, ns, label)
pod := podList.Items[0]
pod.ResourceVersion = ""
pod.Status.Phase = v1.PodFailed
_, err = c.Core().Pods(ns).UpdateStatus(&pod)
Expect(err).NotTo(HaveOccurred(), "error failing a daemon pod")
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive")
})
})
func newDaemonSet(dsName, image string, label map[string]string) *extensions.DaemonSet {
return &extensions.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: dsName,
},
Spec: extensions.DaemonSetSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: label,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: dsName,
Image: image,
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
},
},
},
},
},
}
}
func listDaemonPods(c clientset.Interface, ns string, label map[string]string) *v1.PodList {
selector := labels.Set(label).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := c.Core().Pods(ns).List(options)
Expect(err).NotTo(HaveOccurred())
Expect(len(podList.Items)).To(BeNumerically(">", 0))
return podList
}
func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
daemonSetLabels := map[string]string{}
otherLabels := map[string]string{}
@ -354,7 +342,9 @@ func checkDaemonPodOnNodes(f *framework.Framework, selector map[string]string, n
nodesToPodCount := make(map[string]int)
for _, pod := range pods {
nodesToPodCount[pod.Spec.NodeName] += 1
if controller.IsPodActive(&pod) {
nodesToPodCount[pod.Spec.NodeName] += 1
}
}
framework.Logf("nodesToPodCount: %#v", nodesToPodCount)