mirror of https://github.com/k3s-io/k3s
Merge pull request #58717 from resouer/extender-interface
Automatic merge from submit-queue (batch tested with PRs 60759, 60531, 60923, 60851, 58717). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Implement preemption for extender with a verb and new interface **What this PR does / why we need it**: This is an alternative way of implementing #51656 **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #51656 **Special notes for your reviewer**: We will also want to compare with #56296 to see which one is the best solution. See: https://github.com/kubernetes/kubernetes/pull/56296#discussion_r163381235 cc @ravigadde @bsalamat **Release note**: ```release-note Implement preemption for extender with a verb and new interface ```pull/8/head
commit
14e3efe26a
|
@ -29,7 +29,9 @@ type SchedulerExtender interface {
|
|||
// Filter based on extender-implemented predicate functions. The filtered list is
|
||||
// expected to be a subset of the supplied list. failedNodesMap optionally contains
|
||||
// the list of failed nodes and failure reasons.
|
||||
Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
|
||||
Filter(pod *v1.Pod,
|
||||
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
|
||||
|
||||
// Prioritize based on extender-implemented priority functions. The returned scores & weight
|
||||
// are used to compute the weighted score for an extender. The weighted scores are added to
|
||||
|
@ -45,6 +47,23 @@ type SchedulerExtender interface {
|
|||
// IsInterested returns true if at least one extended resource requested by
|
||||
// this pod is managed by this extender.
|
||||
IsInterested(pod *v1.Pod) bool
|
||||
|
||||
// ProcessPreemption returns nodes with their victim pods processed by extender based on
|
||||
// given:
|
||||
// 1. Pod to schedule
|
||||
// 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process.
|
||||
// 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled.
|
||||
// The possible changes made by extender may include:
|
||||
// 1. Subset of given candidate nodes after preemption phase of extender.
|
||||
// 2. A different set of victim pod for every given candidate node after preemption phase of extender.
|
||||
ProcessPreemption(
|
||||
pod *v1.Pod,
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error)
|
||||
|
||||
// SupportsPreemption returns if the scheduler extender support preemption or not.
|
||||
SupportsPreemption() bool
|
||||
}
|
||||
|
||||
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
|
||||
|
|
|
@ -168,6 +168,8 @@ type ExtenderConfig struct {
|
|||
URLPrefix string
|
||||
// Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender.
|
||||
FilterVerb string
|
||||
// Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender.
|
||||
PreemptVerb string
|
||||
// Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender.
|
||||
PrioritizeVerb string
|
||||
// The numeric multiplier for the node scores that the prioritize call generates.
|
||||
|
@ -200,11 +202,48 @@ type ExtenderConfig struct {
|
|||
ManagedResources []ExtenderManagedResource
|
||||
}
|
||||
|
||||
// ExtenderPreemptionResult represents the result returned by preemption phase of extender.
|
||||
type ExtenderPreemptionResult struct {
|
||||
NodeNameToMetaVictims map[string]*MetaVictims
|
||||
}
|
||||
|
||||
// ExtenderPreemptionArgs represents the arguments needed by the extender to preempt pods on nodes.
|
||||
type ExtenderPreemptionArgs struct {
|
||||
// Pod being scheduled
|
||||
Pod *v1.Pod
|
||||
// Victims map generated by scheduler preemption phase
|
||||
// Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeNameToVictims.
|
||||
NodeNameToVictims map[string]*Victims
|
||||
NodeNameToMetaVictims map[string]*MetaVictims
|
||||
}
|
||||
|
||||
// Victims represents:
|
||||
// pods: a group of pods expected to be preempted.
|
||||
// numPDBViolations: the count of violations of PodDisruptionBudget
|
||||
type Victims struct {
|
||||
Pods []*v1.Pod
|
||||
NumPDBViolations int
|
||||
}
|
||||
|
||||
// MetaPod represent identifier for a v1.Pod
|
||||
type MetaPod struct {
|
||||
UID string
|
||||
}
|
||||
|
||||
// MetaVictims represents:
|
||||
// pods: a group of pods expected to be preempted.
|
||||
// Only Pod identifiers will be sent and user are expect to get v1.Pod in their own way.
|
||||
// numPDBViolations: the count of violations of PodDisruptionBudget
|
||||
type MetaVictims struct {
|
||||
Pods []*MetaPod
|
||||
NumPDBViolations int
|
||||
}
|
||||
|
||||
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize
|
||||
// nodes for a pod.
|
||||
type ExtenderArgs struct {
|
||||
// Pod being scheduled
|
||||
Pod v1.Pod
|
||||
Pod *v1.Pod
|
||||
// List of candidate nodes where the pod can be scheduled; to be populated
|
||||
// only if ExtenderConfig.NodeCacheCapable == false
|
||||
Nodes *v1.NodeList
|
||||
|
|
|
@ -142,6 +142,8 @@ type ExtenderConfig struct {
|
|||
URLPrefix string `json:"urlPrefix"`
|
||||
// Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender.
|
||||
FilterVerb string `json:"filterVerb,omitempty"`
|
||||
// Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender.
|
||||
PreemptVerb string `json:"preemptVerb,omitempty"`
|
||||
// Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender.
|
||||
PrioritizeVerb string `json:"prioritizeVerb,omitempty"`
|
||||
// The numeric multiplier for the node scores that the prioritize call generates.
|
||||
|
@ -178,7 +180,7 @@ type ExtenderConfig struct {
|
|||
// nodes for a pod.
|
||||
type ExtenderArgs struct {
|
||||
// Pod being scheduled
|
||||
Pod apiv1.Pod `json:"pod"`
|
||||
Pod *apiv1.Pod `json:"pod"`
|
||||
// List of candidate nodes where the pod can be scheduled; to be populated
|
||||
// only if ExtenderConfig.NodeCacheCapable == false
|
||||
Nodes *apiv1.NodeList `json:"nodes,omitempty"`
|
||||
|
@ -187,6 +189,43 @@ type ExtenderArgs struct {
|
|||
NodeNames *[]string `json:"nodenames,omitempty"`
|
||||
}
|
||||
|
||||
// ExtenderPreemptionResult represents the result returned by preemption phase of extender.
|
||||
type ExtenderPreemptionResult struct {
|
||||
NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"`
|
||||
}
|
||||
|
||||
// ExtenderPreemptionArgs represents the arguments needed by the extender to preempt pods on nodes.
|
||||
type ExtenderPreemptionArgs struct {
|
||||
// Pod being scheduled
|
||||
Pod *apiv1.Pod `json:"pod"`
|
||||
// Victims map generated by scheduler preemption phase
|
||||
// Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeNameToVictims.
|
||||
NodeNameToVictims map[string]*Victims `json:"nodeToVictims,omitempty"`
|
||||
NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"`
|
||||
}
|
||||
|
||||
// Victims represents:
|
||||
// pods: a group of pods expected to be preempted.
|
||||
// numPDBViolations: the count of violations of PodDisruptionBudget
|
||||
type Victims struct {
|
||||
Pods []*apiv1.Pod `json:"pods"`
|
||||
NumPDBViolations int `json:"numPDBViolations"`
|
||||
}
|
||||
|
||||
// MetaPod represent identifier for a v1.Pod
|
||||
type MetaPod struct {
|
||||
UID string `json:"uid"`
|
||||
}
|
||||
|
||||
// MetaVictims represents:
|
||||
// pods: a group of pods expected to be preempted.
|
||||
// Only Pod identifiers will be sent and user are expect to get v1.Pod in their own way.
|
||||
// numPDBViolations: the count of violations of PodDisruptionBudget
|
||||
type MetaVictims struct {
|
||||
Pods []*MetaPod `json:"pods"`
|
||||
NumPDBViolations int `json:"numPDBViolations"`
|
||||
}
|
||||
|
||||
// FailedNodesMap represents the filtered out nodes, with node names and failure messages
|
||||
type FailedNodesMap map[string]string
|
||||
|
||||
|
|
|
@ -29,7 +29,15 @@ import (
|
|||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderArgs) DeepCopyInto(out *ExtenderArgs) {
|
||||
*out = *in
|
||||
in.Pod.DeepCopyInto(&out.Pod)
|
||||
if in.Pod != nil {
|
||||
in, out := &in.Pod, &out.Pod
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(core_v1.Pod)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
}
|
||||
if in.Nodes != nil {
|
||||
in, out := &in.Nodes, &out.Nodes
|
||||
if *in == nil {
|
||||
|
@ -188,6 +196,83 @@ func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) {
|
||||
*out = *in
|
||||
if in.Pod != nil {
|
||||
in, out := &in.Pod, &out.Pod
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(core_v1.Pod)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
}
|
||||
if in.NodeNameToVictims != nil {
|
||||
in, out := &in.NodeNameToVictims, &out.NodeNameToVictims
|
||||
*out = make(map[string]*Victims, len(*in))
|
||||
for key, val := range *in {
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(Victims)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
}
|
||||
}
|
||||
}
|
||||
if in.NodeNameToMetaVictims != nil {
|
||||
in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims
|
||||
*out = make(map[string]*MetaVictims, len(*in))
|
||||
for key, val := range *in {
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(MetaVictims)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionArgs.
|
||||
func (in *ExtenderPreemptionArgs) DeepCopy() *ExtenderPreemptionArgs {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(ExtenderPreemptionArgs)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) {
|
||||
*out = *in
|
||||
if in.NodeNameToMetaVictims != nil {
|
||||
in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims
|
||||
*out = make(map[string]*MetaVictims, len(*in))
|
||||
for key, val := range *in {
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(MetaVictims)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionResult.
|
||||
func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(ExtenderPreemptionResult)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) {
|
||||
{
|
||||
|
@ -283,6 +368,50 @@ func (in *LabelsPresence) DeepCopy() *LabelsPresence {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *MetaPod) DeepCopyInto(out *MetaPod) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaPod.
|
||||
func (in *MetaPod) DeepCopy() *MetaPod {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(MetaPod)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *MetaVictims) DeepCopyInto(out *MetaVictims) {
|
||||
*out = *in
|
||||
if in.Pods != nil {
|
||||
in, out := &in.Pods, &out.Pods
|
||||
*out = make([]*MetaPod, len(*in))
|
||||
for i := range *in {
|
||||
if (*in)[i] == nil {
|
||||
(*out)[i] = nil
|
||||
} else {
|
||||
(*out)[i] = new(MetaPod)
|
||||
(*in)[i].DeepCopyInto((*out)[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaVictims.
|
||||
func (in *MetaVictims) DeepCopy() *MetaVictims {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(MetaVictims)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *Policy) DeepCopyInto(out *Policy) {
|
||||
*out = *in
|
||||
|
@ -483,3 +612,31 @@ func (in *ServiceAntiAffinity) DeepCopy() *ServiceAntiAffinity {
|
|||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *Victims) DeepCopyInto(out *Victims) {
|
||||
*out = *in
|
||||
if in.Pods != nil {
|
||||
in, out := &in.Pods, &out.Pods
|
||||
*out = make([]*core_v1.Pod, len(*in))
|
||||
for i := range *in {
|
||||
if (*in)[i] == nil {
|
||||
(*out)[i] = nil
|
||||
} else {
|
||||
(*out)[i] = new(core_v1.Pod)
|
||||
(*in)[i].DeepCopyInto((*out)[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Victims.
|
||||
func (in *Victims) DeepCopy() *Victims {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(Victims)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
|
|
@ -61,6 +61,10 @@ func TestValidatePolicy(t *testing.T) {
|
|||
policy: api.Policy{ExtenderConfigs: []api.ExtenderConfig{{URLPrefix: "http://127.0.0.1:8081/extender", FilterVerb: "filter"}}},
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
policy: api.Policy{ExtenderConfigs: []api.ExtenderConfig{{URLPrefix: "http://127.0.0.1:8081/extender", PreemptVerb: "preempt"}}},
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
policy: api.Policy{
|
||||
ExtenderConfigs: []api.ExtenderConfig{
|
||||
|
|
|
@ -29,7 +29,15 @@ import (
|
|||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderArgs) DeepCopyInto(out *ExtenderArgs) {
|
||||
*out = *in
|
||||
in.Pod.DeepCopyInto(&out.Pod)
|
||||
if in.Pod != nil {
|
||||
in, out := &in.Pod, &out.Pod
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(v1.Pod)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
}
|
||||
if in.Nodes != nil {
|
||||
in, out := &in.Nodes, &out.Nodes
|
||||
if *in == nil {
|
||||
|
@ -188,6 +196,83 @@ func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) {
|
||||
*out = *in
|
||||
if in.Pod != nil {
|
||||
in, out := &in.Pod, &out.Pod
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(v1.Pod)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
}
|
||||
if in.NodeNameToVictims != nil {
|
||||
in, out := &in.NodeNameToVictims, &out.NodeNameToVictims
|
||||
*out = make(map[string]*Victims, len(*in))
|
||||
for key, val := range *in {
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(Victims)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
}
|
||||
}
|
||||
}
|
||||
if in.NodeNameToMetaVictims != nil {
|
||||
in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims
|
||||
*out = make(map[string]*MetaVictims, len(*in))
|
||||
for key, val := range *in {
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(MetaVictims)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionArgs.
|
||||
func (in *ExtenderPreemptionArgs) DeepCopy() *ExtenderPreemptionArgs {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(ExtenderPreemptionArgs)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) {
|
||||
*out = *in
|
||||
if in.NodeNameToMetaVictims != nil {
|
||||
in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims
|
||||
*out = make(map[string]*MetaVictims, len(*in))
|
||||
for key, val := range *in {
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(MetaVictims)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionResult.
|
||||
func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(ExtenderPreemptionResult)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) {
|
||||
{
|
||||
|
@ -283,6 +368,50 @@ func (in *LabelsPresence) DeepCopy() *LabelsPresence {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *MetaPod) DeepCopyInto(out *MetaPod) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaPod.
|
||||
func (in *MetaPod) DeepCopy() *MetaPod {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(MetaPod)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *MetaVictims) DeepCopyInto(out *MetaVictims) {
|
||||
*out = *in
|
||||
if in.Pods != nil {
|
||||
in, out := &in.Pods, &out.Pods
|
||||
*out = make([]*MetaPod, len(*in))
|
||||
for i := range *in {
|
||||
if (*in)[i] == nil {
|
||||
(*out)[i] = nil
|
||||
} else {
|
||||
(*out)[i] = new(MetaPod)
|
||||
(*in)[i].DeepCopyInto((*out)[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaVictims.
|
||||
func (in *MetaVictims) DeepCopy() *MetaVictims {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(MetaVictims)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *Policy) DeepCopyInto(out *Policy) {
|
||||
*out = *in
|
||||
|
@ -483,3 +612,31 @@ func (in *ServiceAntiAffinity) DeepCopy() *ServiceAntiAffinity {
|
|||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *Victims) DeepCopyInto(out *Victims) {
|
||||
*out = *in
|
||||
if in.Pods != nil {
|
||||
in, out := &in.Pods, &out.Pods
|
||||
*out = make([]*v1.Pod, len(*in))
|
||||
for i := range *in {
|
||||
if (*in)[i] == nil {
|
||||
(*out)[i] = nil
|
||||
} else {
|
||||
(*out)[i] = new(v1.Pod)
|
||||
(*in)[i].DeepCopyInto((*out)[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Victims.
|
||||
func (in *Victims) DeepCopy() *Victims {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(Victims)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ go_test(
|
|||
"//vendor/k8s.io/api/apps/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
|
|
|
@ -41,6 +41,7 @@ const (
|
|||
// HTTPExtender implements the algorithm.SchedulerExtender interface.
|
||||
type HTTPExtender struct {
|
||||
extenderURL string
|
||||
preemptVerb string
|
||||
filterVerb string
|
||||
prioritizeVerb string
|
||||
bindVerb string
|
||||
|
@ -93,6 +94,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx
|
|||
}
|
||||
return &HTTPExtender{
|
||||
extenderURL: config.URLPrefix,
|
||||
preemptVerb: config.PreemptVerb,
|
||||
filterVerb: config.FilterVerb,
|
||||
prioritizeVerb: config.PrioritizeVerb,
|
||||
bindVerb: config.BindVerb,
|
||||
|
@ -103,10 +105,139 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx
|
|||
}, nil
|
||||
}
|
||||
|
||||
// SupportsPreemption returns if a extender support preemption.
|
||||
// A extender should have preempt verb defined and enabled its own node cache.
|
||||
func (h *HTTPExtender) SupportsPreemption() bool {
|
||||
return len(h.preemptVerb) > 0
|
||||
}
|
||||
|
||||
// ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender.
|
||||
func (h *HTTPExtender) ProcessPreemption(
|
||||
pod *v1.Pod,
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
var (
|
||||
result schedulerapi.ExtenderPreemptionResult
|
||||
args *schedulerapi.ExtenderPreemptionArgs
|
||||
)
|
||||
|
||||
if !h.SupportsPreemption() {
|
||||
return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL)
|
||||
}
|
||||
|
||||
if h.nodeCacheCapable {
|
||||
// If extender has cached node info, pass NodeNameToMetaVictims in args.
|
||||
nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims)
|
||||
args = &schedulerapi.ExtenderPreemptionArgs{
|
||||
Pod: pod,
|
||||
NodeNameToMetaVictims: nodeNameToMetaVictims,
|
||||
}
|
||||
} else {
|
||||
nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims)
|
||||
args = &schedulerapi.ExtenderPreemptionArgs{
|
||||
Pod: pod,
|
||||
NodeNameToVictims: nodeNameToVictims,
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.send(h.preemptVerb, args, &result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Extender will always return NodeNameToMetaVictims.
|
||||
// So let's convert it to NodeToVictims by using NodeNameToInfo.
|
||||
nodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nodeToVictims, nil
|
||||
}
|
||||
|
||||
// convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers,
|
||||
// such as UIDs and names, to object pointers.
|
||||
func (h *HTTPExtender) convertToNodeToVictims(
|
||||
nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
|
||||
for nodeName, metaVictims := range nodeNameToMetaVictims {
|
||||
victims := &schedulerapi.Victims{
|
||||
Pods: []*v1.Pod{},
|
||||
}
|
||||
for _, metaPod := range metaVictims.Pods {
|
||||
pod, err := h.convertPodUIDToPod(metaPod, nodeName, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
victims.Pods = append(victims.Pods, pod)
|
||||
}
|
||||
nodeToVictims[nodeNameToInfo[nodeName].Node()] = victims
|
||||
}
|
||||
return nodeToVictims, nil
|
||||
}
|
||||
|
||||
// convertPodUIDToPod returns v1.Pod object for given MetaPod and node name.
|
||||
// The v1.Pod object is restored by nodeInfo.Pods().
|
||||
// It should return error if there's cache inconsistency between default scheduler and extender
|
||||
// so that this pod or node is missing from nodeNameToInfo.
|
||||
func (h *HTTPExtender) convertPodUIDToPod(
|
||||
metaPod *schedulerapi.MetaPod,
|
||||
nodeName string,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo) (*v1.Pod, error) {
|
||||
var nodeInfo *schedulercache.NodeInfo
|
||||
if nodeInfo, ok := nodeNameToInfo[nodeName]; ok {
|
||||
for _, pod := range nodeInfo.Pods() {
|
||||
if string(pod.UID) == metaPod.UID {
|
||||
return pod, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
|
||||
h.extenderURL, metaPod, nodeInfo.Node().Name)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map",
|
||||
h.extenderURL, nodeInfo.Node().Name)
|
||||
}
|
||||
|
||||
// convertToNodeNameToMetaVictims converts from struct type to meta types.
|
||||
func convertToNodeNameToMetaVictims(
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
) map[string]*schedulerapi.MetaVictims {
|
||||
nodeNameToVictims := map[string]*schedulerapi.MetaVictims{}
|
||||
for node, victims := range nodeToVictims {
|
||||
metaVictims := &schedulerapi.MetaVictims{
|
||||
Pods: []*schedulerapi.MetaPod{},
|
||||
}
|
||||
for _, pod := range victims.Pods {
|
||||
metaPod := &schedulerapi.MetaPod{
|
||||
UID: string(pod.UID),
|
||||
}
|
||||
metaVictims.Pods = append(metaVictims.Pods, metaPod)
|
||||
}
|
||||
nodeNameToVictims[node.GetName()] = metaVictims
|
||||
}
|
||||
return nodeNameToVictims
|
||||
}
|
||||
|
||||
// convertToNodeNameToVictims converts from node type to node name as key.
|
||||
func convertToNodeNameToVictims(
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
) map[string]*schedulerapi.Victims {
|
||||
nodeNameToVictims := map[string]*schedulerapi.Victims{}
|
||||
for node, victims := range nodeToVictims {
|
||||
nodeNameToVictims[node.GetName()] = victims
|
||||
}
|
||||
return nodeNameToVictims
|
||||
}
|
||||
|
||||
// Filter based on extender implemented predicate functions. The filtered list is
|
||||
// expected to be a subset of the supplied list. failedNodesMap optionally contains
|
||||
// the list of failed nodes and failure reasons.
|
||||
func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
|
||||
func (h *HTTPExtender) Filter(
|
||||
pod *v1.Pod,
|
||||
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
|
||||
var (
|
||||
result schedulerapi.ExtenderFilterResult
|
||||
nodeList *v1.NodeList
|
||||
|
@ -133,7 +264,7 @@ func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[
|
|||
}
|
||||
|
||||
args = &schedulerapi.ExtenderArgs{
|
||||
Pod: *pod,
|
||||
Pod: pod,
|
||||
Nodes: nodeList,
|
||||
NodeNames: nodeNames,
|
||||
}
|
||||
|
@ -193,7 +324,7 @@ func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.
|
|||
}
|
||||
|
||||
args = &schedulerapi.ExtenderArgs{
|
||||
Pod: *pod,
|
||||
Pod: pod,
|
||||
Nodes: nodeList,
|
||||
NodeNames: nodeNames,
|
||||
}
|
||||
|
|
|
@ -22,12 +22,14 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
||||
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
|
||||
|
@ -111,22 +113,145 @@ type FakeExtender struct {
|
|||
nodeCacheCapable bool
|
||||
filteredNodes []*v1.Node
|
||||
unInterested bool
|
||||
|
||||
// Cached node information for fake extender
|
||||
cachedNodeNameToInfo map[string]*schedulercache.NodeInfo
|
||||
cachedPDBs []*policy.PodDisruptionBudget
|
||||
}
|
||||
|
||||
func (f *FakeExtender) SupportsPreemption() bool {
|
||||
// Assume preempt verb is always defined.
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *FakeExtender) ProcessPreemption(
|
||||
pod *v1.Pod,
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{}
|
||||
// We don't want to change the original nodeToVictims
|
||||
for k, v := range nodeToVictims {
|
||||
// In real world implementation, extender's user should have his own way to get node object
|
||||
// by name if needed (e.g. query kube-apiserver etc).
|
||||
//
|
||||
// For test purpose, we just use node from parameters directly.
|
||||
nodeToVictimsCopy[k] = v
|
||||
}
|
||||
|
||||
for node, victims := range nodeToVictimsCopy {
|
||||
// Try to do preemption on extender side.
|
||||
extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node, nodeNameToInfo)
|
||||
// If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
|
||||
// let's remove it from potential preemption nodes.
|
||||
if !fits {
|
||||
delete(nodeToVictimsCopy, node)
|
||||
} else {
|
||||
// Append new victims to original victims
|
||||
nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...)
|
||||
nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations
|
||||
}
|
||||
}
|
||||
return nodeToVictimsCopy, nil
|
||||
}
|
||||
|
||||
// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side.
|
||||
// Returns:
|
||||
// 1. More victim pods (if any) amended by preemption phase of extender.
|
||||
// 2. Number of violating victim (used to calculate PDB).
|
||||
// 3. Fits or not after preemption phase on extender's side.
|
||||
func (f *FakeExtender) selectVictimsOnNodeByExtender(
|
||||
pod *v1.Pod,
|
||||
node *v1.Node,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) ([]*v1.Pod, int, bool) {
|
||||
// TODO(harry): add more test in generic_scheduler_test.go to verify this logic.
|
||||
// If a extender support preemption but have no cached node info, let's run filter to make sure
|
||||
// default scheduler's decision still stand with given pod and node.
|
||||
if !f.nodeCacheCapable {
|
||||
if fits, _ := f.runPredicate(pod, node); !fits {
|
||||
return nil, 0, false
|
||||
}
|
||||
return []*v1.Pod{}, 0, true
|
||||
}
|
||||
|
||||
// Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available
|
||||
// and get cached node info by given node name.
|
||||
nodeInfoCopy := f.cachedNodeNameToInfo[node.GetName()].Clone()
|
||||
|
||||
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
|
||||
|
||||
removePod := func(rp *v1.Pod) {
|
||||
nodeInfoCopy.RemovePod(rp)
|
||||
}
|
||||
addPod := func(ap *v1.Pod) {
|
||||
nodeInfoCopy.AddPod(ap)
|
||||
}
|
||||
// As the first step, remove all the lower priority pods from the node and
|
||||
// check if the given pod can be scheduled.
|
||||
podPriority := util.GetPodPriority(pod)
|
||||
for _, p := range nodeInfoCopy.Pods() {
|
||||
if util.GetPodPriority(p) < podPriority {
|
||||
potentialVictims.Items = append(potentialVictims.Items, p)
|
||||
removePod(p)
|
||||
}
|
||||
}
|
||||
potentialVictims.Sort()
|
||||
|
||||
// If the new pod does not fit after removing all the lower priority pods,
|
||||
// we are almost done and this node is not suitable for preemption.
|
||||
if fits, _ := f.runPredicate(pod, nodeInfoCopy.Node()); !fits {
|
||||
return nil, 0, false
|
||||
}
|
||||
|
||||
var victims []*v1.Pod
|
||||
|
||||
// TODO(harry): handle PDBs in the future.
|
||||
numViolatingVictim := 0
|
||||
|
||||
reprievePod := func(p *v1.Pod) bool {
|
||||
addPod(p)
|
||||
fits, _ := f.runPredicate(pod, nodeInfoCopy.Node())
|
||||
if !fits {
|
||||
removePod(p)
|
||||
victims = append(victims, p)
|
||||
}
|
||||
return fits
|
||||
}
|
||||
|
||||
// For now, assume all potential victims to be non-violating.
|
||||
// Now we try to reprieve non-violating victims.
|
||||
for _, p := range potentialVictims.Items {
|
||||
reprievePod(p.(*v1.Pod))
|
||||
}
|
||||
|
||||
return victims, numViolatingVictim, true
|
||||
}
|
||||
|
||||
// runPredicate run predicates of extender one by one for given pod and node.
|
||||
// Returns: fits or not.
|
||||
func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) {
|
||||
fits := true
|
||||
var err error
|
||||
for _, predicate := range f.predicates {
|
||||
fits, err = predicate(pod, node)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !fits {
|
||||
break
|
||||
}
|
||||
}
|
||||
return fits, nil
|
||||
}
|
||||
|
||||
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
|
||||
filtered := []*v1.Node{}
|
||||
failedNodesMap := schedulerapi.FailedNodesMap{}
|
||||
for _, node := range nodes {
|
||||
fits := true
|
||||
for _, predicate := range f.predicates {
|
||||
fit, err := predicate(pod, node)
|
||||
if err != nil {
|
||||
return []*v1.Node{}, schedulerapi.FailedNodesMap{}, err
|
||||
}
|
||||
if !fit {
|
||||
fits = false
|
||||
break
|
||||
}
|
||||
fits, err := f.runPredicate(pod, node)
|
||||
if err != nil {
|
||||
return []*v1.Node{}, schedulerapi.FailedNodesMap{}, err
|
||||
}
|
||||
if fits {
|
||||
filtered = append(filtered, node)
|
||||
|
@ -340,7 +465,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||
}
|
||||
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
|
||||
for _, name := range test.nodes {
|
||||
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
|
||||
cache.AddNode(createNode(name))
|
||||
}
|
||||
queue := NewSchedulingQueue()
|
||||
scheduler := NewGenericScheduler(
|
||||
|
@ -362,3 +487,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createNode(name string) *v1.Node {
|
||||
return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
||||
}
|
||||
|
|
|
@ -54,13 +54,7 @@ type FitError struct {
|
|||
FailedPredicates FailedPredicateMap
|
||||
}
|
||||
|
||||
// Victims describes pod victims.
|
||||
type Victims struct {
|
||||
pods []*v1.Pod
|
||||
numPDBViolations int
|
||||
}
|
||||
|
||||
// ErrNoNodesAvailable defines an error of no nodes available.
|
||||
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
|
||||
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
|
||||
|
||||
const (
|
||||
|
@ -234,34 +228,64 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
|
|||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs)
|
||||
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
|
||||
g.predicateMetaProducer, g.schedulingQueue, pdbs)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
for len(nodeToVictims) > 0 {
|
||||
node := pickOneNodeForPreemption(nodeToVictims)
|
||||
if node == nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToVictims[node].pods, g.cachedNodeInfoMap, g.extenders)
|
||||
if passes && pErr == nil {
|
||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||
// this node. So, we should remove their nomination. Removing their
|
||||
// nomination updates these pods and moves them to the active queue. It
|
||||
// lets scheduler find another place for them.
|
||||
nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name)
|
||||
return node, nodeToVictims[node].pods, nominatedPods, err
|
||||
}
|
||||
if pErr != nil {
|
||||
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
|
||||
}
|
||||
// Remove the node from the map and try to pick a different node.
|
||||
delete(nodeToVictims, node)
|
||||
|
||||
// We will only check nodeToVictims with extenders that support preemption.
|
||||
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
|
||||
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
||||
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return nil, nil, nil, err
|
||||
|
||||
candidateNode := pickOneNodeForPreemption(nodeToVictims)
|
||||
if candidateNode == nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||
// this node. So, we should remove their nomination. Removing their
|
||||
// nomination updates these pods and moves them to the active queue. It
|
||||
// lets scheduler find another place for them.
|
||||
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
|
||||
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
|
||||
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
|
||||
}
|
||||
|
||||
return nil, nil, nil, fmt.Errorf(
|
||||
"preemption failed: the target node %s has been deleted from scheduler cache",
|
||||
candidateNode.Name)
|
||||
}
|
||||
|
||||
// GetLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
||||
// processPreemptionWithExtenders processes preemption with extenders
|
||||
func (g *genericScheduler) processPreemptionWithExtenders(
|
||||
pod *v1.Pod,
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
if len(nodeToVictims) > 0 {
|
||||
for _, extender := range g.extenders {
|
||||
if extender.SupportsPreemption() {
|
||||
var err error
|
||||
// Replace nodeToVictims with result after preemption from extender.
|
||||
if nodeToVictims, err = extender.ProcessPreemption(pod, nodeToVictims, g.cachedNodeInfoMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// If node list is empty, no preemption will happen, skip other extenders.
|
||||
if len(nodeToVictims) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nodeToVictims, nil
|
||||
}
|
||||
|
||||
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
||||
// priority of the given "pod" and are nominated to run on the given node.
|
||||
// Note: We could possibly check if the nominated lower priority pods still fit
|
||||
// and return those that no longer fit, but that would require lots of
|
||||
|
@ -270,6 +294,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
|
|||
// small number of nominated pods per node.
|
||||
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
|
||||
pods := g.schedulingQueue.WaitingPodsForNode(nodeName)
|
||||
|
||||
if len(pods) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -683,7 +708,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
|
|||
// 5. If there are still ties, the first such node is picked (sort of randomly).
|
||||
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
||||
// allocation and garbage collection time.
|
||||
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
||||
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
|
||||
if len(nodesToVictims) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -691,14 +716,14 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
|||
var minNodes1 []*v1.Node
|
||||
lenNodes1 := 0
|
||||
for node, victims := range nodesToVictims {
|
||||
if len(victims.pods) == 0 {
|
||||
if len(victims.Pods) == 0 {
|
||||
// We found a node that doesn't need any preemption. Return it!
|
||||
// This should happen rarely when one or more pods are terminated between
|
||||
// the time that scheduler tries to schedule the pod and the time that
|
||||
// preemption logic tries to find nodes for preemption.
|
||||
return node
|
||||
}
|
||||
numPDBViolatingPods := victims.numPDBViolations
|
||||
numPDBViolatingPods := victims.NumPDBViolations
|
||||
if numPDBViolatingPods < minNumPDBViolatingPods {
|
||||
minNumPDBViolatingPods = numPDBViolatingPods
|
||||
minNodes1 = nil
|
||||
|
@ -722,7 +747,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
|||
node := minNodes1[i]
|
||||
victims := nodesToVictims[node]
|
||||
// highestPodPriority is the highest priority among the victims on this node.
|
||||
highestPodPriority := util.GetPodPriority(victims.pods[0])
|
||||
highestPodPriority := util.GetPodPriority(victims.Pods[0])
|
||||
if highestPodPriority < minHighestPriority {
|
||||
minHighestPriority = highestPodPriority
|
||||
lenNodes2 = 0
|
||||
|
@ -743,7 +768,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
|||
for i := 0; i < lenNodes2; i++ {
|
||||
var sumPriorities int64
|
||||
node := minNodes2[i]
|
||||
for _, pod := range nodesToVictims[node].pods {
|
||||
for _, pod := range nodesToVictims[node].Pods {
|
||||
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
||||
// needed so that a node with a few pods with negative priority is not
|
||||
// picked over a node with a smaller number of pods with the same negative
|
||||
|
@ -769,7 +794,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
|||
lenNodes2 = 0
|
||||
for i := 0; i < lenNodes1; i++ {
|
||||
node := minNodes1[i]
|
||||
numPods := len(nodesToVictims[node].pods)
|
||||
numPods := len(nodesToVictims[node].Pods)
|
||||
if numPods < minNumPods {
|
||||
minNumPods = numPods
|
||||
lenNodes2 = 0
|
||||
|
@ -797,9 +822,9 @@ func selectNodesForPreemption(pod *v1.Pod,
|
|||
metadataProducer algorithm.PredicateMetadataProducer,
|
||||
queue SchedulingQueue,
|
||||
pdbs []*policy.PodDisruptionBudget,
|
||||
) (map[*v1.Node]*Victims, error) {
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
|
||||
nodeNameToVictims := map[*v1.Node]*Victims{}
|
||||
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
|
||||
var resultLock sync.Mutex
|
||||
|
||||
// We can use the same metadata producer for all nodes.
|
||||
|
@ -813,50 +838,16 @@ func selectNodesForPreemption(pod *v1.Pod,
|
|||
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
|
||||
if fits {
|
||||
resultLock.Lock()
|
||||
victims := Victims{
|
||||
pods: pods,
|
||||
numPDBViolations: numPDBViolations,
|
||||
victims := schedulerapi.Victims{
|
||||
Pods: pods,
|
||||
NumPDBViolations: numPDBViolations,
|
||||
}
|
||||
nodeNameToVictims[potentialNodes[i]] = &victims
|
||||
nodeToVictims[potentialNodes[i]] = &victims
|
||||
resultLock.Unlock()
|
||||
}
|
||||
}
|
||||
workqueue.Parallelize(16, len(potentialNodes), checkNode)
|
||||
return nodeNameToVictims, nil
|
||||
}
|
||||
|
||||
func nodePassesExtendersForPreemption(
|
||||
pod *v1.Pod,
|
||||
nodeName string,
|
||||
victims []*v1.Pod,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
extenders []algorithm.SchedulerExtender) (bool, error) {
|
||||
// If there are any extenders, run them and filter the list of candidate nodes.
|
||||
if len(extenders) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
// Remove the victims from the corresponding nodeInfo and send nodes to the
|
||||
// extenders for filtering.
|
||||
originalNodeInfo := nodeNameToInfo[nodeName]
|
||||
nodeInfoCopy := nodeNameToInfo[nodeName].Clone()
|
||||
for _, victim := range victims {
|
||||
nodeInfoCopy.RemovePod(victim)
|
||||
}
|
||||
nodeNameToInfo[nodeName] = nodeInfoCopy
|
||||
defer func() { nodeNameToInfo[nodeName] = originalNodeInfo }()
|
||||
filteredNodes := []*v1.Node{nodeInfoCopy.Node()}
|
||||
for _, extender := range extenders {
|
||||
var err error
|
||||
var failedNodesMap map[string]string
|
||||
filteredNodes, failedNodesMap, err = extender.Filter(pod, filteredNodes, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if _, found := failedNodesMap[nodeName]; found || len(filteredNodes) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
return nodeToVictims, nil
|
||||
}
|
||||
|
||||
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
|
||||
|
@ -996,6 +987,7 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat
|
|||
// (which is the case today), the !found case should never happen, but we'd prefer
|
||||
// to rely less on such assumptions in the code when checking does not impose
|
||||
// significant overhead.
|
||||
// Also, we currently assume all failures returned by extender as resolvable.
|
||||
for _, failedPredicate := range failedPredicates {
|
||||
switch failedPredicate {
|
||||
case
|
||||
|
|
|
@ -660,11 +660,11 @@ func TestZeroRequest(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string {
|
||||
func printNodeToVictims(nodeToVictims map[*v1.Node]*schedulerapi.Victims) string {
|
||||
var output string
|
||||
for node, victims := range nodeToVictims {
|
||||
output += node.Name + ": ["
|
||||
for _, pod := range victims.pods {
|
||||
for _, pod := range victims.Pods {
|
||||
output += pod.Name + ", "
|
||||
}
|
||||
output += "]"
|
||||
|
@ -672,15 +672,15 @@ func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string {
|
|||
return output
|
||||
}
|
||||
|
||||
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*Victims) error {
|
||||
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*schedulerapi.Victims) error {
|
||||
if len(expected) == len(nodeToPods) {
|
||||
for k, victims := range nodeToPods {
|
||||
if expPods, ok := expected[k.Name]; ok {
|
||||
if len(victims.pods) != len(expPods) {
|
||||
if len(victims.Pods) != len(expPods) {
|
||||
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods))
|
||||
}
|
||||
prevPriority := int32(math.MaxInt32)
|
||||
for _, p := range victims.pods {
|
||||
for _, p := range victims.Pods {
|
||||
// Check that pods are sorted by their priority.
|
||||
if *p.Spec.Priority > prevPriority {
|
||||
return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k)
|
||||
|
@ -883,7 +883,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
nodes := []*v1.Node{}
|
||||
for _, n := range test.nodes {
|
||||
node := makeNode(n, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5)
|
||||
node := makeNode(n, 1000*5, priorityutil.DefaultMemoryRequest*5)
|
||||
node.ObjectMeta.Labels = map[string]string{"hostname": node.Name}
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
|
@ -1282,11 +1282,20 @@ func TestPreempt(t *testing.T) {
|
|||
for _, pod := range test.pods {
|
||||
cache.AddPod(pod)
|
||||
}
|
||||
cachedNodeInfoMap := map[string]*schedulercache.NodeInfo{}
|
||||
for _, name := range nodeNames {
|
||||
cache.AddNode(makeNode(name, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5))
|
||||
node := makeNode(name, 1000*5, priorityutil.DefaultMemoryRequest*5)
|
||||
cache.AddNode(node)
|
||||
|
||||
// Set nodeInfo to extenders to mock extenders' cache for preemption.
|
||||
cachedNodeInfo := schedulercache.NewNodeInfo()
|
||||
cachedNodeInfo.SetNode(node)
|
||||
cachedNodeInfoMap[name] = cachedNodeInfo
|
||||
}
|
||||
extenders := []algorithm.SchedulerExtender{}
|
||||
for _, extender := range test.extenders {
|
||||
// Set nodeInfoMap as extenders cached node information.
|
||||
extender.cachedNodeNameToInfo = cachedNodeInfoMap
|
||||
extenders = append(extenders, extender)
|
||||
}
|
||||
scheduler := NewGenericScheduler(
|
||||
|
|
|
@ -541,11 +541,30 @@ type fakeExtender struct {
|
|||
interestedPodName string
|
||||
}
|
||||
|
||||
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) {
|
||||
func (f *fakeExtender) ProcessPreemption(
|
||||
pod *v1.Pod,
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeExtender) SupportsPreemption() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (f *fakeExtender) Filter(
|
||||
pod *v1.Pod,
|
||||
nodes []*v1.Node,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) {
|
||||
func (f *fakeExtender) Prioritize(
|
||||
pod *v1.Pod,
|
||||
nodes []*v1.Node,
|
||||
) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -130,7 +130,7 @@ func (e *Extender) filterUsingNodeCache(args *schedulerapi.ExtenderArgs) (*sched
|
|||
for _, nodeName := range *args.NodeNames {
|
||||
fits := true
|
||||
for _, predicate := range e.predicates {
|
||||
fit, err := predicate(&args.Pod,
|
||||
fit, err := predicate(args.Pod,
|
||||
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
|
||||
if err != nil {
|
||||
return &schedulerapi.ExtenderFilterResult{
|
||||
|
@ -169,7 +169,7 @@ func (e *Extender) Filter(args *schedulerapi.ExtenderArgs) (*schedulerapi.Extend
|
|||
for _, node := range args.Nodes.Items {
|
||||
fits := true
|
||||
for _, predicate := range e.predicates {
|
||||
fit, err := predicate(&args.Pod, &node)
|
||||
fit, err := predicate(args.Pod, &node)
|
||||
if err != nil {
|
||||
return &schedulerapi.ExtenderFilterResult{
|
||||
Nodes: &v1.NodeList{},
|
||||
|
@ -217,7 +217,7 @@ func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.Ho
|
|||
continue
|
||||
}
|
||||
priorityFunc := prioritizer.function
|
||||
prioritizedList, err := priorityFunc(&args.Pod, nodes)
|
||||
prioritizedList, err := priorityFunc(args.Pod, nodes)
|
||||
if err != nil {
|
||||
return &schedulerapi.HostPriorityList{}, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue