From faaa485b35415b4cff938ff4c17a0c08ed918dfb Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Sat, 2 Jun 2018 08:38:58 +0800 Subject: [PATCH 1/3] Updated helper funcs to use nodename. --- pkg/controller/daemon/util/BUILD | 4 +- pkg/controller/daemon/util/daemonset_util.go | 87 +++-- .../daemon/util/daemonset_util_test.go | 367 +++++++++++++++--- 3 files changed, 370 insertions(+), 88 deletions(-) diff --git a/pkg/controller/daemon/util/BUILD b/pkg/controller/daemon/util/BUILD index 9a64e9a62e..3d92cbd88a 100644 --- a/pkg/controller/daemon/util/BUILD +++ b/pkg/controller/daemon/util/BUILD @@ -14,7 +14,6 @@ go_library( "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", "//pkg/features:go_default_library", - "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library", @@ -45,9 +44,12 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/api/testapi:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/apis:go_default_library", + "//pkg/scheduler/algorithm:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index e39f1620f1..ffb7033bd3 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -29,7 +29,6 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/scheduler/algorithm" ) @@ -136,19 +135,20 @@ func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod) ([]*v1.Pod, []* return availablePods, unavailablePods } -// ReplaceDaemonSetPodHostnameNodeAffinity replaces the 'kubernetes.io/hostname' NodeAffinity term with -// the given "nodeName" in the "affinity" terms. -func ReplaceDaemonSetPodHostnameNodeAffinity(affinity *v1.Affinity, nodename string) *v1.Affinity { +// ReplaceDaemonSetPodNodeNameNodeAffinity replaces the RequiredDuringSchedulingIgnoredDuringExecution +// NodeAffinity of the given affinity with a new NodeAffinity that selects the given nodeName. +// Note that this function assumes that no NodeAffinity conflicts with the selected nodeName. +func ReplaceDaemonSetPodNodeNameNodeAffinity(affinity *v1.Affinity, nodename string) *v1.Affinity { + nodeSelReq := v1.NodeSelectorRequirement{ + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{nodename}, + } + nodeSelector := &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: kubeletapis.LabelHostname, - Operator: v1.NodeSelectorOpIn, - Values: []string{nodename}, - }, - }, + MatchFields: []v1.NodeSelectorRequirement{nodeSelReq}, }, }, } @@ -175,28 +175,12 @@ func ReplaceDaemonSetPodHostnameNodeAffinity(affinity *v1.Affinity, nodename str return affinity } - nodeSelectorTerms := []v1.NodeSelectorTerm{} - - // Removes hostname node selector, as only the target hostname will take effect. - for _, term := range nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { - exps := []v1.NodeSelectorRequirement{} - for _, exp := range term.MatchExpressions { - if exp.Key != kubeletapis.LabelHostname { - exps = append(exps, exp) - } - } - - if len(exps) > 0 { - term.MatchExpressions = exps - nodeSelectorTerms = append(nodeSelectorTerms, term) - } - } - - // Adds target hostname NodeAffinity term. - nodeSelectorTerms = append(nodeSelectorTerms, nodeSelector.NodeSelectorTerms[0]) - // Replace node selector with the new one. - nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms + nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{nodeSelReq}, + }, + } return affinity } @@ -225,3 +209,42 @@ func AppendNoScheduleTolerationIfNotExist(tolerations []v1.Toleration) []v1.Tole return tolerations } + +// GetTargetNodeName get the target node name of DaemonSet pods. If `.spec.NodeName` is not empty (nil), +// return `.spec.NodeName`; otherwise, retrieve node name of pending pods from NodeAffinity. Return error +// if failed to retrieve node name from `.spec.NodeName` and NodeAffinity. +func GetTargetNodeName(pod *v1.Pod) (string, error) { + if len(pod.Spec.NodeName) != 0 { + return pod.Spec.NodeName, nil + } + + // If ScheduleDaemonSetPods was enabled before, retrieve node name of unscheduled pods from NodeAffinity + if pod.Spec.Affinity == nil || + pod.Spec.Affinity.NodeAffinity == nil || + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + return "", fmt.Errorf("no spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution for pod %s/%s", + pod.Namespace, pod.Name) + } + + terms := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + if len(terms) < 1 { + return "", fmt.Errorf("no nodeSelectorTerms in requiredDuringSchedulingIgnoredDuringExecution of pod %s/%s", + pod.Namespace, pod.Name) + } + + for _, term := range terms { + for _, exp := range term.MatchFields { + if exp.Key == algorithm.NodeFieldSelectorKeyNodeName && + exp.Operator == v1.NodeSelectorOpIn { + if len(exp.Values) != 1 { + return "", fmt.Errorf("the matchFields value of '%s' is not unique for pod %s/%s", + algorithm.NodeFieldSelectorKeyNodeName, pod.Namespace, pod.Name) + } + + return exp.Values[0], nil + } + } + } + + return "", fmt.Errorf("no node name found for pod %s/%s", pod.Namespace, pod.Name) +} diff --git a/pkg/controller/daemon/util/daemonset_util_test.go b/pkg/controller/daemon/util/daemonset_util_test.go index cf23be4015..5ed8d99902 100644 --- a/pkg/controller/daemon/util/daemonset_util_test.go +++ b/pkg/controller/daemon/util/daemonset_util_test.go @@ -24,8 +24,11 @@ import ( "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + "k8s.io/kubernetes/pkg/scheduler/algorithm" ) func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { @@ -171,7 +174,7 @@ func int64Ptr(i int) *int64 { return &li } -func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { +func TestReplaceDaemonSetPodNodeNameNodeAffinity(t *testing.T) { tests := []struct { affinity *v1.Affinity hostname string @@ -181,6 +184,25 @@ func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { affinity: nil, hostname: "host_1", expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: &v1.Affinity{ NodeAffinity: &v1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ @@ -197,6 +219,24 @@ func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { }, }, }, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, }, { affinity: &v1.Affinity{ @@ -235,9 +275,9 @@ func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ { - MatchExpressions: []v1.NodeSelectorRequirement{ + MatchFields: []v1.NodeSelectorRequirement{ { - Key: kubeletapis.LabelHostname, + Key: algorithm.NodeFieldSelectorKeyNodeName, Operator: v1.NodeSelectorOpIn, Values: []string{"host_1"}, }, @@ -254,55 +294,9 @@ func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ { - MatchExpressions: []v1.NodeSelectorRequirement{ + MatchFields: []v1.NodeSelectorRequirement{ { - Key: "not-host-label", - Operator: v1.NodeSelectorOpIn, - Values: []string{"label_value_1", "label_value_2"}, - }, - }, - }, - }, - }, - }, - }, - hostname: "host_1", - expected: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "not-host-label", - Operator: v1.NodeSelectorOpIn, - Values: []string{"label_value_1", "label_value_2"}, - }, - }, - }, - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: kubeletapis.LabelHostname, - Operator: v1.NodeSelectorOpIn, - Values: []string{"host_1"}, - }, - }, - }, - }, - }, - }, - }, - }, - { - affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: kubeletapis.LabelHostname, + Key: algorithm.NodeFieldSelectorKeyNodeName, Operator: v1.NodeSelectorOpIn, Values: []string{"host_1", "host_2"}, }, @@ -314,13 +308,157 @@ func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { }, hostname: "host_1", expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: nil, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: &v1.Affinity{ NodeAffinity: &v1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ { MatchExpressions: []v1.NodeSelectorRequirement{ { - Key: kubeletapis.LabelHostname, + Key: "hostname", + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_2"}, + }, + }, + }, + }, + }, + }, + }, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpNotIn, + Values: []string{"host_2"}, + }, + }, + }, + }, + }, + }, + }, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + // NOTE: Only `metadata.name` is valid key in `MatchFields` in 1.11; + // added this case for compatibility: the feature works as normal + // when new Keys introduced. + Key: "metadata.foo", + Operator: v1.NodeSelectorOpIn, + Values: []string{"bar"}, + }, + }, + }, + }, + }, + }, + }, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, Operator: v1.NodeSelectorOpIn, Values: []string{"host_1"}, }, @@ -333,10 +471,129 @@ func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { }, } - for _, test := range tests { - got := ReplaceDaemonSetPodHostnameNodeAffinity(test.affinity, test.hostname) + for i, test := range tests { + got := ReplaceDaemonSetPodNodeNameNodeAffinity(test.affinity, test.hostname) if !reflect.DeepEqual(test.expected, got) { - t.Errorf("Failed to append NodeAffinity, got: %v, expected: %v", got, test.expected) + t.Errorf("Failed to append NodeAffinity in case %d, got: %v, expected: %v", + i, got, test.expected) } } } + +func forEachFeatureGate(t *testing.T, tf func(t *testing.T), gates ...utilfeature.Feature) { + for _, fg := range gates { + func() { + enabled := utilfeature.DefaultFeatureGate.Enabled(fg) + defer func() { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, enabled)) + }() + + for _, f := range []bool{true, false} { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, f)) + t.Run(fmt.Sprintf("%v (%t)", fg, f), tf) + } + }() + } +} + +func TestGetTargetNodeName(t *testing.T) { + testFun := func(t *testing.T) { + tests := []struct { + pod *v1.Pod + nodeName string + expectedErr bool + }{ + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + }, + }, + nodeName: "node-1", + }, + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node-1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + nodeName: "node-1", + }, + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node-1", "node-2"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedErr: true, + }, + { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod4", + Namespace: "default", + }, + Spec: v1.PodSpec{}, + }, + expectedErr: true, + }, + } + + for _, test := range tests { + got, err := GetTargetNodeName(test.pod) + if test.expectedErr != (err != nil) { + t.Errorf("Unexpected error, expectedErr: %v, err: %v", test.expectedErr, err) + } else if !test.expectedErr { + if test.nodeName != got { + t.Errorf("Failed to get target node name, got: %v, expected: %v", got, test.nodeName) + } + } + } + } + + forEachFeatureGate(t, testFun, features.ScheduleDaemonSetPods) +} From 8180e1e60f647719cc83aeb205d44dd7d6ec9603 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Sat, 2 Jun 2018 08:39:13 +0800 Subject: [PATCH 2/3] Eanbled schedule DaemonSet Pods by default scheduler. --- pkg/controller/daemon/daemon_controller.go | 43 ++++++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 10fa761485..9b65db1cd1 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -762,7 +762,7 @@ func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, e return cm.ClaimPods(pods) } -// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes. // This also reconciles ControllerRef by adopting/orphaning. // Note that returned Pods are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. @@ -774,9 +774,16 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[s // Group Pods by Node name. nodeToDaemonPods := make(map[string][]*v1.Pod) for _, pod := range claimedPods { - nodeName := pod.Spec.NodeName + nodeName, err := util.GetTargetNodeName(pod) + if err != nil { + glog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v", + pod.Namespace, pod.Name, ds.Namespace, ds.Name) + continue + } + nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod) } + return nodeToDaemonPods, nil } @@ -850,7 +857,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. // Sort the daemon pods by creation time, so the oldest is preserved. if len(daemonPodsRunning) > 1 { - sort.Sort(podByCreationTimestamp(daemonPodsRunning)) + sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) for i := 1; i < len(daemonPodsRunning); i++ { podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name) } @@ -870,7 +877,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( // which nodes should not run a Pod of ds but currently running one, it calls function // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds. func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, hash string) error { - // Find out which nodes are running the daemon pods controlled by ds. + // Find out the pods which are created for the nodes by DaemonSet. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) @@ -962,9 +969,12 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod podTemplate := &template - if false /*disabled for 1.10*/ && utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { + if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { podTemplate = template.DeepCopy() - podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodHostnameNodeAffinity( + // The pod's NodeAffinity will be updated to make sure the Pod is bound + // to the target node by default scheduler. It is safe to do so because there + // should be no conflicting node affinity with the target node. + podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) podTemplate.Spec.Tolerations = util.AppendNoScheduleTolerationIfNotExist(podTemplate.Spec.Tolerations) @@ -1098,7 +1108,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, hash currentNumberScheduled++ // Sort the daemon pods by creation time, so that the oldest is first. daemonPods, _ := nodeToDaemonPods[node.Name] - sort.Sort(podByCreationTimestamp(daemonPods)) + sort.Sort(podByCreationTimestampAndPhase(daemonPods)) pod := daemonPods[0] if podutil.IsPodReady(pod) { numberReady++ @@ -1414,7 +1424,7 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit var predicateFails []algorithm.PredicateFailureReason // If ScheduleDaemonSetPods is enabled, only check nodeSelector and nodeAffinity. - if false /*disabled for 1.10*/ && utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { + if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { fit, reasons, err := nodeSelectionPredicates(pod, nil, nodeInfo) if err != nil { return false, predicateFails, err @@ -1466,12 +1476,21 @@ func (o byCreationTimestamp) Less(i, j int) bool { return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) } -type podByCreationTimestamp []*v1.Pod +type podByCreationTimestampAndPhase []*v1.Pod -func (o podByCreationTimestamp) Len() int { return len(o) } -func (o podByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o podByCreationTimestampAndPhase) Len() int { return len(o) } +func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o podByCreationTimestampAndPhase) Less(i, j int) bool { + // Scheduled Pod first + if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 { + return true + } + + if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 { + return false + } -func (o podByCreationTimestamp) Less(i, j int) bool { if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { return o[i].Name < o[j].Name } From 9fd848e5ec31720f099feb4d5e918975a3a0eb99 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Sat, 2 Jun 2018 08:39:28 +0800 Subject: [PATCH 3/3] Updated integration test. --- test/integration/daemonset/BUILD | 9 + test/integration/daemonset/daemonset_test.go | 394 ++++++++++++++++--- 2 files changed, 351 insertions(+), 52 deletions(-) diff --git a/test/integration/daemonset/BUILD b/test/integration/daemonset/BUILD index 138dddfb5b..a1c6053705 100644 --- a/test/integration/daemonset/BUILD +++ b/test/integration/daemonset/BUILD @@ -14,23 +14,32 @@ go_test( ], tags = ["integration"], deps = [ + "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/controller/daemon:go_default_library", + "//pkg/features:go_default_library", + "//pkg/scheduler:go_default_library", + "//pkg/scheduler/algorithm:go_default_library", + "//pkg/scheduler/algorithmprovider:go_default_library", + "//pkg/scheduler/factory:go_default_library", "//pkg/util/metrics:go_default_library", "//test/integration/framework:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 5c8cf8ee27..0a7a72ca16 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -24,19 +24,30 @@ import ( apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" appstyped "k8s.io/client-go/kubernetes/typed/apps/v1" + clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" corev1typed "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/daemon" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/algorithm" + "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + "k8s.io/kubernetes/pkg/scheduler/factory" "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/test/integration/framework" ) @@ -69,6 +80,62 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS return server, closeFn, dc, informers, clientSet } +func setupScheduler( + t *testing.T, + cs clientset.Interface, + informerFactory informers.SharedInformerFactory, + stopCh chan struct{}, +) { + // If ScheduleDaemonSetPods is disabled, do not start scheduler. + if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { + return + } + + schedulerConfigFactory := factory.NewConfigFactory( + v1.DefaultSchedulerName, + cs, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Apps().V1beta1().StatefulSets(), + informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), + v1.DefaultHardPodAffinitySymmetricWeight, + true, + false, + ) + + schedulerConfig, err := schedulerConfigFactory.Create() + if err != nil { + t.Fatalf("Couldn't create scheduler config: %v", err) + } + + schedulerConfig.StopEverything = stopCh + + eventBroadcaster := record.NewBroadcaster() + schedulerConfig.Recorder = eventBroadcaster.NewRecorder( + legacyscheme.Scheme, + v1.EventSource{Component: v1.DefaultSchedulerName}, + ) + eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{ + Interface: cs.CoreV1().Events(""), + }) + + sched, err := scheduler.NewFromConfigurator( + &scheduler.FakeConfigurator{Config: schedulerConfig}, nil...) + if err != nil { + t.Fatalf("error creating scheduler: %v", err) + } + + algorithmprovider.ApplyFeatureGates() + + go sched.Run() +} + func testLabels() map[string]string { return map[string]string{"name": "test"} } @@ -162,6 +229,12 @@ func updateStrategies() []*apps.DaemonSetUpdateStrategy { return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()} } +func featureGates() []utilfeature.Feature { + return []utilfeature.Feature{ + features.ScheduleDaemonSetPods, + } +} + func allocatableResources(memory, cpu string) v1.ResourceList { return v1.ResourceList{ v1.ResourceMemory: resource.MustParse(memory), @@ -242,7 +315,7 @@ func validateDaemonSetPodsAndMarkReady( t.Errorf("controllerRef.Controller is not set to true") } - if !podutil.IsPodReady(pod) { + if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 { podCopy := pod.DeepCopy() podCopy.Status = v1.PodStatus{ Phase: v1.PodRunning, @@ -261,6 +334,44 @@ func validateDaemonSetPodsAndMarkReady( } } +// podUnschedulable returns a condition function that returns true if the given pod +// gets unschedulable status. +func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond != nil && cond.Status == v1.ConditionFalse && + cond.Reason == v1.PodReasonUnschedulable, nil + } +} + +// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns +// an error if it does not become unschedulable within the given timeout. +func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { + return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name)) +} + +// waitForPodUnschedule waits for a pod to fail scheduling and returns +// an error if it does not become unschedulable within the timeout duration (30 seconds). +func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { + return waitForPodUnschedulableWithTimeout(cs, pod, 10*time.Second) +} + +// waitForPodsCreated waits for number of pods are created. +func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error { + return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) { + objects := podInformer.GetIndexer().List() + return len(objects) == num, nil + }) +} + func validateDaemonSetStatus( dsClient appstyped.DaemonSetInterface, dsName string, @@ -302,6 +413,22 @@ func validateFailedPlacementEvent(eventClient corev1typed.EventInterface, t *tes } } +func forEachFeatureGate(t *testing.T, tf func(t *testing.T)) { + for _, fg := range featureGates() { + func() { + enabled := utilfeature.DefaultFeatureGate.Enabled(fg) + defer func() { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, enabled)) + }() + + for _, f := range []bool{true, false} { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, f)) + t.Run(fmt.Sprintf("%v (%t)", fg, f), tf) + } + }() + } +} + func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) { for _, strategy := range updateStrategies() { t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy), @@ -310,69 +437,152 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe } func TestOneNodeDaemonLaunchesPod(t *testing.T) { - forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - server, closeFn, dc, informers, clientset := setup(t) - defer closeFn() - ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t) - defer framework.DeleteTestingNamespace(ns, server, t) + forEachFeatureGate(t, func(t *testing.T) { + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) - dsClient := clientset.AppsV1().DaemonSets(ns.Name) - podClient := clientset.CoreV1().Pods(ns.Name) - nodeClient := clientset.CoreV1().Nodes() - podInformer := informers.Core().V1().Pods().Informer() + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + nodeClient := clientset.CoreV1().Nodes() + podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - informers.Start(stopCh) - go dc.Run(5, stopCh) - defer close(stopCh) + stopCh := make(chan struct{}) + defer close(stopCh) - ds := newDaemonSet("foo", ns.Name) - ds.Spec.UpdateStrategy = *strategy - _, err := dsClient.Create(ds) - if err != nil { - t.Fatalf("Failed to create DaemonSet: %v", err) - } - defer cleanupDaemonSets(t, clientset, ds) + informers.Start(stopCh) + go dc.Run(5, stopCh) - _, err = nodeClient.Create(newNode("single-node", nil)) - if err != nil { - t.Fatalf("Failed to create node: %v", err) - } + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) - validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t) - validateDaemonSetStatus(dsClient, ds.Name, 1, t) + ds := newDaemonSet("foo", ns.Name) + ds.Spec.UpdateStrategy = *strategy + _, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + defer cleanupDaemonSets(t, clientset, ds) + + _, err = nodeClient.Create(newNode("single-node", nil)) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t) + validateDaemonSetStatus(dsClient, ds.Name, 1, t) + }) }) } func TestSimpleDaemonSetLaunchesPods(t *testing.T) { - forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - server, closeFn, dc, informers, clientset := setup(t) - defer closeFn() - ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t) - defer framework.DeleteTestingNamespace(ns, server, t) + forEachFeatureGate(t, func(t *testing.T) { + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) - dsClient := clientset.AppsV1().DaemonSets(ns.Name) - podClient := clientset.CoreV1().Pods(ns.Name) - nodeClient := clientset.CoreV1().Nodes() - podInformer := informers.Core().V1().Pods().Informer() + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + nodeClient := clientset.CoreV1().Nodes() + podInformer := informers.Core().V1().Pods().Informer() - stopCh := make(chan struct{}) - informers.Start(stopCh) - go dc.Run(5, stopCh) - defer close(stopCh) + stopCh := make(chan struct{}) + defer close(stopCh) - ds := newDaemonSet("foo", ns.Name) - ds.Spec.UpdateStrategy = *strategy - _, err := dsClient.Create(ds) - if err != nil { - t.Fatalf("Failed to create DaemonSet: %v", err) - } - defer cleanupDaemonSets(t, clientset, ds) + informers.Start(stopCh) + go dc.Run(5, stopCh) - addNodes(nodeClient, 0, 5, nil, t) + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) - validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t) - validateDaemonSetStatus(dsClient, ds.Name, 5, t) + ds := newDaemonSet("foo", ns.Name) + ds.Spec.UpdateStrategy = *strategy + _, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + defer cleanupDaemonSets(t, clientset, ds) + + addNodes(nodeClient, 0, 5, nil, t) + + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t) + validateDaemonSetStatus(dsClient, ds.Name, 5, t) + }) + }) +} + +func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) { + forEachFeatureGate(t, func(t *testing.T) { + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + nodeClient := clientset.CoreV1().Nodes() + podInformer := informers.Core().V1().Pods().Informer() + + stopCh := make(chan struct{}) + defer close(stopCh) + + informers.Start(stopCh) + go dc.Run(5, stopCh) + + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) + + ds := newDaemonSet("foo", ns.Name) + ds.Spec.UpdateStrategy = *strategy + + ds.Spec.Template.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "zone", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test"}, + }, + }, + }, + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: algorithm.NodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node-1"}, + }, + }, + }, + }, + }, + }, + } + + _, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + defer cleanupDaemonSets(t, clientset, ds) + + addNodes(nodeClient, 0, 2, nil, t) + // Two nodes with labels + addNodes(nodeClient, 2, 2, map[string]string{ + "zone": "test", + }, t) + addNodes(nodeClient, 4, 2, nil, t) + + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t) + validateDaemonSetStatus(dsClient, ds.Name, 3, t) + }) }) } @@ -389,9 +599,13 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { podInformer := informers.Core().V1().Pods().Informer() stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) go dc.Run(5, stopCh) - defer close(stopCh) + + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -399,6 +613,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { if err != nil { t.Fatalf("Failed to create DaemonSet: %v", err) } + defer cleanupDaemonSets(t, clientset, ds) node := newNode("single-node", nil) @@ -427,9 +642,10 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { eventClient := clientset.CoreV1().Events(ns.Namespace) stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) go dc.Run(5, stopCh) - defer close(stopCh) ds := newDaemonSet("foo", ns.Name) ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m") @@ -450,3 +666,77 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { validateFailedPlacementEvent(eventClient, t) }) } + +// TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests that when "ScheduleDaemonSetPods" +// feature is enabled, the DaemonSet should create Pods for all the nodes regardless of available resource +// on the nodes, and kube-scheduler should not schedule Pods onto the nodes with insufficient resource. +func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) { + enabled := utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) + defer func() { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", + features.ScheduleDaemonSetPods, enabled)) + }() + + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.ScheduleDaemonSetPods, true)) + + forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { + server, closeFn, dc, informers, clientset := setup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("insufficient-capacity", server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + dsClient := clientset.AppsV1().DaemonSets(ns.Name) + podClient := clientset.CoreV1().Pods(ns.Name) + podInformer := informers.Core().V1().Pods().Informer() + nodeClient := clientset.CoreV1().Nodes() + stopCh := make(chan struct{}) + defer close(stopCh) + + informers.Start(stopCh) + go dc.Run(5, stopCh) + + // Start Scheduler + setupScheduler(t, clientset, informers, stopCh) + + ds := newDaemonSet("foo", ns.Name) + ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m") + ds.Spec.UpdateStrategy = *strategy + ds, err := dsClient.Create(ds) + if err != nil { + t.Fatalf("Failed to create DaemonSet: %v", err) + } + + defer cleanupDaemonSets(t, clientset, ds) + + node := newNode("node-with-limited-memory", nil) + node.Status.Allocatable = allocatableResources("100M", "200m") + _, err = nodeClient.Create(node) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + if err := waitForPodsCreated(podInformer, 1); err != nil { + t.Errorf("Failed to wait for pods created: %v", err) + } + + objects := podInformer.GetIndexer().List() + for _, object := range objects { + pod := object.(*v1.Pod) + if err := waitForPodUnschedulable(clientset, pod); err != nil { + t.Errorf("Failed to wait for unschedulable status of pod %+v", pod) + } + } + + node1 := newNode("node-with-enough-memory", nil) + node1.Status.Allocatable = allocatableResources("200M", "2000m") + _, err = nodeClient.Create(node1) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + // When ScheduleDaemonSetPods enabled, 2 pods are created. But only one + // of two Pods is scheduled by default scheduler. + validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t) + validateDaemonSetStatus(dsClient, ds.Name, 1, t) + }) +}