diff --git a/examples/scheduler-policy-config-with-extender.json b/examples/scheduler-policy-config-with-extender.json index cc1a9bd6fb..fb5e138135 100644 --- a/examples/scheduler-policy-config-with-extender.json +++ b/examples/scheduler-policy-config-with-extender.json @@ -20,6 +20,7 @@ "filterVerb": "filter", "bindVerb": "bind", "prioritizeVerb": "prioritize", + "preemptVerb": "preempt", "weight": 5, "enableHttps": false, "nodeCacheCapable": false diff --git a/pkg/scheduler/algorithm/scheduler_interface.go b/pkg/scheduler/algorithm/scheduler_interface.go index f86015b491..61c740c60a 100644 --- a/pkg/scheduler/algorithm/scheduler_interface.go +++ b/pkg/scheduler/algorithm/scheduler_interface.go @@ -29,7 +29,9 @@ type SchedulerExtender interface { // Filter based on extender-implemented predicate functions. The filtered list is // expected to be a subset of the supplied list. failedNodesMap optionally contains // the list of failed nodes and failure reasons. - Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) + Filter(pod *v1.Pod, + nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo, + ) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) // Prioritize based on extender-implemented priority functions. The returned scores & weight // are used to compute the weighted score for an extender. The weighted scores are added to @@ -45,6 +47,23 @@ type SchedulerExtender interface { // IsInterested returns true if at least one extended resource requested by // this pod is managed by this extender. IsInterested(pod *v1.Pod) bool + + // ProcessPreemption returns nodes with their victim pods processed by extender based on + // given: + // 1. Pod to schedule + // 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process. + // 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled. + // The possible changes made by extender may include: + // 1. Subset of given candidate nodes after preemption phase of extender. + // 2. A different set of victim pod for every given candidate node after preemption phase of extender. + ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, + ) (map[*v1.Node]*schedulerapi.Victims, error) + + // SupportsPreemption returns if the scheduler extender support preemption or not. + SupportsPreemption() bool } // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index cfc8d219ec..de985d9c82 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -168,6 +168,8 @@ type ExtenderConfig struct { URLPrefix string // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender. FilterVerb string + // Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender. + PreemptVerb string // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender. PrioritizeVerb string // The numeric multiplier for the node scores that the prioritize call generates. @@ -200,11 +202,48 @@ type ExtenderConfig struct { ManagedResources []ExtenderManagedResource } +// ExtenderPreemptionResult represents the result returned by preemption phase of extender. +type ExtenderPreemptionResult struct { + NodeNameToMetaVictims map[string]*MetaVictims +} + +// ExtenderPreemptionArgs represents the arguments needed by the extender to preempt pods on nodes. +type ExtenderPreemptionArgs struct { + // Pod being scheduled + Pod *v1.Pod + // Victims map generated by scheduler preemption phase + // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims. + NodeToVictims map[*v1.Node]*Victims + NodeNameToMetaVictims map[string]*MetaVictims +} + +// Victims represents: +// pods: a group of pods expected to be preempted. +// numPDBViolations: the count of violations of PodDisruptionBudget +type Victims struct { + Pods []*v1.Pod + NumPDBViolations int +} + +// MetaPod represent identifier for a v1.Pod +type MetaPod struct { + UID string +} + +// MetaVictims represents: +// pods: a group of pods expected to be preempted. +// Only Pod identifiers will be sent and user are expect to get v1.Pod in their own way. +// numPDBViolations: the count of violations of PodDisruptionBudget +type MetaVictims struct { + Pods []*MetaPod + NumPDBViolations int +} + // ExtenderArgs represents the arguments needed by the extender to filter/prioritize // nodes for a pod. type ExtenderArgs struct { // Pod being scheduled - Pod v1.Pod + Pod *v1.Pod // List of candidate nodes where the pod can be scheduled; to be populated // only if ExtenderConfig.NodeCacheCapable == false Nodes *v1.NodeList diff --git a/pkg/scheduler/api/v1/types.go b/pkg/scheduler/api/v1/types.go index 32ac257958..e1722cc9ca 100644 --- a/pkg/scheduler/api/v1/types.go +++ b/pkg/scheduler/api/v1/types.go @@ -142,6 +142,8 @@ type ExtenderConfig struct { URLPrefix string `json:"urlPrefix"` // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender. FilterVerb string `json:"filterVerb,omitempty"` + // Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender. + PreemptVerb string `json:"preemptVerb,omitempty"` // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender. PrioritizeVerb string `json:"prioritizeVerb,omitempty"` // The numeric multiplier for the node scores that the prioritize call generates. @@ -178,7 +180,7 @@ type ExtenderConfig struct { // nodes for a pod. type ExtenderArgs struct { // Pod being scheduled - Pod apiv1.Pod `json:"pod"` + Pod *apiv1.Pod `json:"pod"` // List of candidate nodes where the pod can be scheduled; to be populated // only if ExtenderConfig.NodeCacheCapable == false Nodes *apiv1.NodeList `json:"nodes,omitempty"` @@ -187,6 +189,43 @@ type ExtenderArgs struct { NodeNames *[]string `json:"nodenames,omitempty"` } +// ExtenderPreemptionResult represents the result returned by preemption phase of extender. +type ExtenderPreemptionResult struct { + NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"` +} + +// ExtenderPreemptionArgs represents the arguments needed by the extender to preempt pods on nodes. +type ExtenderPreemptionArgs struct { + // Pod being scheduled + Pod *apiv1.Pod `json:"pod"` + // Victims map generated by scheduler preemption phase + // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims. + NodeToVictims map[*apiv1.Node]*Victims `json:"nodeToVictims,omitempty"` + NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"` +} + +// Victims represents: +// pods: a group of pods expected to be preempted. +// numPDBViolations: the count of violations of PodDisruptionBudget +type Victims struct { + Pods []*apiv1.Pod `json:"pods"` + NumPDBViolations int `json:"numPDBViolations"` +} + +// MetaPod represent identifier for a v1.Pod +type MetaPod struct { + UID string `json:"uid"` +} + +// MetaVictims represents: +// pods: a group of pods expected to be preempted. +// Only Pod identifiers will be sent and user are expect to get v1.Pod in their own way. +// numPDBViolations: the count of violations of PodDisruptionBudget +type MetaVictims struct { + Pods []*MetaPod `json:"pods"` + NumPDBViolations int `json:"numPDBViolations"` +} + // FailedNodesMap represents the filtered out nodes, with node names and failure messages type FailedNodesMap map[string]string diff --git a/pkg/scheduler/api/v1/zz_generated.deepcopy.go b/pkg/scheduler/api/v1/zz_generated.deepcopy.go index 98f8fcbf7e..77efc9a175 100644 --- a/pkg/scheduler/api/v1/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/v1/zz_generated.deepcopy.go @@ -29,7 +29,15 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExtenderArgs) DeepCopyInto(out *ExtenderArgs) { *out = *in - in.Pod.DeepCopyInto(&out.Pod) + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(core_v1.Pod) + (*in).DeepCopyInto(*out) + } + } if in.Nodes != nil { in, out := &in.Nodes, &out.Nodes if *in == nil { @@ -210,6 +218,85 @@ func (in FailedNodesMap) DeepCopy() FailedNodesMap { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { + *out = *in + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(core_v1.Pod) + (*in).DeepCopyInto(*out) + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*core_v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *core_v1.Node + } + } + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionArgs. +func (in *ExtenderPreemptionArgs) DeepCopy() *ExtenderPreemptionArgs { + if in == nil { + return nil + } + out := new(ExtenderPreemptionArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) { + *out = *in + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*core_v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *core_v1.Node + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionResult. +func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult { + if in == nil { + return nil + } + out := new(ExtenderPreemptionResult) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HostPriority) DeepCopyInto(out *HostPriority) { *out = *in @@ -283,6 +370,50 @@ func (in *LabelsPresence) DeepCopy() *LabelsPresence { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaPod) DeepCopyInto(out *MetaPod) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaPod. +func (in *MetaPod) DeepCopy() *MetaPod { + if in == nil { + return nil + } + out := new(MetaPod) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaVictims) DeepCopyInto(out *MetaVictims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*MetaPod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(MetaPod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaVictims. +func (in *MetaVictims) DeepCopy() *MetaVictims { + if in == nil { + return nil + } + out := new(MetaVictims) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Policy) DeepCopyInto(out *Policy) { *out = *in @@ -483,3 +614,31 @@ func (in *ServiceAntiAffinity) DeepCopy() *ServiceAntiAffinity { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Victims) DeepCopyInto(out *Victims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*core_v1.Pod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(core_v1.Pod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Victims. +func (in *Victims) DeepCopy() *Victims { + if in == nil { + return nil + } + out := new(Victims) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/scheduler/api/validation/validation_test.go b/pkg/scheduler/api/validation/validation_test.go index ede1b2c421..9c3a5cd355 100644 --- a/pkg/scheduler/api/validation/validation_test.go +++ b/pkg/scheduler/api/validation/validation_test.go @@ -61,6 +61,10 @@ func TestValidatePolicy(t *testing.T) { policy: api.Policy{ExtenderConfigs: []api.ExtenderConfig{{URLPrefix: "http://127.0.0.1:8081/extender", FilterVerb: "filter"}}}, expected: nil, }, + { + policy: api.Policy{ExtenderConfigs: []api.ExtenderConfig{{URLPrefix: "http://127.0.0.1:8081/extender", PreemptVerb: "preempt"}}}, + expected: nil, + }, { policy: api.Policy{ ExtenderConfigs: []api.ExtenderConfig{ diff --git a/pkg/scheduler/api/zz_generated.deepcopy.go b/pkg/scheduler/api/zz_generated.deepcopy.go index 1986933b93..b765a201d4 100644 --- a/pkg/scheduler/api/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/zz_generated.deepcopy.go @@ -29,7 +29,15 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExtenderArgs) DeepCopyInto(out *ExtenderArgs) { *out = *in - in.Pod.DeepCopyInto(&out.Pod) + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(v1.Pod) + (*in).DeepCopyInto(*out) + } + } if in.Nodes != nil { in, out := &in.Nodes, &out.Nodes if *in == nil { @@ -210,6 +218,85 @@ func (in FailedNodesMap) DeepCopy() FailedNodesMap { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { + *out = *in + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(v1.Pod) + (*in).DeepCopyInto(*out) + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *v1.Node + } + } + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionArgs. +func (in *ExtenderPreemptionArgs) DeepCopy() *ExtenderPreemptionArgs { + if in == nil { + return nil + } + out := new(ExtenderPreemptionArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) { + *out = *in + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *v1.Node + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionResult. +func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult { + if in == nil { + return nil + } + out := new(ExtenderPreemptionResult) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HostPriority) DeepCopyInto(out *HostPriority) { *out = *in @@ -283,6 +370,50 @@ func (in *LabelsPresence) DeepCopy() *LabelsPresence { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaPod) DeepCopyInto(out *MetaPod) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaPod. +func (in *MetaPod) DeepCopy() *MetaPod { + if in == nil { + return nil + } + out := new(MetaPod) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaVictims) DeepCopyInto(out *MetaVictims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*MetaPod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(MetaPod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaVictims. +func (in *MetaVictims) DeepCopy() *MetaVictims { + if in == nil { + return nil + } + out := new(MetaVictims) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Policy) DeepCopyInto(out *Policy) { *out = *in @@ -483,3 +614,31 @@ func (in *ServiceAntiAffinity) DeepCopy() *ServiceAntiAffinity { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Victims) DeepCopyInto(out *Victims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*v1.Pod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(v1.Pod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Victims. +func (in *Victims) DeepCopy() *Victims { + if in == nil { + return nil + } + out := new(Victims) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index dd2823df3b..8163951d7d 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -27,6 +27,7 @@ go_test( "//vendor/k8s.io/api/apps/v1beta1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/scheduler/core/extender.go b/pkg/scheduler/core/extender.go index 022db87f5b..14bd455cc0 100644 --- a/pkg/scheduler/core/extender.go +++ b/pkg/scheduler/core/extender.go @@ -41,6 +41,7 @@ const ( // HTTPExtender implements the algorithm.SchedulerExtender interface. type HTTPExtender struct { extenderURL string + preemptVerb string filterVerb string prioritizeVerb string bindVerb string @@ -93,6 +94,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx } return &HTTPExtender{ extenderURL: config.URLPrefix, + preemptVerb: config.PreemptVerb, filterVerb: config.FilterVerb, prioritizeVerb: config.PrioritizeVerb, bindVerb: config.BindVerb, @@ -103,10 +105,126 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx }, nil } +// SupportsPreemption returns if a extender support preemption. +// A extender should have preempt verb defined and enabled its own node cache. +func (h *HTTPExtender) SupportsPreemption() bool { + return len(h.preemptVerb) > 0 +} + +// ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender. +func (h *HTTPExtender) ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + var ( + result schedulerapi.ExtenderPreemptionResult + args *schedulerapi.ExtenderPreemptionArgs + ) + + if !h.SupportsPreemption() { + return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL) + } + + if h.nodeCacheCapable { + // If extender has cached node info, pass NodeNameToMetaVictims in args. + nodeNameToVictims := convertToNodeNameToMetaVictims(nodeToVictims) + args = &schedulerapi.ExtenderPreemptionArgs{ + Pod: pod, + NodeNameToMetaVictims: nodeNameToVictims, + } + } else { + args = &schedulerapi.ExtenderPreemptionArgs{ + Pod: pod, + NodeToVictims: nodeToVictims, + } + } + + if err := h.send(h.preemptVerb, args, &result); err != nil { + return nil, err + } + + // Extender will always return NodeNameToMetaVictims. + // So let's convert it to NodeToVictims by using NodeNameToInfo. + nodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo) + if err != nil { + return nil, err + } + return nodeToVictims, nil + +} + +// convertToNodeToVictims converts from meta types to struct type. +func (h *HTTPExtender) convertToNodeToVictims( + nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + nodeToVictims := map[*v1.Node]*schedulerapi.Victims{} + for nodeName, metaVictims := range nodeNameToMetaVictims { + victims := &schedulerapi.Victims{ + Pods: []*v1.Pod{}, + } + for _, metaPod := range metaVictims.Pods { + pod, err := h.restorePodFromNodeInfo(metaPod, nodeName, nodeNameToInfo) + if err != nil { + return nil, err + } + victims.Pods = append(victims.Pods, pod) + } + nodeToVictims[nodeNameToInfo[nodeName].Node()] = victims + } + return nodeToVictims, nil +} + +// restorePodFromNodeInfo returns v1.Pod object for given MetaPod and node name. +// The v1.Pod object is restored by nodeInfo.Pods(). +// It should return error if there's cache inconsistency between default scheduler and extender +// so that this pod or node is missing from nodeNameToInfo. +func (h *HTTPExtender) restorePodFromNodeInfo( + metaPod *schedulerapi.MetaPod, + nodeName string, + nodeNameToInfo map[string]*schedulercache.NodeInfo) (*v1.Pod, error) { + if nodeInfo, ok := nodeNameToInfo[nodeName]; ok { + for _, pod := range nodeInfo.Pods() { + if string(pod.UID) == metaPod.UID { + return pod, nil + } + } + return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node.", + h.extenderURL, metaPod, nodeInfo.Node().Name) + } else { + return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map", + h.extenderURL, nodeInfo.Node().Name) + } +} + +// convertToNodeNameToMetaVictims converts from struct type to meta types. +func convertToNodeNameToMetaVictims( + nodeToVictims map[*v1.Node]*schedulerapi.Victims, +) map[string]*schedulerapi.MetaVictims { + nodeNameToVictims := map[string]*schedulerapi.MetaVictims{} + for node, victims := range nodeToVictims { + metaVictims := &schedulerapi.MetaVictims{ + Pods: []*schedulerapi.MetaPod{}, + } + for _, pod := range victims.Pods { + metaPod := &schedulerapi.MetaPod{ + UID: string(pod.UID), + } + metaVictims.Pods = append(metaVictims.Pods, metaPod) + } + nodeNameToVictims[node.GetName()] = metaVictims + } + return nodeNameToVictims +} + // Filter based on extender implemented predicate functions. The filtered list is // expected to be a subset of the supplied list. failedNodesMap optionally contains // the list of failed nodes and failure reasons. -func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { +func (h *HTTPExtender) Filter( + pod *v1.Pod, + nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo, +) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { var ( result schedulerapi.ExtenderFilterResult nodeList *v1.NodeList @@ -133,7 +251,7 @@ func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[ } args = &schedulerapi.ExtenderArgs{ - Pod: *pod, + Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } @@ -193,7 +311,7 @@ func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi. } args = &schedulerapi.ExtenderArgs{ - Pod: *pod, + Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 766ee2dec5..89bef4d8a2 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -22,12 +22,14 @@ import ( "time" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/schedulercache" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util" ) type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error) @@ -111,22 +113,140 @@ type FakeExtender struct { nodeCacheCapable bool filteredNodes []*v1.Node unInterested bool + + // Cached node information for fake extender + cachedNodeNameToInfo map[string]*schedulercache.NodeInfo + cachedPDBs []*policy.PodDisruptionBudget +} + +// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side. +// Returns: +// 1. More victim pods (if any) amended by preemption phase of extender. +// 2. Number of violating victim (used to calculate PDB). +// 3. Fits or not after preemption phase on extender's side. +func (f *FakeExtender) selectVictimsOnNodeByExtender( + pod *v1.Pod, + node *v1.Node, +) ([]*v1.Pod, int, bool) { + // TODO(harry): add more test in generic_scheduler_test.go to verify this logic. + // If a extender support preemption but have no cached node info, let's run filter to make sure + // default scheduler's decision still stand with given pod and node. + if !f.nodeCacheCapable { + if fits, _ := f.runPredicate(pod, node); !fits { + return nil, 0, false + } + return []*v1.Pod{}, 0, true + } + + // Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available + // and get cached node info by given nodeName. + nodeInfoCopy := f.cachedNodeNameToInfo[node.GetName()].Clone() + + potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} + + removePod := func(rp *v1.Pod) { + nodeInfoCopy.RemovePod(rp) + } + addPod := func(ap *v1.Pod) { + nodeInfoCopy.AddPod(ap) + } + // As the first step, remove all the lower priority pods from the node and + // check if the given pod can be scheduled. + podPriority := util.GetPodPriority(pod) + for _, p := range nodeInfoCopy.Pods() { + if util.GetPodPriority(p) < podPriority { + potentialVictims.Items = append(potentialVictims.Items, p) + removePod(p) + } + } + potentialVictims.Sort() + + // If the new pod does not fit after removing all the lower priority pods, + // we are almost done and this node is not suitable for preemption. + if fits, _ := f.runPredicate(pod, nodeInfoCopy.Node()); !fits { + return nil, 0, false + } + + var victims []*v1.Pod + + // TODO(harry): handle PDBs in the future. + numViolatingVictim := 0 + + reprievePod := func(p *v1.Pod) bool { + addPod(p) + fits, _ := f.runPredicate(pod, nodeInfoCopy.Node()) + if !fits { + removePod(p) + victims = append(victims, p) + } + return fits + } + + // For now, assume all potential victims to be non-violating. + // Now we try to reprieve non-violating victims. + for _, p := range potentialVictims.Items { + reprievePod(p.(*v1.Pod)) + } + + return victims, numViolatingVictim, true +} + +func (f *FakeExtender) SupportsPreemption() bool { + // Assume preempt verb is always defined. + return true +} + +func (f *FakeExtender) ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{} + // We don't want to change the original nodeToVictims + for k, v := range nodeToVictims { + nodeToVictimsCopy[k] = v + } + + for node, victims := range nodeToVictimsCopy { + // Try to do preemption on extender side. + extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node) + // If it's unfit after extender's preemption, this node is unresolvable by preemption overall, + // let's remove it from potential preemption nodes. + if !fits { + delete(nodeToVictimsCopy, node) + } else { + // Append new victims to original victims + nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...) + nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations + } + } + return nodeToVictimsCopy, nil +} + +// runPredicate run predicates of extender one by one for given pod and node. +// Returns: fits or not. +func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) { + fits := true + var err error + for _, predicate := range f.predicates { + fits, err = predicate(pod, node) + if err != nil { + return false, err + } + if !fits { + break + } + } + return fits, nil } func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { filtered := []*v1.Node{} failedNodesMap := schedulerapi.FailedNodesMap{} for _, node := range nodes { - fits := true - for _, predicate := range f.predicates { - fit, err := predicate(pod, node) - if err != nil { - return []*v1.Node{}, schedulerapi.FailedNodesMap{}, err - } - if !fit { - fits = false - break - } + fits, err := f.runPredicate(pod, node) + if err != nil { + return []*v1.Node{}, schedulerapi.FailedNodesMap{}, err } if fits { filtered = append(filtered, node) @@ -340,7 +460,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { } cache := schedulercache.New(time.Duration(0), wait.NeverStop) for _, name := range test.nodes { - cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) + cache.AddNode(createNode(name)) } queue := NewSchedulingQueue() scheduler := NewGenericScheduler( @@ -362,3 +482,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { } } } + +func createNode(name string) *v1.Node { + return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}} +} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index f7e9cddfd2..6f3a9a323c 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -54,13 +54,6 @@ type FitError struct { FailedPredicates FailedPredicateMap } -// Victims describes pod victims. -type Victims struct { - pods []*v1.Pod - numPDBViolations int -} - -// ErrNoNodesAvailable defines an error of no nodes available. var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") const ( @@ -234,34 +227,64 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, if err != nil { return nil, nil, nil, err } - nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) + nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, + g.predicateMetaProducer, g.schedulingQueue, pdbs) if err != nil { return nil, nil, nil, err } - for len(nodeToVictims) > 0 { - node := pickOneNodeForPreemption(nodeToVictims) - if node == nil { - return nil, nil, nil, err - } - passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToVictims[node].pods, g.cachedNodeInfoMap, g.extenders) - if passes && pErr == nil { - // Lower priority pods nominated to run on this node, may no longer fit on - // this node. So, we should remove their nomination. Removing their - // nomination updates these pods and moves them to the active queue. It - // lets scheduler find another place for them. - nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name) - return node, nodeToVictims[node].pods, nominatedPods, err - } - if pErr != nil { - glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr) - } - // Remove the node from the map and try to pick a different node. - delete(nodeToVictims, node) + + // We will only check nodeToVictims with extenders that support preemption. + // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated + // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. + nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) + if err != nil { + return nil, nil, nil, err + } + + candidateNode := pickOneNodeForPreemption(nodeToVictims) + if candidateNode == nil { + return nil, nil, nil, err + } + + // Lower priority pods nominated to run on this node, may no longer fit on + // this node. So, we should remove their nomination. Removing their + // nomination updates these pods and moves them to the active queue. It + // lets scheduler find another place for them. + nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) + if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok { + return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err + } else { + return nil, nil, nil, fmt.Errorf( + "preemption failed: the target node %s has been deleted from scheduler cache", + candidateNode.Name) } - return nil, nil, nil, err } -// GetLowerPriorityNominatedPods returns pods whose priority is smaller than the +// processPreemptionWithExtenders processes preemption with extenders +func (g *genericScheduler) processPreemptionWithExtenders( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, +) (map[*v1.Node]*schedulerapi.Victims, error) { + if len(nodeToVictims) > 0 { + for _, extender := range g.extenders { + if extender.SupportsPreemption() { + var err error + // Replace nodeToVictims with result after preemption from extender. + if nodeToVictims, err = extender.ProcessPreemption(pod, nodeToVictims, g.cachedNodeInfoMap); err != nil { + return nil, err + } + // If node list is empty, no preemption will happen, skip other extenders. + if len(nodeToVictims) == 0 { + break + } + } + } + } + + return nodeToVictims, nil +} + +// getLowerPriorityNominatedPods returns pods whose priority is smaller than the // priority of the given "pod" and are nominated to run on the given node. // Note: We could possibly check if the nominated lower priority pods still fit // and return those that no longer fit, but that would require lots of @@ -270,6 +293,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, // small number of nominated pods per node. func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { pods := g.schedulingQueue.WaitingPodsForNode(nodeName) + if len(pods) == 0 { return nil } @@ -683,7 +707,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf // 5. If there are still ties, the first such node is picked (sort of randomly). // The 'minNodes1' and 'minNodes2' are being reused here to save the memory // allocation and garbage collection time. -func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { +func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node { if len(nodesToVictims) == 0 { return nil } @@ -691,14 +715,14 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { var minNodes1 []*v1.Node lenNodes1 := 0 for node, victims := range nodesToVictims { - if len(victims.pods) == 0 { + if len(victims.Pods) == 0 { // We found a node that doesn't need any preemption. Return it! // This should happen rarely when one or more pods are terminated between // the time that scheduler tries to schedule the pod and the time that // preemption logic tries to find nodes for preemption. return node } - numPDBViolatingPods := victims.numPDBViolations + numPDBViolatingPods := victims.NumPDBViolations if numPDBViolatingPods < minNumPDBViolatingPods { minNumPDBViolatingPods = numPDBViolatingPods minNodes1 = nil @@ -722,7 +746,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { node := minNodes1[i] victims := nodesToVictims[node] // highestPodPriority is the highest priority among the victims on this node. - highestPodPriority := util.GetPodPriority(victims.pods[0]) + highestPodPriority := util.GetPodPriority(victims.Pods[0]) if highestPodPriority < minHighestPriority { minHighestPriority = highestPodPriority lenNodes2 = 0 @@ -743,7 +767,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { for i := 0; i < lenNodes2; i++ { var sumPriorities int64 node := minNodes2[i] - for _, pod := range nodesToVictims[node].pods { + for _, pod := range nodesToVictims[node].Pods { // We add MaxInt32+1 to all priorities to make all of them >= 0. This is // needed so that a node with a few pods with negative priority is not // picked over a node with a smaller number of pods with the same negative @@ -769,7 +793,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { lenNodes2 = 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] - numPods := len(nodesToVictims[node].pods) + numPods := len(nodesToVictims[node].Pods) if numPods < minNumPods { minNumPods = numPods lenNodes2 = 0 @@ -797,9 +821,9 @@ func selectNodesForPreemption(pod *v1.Pod, metadataProducer algorithm.PredicateMetadataProducer, queue SchedulingQueue, pdbs []*policy.PodDisruptionBudget, -) (map[*v1.Node]*Victims, error) { +) (map[*v1.Node]*schedulerapi.Victims, error) { - nodeNameToVictims := map[*v1.Node]*Victims{} + nodeToVictims := map[*v1.Node]*schedulerapi.Victims{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. @@ -813,50 +837,16 @@ func selectNodesForPreemption(pod *v1.Pod, pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs) if fits { resultLock.Lock() - victims := Victims{ - pods: pods, - numPDBViolations: numPDBViolations, + victims := schedulerapi.Victims{ + Pods: pods, + NumPDBViolations: numPDBViolations, } - nodeNameToVictims[potentialNodes[i]] = &victims + nodeToVictims[potentialNodes[i]] = &victims resultLock.Unlock() } } workqueue.Parallelize(16, len(potentialNodes), checkNode) - return nodeNameToVictims, nil -} - -func nodePassesExtendersForPreemption( - pod *v1.Pod, - nodeName string, - victims []*v1.Pod, - nodeNameToInfo map[string]*schedulercache.NodeInfo, - extenders []algorithm.SchedulerExtender) (bool, error) { - // If there are any extenders, run them and filter the list of candidate nodes. - if len(extenders) == 0 { - return true, nil - } - // Remove the victims from the corresponding nodeInfo and send nodes to the - // extenders for filtering. - originalNodeInfo := nodeNameToInfo[nodeName] - nodeInfoCopy := nodeNameToInfo[nodeName].Clone() - for _, victim := range victims { - nodeInfoCopy.RemovePod(victim) - } - nodeNameToInfo[nodeName] = nodeInfoCopy - defer func() { nodeNameToInfo[nodeName] = originalNodeInfo }() - filteredNodes := []*v1.Node{nodeInfoCopy.Node()} - for _, extender := range extenders { - var err error - var failedNodesMap map[string]string - filteredNodes, failedNodesMap, err = extender.Filter(pod, filteredNodes, nodeNameToInfo) - if err != nil { - return false, err - } - if _, found := failedNodesMap[nodeName]; found || len(filteredNodes) == 0 { - return false, nil - } - } - return true, nil + return nodeToVictims, nil } // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" @@ -996,6 +986,7 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat // (which is the case today), the !found case should never happen, but we'd prefer // to rely less on such assumptions in the code when checking does not impose // significant overhead. + // Also, we currently assume all failures returned by extender as resolvable. for _, failedPredicate := range failedPredicates { switch failedPredicate { case diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 9df65890f2..eaca9831f7 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -660,11 +660,11 @@ func TestZeroRequest(t *testing.T) { } } -func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string { +func printNodeToVictims(nodeToVictims map[*v1.Node]*schedulerapi.Victims) string { var output string for node, victims := range nodeToVictims { output += node.Name + ": [" - for _, pod := range victims.pods { + for _, pod := range victims.Pods { output += pod.Name + ", " } output += "]" @@ -672,15 +672,15 @@ func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string { return output } -func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*Victims) error { +func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*schedulerapi.Victims) error { if len(expected) == len(nodeToPods) { for k, victims := range nodeToPods { if expPods, ok := expected[k.Name]; ok { - if len(victims.pods) != len(expPods) { + if len(victims.Pods) != len(expPods) { return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods)) } prevPriority := int32(math.MaxInt32) - for _, p := range victims.pods { + for _, p := range victims.Pods { // Check that pods are sorted by their priority. if *p.Spec.Priority > prevPriority { return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k) @@ -1284,11 +1284,20 @@ func TestPreempt(t *testing.T) { for _, pod := range test.pods { cache.AddPod(pod) } + cachedNodeInfoMap := map[string]*schedulercache.NodeInfo{} for _, name := range nodeNames { - cache.AddNode(makeNode(name, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5)) + node := makeNode(name, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5) + cache.AddNode(node) + + // Set nodeInfo to extenders to mock extenders' cache for preemption. + cachedNodeInfo := schedulercache.NewNodeInfo() + cachedNodeInfo.SetNode(node) + cachedNodeInfoMap[name] = cachedNodeInfo } extenders := []algorithm.SchedulerExtender{} for _, extender := range test.extenders { + // Set nodeInfoMap as extenders cached node information. + extender.cachedNodeNameToInfo = cachedNodeInfoMap extenders = append(extenders, extender) } scheduler := NewGenericScheduler( diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index 38369f960d..6bb0154c82 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -130,7 +130,7 @@ func (e *Extender) filterUsingNodeCache(args *schedulerapi.ExtenderArgs) (*sched for _, nodeName := range *args.NodeNames { fits := true for _, predicate := range e.predicates { - fit, err := predicate(&args.Pod, + fit, err := predicate(args.Pod, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}) if err != nil { return &schedulerapi.ExtenderFilterResult{ @@ -169,7 +169,7 @@ func (e *Extender) Filter(args *schedulerapi.ExtenderArgs) (*schedulerapi.Extend for _, node := range args.Nodes.Items { fits := true for _, predicate := range e.predicates { - fit, err := predicate(&args.Pod, &node) + fit, err := predicate(args.Pod, &node) if err != nil { return &schedulerapi.ExtenderFilterResult{ Nodes: &v1.NodeList{}, @@ -217,7 +217,7 @@ func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.Ho continue } priorityFunc := prioritizer.function - prioritizedList, err := priorityFunc(&args.Pod, nodes) + prioritizedList, err := priorityFunc(args.Pod, nodes) if err != nil { return &schedulerapi.HostPriorityList{}, err }