From 528bf7af3a454e5210a72ac8e021bb77927a7f8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Ole=C5=9B?= Date: Fri, 1 Jul 2016 19:02:51 +0200 Subject: [PATCH] Make Daemonset use GeneralPredicates fixes #21454, fixes #22205 --- pkg/controller/daemon/daemoncontroller.go | 41 ++++++------ .../daemon/daemoncontroller_test.go | 33 ++++++++-- .../algorithm/predicates/predicates.go | 36 ---------- .../algorithm/predicates/predicates_test.go | 3 +- test/e2e/daemon_set.go | 66 +++++++++++++++++++ 5 files changed, 116 insertions(+), 63 deletions(-) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index b5063f7976..fc0d102ae7 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -27,7 +27,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -41,12 +40,11 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" - "k8s.io/kubernetes/pkg/util/sets" - "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) const ( @@ -670,25 +668,22 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { } func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { - // Check if the node satisfies the daemon set's node selector. - nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() - if !nodeSelector.Matches(labels.Set(node.Labels)) { - return false - } // If the daemon set specifies a node name, check that it matches with node.Name. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { return false } + // TODO: Move it to the predicates for _, c := range node.Status.Conditions { if c.Type == api.NodeOutOfDisk && c.Status == api.ConditionTrue { return false } } - newPod := &api.Pod{Spec: ds.Spec.Template.Spec} + newPod := &api.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta} newPod.Spec.NodeName = node.Name - pods := []*api.Pod{newPod} + + pods := []*api.Pod{} for _, m := range dsc.podStore.Indexer.List() { pod := m.(*api.Pod) @@ -705,19 +700,23 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *exte } pods = append(pods, pod) } - _, notFittingCPU, notFittingMemory, notFittingNvidiaGPU := predicates.CheckPodsExceedingFreeResources(pods, node.Status.Allocatable) - if len(notFittingCPU)+len(notFittingMemory)+len(notFittingNvidiaGPU) != 0 { - dsc.eventRecorder.Eventf(ds, api.EventTypeNormal, "FailedPlacement", "failed to place pod on %q: insufficent free resources", node.ObjectMeta.Name) - return false - } - ports := sets.String{} - for _, pod := range pods { - if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 { - dsc.eventRecorder.Eventf(ds, api.EventTypeNormal, "FailedPlacement", "failed to place pod on %q: host port conflict", node.ObjectMeta.Name) - return false + + nodeInfo := schedulercache.NewNodeInfo(pods...) + nodeInfo.SetNode(node) + fit, err := predicates.GeneralPredicates(newPod, nil, nodeInfo) + if err != nil { + if re, ok := err.(*predicates.PredicateFailureError); ok { + message := re.Error() + glog.V(2).Infof("Predicate failed on Pod: %s, for reason: %v", newPod.Name, message) } + if re, ok := err.(*predicates.InsufficientResourceError); ok { + message := re.Error() + glog.V(2).Infof("Predicate failed on Pod: %s, for reason: %v", newPod.Name, message) + } + message := fmt.Sprintf("GeneralPredicates failed due to %v.", err) + glog.Warningf("Predicate failed on Pod %s - %s", newPod.Name, message) } - return true + return fit } // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index 5dd4048aa7..eec7dbe5c8 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -90,6 +90,9 @@ func newNode(name string, label map[string]string) *api.Node { Conditions: []api.NodeCondition{ {Type: api.NodeReady, Status: api.ConditionTrue}, }, + Allocatable: api.ResourceList{ + api.ResourcePods: resource.MustParse("100"), + }, }, } } @@ -201,10 +204,8 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) { func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) { manager, podControl := newTestController() node := newNode("not-ready", nil) - node.Status = api.NodeStatus{ - Conditions: []api.NodeCondition{ - {Type: api.NodeReady, Status: api.ConditionFalse}, - }, + node.Status.Conditions = []api.NodeCondition{ + {Type: api.NodeReady, Status: api.ConditionFalse}, } manager.nodeStore.Add(node) ds := newDaemonSet("foo") @@ -238,6 +239,7 @@ func allocatableResources(memory, cpu string) api.ResourceList { return api.ResourceList{ api.ResourceMemory: resource.MustParse(memory), api.ResourceCPU: resource.MustParse(cpu), + api.ResourcePods: resource.MustParse("100"), } } @@ -558,3 +560,26 @@ func TestDSManagerNotReady(t *testing.T) { manager.podStoreSynced = alwaysReady syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } + +// Daemon with node affinity should launch pods on nodes matching affinity. +func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + daemon := newDaemonSet("foo") + affinity := map[string]string{ + api.AffinityAnnotationKey: fmt.Sprintf(` + {"nodeAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [{ + "matchExpressions": [{ + "key": "color", + "operator": "In", + "values": ["%s"] + }] + }] + }}}`, simpleNodeLabel["color"]), + } + daemon.Spec.Template.ObjectMeta.Annotations = affinity + manager.dsStore.Add(daemon) + syncAndValidateDaemonSets(t, manager, daemon, podControl, 3, 0) +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 728617d4ff..e448398f71 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -429,42 +429,6 @@ func getResourceRequest(pod *api.Pod) *resourceRequest { return &result } -func CheckPodsExceedingFreeResources(pods []*api.Pod, allocatable api.ResourceList) (fitting []*api.Pod, notFittingCPU, notFittingMemory, notFittingNvidiaGPU []*api.Pod) { - totalMilliCPU := allocatable.Cpu().MilliValue() - totalMemory := allocatable.Memory().Value() - totalNvidiaGPU := allocatable.NvidiaGPU().Value() - milliCPURequested := int64(0) - memoryRequested := int64(0) - nvidiaGPURequested := int64(0) - for _, pod := range pods { - podRequest := getResourceRequest(pod) - fitsCPU := (totalMilliCPU - milliCPURequested) >= podRequest.milliCPU - fitsMemory := (totalMemory - memoryRequested) >= podRequest.memory - fitsNVidiaGPU := (totalNvidiaGPU - nvidiaGPURequested) >= podRequest.nvidiaGPU - if !fitsCPU { - // the pod doesn't fit due to CPU request - notFittingCPU = append(notFittingCPU, pod) - continue - } - if !fitsMemory { - // the pod doesn't fit due to Memory request - notFittingMemory = append(notFittingMemory, pod) - continue - } - if !fitsNVidiaGPU { - // the pod doesn't fit due to NvidiaGPU request - notFittingNvidiaGPU = append(notFittingNvidiaGPU, pod) - continue - } - // the pod fits - milliCPURequested += podRequest.milliCPU - memoryRequested += podRequest.memory - nvidiaGPURequested += podRequest.nvidiaGPU - fitting = append(fitting, pod) - } - return -} - func podName(pod *api.Pod) string { return pod.Namespace + "/" + pod.Name } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 9c0e1da0c2..7ce564650b 100755 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -1840,8 +1840,7 @@ func TestInterPodAffinity(t *testing.T) { ObjectMeta: api.ObjectMeta{ Labels: podLabel2, Annotations: map[string]string{ - api.AffinityAnnotationKey: ` - {"podAffinity": { + api.AffinityAnnotationKey: `{"podAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": [{ "labelSelector": { "matchExpressions": [{ diff --git a/test/e2e/daemon_set.go b/test/e2e/daemon_set.go index b412e29a75..de4a4545ba 100644 --- a/test/e2e/daemon_set.go +++ b/test/e2e/daemon_set.go @@ -197,6 +197,72 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() { Expect(c.DaemonSets(ns).Delete(dsName)).NotTo(HaveOccurred()) }) + + It("should run and stop complex daemon with node affinity", func() { + complexLabel := map[string]string{daemonsetNameLabel: dsName} + nodeSelector := map[string]string{daemonsetColorLabel: "blue"} + framework.Logf("Creating daemon with a node affinity %s", dsName) + affinity := map[string]string{ + api.AffinityAnnotationKey: fmt.Sprintf(` + {"nodeAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [{ + "matchExpressions": [{ + "key": "%s", + "operator": "In", + "values": ["%s"] + }] + }] + }}}`, daemonsetColorLabel, nodeSelector[daemonsetColorLabel]), + } + _, err := c.DaemonSets(ns).Create(&extensions.DaemonSet{ + ObjectMeta: api.ObjectMeta{ + Name: dsName, + }, + Spec: extensions.DaemonSetSpec{ + Selector: &unversioned.LabelSelector{MatchLabels: complexLabel}, + Template: api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: complexLabel, + Annotations: affinity, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: dsName, + Image: image, + Ports: []api.ContainerPort{{ContainerPort: 9376}}, + }, + }, + }, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Initially, daemon pods should not be running on any nodes.") + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel)) + Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pods to be running on no nodes") + + By("Change label of node, check that daemon pod is launched.") + nodeList := framework.GetReadySchedulableNodesOrDie(f.Client) + Expect(len(nodeList.Items)).To(BeNumerically(">", 0)) + newNode, err := setDaemonSetNodeLabels(c, nodeList.Items[0].Name, nodeSelector) + Expect(err).NotTo(HaveOccurred(), "error setting labels on node") + daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels) + Expect(len(daemonSetLabels)).To(Equal(1)) + err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, complexLabel, []string{newNode.Name})) + Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pods to be running on new nodes") + + By("remove the node selector and wait for daemons to be unscheduled") + _, err = setDaemonSetNodeLabels(c, nodeList.Items[0].Name, map[string]string{}) + Expect(err).NotTo(HaveOccurred(), "error removing labels on node") + Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))). + NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes") + + By("We should now be able to delete the daemon set.") + Expect(c.DaemonSets(ns).Delete(dsName)).NotTo(HaveOccurred()) + + }) }) func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {