From 8d880506fee441cdc742d61753a7a3c9215079a2 Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Thu, 8 Feb 2018 16:40:56 +0800 Subject: [PATCH] Support cluster-level extended resources in kubelet and kube-scheduler Co-authored-by: Yang Guo Co-authored-by: Chun Chen --- pkg/kubelet/kubelet_node_status.go | 12 +- pkg/kubelet/kubelet_test.go | 56 ++++++--- pkg/kubelet/lifecycle/BUILD | 8 +- pkg/kubelet/lifecycle/predicate.go | 34 +++++- pkg/kubelet/lifecycle/predicate_test.go | 114 ++++++++++++++++++ pkg/scheduler/algorithm/predicates/BUILD | 1 + .../algorithm/predicates/metadata.go | 28 ++++- .../algorithm/predicates/predicates.go | 14 +++ .../algorithm/predicates/predicates_test.go | 25 +++- .../algorithm/scheduler_interface.go | 4 + pkg/scheduler/api/types.go | 20 +++ pkg/scheduler/api/v1/types.go | 20 +++ pkg/scheduler/api/v1/zz_generated.deepcopy.go | 21 ++++ pkg/scheduler/api/validation/BUILD | 4 + pkg/scheduler/api/validation/validation.go | 32 +++++ .../api/validation/validation_test.go | 15 +++ pkg/scheduler/api/zz_generated.deepcopy.go | 21 ++++ pkg/scheduler/core/extender.go | 39 ++++++ pkg/scheduler/core/extender_test.go | 27 +++++ pkg/scheduler/core/generic_scheduler.go | 6 + pkg/scheduler/factory/factory.go | 25 +++- pkg/scheduler/factory/factory_test.go | 84 +++++++++++++ pkg/scheduler/scheduler.go | 4 +- pkg/scheduler/scheduler_test.go | 32 +++-- test/integration/scheduler/extender_test.go | 25 +++- 25 files changed, 613 insertions(+), 58 deletions(-) create mode 100644 pkg/kubelet/lifecycle/predicate_test.go diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 1acf1565dc..faeeb77224 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -606,8 +606,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } for _, removedResource := range removedDevicePlugins { - glog.V(2).Infof("Remove capacity for %s", removedResource) - delete(node.Status.Capacity, v1.ResourceName(removedResource)) + glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource) + // Set the capacity of the removed resource to 0 instead of + // removing the resource from the node status. This is to indicate + // that the resource is managed by device plugin and had been + // registered before. + // + // This is required to differentiate the device plugin managed + // resources and the cluster-level resources, which are absent in + // node status. + node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI) } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 535d8598ca..f1aff9153d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -583,8 +583,10 @@ func TestHandlePluginResources(t *testing.T) { kl := testKubelet.kubelet adjustedResource := v1.ResourceName("domain1.com/adjustedResource") - unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce") + emptyResource := v1.ResourceName("domain2.com/emptyResource") + missingResource := v1.ResourceName("domain2.com/missingResource") failedResource := v1.ResourceName("domain2.com/failedResource") + resourceQuantity0 := *resource.NewQuantity(int64(0), resource.DecimalSI) resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI) resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI) resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI) @@ -592,9 +594,9 @@ func TestHandlePluginResources(t *testing.T) { nodes := []*v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{ - adjustedResource: resourceQuantity1, - unadjustedResouce: resourceQuantity1, - v1.ResourcePods: allowedPodQuantity, + adjustedResource: resourceQuantity1, + emptyResource: resourceQuantity0, + v1.ResourcePods: allowedPodQuantity, }}}, } kl.nodeInfo = testNodeInfo{nodes: nodes} @@ -607,6 +609,7 @@ func TestHandlePluginResources(t *testing.T) { // quantity unchanged. updateResourceMap := map[v1.ResourceName]resource.Quantity{ adjustedResource: resourceQuantity2, + emptyResource: resourceQuantity0, failedResource: resourceQuantityInvalid, } pod := attrs.Pod @@ -634,7 +637,7 @@ func TestHandlePluginResources(t *testing.T) { // pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc // adjusts node.allocatableResource for this resource to a sufficient value. - fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName), + fittingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), Containers: []v1.Container{{Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ adjustedResource: resourceQuantity2, @@ -644,14 +647,30 @@ func TestHandlePluginResources(t *testing.T) { }, }}}, } - // pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit. - exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), + // pod requiring emptyResource (extended resources with 0 allocatable) will + // not pass PredicateAdmit. + emptyPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), Containers: []v1.Container{{Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ - unadjustedResouce: resourceQuantity2, + emptyResource: resourceQuantity2, }, Requests: v1.ResourceList{ - unadjustedResouce: resourceQuantity2, + emptyResource: resourceQuantity2, + }, + }}}, + } + // pod requiring missingResource will pass PredicateAdmit. + // + // Extended resources missing in node status are ignored in PredicateAdmit. + // This is required to support extended resources that are not managed by + // device plugin, such as cluster-level resources. + missingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), + Containers: []v1.Container{{Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + missingResource: resourceQuantity2, + }, + Requests: v1.ResourceList{ + missingResource: resourceQuantity2, }, }}}, } @@ -666,21 +685,18 @@ func TestHandlePluginResources(t *testing.T) { }, }}}, } - pods := []*v1.Pod{ - podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec), - podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec), - podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec), - } - // The latter two pod should be rejected. - fittingPod := pods[0] - exceededPod := pods[1] - failedPod := pods[2] - kl.HandlePodAdditions(pods) + fittingPod := podWithUIDNameNsSpec("1", "fittingpod", "foo", fittingPodSpec) + emptyPod := podWithUIDNameNsSpec("2", "emptypod", "foo", emptyPodSpec) + missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec) + failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec) + + kl.HandlePodAdditions([]*v1.Pod{fittingPod, emptyPod, missingPod, failedPod}) // Check pod status stored in the status map. checkPodStatus(t, kl, fittingPod, v1.PodPending) - checkPodStatus(t, kl, exceededPod, v1.PodFailed) + checkPodStatus(t, kl, emptyPod, v1.PodFailed) + checkPodStatus(t, kl, missingPod, v1.PodPending) checkPodStatus(t, kl, failedPod, v1.PodFailed) } diff --git a/pkg/kubelet/lifecycle/BUILD b/pkg/kubelet/lifecycle/BUILD index 96af196fe8..872a195913 100644 --- a/pkg/kubelet/lifecycle/BUILD +++ b/pkg/kubelet/lifecycle/BUILD @@ -18,6 +18,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/kubelet/lifecycle", deps = [ + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/format:go_default_library", @@ -34,12 +35,17 @@ go_library( go_test( name = "go_default_test", - srcs = ["handlers_test.go"], + srcs = [ + "handlers_test.go", + "predicate_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/util/format:go_default_library", + "//pkg/scheduler/schedulercache:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", ], ) diff --git a/pkg/kubelet/lifecycle/predicate.go b/pkg/kubelet/lifecycle/predicate.go index e3890055f1..e917acab37 100644 --- a/pkg/kubelet/lifecycle/predicate.go +++ b/pkg/kubelet/lifecycle/predicate.go @@ -20,7 +20,9 @@ import ( "fmt" "github.com/golang/glog" + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -77,7 +79,18 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult Message: message, } } - fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo) + + // Remove the requests of the extended resources that are missing in the + // node info. This is required to support cluster-level resources, which + // are extended resources unknown to nodes. + // + // Caveat: If a pod was manually bound to a node (e.g., static pod) where a + // node-level extended resource it requires is not found, then kubelet will + // not fail admission while it should. This issue will be addressed with + // the Resource Class API in the future. + podWithoutMissingExtendedResources := removeMissingExtendedResources(pod, nodeInfo) + + fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo) if err != nil { message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err) glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) @@ -141,3 +154,22 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult Admit: true, } } + +func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) *v1.Pod { + podCopy := pod.DeepCopy() + for i, c := range pod.Spec.Containers { + // We only handle requests in Requests but not Limits because the + // PodFitsResources predicate, to which the result pod will be passed, + // does not use Limits. + podCopy.Spec.Containers[i].Resources.Requests = make(v1.ResourceList) + for rName, rQuant := range c.Resources.Requests { + if v1helper.IsExtendedResourceName(rName) { + if _, found := nodeInfo.AllocatableResource().ScalarResources[rName]; !found { + continue + } + } + podCopy.Spec.Containers[i].Resources.Requests[rName] = rQuant + } + } + return podCopy +} diff --git a/pkg/kubelet/lifecycle/predicate_test.go b/pkg/kubelet/lifecycle/predicate_test.go new file mode 100644 index 0000000000..325bafe021 --- /dev/null +++ b/pkg/kubelet/lifecycle/predicate_test.go @@ -0,0 +1,114 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lifecycle + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" +) + +var ( + quantity = *resource.NewQuantity(1, resource.DecimalSI) + extendedResourceName1 = "example.com/er1" + extendedResourceName2 = "example.com/er2" +) + +func TestRemoveMissingExtendedResources(t *testing.T) { + for _, test := range []struct { + desc string + pod *v1.Pod + node *v1.Node + + expectedPod *v1.Pod + }{ + { + desc: "requests in Limits should be ignored", + pod: makeTestPod( + v1.ResourceList{}, // Requests + v1.ResourceList{"foo.com/bar": quantity}, // Limits + ), + node: makeTestNode( + v1.ResourceList{"foo.com/baz": quantity}, // Allocatable + ), + expectedPod: makeTestPod( + v1.ResourceList{}, // Requests + v1.ResourceList{"foo.com/bar": quantity}, // Limits + ), + }, + { + desc: "requests for resources available in node should not be removed", + pod: makeTestPod( + v1.ResourceList{"foo.com/bar": quantity}, // Requests + v1.ResourceList{}, // Limits + ), + node: makeTestNode( + v1.ResourceList{"foo.com/bar": quantity}, // Allocatable + ), + expectedPod: makeTestPod( + v1.ResourceList{"foo.com/bar": quantity}, // Requests + v1.ResourceList{}), // Limits + }, + { + desc: "requests for resources unavailable in node should be removed", + pod: makeTestPod( + v1.ResourceList{"foo.com/bar": quantity}, // Requests + v1.ResourceList{}, // Limits + ), + node: makeTestNode( + v1.ResourceList{"foo.com/baz": quantity}, // Allocatable + ), + expectedPod: makeTestPod( + v1.ResourceList{}, // Requests + v1.ResourceList{}, // Limits + ), + }, + } { + nodeInfo := schedulercache.NewNodeInfo() + nodeInfo.SetNode(test.node) + pod := removeMissingExtendedResources(test.pod, nodeInfo) + if !reflect.DeepEqual(pod, test.expectedPod) { + t.Errorf("%s: Expected pod\n%v\ngot\n%v\n", test.desc, test.expectedPod, pod) + } + } +} + +func makeTestPod(requests, limits v1.ResourceList) *v1.Pod { + return &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: requests, + Limits: limits, + }, + }, + }, + }, + } +} + +func makeTestNode(allocatable v1.ResourceList) *v1.Node { + return &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: allocatable, + }, + } +} diff --git a/pkg/scheduler/algorithm/predicates/BUILD b/pkg/scheduler/algorithm/predicates/BUILD index d4a43dcd85..d85c822872 100644 --- a/pkg/scheduler/algorithm/predicates/BUILD +++ b/pkg/scheduler/algorithm/predicates/BUILD @@ -61,6 +61,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index f4ab6cad79..a83e153e5a 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -22,6 +22,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/schedulercache" schedutil "k8s.io/kubernetes/pkg/scheduler/util" @@ -53,6 +54,13 @@ type predicateMetadata struct { serviceAffinityInUse bool serviceAffinityMatchingPodList []*v1.Pod serviceAffinityMatchingPodServices []*v1.Service + // ignoredExtendedResources is a set of extended resource names that will + // be ignored in the PodFitsResources predicate. + // + // They can be scheduler extender managed resources, the consumption of + // which should be accounted only by the extenders. This set is synthesized + // from scheduler extender configuration and does not change per pod. + ignoredExtendedResources sets.String } // Ensure that predicateMetadata implements algorithm.PredicateMetadata. @@ -71,6 +79,17 @@ func RegisterPredicateMetadataProducer(predicateName string, precomp PredicateMe predicateMetadataProducers[predicateName] = precomp } +// RegisterPredicateMetadataProducerWithExtendedResourceOptions registers a +// PredicateMetadataProducer that creates predicate metadata with the provided +// options for extended resources. +// +// See the comments in "predicateMetadata" for the explanation of the options. +func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) { + RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) { + pm.ignoredExtendedResources = ignoredExtendedResources + }) +} + // NewPredicateMetadataFactory creates a PredicateMetadataFactory. func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.PredicateMetadataProducer { factory := &PredicateMetadataFactory{ @@ -170,10 +189,11 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache // its maps and slices, but it does not copy the contents of pointer values. func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { newPredMeta := &predicateMetadata{ - pod: meta.pod, - podBestEffort: meta.podBestEffort, - podRequest: meta.podRequest, - serviceAffinityInUse: meta.serviceAffinityInUse, + pod: meta.pod, + podBestEffort: meta.podBestEffort, + podRequest: meta.podRequest, + serviceAffinityInUse: meta.serviceAffinityInUse, + ignoredExtendedResources: meta.ignoredExtendedResources, } newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...) newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{} diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index faa926a229..d5a6c60262 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1" @@ -712,9 +713,15 @@ func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber))) } + // No extended resources should be ignored by default. + ignoredExtendedResources := sets.NewString() + var podRequest *schedulercache.Resource if predicateMeta, ok := meta.(*predicateMetadata); ok { podRequest = predicateMeta.podRequest + if predicateMeta.ignoredExtendedResources != nil { + ignoredExtendedResources = predicateMeta.ignoredExtendedResources + } } else { // We couldn't parse metadata - fallback to computing it. podRequest = GetResourceRequest(pod) @@ -743,6 +750,13 @@ func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s } for rName, rQuant := range podRequest.ScalarResources { + if v1helper.IsExtendedResourceName(rName) { + // If this resource is one of the extended resources that should be + // ignored, we will skip checking it. + if ignoredExtendedResources.Has(string(rName)) { + continue + } + } if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] { predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName])) } diff --git a/pkg/scheduler/algorithm/predicates/predicates_test.go b/pkg/scheduler/algorithm/predicates/predicates_test.go index c132f72399..7f91a3b4b6 100644 --- a/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -27,6 +27,7 @@ import ( storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -93,11 +94,12 @@ func PredicateMetadata(p *v1.Pod, nodeInfo map[string]*schedulercache.NodeInfo) func TestPodFitsResources(t *testing.T) { enoughPodsTests := []struct { - pod *v1.Pod - nodeInfo *schedulercache.NodeInfo - fits bool - test string - reasons []algorithm.PredicateFailureReason + pod *v1.Pod + nodeInfo *schedulercache.NodeInfo + fits bool + test string + reasons []algorithm.PredicateFailureReason + ignoredExtendedResources sets.String }{ { pod: &v1.Pod{}, @@ -323,12 +325,23 @@ func TestPodFitsResources(t *testing.T) { test: "hugepages resource allocatable enforced for multiple containers", reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(hugePageResourceA, 6, 2, 5)}, }, + { + pod: newResourcePod( + schedulercache.Resource{MilliCPU: 1, Memory: 1, ScalarResources: map[v1.ResourceName]int64{extendedResourceB: 1}}), + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0})), + fits: true, + ignoredExtendedResources: sets.NewString(string(extendedResourceB)), + test: "skip checking ignored extended resource", + }, } for _, test := range enoughPodsTests { node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5, 20, 5)}} test.nodeInfo.SetNode(&node) - fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) + RegisterPredicateMetadataProducerWithExtendedResourceOptions(test.ignoredExtendedResources) + meta := PredicateMetadata(test.pod, nil) + fits, reasons, err := PodFitsResources(test.pod, meta, test.nodeInfo) if err != nil { t.Errorf("%s: unexpected error: %v", test.test, err) } diff --git a/pkg/scheduler/algorithm/scheduler_interface.go b/pkg/scheduler/algorithm/scheduler_interface.go index d64e884260..f86015b491 100644 --- a/pkg/scheduler/algorithm/scheduler_interface.go +++ b/pkg/scheduler/algorithm/scheduler_interface.go @@ -41,6 +41,10 @@ type SchedulerExtender interface { // IsBinder returns whether this extender is configured for the Bind method. IsBinder() bool + + // IsInterested returns true if at least one extended resource requested by + // this pod is managed by this extender. + IsInterested(pod *v1.Pod) bool } // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 090e8780a1..cfc8d219ec 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -151,6 +151,16 @@ type LabelPreference struct { Presence bool } +// ExtenderManagedResource describes the arguments of extended resources +// managed by an extender. +type ExtenderManagedResource struct { + // Name is the extended resource name. + Name v1.ResourceName + // IgnoredByScheduler indicates whether kube-scheduler should ignore this + // resource when applying predicates. + IgnoredByScheduler bool +} + // ExtenderConfig holds the parameters used to communicate with the extender. If a verb is unspecified/empty, // it is assumed that the extender chose not to provide that extension. type ExtenderConfig struct { @@ -178,6 +188,16 @@ type ExtenderConfig struct { // so the scheduler should only send minimal information about the eligible nodes // assuming that the extender already cached full details of all nodes in the cluster NodeCacheCapable bool + // ManagedResources is a list of extended resources that are managed by + // this extender. + // - A pod will be sent to the extender on the Filter, Prioritize and Bind + // (if the extender is the binder) phases iff the pod requests at least + // one of the extended resources in this list. If empty or unspecified, + // all pods will be sent to this extender. + // - If IgnoredByScheduler is set to true for a resource, kube-scheduler + // will skip checking the resource in predicates. + // +optional + ManagedResources []ExtenderManagedResource } // ExtenderArgs represents the arguments needed by the extender to filter/prioritize diff --git a/pkg/scheduler/api/v1/types.go b/pkg/scheduler/api/v1/types.go index 8e99956c40..32ac257958 100644 --- a/pkg/scheduler/api/v1/types.go +++ b/pkg/scheduler/api/v1/types.go @@ -125,6 +125,16 @@ type LabelPreference struct { Presence bool `json:"presence"` } +// ExtenderManagedResource describes the arguments of extended resources +// managed by an extender. +type ExtenderManagedResource struct { + // Name is the extended resource name. + Name apiv1.ResourceName `json:"name,casttype=ResourceName"` + // IgnoredByScheduler indicates whether kube-scheduler should ignore this + // resource when applying predicates. + IgnoredByScheduler bool `json:"ignoredByScheduler,omitempty"` +} + // ExtenderConfig holds the parameters used to communicate with the extender. If a verb is unspecified/empty, // it is assumed that the extender chose not to provide that extension. type ExtenderConfig struct { @@ -152,6 +162,16 @@ type ExtenderConfig struct { // so the scheduler should only send minimal information about the eligible nodes // assuming that the extender already cached full details of all nodes in the cluster NodeCacheCapable bool `json:"nodeCacheCapable,omitempty"` + // ManagedResources is a list of extended resources that are managed by + // this extender. + // - A pod will be sent to the extender on the Filter, Prioritize and Bind + // (if the extender is the binder) phases iff the pod requests at least + // one of the extended resources in this list. If empty or unspecified, + // all pods will be sent to this extender. + // - If IgnoredByScheduler is set to true for a resource, kube-scheduler + // will skip checking the resource in predicates. + // +optional + ManagedResources []ExtenderManagedResource `json:"managedResources,omitempty"` } // ExtenderArgs represents the arguments needed by the extender to filter/prioritize diff --git a/pkg/scheduler/api/v1/zz_generated.deepcopy.go b/pkg/scheduler/api/v1/zz_generated.deepcopy.go index fc7ab765bf..98f8fcbf7e 100644 --- a/pkg/scheduler/api/v1/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/v1/zz_generated.deepcopy.go @@ -109,6 +109,11 @@ func (in *ExtenderConfig) DeepCopyInto(out *ExtenderConfig) { (*in).DeepCopyInto(*out) } } + if in.ManagedResources != nil { + in, out := &in.ManagedResources, &out.ManagedResources + *out = make([]ExtenderManagedResource, len(*in)) + copy(*out, *in) + } return } @@ -167,6 +172,22 @@ func (in *ExtenderFilterResult) DeepCopy() *ExtenderFilterResult { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderManagedResource) DeepCopyInto(out *ExtenderManagedResource) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderManagedResource. +func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource { + if in == nil { + return nil + } + out := new(ExtenderManagedResource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) { { diff --git a/pkg/scheduler/api/validation/BUILD b/pkg/scheduler/api/validation/BUILD index 8a7fb0ecc3..2cee0ff303 100644 --- a/pkg/scheduler/api/validation/BUILD +++ b/pkg/scheduler/api/validation/BUILD @@ -11,8 +11,12 @@ go_library( srcs = ["validation.go"], importpath = "k8s.io/kubernetes/pkg/scheduler/api/validation", deps = [ + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/scheduler/api:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library", ], ) diff --git a/pkg/scheduler/api/validation/validation.go b/pkg/scheduler/api/validation/validation.go index d8eb954c5f..5e9e53bec6 100644 --- a/pkg/scheduler/api/validation/validation.go +++ b/pkg/scheduler/api/validation/validation.go @@ -17,9 +17,14 @@ limitations under the License. package validation import ( + "errors" "fmt" + "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" ) @@ -35,6 +40,7 @@ func ValidatePolicy(policy schedulerapi.Policy) error { } binders := 0 + extenderManagedResources := sets.NewString() for _, extender := range policy.ExtenderConfigs { if len(extender.PrioritizeVerb) > 0 && extender.Weight <= 0 { validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a positive weight applied to it", extender.URLPrefix)) @@ -42,9 +48,35 @@ func ValidatePolicy(policy schedulerapi.Policy) error { if extender.BindVerb != "" { binders++ } + for _, resource := range extender.ManagedResources { + errs := validateExtendedResourceName(resource.Name) + if len(errs) != 0 { + validationErrors = append(validationErrors, errs...) + } + if extenderManagedResources.Has(string(resource.Name)) { + validationErrors = append(validationErrors, fmt.Errorf("Duplicate extender managed resource name %s", string(resource.Name))) + } + extenderManagedResources.Insert(string(resource.Name)) + } } if binders > 1 { validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders)) } return utilerrors.NewAggregate(validationErrors) } + +// validateExtendedResourceName checks whether the specified name is a valid +// extended resource name. +func validateExtendedResourceName(name v1.ResourceName) []error { + var validationErrors []error + for _, msg := range validation.IsQualifiedName(string(name)) { + validationErrors = append(validationErrors, errors.New(msg)) + } + if len(validationErrors) != 0 { + return validationErrors + } + if !v1helper.IsExtendedResourceName(name) { + validationErrors = append(validationErrors, fmt.Errorf("%s is an invalid extended resource name", name)) + } + return validationErrors +} diff --git a/pkg/scheduler/api/validation/validation_test.go b/pkg/scheduler/api/validation/validation_test.go index 482bf92f66..ede1b2c421 100644 --- a/pkg/scheduler/api/validation/validation_test.go +++ b/pkg/scheduler/api/validation/validation_test.go @@ -69,6 +69,21 @@ func TestValidatePolicy(t *testing.T) { }}, expected: errors.New("Only one extender can implement bind, found 2"), }, + { + policy: api.Policy{ + ExtenderConfigs: []api.ExtenderConfig{ + {URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []api.ExtenderManagedResource{{Name: "foo.com/bar"}}}, + {URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind", ManagedResources: []api.ExtenderManagedResource{{Name: "foo.com/bar"}}}, + }}, + expected: errors.New("Duplicate extender managed resource name foo.com/bar"), + }, + { + policy: api.Policy{ + ExtenderConfigs: []api.ExtenderConfig{ + {URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []api.ExtenderManagedResource{{Name: "kubernetes.io/foo"}}}, + }}, + expected: errors.New("kubernetes.io/foo is an invalid extended resource name"), + }, } for _, test := range tests { diff --git a/pkg/scheduler/api/zz_generated.deepcopy.go b/pkg/scheduler/api/zz_generated.deepcopy.go index 62f6d6d297..1986933b93 100644 --- a/pkg/scheduler/api/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/zz_generated.deepcopy.go @@ -109,6 +109,11 @@ func (in *ExtenderConfig) DeepCopyInto(out *ExtenderConfig) { (*in).DeepCopyInto(*out) } } + if in.ManagedResources != nil { + in, out := &in.ManagedResources, &out.ManagedResources + *out = make([]ExtenderManagedResource, len(*in)) + copy(*out, *in) + } return } @@ -167,6 +172,22 @@ func (in *ExtenderFilterResult) DeepCopy() *ExtenderFilterResult { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderManagedResource) DeepCopyInto(out *ExtenderManagedResource) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderManagedResource. +func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource { + if in == nil { + return nil + } + out := new(ExtenderManagedResource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) { { diff --git a/pkg/scheduler/core/extender.go b/pkg/scheduler/core/extender.go index 76e5878131..022db87f5b 100644 --- a/pkg/scheduler/core/extender.go +++ b/pkg/scheduler/core/extender.go @@ -26,6 +26,7 @@ import ( "k8s.io/api/core/v1" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/sets" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" @@ -46,6 +47,7 @@ type HTTPExtender struct { weight int client *http.Client nodeCacheCapable bool + managedResources sets.String } func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) { @@ -85,6 +87,10 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx Transport: transport, Timeout: config.HTTPTimeout, } + managedResources := sets.NewString() + for _, r := range config.ManagedResources { + managedResources.Insert(string(r.Name)) + } return &HTTPExtender{ extenderURL: config.URLPrefix, filterVerb: config.FilterVerb, @@ -93,6 +99,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx weight: config.Weight, client: client, nodeCacheCapable: config.NodeCacheCapable, + managedResources: managedResources, }, nil } @@ -252,3 +259,35 @@ func (h *HTTPExtender) send(action string, args interface{}, result interface{}) return json.NewDecoder(resp.Body).Decode(result) } + +// IsInterested returns true if at least one extended resource requested by +// this pod is managed by this extender. +func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool { + if h.managedResources.Len() == 0 { + return true + } + if h.hasManagedResources(pod.Spec.Containers) { + return true + } + if h.hasManagedResources(pod.Spec.InitContainers) { + return true + } + return false +} + +func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool { + for i := range containers { + container := &containers[i] + for resourceName := range container.Resources.Requests { + if h.managedResources.Has(string(resourceName)) { + return true + } + } + for resourceName := range container.Resources.Limits { + if h.managedResources.Has(string(resourceName)) { + return true + } + } + } + return false +} diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 69cf8c54d1..766ee2dec5 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -110,6 +110,7 @@ type FakeExtender struct { weight int nodeCacheCapable bool filteredNodes []*v1.Node + unInterested bool } func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { @@ -183,6 +184,10 @@ func (f *FakeExtender) IsBinder() bool { return true } +func (f *FakeExtender) IsInterested(pod *v1.Pod) bool { + return !f.unInterested +} + var _ algorithm.SchedulerExtender = &FakeExtender{} func TestGenericSchedulerWithExtenders(t *testing.T) { @@ -304,6 +309,28 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { expectedHost: "machine2", // machine2 has higher score name: "test 7", }, + { + // Scheduler is expected to not send pod to extender in + // Filter/Prioritize phases if the extender is not interested in + // the pod. + // + // If scheduler sends the pod by mistake, the test will fail + // because of the errors from errorPredicateExtender and/or + // errorPrioritizerExtender. + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{errorPredicateExtender}, + prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}}, + unInterested: true, + }, + }, + nodes: []string{"machine1", "machine2"}, + expectsErr: false, + expectedHost: "machine2", // machine2 has higher score + name: "test 8", + }, } for _, test := range tests { diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index b4dcea6a9f..8ebc7641de 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -354,6 +354,9 @@ func findNodesThatFit( if len(filtered) > 0 && len(extenders) != 0 { for _, extender := range extenders { + if !extender.IsInterested(pod) { + continue + } filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo) if err != nil { return []*v1.Node{}, FailedPredicateMap{}, err @@ -624,6 +627,9 @@ func PrioritizeNodes( if len(extenders) != 0 && nodes != nil { combinedScores := make(map[string]int, len(nodeNameToInfo)) for _, extender := range extenders { + if !extender.IsInterested(pod) { + continue + } wg.Add(1) go func(ext algorithm.SchedulerExtender) { defer wg.Done() diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 648f6a9995..407d5b85c1 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -969,6 +969,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler extenders := make([]algorithm.SchedulerExtender, 0) if len(policy.ExtenderConfigs) != 0 { + ignoredExtendedResources := sets.NewString() for ii := range policy.ExtenderConfigs { glog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii]) extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii]) @@ -976,7 +977,13 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler return nil, err } extenders = append(extenders, extender) + for _, r := range policy.ExtenderConfigs[ii].ManagedResources { + if r.IgnoredByScheduler { + ignoredExtendedResources.Insert(string(r.Name)) + } + } } + predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources) } // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value. // Give it higher precedence than scheduler CLI configuration when it is provided. @@ -992,14 +999,22 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler return c.CreateFromKeys(predicateKeys, priorityKeys, extenders) } -// getBinder returns an extender that supports bind or a default binder. -func (c *configFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder { +// getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod. +func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder { + var extenderBinder algorithm.SchedulerExtender for i := range extenders { if extenders[i].IsBinder() { - return extenders[i] + extenderBinder = extenders[i] + break } } - return &binder{c.client} + defaultBinder := &binder{c.client} + return func(pod *v1.Pod) scheduler.Binder { + if extenderBinder != nil && extenderBinder.IsInterested(pod) { + return extenderBinder + } + return defaultBinder + } } // Creates a scheduler from a set of registered fit predicate keys and priority keys. @@ -1051,7 +1066,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // The scheduler only needs to consider schedulable nodes. NodeLister: &nodeLister{c.nodeLister}, Algorithm: algo, - Binder: c.getBinder(extenders), + GetBinder: c.getBinderFunc(extenders), PodConditionUpdater: &podConditionUpdater{c.client}, PodPreemptor: &podPreemptor{c.client}, WaitForCacheSync: func() bool { diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index ca7ba5b5e7..1ddce5808f 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -17,6 +17,8 @@ limitations under the License. package factory import ( + "errors" + "fmt" "net/http" "net/http/httptest" "reflect" @@ -533,3 +535,85 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh enableEquivalenceCache, ) } + +type fakeExtender struct { + isBinder bool + interestedPodName string +} + +func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) { + return nil, nil, nil +} + +func (f *fakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) { + return nil, 0, nil +} + +func (f *fakeExtender) Bind(binding *v1.Binding) error { + if f.isBinder { + return nil + } + return errors.New("not a binder") +} + +func (f *fakeExtender) IsBinder() bool { + return f.isBinder +} + +func (f *fakeExtender) IsInterested(pod *v1.Pod) bool { + return pod != nil && pod.Name == f.interestedPodName +} + +func TestGetBinderFunc(t *testing.T) { + for _, test := range []struct { + podName string + extenders []algorithm.SchedulerExtender + + expectedBinderType string + }{ + // Expect to return the default binder because the extender is not a + // binder, even though it's interested in the pod. + { + podName: "pod0", + extenders: []algorithm.SchedulerExtender{ + &fakeExtender{isBinder: false, interestedPodName: "pod0"}, + }, + expectedBinderType: "*factory.binder", + }, + // Expect to return the fake binder because one of the extenders is a + // binder and it's interested in the pod. + { + podName: "pod0", + extenders: []algorithm.SchedulerExtender{ + &fakeExtender{isBinder: false, interestedPodName: "pod0"}, + &fakeExtender{isBinder: true, interestedPodName: "pod0"}, + }, + expectedBinderType: "*factory.fakeExtender", + }, + // Expect to return the default binder because one of the extenders is + // a binder but the binder is not interested in the pod. + { + podName: "pod1", + extenders: []algorithm.SchedulerExtender{ + &fakeExtender{isBinder: false, interestedPodName: "pod1"}, + &fakeExtender{isBinder: true, interestedPodName: "pod0"}, + }, + expectedBinderType: "*factory.binder", + }, + } { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: test.podName, + }, + } + + f := &configFactory{} + binderFunc := f.getBinderFunc(test.extenders) + binder := binderFunc(pod) + + binderType := fmt.Sprintf("%s", reflect.TypeOf(binder)) + if binderType != test.expectedBinderType { + t.Errorf("Expected binder %q but got %q", test.expectedBinderType, binderType) + } + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 85b4458fd5..ebab58d45b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -107,7 +107,7 @@ type Config struct { Ecache *core.EquivalenceCache NodeLister algorithm.NodeLister Algorithm algorithm.ScheduleAlgorithm - Binder Binder + GetBinder func(pod *v1.Pod) Binder // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. @@ -403,7 +403,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { bindingStart := time.Now() // If binding succeeded then PodScheduled condition will be updated in apiserver so that // it's atomic with setting host. - err := sched.config.Binder.Bind(b) + err := sched.config.GetBinder(assumed).Bind(b) if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil { glog.Errorf("scheduler cache FinishBinding failed: %v", err) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 440c9f76ae..e15675364f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -195,10 +195,12 @@ func TestScheduler(t *testing.T) { []*v1.Node{&testNode}, ), Algorithm: item.algo, - Binder: fakeBinder{func(b *v1.Binding) error { - gotBinding = b - return item.injectBindError - }}, + GetBinder: func(pod *v1.Pod) Binder { + return fakeBinder{func(b *v1.Binding) error { + gotBinding = b + return item.injectBindError + }} + }, PodConditionUpdater: fakePodConditionUpdater{}, Error: func(p *v1.Pod, err error) { gotPod = p @@ -543,10 +545,12 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. SchedulerCache: scache, NodeLister: nodeLister, Algorithm: algo, - Binder: fakeBinder{func(b *v1.Binding) error { - bindingChan <- b - return nil - }}, + GetBinder: func(pod *v1.Pod) Binder { + return fakeBinder{func(b *v1.Binding) error { + bindingChan <- b + return nil + }} + }, NextPod: func() *v1.Pod { return clientcache.Pop(queuedPodStore).(*v1.Pod) }, @@ -588,11 +592,13 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc SchedulerCache: scache, NodeLister: nodeLister, Algorithm: algo, - Binder: fakeBinder{func(b *v1.Binding) error { - time.Sleep(bindingTime) - bindingChan <- b - return nil - }}, + GetBinder: func(pod *v1.Pod) Binder { + return fakeBinder{func(b *v1.Binding) error { + time.Sleep(bindingTime) + bindingChan <- b + return nil + }} + }, WaitForCacheSync: func() bool { return true }, diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index b9b284ce1a..38369f960d 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -47,9 +47,10 @@ import ( ) const ( - filter = "filter" - prioritize = "prioritize" - bind = "bind" + filter = "filter" + prioritize = "prioritize" + bind = "bind" + extendedResourceName = "foo.com/bar" ) type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error) @@ -343,6 +344,12 @@ func TestSchedulerExtender(t *testing.T) { BindVerb: bind, Weight: 4, EnableHTTPS: false, + ManagedResources: []schedulerapi.ExtenderManagedResource{ + { + Name: extendedResourceName, + IgnoredByScheduler: true, + }, + }, }, { URLPrefix: es3.URL, @@ -420,7 +427,17 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "extender-test-pod"}, Spec: v1.PodSpec{ - Containers: []v1.Container{{Name: "container", Image: e2e.GetPauseImageName(cs)}}, + Containers: []v1.Container{ + { + Name: "container", + Image: e2e.GetPauseImageName(cs), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + extendedResourceName: *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + }, + }, }, }