From 7bdf54a30ac3a73b37014cc221b20c6c557ed944 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Wed, 1 Mar 2017 16:04:54 -0800 Subject: [PATCH 1/2] In DaemonSet e2e test, don't check nodes with NoSchedule taints --- pkg/controller/daemon/daemoncontroller.go | 23 ++++++----- test/e2e/BUILD | 2 + test/e2e/daemon_set.go | 49 ++++++++++++++++------- 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 04f822c1f2..dcb5fcde70 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -785,9 +785,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { // Returns true when a daemonset should continue running on a node if a daemonset pod is already // running on that node. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *extensions.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) { - newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta} - newPod.Namespace = ds.Namespace - newPod.Spec.NodeName = node.Name + newPod := NewPod(ds, node.Name) critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && kubelettypes.IsCriticalPod(newPod) // Because these bools require an && of all their required conditions, we start @@ -864,14 +862,14 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten nodeInfo := schedulercache.NewNodeInfo(pods...) nodeInfo.SetNode(node) - _, reasons, err := daemonSetPredicates(newPod, nodeInfo) + _, reasons, err := Predicates(newPod, nodeInfo) if err != nil { - glog.Warningf("daemonSetPredicates failed on ds '%s/%s' due to unexpected error: %v", ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err) + glog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err) return false, false, false, err } for _, r := range reasons { - glog.V(4).Infof("daemonSetPredicates failed on ds '%s/%s' for reason: %v", ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason()) + glog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason()) switch reason := r.(type) { case *predicates.InsufficientResourceError: dsc.eventRecorder.Eventf(ds, v1.EventTypeNormal, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.Error()) @@ -908,7 +906,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten predicates.ErrPodAffinityNotMatch, predicates.ErrServiceAffinityViolated: glog.Warningf("unexpected predicate failure reason: %s", reason.GetReason()) - return false, false, false, fmt.Errorf("unexpected reason: daemonSetPredicates should not return reason %s", reason.GetReason()) + return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason()) default: glog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason()) wantToRun, shouldSchedule, shouldContinueRunning = false, false, false @@ -922,9 +920,16 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten return } -// daemonSetPredicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates +func NewPod(ds *extensions.DaemonSet, nodeName string) *v1.Pod { + newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta} + newPod.Namespace = ds.Namespace + newPod.Spec.NodeName = nodeName + return newPod +} + +// Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates // and PodToleratesNodeTaints predicate -func daemonSetPredicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { +func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { var predicateFails []algorithm.PredicateFailureReason critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && kubelettypes.IsCriticalPod(pod) diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 5d45906fc2..55d89a2f9e 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -128,6 +128,7 @@ go_library( "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/cloudprovider/providers/vsphere:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/daemon:go_default_library", "//pkg/controller/deployment/util:go_default_library", "//pkg/controller/endpoint:go_default_library", "//pkg/controller/job:go_default_library", @@ -149,6 +150,7 @@ go_library( "//pkg/util/version:go_default_library", "//pkg/volume/util/volumehelper:go_default_library", "//plugin/pkg/admission/serviceaccount:go_default_library", + "//plugin/pkg/scheduler/schedulercache:go_default_library", "//test/e2e/chaosmonkey:go_default_library", "//test/e2e/common:go_default_library", "//test/e2e/framework:go_default_library", diff --git a/test/e2e/daemon_set.go b/test/e2e/daemon_set.go index 40e0f8fd48..4cd6521621 100644 --- a/test/e2e/daemon_set.go +++ b/test/e2e/daemon_set.go @@ -33,7 +33,9 @@ import ( extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/kubectl" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" @@ -109,12 +111,12 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { label := map[string]string{daemonsetNameLabel: dsName} By(fmt.Sprintf("Creating simple DaemonSet %q", dsName)) - _, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) + ds, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) Expect(err).NotTo(HaveOccurred()) By("Check that daemon pods launch on every node of the cluster.") Expect(err).NotTo(HaveOccurred()) - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start") err = checkDaemonStatus(f, dsName) Expect(err).NotTo(HaveOccurred()) @@ -124,7 +126,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { pod := podList.Items[0] err = c.Core().Pods(ns).Delete(pod.Name, nil) Expect(err).NotTo(HaveOccurred()) - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive") }) @@ -212,12 +214,12 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { label := map[string]string{daemonsetNameLabel: dsName} By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName)) - _, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) + ds, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) Expect(err).NotTo(HaveOccurred()) By("Check that daemon pods launch on every node of the cluster.") Expect(err).NotTo(HaveOccurred()) - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start") err = checkDaemonStatus(f, dsName) Expect(err).NotTo(HaveOccurred()) @@ -229,7 +231,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { pod.Status.Phase = v1.PodFailed _, err = c.Core().Pods(ns).UpdateStatus(&pod) Expect(err).NotTo(HaveOccurred(), "error failing a daemon pod") - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive") }) @@ -237,16 +239,16 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { label := map[string]string{daemonsetNameLabel: dsName} framework.Logf("Creating simple daemon set %s", dsName) - _, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) + ds, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) Expect(err).NotTo(HaveOccurred()) By("Check that daemon pods launch on every node of the cluster.") Expect(err).NotTo(HaveOccurred()) - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start") By("Update daemon pods image.") - ds, err := c.Extensions().DaemonSets(ns).Get(dsName, metav1.GetOptions{}) + ds, err = c.Extensions().DaemonSets(ns).Get(dsName, metav1.GetOptions{}) ds.Spec.Template.Spec.Containers[0].Image = redisImage _, err = c.Extensions().DaemonSets(ns).Update(ds) Expect(err).NotTo(HaveOccurred()) @@ -257,7 +259,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { By("Check that daemon pods are still running on every node of the cluster.") Expect(err).NotTo(HaveOccurred()) - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start") }) @@ -265,16 +267,16 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { label := map[string]string{daemonsetNameLabel: dsName} framework.Logf("Creating simple daemon set %s", dsName) - _, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) + ds, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label)) Expect(err).NotTo(HaveOccurred()) By("Check that daemon pods launch on every node of the cluster.") Expect(err).NotTo(HaveOccurred()) - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start") By("Update daemon pods image.") - ds, err := c.Extensions().DaemonSets(ns).Get(dsName, metav1.GetOptions{}) + ds, err = c.Extensions().DaemonSets(ns).Get(dsName, metav1.GetOptions{}) ds.Spec.Template.Spec.Containers[0].Image = redisImage ds.Spec.UpdateStrategy = extensions.DaemonSetUpdateStrategy{Type: extensions.RollingUpdateDaemonSetStrategyType} _, err = c.Extensions().DaemonSets(ns).Update(ds) @@ -286,7 +288,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { By("Check that daemon pods are still running on every node of the cluster.") Expect(err).NotTo(HaveOccurred()) - err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label, ds)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start") }) @@ -421,18 +423,35 @@ func checkDaemonPodOnNodes(f *framework.Framework, selector map[string]string, n } } -func checkRunningOnAllNodes(f *framework.Framework, selector map[string]string) func() (bool, error) { +func checkRunningOnAllNodes(f *framework.Framework, selector map[string]string, ds *extensions.DaemonSet) func() (bool, error) { return func() (bool, error) { nodeList, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{}) framework.ExpectNoError(err) nodeNames := make([]string, 0) for _, node := range nodeList.Items { + if !canScheduleOnNode(node, ds) { + framework.Logf("DaemonSet pods can't tolerate node %s with taints %+v, skip checking this node", node, node.Spec.Taints) + continue + } nodeNames = append(nodeNames, node.Name) } return checkDaemonPodOnNodes(f, selector, nodeNames)() } } +// canScheduleOnNode checks if a given DaemonSet can schedule pods on the given node +func canScheduleOnNode(node v1.Node, ds *extensions.DaemonSet) bool { + newPod := daemon.NewPod(ds, node.Name) + nodeInfo := schedulercache.NewNodeInfo() + nodeInfo.SetNode(&node) + fit, _, err := daemon.Predicates(newPod, nodeInfo) + if err != nil { + framework.Failf("Can't test DaemonSet predicates for node %s: %v", node.Name, err) + return false + } + return fit +} + func checkRunningOnNoNodes(f *framework.Framework, selector map[string]string) func() (bool, error) { return checkDaemonPodOnNodes(f, selector, make([]string, 0)) } From 16b88e7e14bf3618fa0bdcd0c6dd483c1ced10a0 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Thu, 2 Mar 2017 11:03:27 -0800 Subject: [PATCH 2/2] Add unit test for daemonset with network unavailable node --- pkg/controller/daemon/daemoncontroller_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index 4262357f17..c6869bce2e 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -448,6 +448,22 @@ func TestSufficientCapacityNodeDaemonLaunchesPod(t *testing.T) { syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } +// DaemonSet should launch a pod on a node with taint NetworkUnavailable condition. +func TestNetworkUnavailableNodeDaemonLaunchesPod(t *testing.T) { + manager, podControl, _ := newTestController() + + node := newNode("network-unavailable", nil) + node.Status.Conditions = []v1.NodeCondition{ + {Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue}, + } + manager.nodeStore.Add(node) + + ds := newDaemonSet("simple") + manager.dsStore.Add(ds) + + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + // DaemonSets not take any actions when being deleted func TestDontDoAnythingIfBeingDeleted(t *testing.T) { podSpec := resourcePodSpec("not-too-much-mem", "75M", "75m")