Replace nominateNodeName annotation with PodStatus.NominatedNodeName in scheudler logic

pull/6/head
Bobby (Babak) Salamat 2018-02-02 11:24:20 -08:00
parent 49bf442175
commit bfd950e471
8 changed files with 48 additions and 80 deletions

View File

@ -61,11 +61,6 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
const (
NoNodeAvailableMsg = "0/%v nodes are available"
// NominatedNodeAnnotationKey is used to annotate a pod that has preempted other pods.
// The scheduler uses the annotation to find that the pod shouldn't preempt more pods
// when it gets to the head of scheduling queue again.
// See podEligibleToPreemptOthers() for more information.
NominatedNodeAnnotationKey = "scheduler.kubernetes.io/nominated-node-name"
)
// Error returns detailed information of why the pod failed to fit on each node
@ -1001,8 +996,9 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {
if nodeInfo, found := nodeNameToInfo[nodeName]; found {
nomNodeName := pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
// There is a terminating pod on the nominated node.

View File

@ -1318,8 +1318,7 @@ func TestPreempt(t *testing.T) {
// Mark the victims for deletion and record the preemptor's nominated node name.
now := metav1.Now()
victim.DeletionTimestamp = &now
test.pod.Annotations = make(map[string]string)
test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name
test.pod.Status.NominatedNodeName = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, _, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))

View File

@ -397,12 +397,10 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
pods := p.unschedulableQ.GetPodsWaitingForNode(nodeName)
for _, obj := range p.activeQ.List() {
pod := obj.(*v1.Pod)
if pod.Annotations != nil {
if n, ok := pod.Annotations[NominatedNodeAnnotationKey]; ok && n == nodeName {
if pod.Status.NominatedNodeName == nodeName {
pods = append(pods, pod)
}
}
}
return pods
}
@ -420,11 +418,7 @@ type UnschedulablePodsMap struct {
var _ = UnschedulablePods(&UnschedulablePodsMap{})
func NominatedNodeName(pod *v1.Pod) string {
nominatedNodeName, ok := pod.Annotations[NominatedNodeAnnotationKey]
if !ok {
return ""
}
return nominatedNodeName
return pod.Status.NominatedNodeName
}
// Add adds a pod to the unschedulable pods.

View File

@ -41,19 +41,22 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
Name: "mpp",
Namespace: "ns2",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1", "annot2": "val2",
"annot2": "val2",
},
},
Spec: v1.PodSpec{
Priority: &mediumPriority,
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "up",
Namespace: "ns1",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1", "annot2": "val2",
"annot2": "val2",
},
},
Spec: v1.PodSpec{
@ -67,6 +70,7 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
Reason: v1.PodReasonUnschedulable,
},
},
NominatedNodeName: "node1",
},
}
@ -217,9 +221,12 @@ func TestUnschedulablePodsMap(t *testing.T) {
Name: "p0",
Namespace: "ns1",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1", "annot2": "val2",
"annot1": "val1",
},
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
@ -235,27 +242,30 @@ func TestUnschedulablePodsMap(t *testing.T) {
Name: "p2",
Namespace: "ns2",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node3", "annot2": "val2", "annot3": "val3",
"annot2": "val2", "annot3": "val3",
},
},
Status: v1.PodStatus{
NominatedNodeName: "node3",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "p3",
Namespace: "ns4",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1",
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
}
var updatedPods = make([]*v1.Pod, len(pods))
updatedPods[0] = pods[0].DeepCopy()
updatedPods[0].Annotations[NominatedNodeAnnotationKey] = "node3"
updatedPods[0].Status.NominatedNodeName = "node3"
updatedPods[1] = pods[1].DeepCopy()
updatedPods[1].Annotations[NominatedNodeAnnotationKey] = "node3"
updatedPods[1].Status.NominatedNodeName = "node3"
updatedPods[3] = pods[3].DeepCopy()
delete(updatedPods[3].Annotations, NominatedNodeAnnotationKey)
updatedPods[3].Status.NominatedNodeName = ""
tests := []struct {
podsToAdd []*v1.Pod

View File

@ -19,7 +19,6 @@ limitations under the License.
package factory
import (
"encoding/json"
"fmt"
"reflect"
"time"
@ -30,7 +29,6 @@ import (
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -1313,41 +1311,16 @@ func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]string) error {
func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
podCopy.Annotations = map[string]string{}
}
for k, v := range annotations {
podCopy.Annotations[k] = v
}
ret := &unstructured.Unstructured{}
ret.SetAnnotations(podCopy.Annotations)
patchData, err := json.Marshal(ret)
if err != nil {
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
}
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
}
func (p *podPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
if _, exists := podCopy.Annotations[core.NominatedNodeAnnotationKey]; !exists {
return nil
}
// Note: Deleting the entry from the annotations and passing it to Patch() will
// not remove the annotation. That's why we set it to empty string.
podCopy.Annotations[core.NominatedNodeAnnotationKey] = ""
ret := &unstructured.Unstructured{}
ret.SetAnnotations(podCopy.Annotations)
patchData, err := json.Marshal(ret)
if err != nil {
return err
}
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
return p.SetNominatedNodeName(pod, "")
}

