From 5adb2bad4534b25c567220a21a242d2745e6cbf0 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Thu, 8 Mar 2018 10:09:54 +0800 Subject: [PATCH] Task 2: Schedule DaemonSet Pods by default scheduler. Signed-off-by: Da K. Ma --- pkg/controller/daemon/BUILD | 2 + pkg/controller/daemon/daemon_controller.go | 61 ++++++- .../daemon/daemon_controller_test.go | 116 ++++++++++++ pkg/controller/daemon/util/BUILD | 2 + pkg/controller/daemon/util/daemonset_util.go | 66 +++++++ .../daemon/util/daemonset_util_test.go | 172 ++++++++++++++++++ pkg/features/kube_features.go | 4 +- 7 files changed, 419 insertions(+), 4 deletions(-) diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 8dddb58b12..81c11c383d 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -68,6 +68,8 @@ go_test( "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/controller:go_default_library", + "//pkg/features:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/securitycontext:go_default_library", diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 100b5d19f3..8200ba1ff8 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -934,7 +934,22 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod for i := pos; i < pos+batchSize; i++ { go func(ix int) { defer createWait.Done() - err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, metav1.NewControllerRef(ds, controllerKind)) + var err error + + podTemplate := &template + + if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { + podTemplate = template.DeepCopy() + podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodHostnameNodeAffinity( + podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) + + err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate, + ds, metav1.NewControllerRef(ds, controllerKind)) + } else { + err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate, + ds, metav1.NewControllerRef(ds, controllerKind)) + } + if err != nil && errors.IsTimeout(err) { // Pod is created but its initialization has timed out. // If the initialization is successful eventually, the @@ -1267,6 +1282,9 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps. return false, false, false, err } + // TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason, + // e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning" + // into one result, e.g. selectedNode. var insufficientResourceErr error for _, r := range reasons { glog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason()) @@ -1341,11 +1359,50 @@ func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod { return newPod } +// nodeSelectionPredicates runs a set of predicates that select candidate nodes for the DaemonSet; +// the predicates include: +// - PodFitsHost: checks pod's NodeName against node +// - PodMatchNodeSelector: checks pod's NodeSelector and NodeAffinity against node +func nodeSelectionPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + var predicateFails []algorithm.PredicateFailureReason + fit, reasons, err := predicates.PodFitsHost(pod, meta, nodeInfo) + if err != nil { + return false, predicateFails, err + } + if !fit { + predicateFails = append(predicateFails, reasons...) + } + + fit, reasons, err = predicates.PodMatchNodeSelector(pod, meta, nodeInfo) + if err != nil { + return false, predicateFails, err + } + if !fit { + predicateFails = append(predicateFails, reasons...) + } + return len(predicateFails) == 0, predicateFails, nil +} + // Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates // and PodToleratesNodeTaints predicate func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { var predicateFails []algorithm.PredicateFailureReason - critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && kubelettypes.IsCriticalPod(pod) + + // If ScheduleDaemonSetPods is enabled, only check nodeSelector and nodeAffinity. + if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { + fit, reasons, err := nodeSelectionPredicates(pod, nil, nodeInfo) + if err != nil { + return false, predicateFails, err + } + if !fit { + predicateFails = append(predicateFails, reasons...) + } + + return len(predicateFails) == 0, predicateFails, nil + } + + critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && + kubelettypes.IsCriticalPod(pod) fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) if err != nil { diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index ce4c6c5e7f..4424dda018 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -43,6 +43,8 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/features" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/securitycontext" @@ -267,6 +269,31 @@ func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template * return nil } +func (f *fakePodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + f.Lock() + defer f.Unlock() + if err := f.FakePodControl.CreatePodsWithControllerRef(namespace, template, object, controllerRef); err != nil { + return fmt.Errorf("failed to create pod for DaemonSet") + } + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: template.Labels, + Namespace: namespace, + }, + } + + pod.Name = names.SimpleNameGenerator.GenerateName(fmt.Sprintf("%p-", pod)) + + if err := legacyscheme.Scheme.Convert(&template.Spec, &pod.Spec, nil); err != nil { + return fmt.Errorf("unable to convert pod template: %v", err) + } + + f.podStore.Update(pod) + f.podIDMap[pod.Name] = pod + return nil +} + func (f *fakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { f.Lock() defer f.Unlock() @@ -423,6 +450,95 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { } } +// When ScheduleDaemonSetPods is enabled, DaemonSets without node selectors should +// launch pods on every node by NodeAffinity. +func TestSimpleDaemonSetScheduleDaemonSetPodsLaunchesPods(t *testing.T) { + enabled := utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) + // Rollback feature gate. + defer func() { + if !enabled { + utilfeature.DefaultFeatureGate.Set("ScheduleDaemonSetPods=false") + } + }() + + utilfeature.DefaultFeatureGate.Set("ScheduleDaemonSetPods=true") + + nodeNum := 5 + + for _, strategy := range updateStrategies() { + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = *strategy + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, nodeNum, nil) + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, nodeNum, 0, 0) + + // Check for ScheduleDaemonSetPods feature + if len(podControl.podIDMap) != nodeNum { + t.Fatalf("failed to create pods for DaemonSet when enabled ScheduleDaemonSetPods.") + } + + nodeMap := make(map[string]*v1.Node) + for _, node := range manager.nodeStore.List() { + n := node.(*v1.Node) + nodeMap[n.Name] = n + } + + if len(nodeMap) != nodeNum { + t.Fatalf("not enough nodes in the store, expected: %v, got: %v", + nodeNum, len(nodeMap)) + } + + for _, pod := range podControl.podIDMap { + if len(pod.Spec.NodeName) != 0 { + t.Fatalf("the hostname of pod %v should be empty, but got %s", + pod.Name, pod.Spec.NodeName) + } + if pod.Spec.Affinity == nil { + t.Fatalf("the Affinity of pod %s is nil.", pod.Name) + } + if pod.Spec.Affinity.NodeAffinity == nil { + t.Fatalf("the NodeAffinity of pod %s is nil.", pod.Name) + } + + nodeSelector := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution + if nodeSelector == nil { + t.Fatalf("the node selector of pod %s is nil.", pod.Name) + } + if len(nodeSelector.NodeSelectorTerms) != 1 { + t.Fatalf("incorrect node selector terms number of pod %s, expected: 1, got: %d.", + pod.Name, len(nodeSelector.NodeSelectorTerms)) + } + + if len(nodeSelector.NodeSelectorTerms[0].MatchExpressions) != 1 { + t.Fatalf("incorrect expression number of pod %s node selector term, expected: 1, got: %d.", + pod.Name, len(nodeSelector.NodeSelectorTerms[0].MatchExpressions)) + } + + exp := nodeSelector.NodeSelectorTerms[0].MatchExpressions[0] + if exp.Key == kubeletapis.LabelHostname { + if exp.Operator != v1.NodeSelectorOpIn { + t.Fatalf("the operation of hostname NodeAffinity is not %v", v1.NodeSelectorOpIn) + } + + if len(exp.Values) != 1 { + t.Fatalf("incorrect hostname in node affinity: expected 1, got %v", len(exp.Values)) + } + + delete(nodeMap, exp.Values[0]) + } + } + + if len(nodeMap) != 0 { + t.Fatalf("did not foud pods on nodes %+v", nodeMap) + } + } + +} + // Simulate a cluster with 100 nodes, but simulate a limit (like a quota limit) // of 10 pods, and verify that the ds doesn't make 100 create calls per sync pass func TestSimpleDaemonSetPodCreateErrors(t *testing.T) { diff --git a/pkg/controller/daemon/util/BUILD b/pkg/controller/daemon/util/BUILD index 0f0baa655a..b33d553a95 100644 --- a/pkg/controller/daemon/util/BUILD +++ b/pkg/controller/daemon/util/BUILD @@ -14,6 +14,7 @@ go_library( "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", "//pkg/features:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library", @@ -43,6 +44,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/api/testapi:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index f6510ea9be..20a0889d19 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -28,6 +28,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/scheduler/algorithm" ) @@ -133,3 +134,68 @@ func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod) ([]*v1.Pod, []* } return availablePods, unavailablePods } + +// ReplaceDaemonSetPodHostnameNodeAffinity replaces the 'kubernetes.io/hostname' NodeAffinity term with +// the given "nodeName" in the "affinity" terms. +func ReplaceDaemonSetPodHostnameNodeAffinity(affinity *v1.Affinity, nodename string) *v1.Affinity { + nodeSelector := &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{nodename}, + }, + }, + }, + }, + } + + if affinity == nil { + return &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: nodeSelector, + }, + } + } + + if affinity.NodeAffinity == nil { + affinity.NodeAffinity = &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: nodeSelector, + } + return affinity + } + + nodeAffinity := affinity.NodeAffinity + + if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = nodeSelector + return affinity + } + + nodeSelectorTerms := []v1.NodeSelectorTerm{} + + // Removes hostname node selector, as only the target hostname will take effect. + for _, term := range nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { + exps := []v1.NodeSelectorRequirement{} + for _, exp := range term.MatchExpressions { + if exp.Key != kubeletapis.LabelHostname { + exps = append(exps, exp) + } + } + + if len(exps) > 0 { + term.MatchExpressions = exps + nodeSelectorTerms = append(nodeSelectorTerms, term) + } + } + + // Adds target hostname NodeAffinity term. + nodeSelectorTerms = append(nodeSelectorTerms, nodeSelector.NodeSelectorTerms[0]) + + // Replace node selector with the new one. + nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms + + return affinity +} diff --git a/pkg/controller/daemon/util/daemonset_util_test.go b/pkg/controller/daemon/util/daemonset_util_test.go index 21060362ec..cf23be4015 100644 --- a/pkg/controller/daemon/util/daemonset_util_test.go +++ b/pkg/controller/daemon/util/daemonset_util_test.go @@ -18,12 +18,14 @@ package util import ( "fmt" + "reflect" "testing" "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api/testapi" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" ) func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { @@ -168,3 +170,173 @@ func int64Ptr(i int) *int64 { li := int64(i) return &li } + +func TestReplaceDaemonSetPodHostnameNodeAffinity(t *testing.T) { + tests := []struct { + affinity *v1.Affinity + hostname string + expected *v1.Affinity + }{ + { + affinity: nil, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "not-host-label", + Operator: v1.NodeSelectorOpIn, + Values: []string{"label_value_1", "label_value_2"}, + }, + }, + }, + }, + }, + }, + }, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "not-host-label", + Operator: v1.NodeSelectorOpIn, + Values: []string{"label_value_1", "label_value_2"}, + }, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + { + affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1", "host_2"}, + }, + }, + }, + }, + }, + }, + }, + hostname: "host_1", + expected: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: kubeletapis.LabelHostname, + Operator: v1.NodeSelectorOpIn, + Values: []string{"host_1"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + got := ReplaceDaemonSetPodHostnameNodeAffinity(test.affinity, test.hostname) + if !reflect.DeepEqual(test.expected, got) { + t.Errorf("Failed to append NodeAffinity, got: %v, expected: %v", got, test.expected) + } + } +} diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 10a58493df..768cc600f7 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -242,7 +242,7 @@ const ( // alpha: v1.10 // // Schedule DaemonSet Pods by default scheduler instead of DaemonSet controller - NoDaemonSetScheduler utilfeature.Feature = "NoDaemonSetScheduler" + ScheduleDaemonSetPods utilfeature.Feature = "ScheduleDaemonSetPods" // owner: @mikedanese // alpha: v1.10 @@ -308,7 +308,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS SupportIPVSProxyMode: {Default: true, PreRelease: utilfeature.Beta}, SupportPodPidsLimit: {Default: false, PreRelease: utilfeature.Alpha}, HyperVContainer: {Default: false, PreRelease: utilfeature.Alpha}, - NoDaemonSetScheduler: {Default: false, PreRelease: utilfeature.Alpha}, + ScheduleDaemonSetPods: {Default: false, PreRelease: utilfeature.Alpha}, TokenRequest: {Default: false, PreRelease: utilfeature.Alpha}, CRIContainerLogRotation: {Default: false, PreRelease: utilfeature.Alpha}, GCERegionalPersistentDisk: {Default: true, PreRelease: utilfeature.Beta},