From bfd950e47187988b16f29a2207d04a0dba4dc079 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Fri, 2 Feb 2018 11:24:20 -0800 Subject: [PATCH] Replace nominateNodeName annotation with PodStatus.NominatedNodeName in scheudler logic --- pkg/scheduler/core/generic_scheduler.go | 10 ++--- pkg/scheduler/core/generic_scheduler_test.go | 3 +- pkg/scheduler/core/scheduling_queue.go | 12 ++---- pkg/scheduler/core/scheduling_queue_test.go | 30 +++++++++----- pkg/scheduler/factory/factory.go | 41 ++++--------------- pkg/scheduler/scheduler.go | 9 ++-- pkg/scheduler/scheduler_test.go | 4 +- test/integration/scheduler/preemption_test.go | 19 ++++----- 8 files changed, 48 insertions(+), 80 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index da04ff45ad..31cc570d4e 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -61,11 +61,6 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") const ( NoNodeAvailableMsg = "0/%v nodes are available" - // NominatedNodeAnnotationKey is used to annotate a pod that has preempted other pods. - // The scheduler uses the annotation to find that the pod shouldn't preempt more pods - // when it gets to the head of scheduling queue again. - // See podEligibleToPreemptOthers() for more information. - NominatedNodeAnnotationKey = "scheduler.kubernetes.io/nominated-node-name" ) // Error returns detailed information of why the pod failed to fit on each node @@ -1001,8 +996,9 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat // We look at the node that is nominated for this pod and as long as there are // terminating pods on the node, we don't consider this for preempting more pods. func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool { - if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found { - if nodeInfo, found := nodeNameToInfo[nodeName]; found { + nomNodeName := pod.Status.NominatedNodeName + if len(nomNodeName) > 0 { + if nodeInfo, found := nodeNameToInfo[nomNodeName]; found { for _, p := range nodeInfo.Pods() { if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) { // There is a terminating pod on the nominated node. diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 5841907a96..81d675de79 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1318,8 +1318,7 @@ func TestPreempt(t *testing.T) { // Mark the victims for deletion and record the preemptor's nominated node name. now := metav1.Now() victim.DeletionTimestamp = &now - test.pod.Annotations = make(map[string]string) - test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name + test.pod.Status.NominatedNodeName = node.Name } // Call preempt again and make sure it doesn't preempt any more pods. node, victims, _, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) diff --git a/pkg/scheduler/core/scheduling_queue.go b/pkg/scheduler/core/scheduling_queue.go index 79dfee0a1d..55440c5345 100644 --- a/pkg/scheduler/core/scheduling_queue.go +++ b/pkg/scheduler/core/scheduling_queue.go @@ -397,10 +397,8 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { pods := p.unschedulableQ.GetPodsWaitingForNode(nodeName) for _, obj := range p.activeQ.List() { pod := obj.(*v1.Pod) - if pod.Annotations != nil { - if n, ok := pod.Annotations[NominatedNodeAnnotationKey]; ok && n == nodeName { - pods = append(pods, pod) - } + if pod.Status.NominatedNodeName == nodeName { + pods = append(pods, pod) } } return pods @@ -420,11 +418,7 @@ type UnschedulablePodsMap struct { var _ = UnschedulablePods(&UnschedulablePodsMap{}) func NominatedNodeName(pod *v1.Pod) string { - nominatedNodeName, ok := pod.Annotations[NominatedNodeAnnotationKey] - if !ok { - return "" - } - return nominatedNodeName + return pod.Status.NominatedNodeName } // Add adds a pod to the unschedulable pods. diff --git a/pkg/scheduler/core/scheduling_queue_test.go b/pkg/scheduler/core/scheduling_queue_test.go index e69ae867cc..d906576135 100644 --- a/pkg/scheduler/core/scheduling_queue_test.go +++ b/pkg/scheduler/core/scheduling_queue_test.go @@ -41,19 +41,22 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{ Name: "mpp", Namespace: "ns2", Annotations: map[string]string{ - NominatedNodeAnnotationKey: "node1", "annot2": "val2", + "annot2": "val2", }, }, Spec: v1.PodSpec{ Priority: &mediumPriority, }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, }, v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "up", Namespace: "ns1", Annotations: map[string]string{ - NominatedNodeAnnotationKey: "node1", "annot2": "val2", + "annot2": "val2", }, }, Spec: v1.PodSpec{ @@ -67,6 +70,7 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{ Reason: v1.PodReasonUnschedulable, }, }, + NominatedNodeName: "node1", }, } @@ -217,9 +221,12 @@ func TestUnschedulablePodsMap(t *testing.T) { Name: "p0", Namespace: "ns1", Annotations: map[string]string{ - NominatedNodeAnnotationKey: "node1", "annot2": "val2", + "annot1": "val1", }, }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, }, { ObjectMeta: metav1.ObjectMeta{ @@ -235,27 +242,30 @@ func TestUnschedulablePodsMap(t *testing.T) { Name: "p2", Namespace: "ns2", Annotations: map[string]string{ - NominatedNodeAnnotationKey: "node3", "annot2": "val2", "annot3": "val3", + "annot2": "val2", "annot3": "val3", }, }, + Status: v1.PodStatus{ + NominatedNodeName: "node3", + }, }, { ObjectMeta: metav1.ObjectMeta{ Name: "p3", Namespace: "ns4", - Annotations: map[string]string{ - NominatedNodeAnnotationKey: "node1", - }, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", }, }, } var updatedPods = make([]*v1.Pod, len(pods)) updatedPods[0] = pods[0].DeepCopy() - updatedPods[0].Annotations[NominatedNodeAnnotationKey] = "node3" + updatedPods[0].Status.NominatedNodeName = "node3" updatedPods[1] = pods[1].DeepCopy() - updatedPods[1].Annotations[NominatedNodeAnnotationKey] = "node3" + updatedPods[1].Status.NominatedNodeName = "node3" updatedPods[3] = pods[3].DeepCopy() - delete(updatedPods[3].Annotations, NominatedNodeAnnotationKey) + updatedPods[3].Status.NominatedNodeName = "" tests := []struct { podsToAdd []*v1.Pod diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index dfc5d6c3db..81ad49c305 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -19,7 +19,6 @@ limitations under the License. package factory import ( - "encoding/json" "fmt" "reflect" "time" @@ -30,7 +29,6 @@ import ( "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -1313,41 +1311,16 @@ func (p *podPreemptor) DeletePod(pod *v1.Pod) error { return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) } -func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]string) error { +func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { podCopy := pod.DeepCopy() - if podCopy.Annotations == nil { - podCopy.Annotations = map[string]string{} - } - for k, v := range annotations { - podCopy.Annotations[k] = v - } - ret := &unstructured.Unstructured{} - ret.SetAnnotations(podCopy.Annotations) - patchData, err := json.Marshal(ret) - if err != nil { - return err - } - _, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status") - return error + podCopy.Status.NominatedNodeName = nominatedNodeName + _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy) + return err } -func (p *podPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error { - podCopy := pod.DeepCopy() - if podCopy.Annotations == nil { +func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { + if len(pod.Status.NominatedNodeName) == 0 { return nil } - if _, exists := podCopy.Annotations[core.NominatedNodeAnnotationKey]; !exists { - return nil - } - // Note: Deleting the entry from the annotations and passing it to Patch() will - // not remove the annotation. That's why we set it to empty string. - podCopy.Annotations[core.NominatedNodeAnnotationKey] = "" - ret := &unstructured.Unstructured{} - ret.SetAnnotations(podCopy.Annotations) - patchData, err := json.Marshal(ret) - if err != nil { - return err - } - _, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status") - return error + return p.SetNominatedNodeName(pod, "") } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 788647618e..74590a0869 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -57,8 +57,8 @@ type PodConditionUpdater interface { type PodPreemptor interface { GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) DeletePod(pod *v1.Pod) error - UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error - RemoveNominatedNodeAnnotation(pod *v1.Pod) error + SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error + RemoveNominatedNodeName(pod *v1.Pod) error } // Scheduler watches for new unscheduled pods. It attempts to find @@ -226,8 +226,7 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e var nodeName = "" if node != nil { nodeName = node.Name - annotations := map[string]string{core.NominatedNodeAnnotationKey: nodeName} - err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations) + err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName) if err != nil { glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err) return "", err @@ -245,7 +244,7 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e // but preemption logic does not find any node for it. In that case Preempt() // function of generic_scheduler.go returns the pod itself for removal of the annotation. for _, p := range nominatedPodsToClear { - rErr := sched.config.PodPreemptor.RemoveNominatedNodeAnnotation(p) + rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p) if rErr != nil { glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr) // We do not return as this error is not critical. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d2a5d14772..47bc6fed55 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -65,11 +65,11 @@ func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error { return nil } -func (fp fakePodPreemptor) UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error { +func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error { return nil } -func (fp fakePodPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error { +func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { return nil } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 6ad507353a..45ee3697d7 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -34,7 +34,6 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/features" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" - "k8s.io/kubernetes/pkg/scheduler/core" testutils "k8s.io/kubernetes/test/utils" "github.com/golang/glog" @@ -42,14 +41,13 @@ import ( var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300) -func waitForNominatedNodeAnnotation(cs clientset.Interface, pod *v1.Pod) error { +func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error { if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { pod, err := cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) if err != nil { return false, err } - annot, found := pod.Annotations[core.NominatedNodeAnnotationKey] - if found && len(annot) > 0 { + if len(pod.Status.NominatedNodeName) > 0 { return true, nil } return false, err @@ -276,7 +274,7 @@ func TestPreemption(t *testing.T) { } // Also check that the preemptor pod gets the annotation for nominated node name. if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err) } } @@ -389,7 +387,7 @@ func TestPreemptionStarvation(t *testing.T) { t.Errorf("Error while creating the preempting pod: %v", err) } // Check that the preemptor pod gets the annotation for nominated node name. - if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err) } // Make sure that preemptor is scheduled after preemptions. @@ -462,7 +460,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { t.Errorf("Error while creating the medium priority pod: %v", err) } // Step 3. Check that nominated node name of the medium priority pod is set. - if err := waitForNominatedNodeAnnotation(cs, medPriPod); err != nil { + if err := waitForNominatedNodeName(cs, medPriPod); err != nil { t.Errorf("NominatedNodeName annotation was not set for pod %v: %v", medPriPod.Name, err) } // Step 4. Create a high priority pod. @@ -480,7 +478,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { t.Errorf("Error while creating the high priority pod: %v", err) } // Step 5. Check that nominated node name of the high priority pod is set. - if err := waitForNominatedNodeAnnotation(cs, highPriPod); err != nil { + if err := waitForNominatedNodeName(cs, highPriPod); err != nil { t.Errorf("NominatedNodeName annotation was not set for pod %v: %v", medPriPod.Name, err) } // And the nominated node name of the medium priority pod is cleared. @@ -489,8 +487,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { if err != nil { t.Errorf("Error getting the medium priority pod info: %v", err) } - n, found := pod.Annotations[core.NominatedNodeAnnotationKey] - if !found || len(n) == 0 { + if len(pod.Status.NominatedNodeName) == 0 { return true, nil } return false, err @@ -755,7 +752,7 @@ func TestPDBInPreemption(t *testing.T) { } // Also check that the preemptor pod gets the annotation for nominated node name. if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err) } }