View File

@ -57,8 +57,8 @@ type PodConditionUpdater interface {
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error
RemoveNominatedNodeAnnotation(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
@ -226,8 +226,7 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
var nodeName = ""
if node != nil {
nodeName = node.Name
annotations := map[string]string{core.NominatedNodeAnnotationKey: nodeName}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
@ -245,7 +244,7 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeAnnotation(p)
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.

View File

@ -65,11 +65,11 @@ func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error {
return nil
}
func (fp fakePodPreemptor) UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error {
func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
return nil
}
func (fp fakePodPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
return nil
}

View File

@ -34,7 +34,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/features"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/pkg/scheduler/core"
testutils "k8s.io/kubernetes/test/utils"
"github.com/golang/glog"
@ -42,14 +41,13 @@ import (
var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
func waitForNominatedNodeAnnotation(cs clientset.Interface, pod *v1.Pod) error {
func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
annot, found := pod.Annotations[core.NominatedNodeAnnotationKey]
if found && len(annot) > 0 {
if len(pod.Status.NominatedNodeName) > 0 {
return true, nil
}
return false, err
@ -276,7 +274,7 @@ func TestPreemption(t *testing.T) {
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
}
@ -389,7 +387,7 @@ func TestPreemptionStarvation(t *testing.T) {
t.Errorf("Error while creating the preempting pod: %v", err)
}
// Check that the preemptor pod gets the annotation for nominated node name.
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
// Make sure that preemptor is scheduled after preemptions.
@ -462,7 +460,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
t.Errorf("Error while creating the medium priority pod: %v", err)
}
// Step 3. Check that nominated node name of the medium priority pod is set.
if err := waitForNominatedNodeAnnotation(cs, medPriPod); err != nil {
if err := waitForNominatedNodeName(cs, medPriPod); err != nil {
t.Errorf("NominatedNodeName annotation was not set for pod %v: %v", medPriPod.Name, err)
}
// Step 4. Create a high priority pod.
@ -480,7 +478,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
t.Errorf("Error while creating the high priority pod: %v", err)
}
// Step 5. Check that nominated node name of the high priority pod is set.
if err := waitForNominatedNodeAnnotation(cs, highPriPod); err != nil {
if err := waitForNominatedNodeName(cs, highPriPod); err != nil {
t.Errorf("NominatedNodeName annotation was not set for pod %v: %v", medPriPod.Name, err)
}
// And the nominated node name of the medium priority pod is cleared.
@ -489,8 +487,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
if err != nil {
t.Errorf("Error getting the medium priority pod info: %v", err)
}
n, found := pod.Annotations[core.NominatedNodeAnnotationKey]
if !found || len(n) == 0 {
if len(pod.Status.NominatedNodeName) == 0 {
return true, nil
}
return false, err
@ -755,7 +752,7 @@ func TestPDBInPreemption(t *testing.T) {
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
}