mirror of https://github.com/k3s-io/k3s
Merge pull request #61445 from resouer/extender-priority
Automatic merge from submit-queue (batch tested with PRs 62063, 62169, 62155, 62139, 61445). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Schedule even if extender is not available when using extender **What this PR does / why we need it**: When using scheduler extender, if the extender is not available scheduling of all pods fail. We should let the scheduling happen but display error message that extender is failing. `IsIgnorable()` is added to extender to indicate: if scheduling of all pods should fail when it's unavailable **Backward compabtiility:** We use `IsIgnorable` instead of `IsCritical` so that when this flag is not set, the default value will be `false`, i.e. not ignorable, which consistent with the current behavior in existing extenders. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes: #60616 **Special notes for your reviewer**: kindly cc @ravisantoshgudimetla to see if this meets your expectation TODO: update the examples in kubernetes/examples, but the strategy there is not clear to me for now **Release note**: ```release-note Schedule even if extender is not available when using extender ```pull/8/head
commit
6d8df0c400
|
@ -64,6 +64,10 @@ type SchedulerExtender interface {
|
|||
|
||||
// SupportsPreemption returns if the scheduler extender support preemption or not.
|
||||
SupportsPreemption() bool
|
||||
|
||||
// IsIgnorable returns true indicates scheduling should not fail when this extender
|
||||
// is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well.
|
||||
IsIgnorable() bool
|
||||
}
|
||||
|
||||
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
|
||||
|
|
|
@ -192,6 +192,9 @@ type ExtenderConfig struct {
|
|||
// will skip checking the resource in predicates.
|
||||
// +optional
|
||||
ManagedResources []ExtenderManagedResource
|
||||
// Ignorable specifies if the extender is ignorable, i.e. scheduling should not
|
||||
// fail when the extender returns an error or is not reachable.
|
||||
Ignorable bool
|
||||
}
|
||||
|
||||
// ExtenderPreemptionResult represents the result returned by preemption phase of extender.
|
||||
|
|
|
@ -174,6 +174,9 @@ type ExtenderConfig struct {
|
|||
// will skip checking the resource in predicates.
|
||||
// +optional
|
||||
ManagedResources []ExtenderManagedResource `json:"managedResources,omitempty"`
|
||||
// Ignorable specifies if the extender is ignorable, i.e. scheduling should not
|
||||
// fail when the extender returns an error or is not reachable.
|
||||
Ignorable bool `json:"ignorable,omitempty"`
|
||||
}
|
||||
|
||||
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize
|
||||
|
|
|
@ -49,6 +49,7 @@ type HTTPExtender struct {
|
|||
client *http.Client
|
||||
nodeCacheCapable bool
|
||||
managedResources sets.String
|
||||
ignorable bool
|
||||
}
|
||||
|
||||
func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) {
|
||||
|
@ -102,9 +103,16 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx
|
|||
client: client,
|
||||
nodeCacheCapable: config.NodeCacheCapable,
|
||||
managedResources: managedResources,
|
||||
ignorable: config.Ignorable,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IsIgnorable returns true indicates scheduling should not fail when this extender
|
||||
// is unavailable
|
||||
func (h *HTTPExtender) IsIgnorable() bool {
|
||||
return h.ignorable
|
||||
}
|
||||
|
||||
// SupportsPreemption returns if a extender support preemption.
|
||||
// A extender should have preempt verb defined and enabled its own node cache.
|
||||
func (h *HTTPExtender) SupportsPreemption() bool {
|
||||
|
@ -147,11 +155,12 @@ func (h *HTTPExtender) ProcessPreemption(
|
|||
|
||||
// Extender will always return NodeNameToMetaVictims.
|
||||
// So let's convert it to NodeToVictims by using NodeNameToInfo.
|
||||
nodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo)
|
||||
newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nodeToVictims, nil
|
||||
// Do not override nodeToVictims
|
||||
return newNodeToVictims, nil
|
||||
}
|
||||
|
||||
// convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers,
|
||||
|
|
|
@ -113,12 +113,17 @@ type FakeExtender struct {
|
|||
nodeCacheCapable bool
|
||||
filteredNodes []*v1.Node
|
||||
unInterested bool
|
||||
ignorable bool
|
||||
|
||||
// Cached node information for fake extender
|
||||
cachedNodeNameToInfo map[string]*schedulercache.NodeInfo
|
||||
cachedPDBs []*policy.PodDisruptionBudget
|
||||
}
|
||||
|
||||
func (f *FakeExtender) IsIgnorable() bool {
|
||||
return f.ignorable
|
||||
}
|
||||
|
||||
func (f *FakeExtender) SupportsPreemption() bool {
|
||||
// Assume preempt verb is always defined.
|
||||
return true
|
||||
|
@ -141,7 +146,10 @@ func (f *FakeExtender) ProcessPreemption(
|
|||
|
||||
for node, victims := range nodeToVictimsCopy {
|
||||
// Try to do preemption on extender side.
|
||||
extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node, nodeNameToInfo)
|
||||
extenderVictimPods, extendernPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, node, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
|
||||
// let's remove it from potential preemption nodes.
|
||||
if !fits {
|
||||
|
@ -164,15 +172,18 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(
|
|||
pod *v1.Pod,
|
||||
node *v1.Node,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
) ([]*v1.Pod, int, bool) {
|
||||
// TODO(harry): add more test in generic_scheduler_test.go to verify this logic.
|
||||
) ([]*v1.Pod, int, bool, error) {
|
||||
// If a extender support preemption but have no cached node info, let's run filter to make sure
|
||||
// default scheduler's decision still stand with given pod and node.
|
||||
if !f.nodeCacheCapable {
|
||||
if fits, _ := f.runPredicate(pod, node); !fits {
|
||||
return nil, 0, false
|
||||
fits, err := f.runPredicate(pod, node)
|
||||
if err != nil {
|
||||
return nil, 0, false, err
|
||||
}
|
||||
return []*v1.Pod{}, 0, true
|
||||
if !fits {
|
||||
return nil, 0, false, nil
|
||||
}
|
||||
return []*v1.Pod{}, 0, true, nil
|
||||
}
|
||||
|
||||
// Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available
|
||||
|
@ -200,8 +211,12 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(
|
|||
|
||||
// If the new pod does not fit after removing all the lower priority pods,
|
||||
// we are almost done and this node is not suitable for preemption.
|
||||
if fits, _ := f.runPredicate(pod, nodeInfoCopy.Node()); !fits {
|
||||
return nil, 0, false
|
||||
fits, err := f.runPredicate(pod, nodeInfoCopy.Node())
|
||||
if err != nil {
|
||||
return nil, 0, false, err
|
||||
}
|
||||
if !fits {
|
||||
return nil, 0, false, nil
|
||||
}
|
||||
|
||||
var victims []*v1.Pod
|
||||
|
@ -225,7 +240,7 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(
|
|||
reprievePod(p.(*v1.Pod))
|
||||
}
|
||||
|
||||
return victims, numViolatingVictim, true
|
||||
return victims, numViolatingVictim, true, nil
|
||||
}
|
||||
|
||||
// runPredicate run predicates of extender one by one for given pod and node.
|
||||
|
@ -439,7 +454,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||
// Filter/Prioritize phases if the extender is not interested in
|
||||
// the pod.
|
||||
//
|
||||
// If scheduler sends the pod by mistake, the test will fail
|
||||
// If scheduler sends the pod by mistake, the test would fail
|
||||
// because of the errors from errorPredicateExtender and/or
|
||||
// errorPrioritizerExtender.
|
||||
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
|
||||
|
@ -456,6 +471,28 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||
expectedHost: "machine2", // machine2 has higher score
|
||||
name: "test 8",
|
||||
},
|
||||
{
|
||||
// Scheduling is expected to not fail in
|
||||
// Filter/Prioritize phases if the extender is not available and ignorable.
|
||||
//
|
||||
// If scheduler did not ignore the extender, the test would fail
|
||||
// because of the errors from errorPredicateExtender.
|
||||
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
|
||||
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{errorPredicateExtender},
|
||||
ignorable: true,
|
||||
},
|
||||
{
|
||||
predicates: []fitPredicate{machine1PredicateExtender},
|
||||
},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
expectsErr: false,
|
||||
expectedHost: "machine1",
|
||||
name: "test 9",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
|
|
@ -269,12 +269,25 @@ func (g *genericScheduler) processPreemptionWithExtenders(
|
|||
if len(nodeToVictims) > 0 {
|
||||
for _, extender := range g.extenders {
|
||||
if extender.SupportsPreemption() {
|
||||
var err error
|
||||
// Replace nodeToVictims with result after preemption from extender.
|
||||
if nodeToVictims, err = extender.ProcessPreemption(pod, nodeToVictims, g.cachedNodeInfoMap); err != nil {
|
||||
newNodeToVictims, err := extender.ProcessPreemption(
|
||||
pod,
|
||||
nodeToVictims,
|
||||
g.cachedNodeInfoMap,
|
||||
)
|
||||
if err != nil {
|
||||
if extender.IsIgnorable() {
|
||||
glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
||||
extender, err)
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// If node list is empty, no preemption will happen, skip other extenders.
|
||||
|
||||
// Replace nodeToVictims with new result after preemption. So the
|
||||
// rest of extenders can continue use it as parameter.
|
||||
nodeToVictims = newNodeToVictims
|
||||
|
||||
// If node list becomes empty, no preemption can happen regardless of other extenders.
|
||||
if len(nodeToVictims) == 0 {
|
||||
break
|
||||
}
|
||||
|
@ -384,7 +397,13 @@ func findNodesThatFit(
|
|||
}
|
||||
filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return []*v1.Node{}, FailedPredicateMap{}, err
|
||||
if extender.IsIgnorable() {
|
||||
glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
||||
extender, err)
|
||||
continue
|
||||
} else {
|
||||
return []*v1.Node{}, FailedPredicateMap{}, err
|
||||
}
|
||||
}
|
||||
|
||||
for failedNodeName, failedMsg := range failedMap {
|
||||
|
|
|
@ -1274,6 +1274,30 @@ func TestPreempt(t *testing.T) {
|
|||
expectedNode: "",
|
||||
expectedPods: []string{},
|
||||
},
|
||||
{
|
||||
name: "One scheduler extender allows only machine1, the other returns error but ignorable. Only machine1 would be chosen",
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: types.UID("pod1")}, Spec: v1.PodSpec{
|
||||
Containers: veryLargeContainers,
|
||||
Priority: &highPriority},
|
||||
},
|
||||
pods: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
|
||||
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
|
||||
},
|
||||
extenders: []*FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{errorPredicateExtender},
|
||||
ignorable: true,
|
||||
},
|
||||
{
|
||||
predicates: []fitPredicate{machine1PredicateExtender},
|
||||
},
|
||||
},
|
||||
expectedNode: "machine1",
|
||||
expectedPods: []string{"m1.1", "m1.2"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
|
|
@ -539,6 +539,11 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh
|
|||
type fakeExtender struct {
|
||||
isBinder bool
|
||||
interestedPodName string
|
||||
ignorable bool
|
||||
}
|
||||
|
||||
func (f *fakeExtender) IsIgnorable() bool {
|
||||
return f.ignorable
|
||||
}
|
||||
|
||||
func (f *fakeExtender) ProcessPreemption(
|
||||
|
|
Loading…
Reference in New Issue