Support cluster-level extended resources in kubelet and kube-scheduler

Co-authored-by: Yang Guo <ygg@google.com>
Co-authored-by: Chun Chen <chenchun.feed@gmail.com>
pull/6/head
Yang Guo 2018-02-08 16:40:56 +08:00
parent c1a97c34c8
commit 8d880506fe
25 changed files with 613 additions and 58 deletions

View File

@ -606,8 +606,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Remove capacity for %s", removedResource)
delete(node.Status.Capacity, v1.ResourceName(removedResource))
glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource)
// Set the capacity of the removed resource to 0 instead of
// removing the resource from the node status. This is to indicate
// that the resource is managed by device plugin and had been
// registered before.
//
// This is required to differentiate the device plugin managed
// resources and the cluster-level resources, which are absent in
// node status.
node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
}
}

View File

@ -583,8 +583,10 @@ func TestHandlePluginResources(t *testing.T) {
kl := testKubelet.kubelet
adjustedResource := v1.ResourceName("domain1.com/adjustedResource")
unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce")
emptyResource := v1.ResourceName("domain2.com/emptyResource")
missingResource := v1.ResourceName("domain2.com/missingResource")
failedResource := v1.ResourceName("domain2.com/failedResource")
resourceQuantity0 := *resource.NewQuantity(int64(0), resource.DecimalSI)
resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI)
resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI)
resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI)
@ -593,7 +595,7 @@ func TestHandlePluginResources(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
adjustedResource: resourceQuantity1,
unadjustedResouce: resourceQuantity1,
emptyResource: resourceQuantity0,
v1.ResourcePods: allowedPodQuantity,
}}},
}
@ -607,6 +609,7 @@ func TestHandlePluginResources(t *testing.T) {
// quantity unchanged.
updateResourceMap := map[v1.ResourceName]resource.Quantity{
adjustedResource: resourceQuantity2,
emptyResource: resourceQuantity0,
failedResource: resourceQuantityInvalid,
}
pod := attrs.Pod
@ -634,7 +637,7 @@ func TestHandlePluginResources(t *testing.T) {
// pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc
// adjusts node.allocatableResource for this resource to a sufficient value.
fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName),
fittingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
adjustedResource: resourceQuantity2,
@ -644,14 +647,30 @@ func TestHandlePluginResources(t *testing.T) {
},
}}},
}
// pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit.
exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
// pod requiring emptyResource (extended resources with 0 allocatable) will
// not pass PredicateAdmit.
emptyPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
unadjustedResouce: resourceQuantity2,
emptyResource: resourceQuantity2,
},
Requests: v1.ResourceList{
unadjustedResouce: resourceQuantity2,
emptyResource: resourceQuantity2,
},
}}},
}
// pod requiring missingResource will pass PredicateAdmit.
//
// Extended resources missing in node status are ignored in PredicateAdmit.
// This is required to support extended resources that are not managed by
// device plugin, such as cluster-level resources.
missingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
missingResource: resourceQuantity2,
},
Requests: v1.ResourceList{
missingResource: resourceQuantity2,
},
}}},
}
@ -666,21 +685,18 @@ func TestHandlePluginResources(t *testing.T) {
},
}}},
}
pods := []*v1.Pod{
podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec),
podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec),
podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec),
}
// The latter two pod should be rejected.
fittingPod := pods[0]
exceededPod := pods[1]
failedPod := pods[2]
kl.HandlePodAdditions(pods)
fittingPod := podWithUIDNameNsSpec("1", "fittingpod", "foo", fittingPodSpec)
emptyPod := podWithUIDNameNsSpec("2", "emptypod", "foo", emptyPodSpec)
missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec)
failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec)
kl.HandlePodAdditions([]*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})
// Check pod status stored in the status map.
checkPodStatus(t, kl, fittingPod, v1.PodPending)
checkPodStatus(t, kl, exceededPod, v1.PodFailed)
checkPodStatus(t, kl, emptyPod, v1.PodFailed)
checkPodStatus(t, kl, missingPod, v1.PodPending)
checkPodStatus(t, kl, failedPod, v1.PodFailed)
}

View File

@ -18,6 +18,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/lifecycle",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
@ -34,12 +35,17 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["handlers_test.go"],
srcs = [
"handlers_test.go",
"predicate_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
],
)

View File

@ -20,7 +20,9 @@ import (
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
@ -77,7 +79,18 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
Message: message,
}
}
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
// Remove the requests of the extended resources that are missing in the
// node info. This is required to support cluster-level resources, which
// are extended resources unknown to nodes.
//
// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
// node-level extended resource it requires is not found, then kubelet will
// not fail admission while it should. This issue will be addressed with
// the Resource Class API in the future.
podWithoutMissingExtendedResources := removeMissingExtendedResources(pod, nodeInfo)
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
if err != nil {
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
@ -141,3 +154,22 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
Admit: true,
}
}
func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) *v1.Pod {
podCopy := pod.DeepCopy()
for i, c := range pod.Spec.Containers {
// We only handle requests in Requests but not Limits because the
// PodFitsResources predicate, to which the result pod will be passed,
// does not use Limits.
podCopy.Spec.Containers[i].Resources.Requests = make(v1.ResourceList)
for rName, rQuant := range c.Resources.Requests {
if v1helper.IsExtendedResourceName(rName) {
if _, found := nodeInfo.AllocatableResource().ScalarResources[rName]; !found {
continue
}
}
podCopy.Spec.Containers[i].Resources.Requests[rName] = rQuant
}
}
return podCopy
}

View File

@ -0,0 +1,114 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lifecycle
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)
var (
quantity = *resource.NewQuantity(1, resource.DecimalSI)
extendedResourceName1 = "example.com/er1"
extendedResourceName2 = "example.com/er2"
)
func TestRemoveMissingExtendedResources(t *testing.T) {
for _, test := range []struct {
desc string
pod *v1.Pod
node *v1.Node
expectedPod *v1.Pod
}{
{
desc: "requests in Limits should be ignored",
pod: makeTestPod(
v1.ResourceList{}, // Requests
v1.ResourceList{"foo.com/bar": quantity}, // Limits
),
node: makeTestNode(
v1.ResourceList{"foo.com/baz": quantity}, // Allocatable
),
expectedPod: makeTestPod(
v1.ResourceList{}, // Requests
v1.ResourceList{"foo.com/bar": quantity}, // Limits
),
},
{
desc: "requests for resources available in node should not be removed",
pod: makeTestPod(
v1.ResourceList{"foo.com/bar": quantity}, // Requests
v1.ResourceList{}, // Limits
),
node: makeTestNode(
v1.ResourceList{"foo.com/bar": quantity}, // Allocatable
),
expectedPod: makeTestPod(
v1.ResourceList{"foo.com/bar": quantity}, // Requests
v1.ResourceList{}), // Limits
},
{
desc: "requests for resources unavailable in node should be removed",
pod: makeTestPod(
v1.ResourceList{"foo.com/bar": quantity}, // Requests
v1.ResourceList{}, // Limits
),
node: makeTestNode(
v1.ResourceList{"foo.com/baz": quantity}, // Allocatable
),
expectedPod: makeTestPod(
v1.ResourceList{}, // Requests
v1.ResourceList{}, // Limits
),
},
} {
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node)
pod := removeMissingExtendedResources(test.pod, nodeInfo)
if !reflect.DeepEqual(pod, test.expectedPod) {
t.Errorf("%s: Expected pod\n%v\ngot\n%v\n", test.desc, test.expectedPod, pod)
}
}
}
func makeTestPod(requests, limits v1.ResourceList) *v1.Pod {
return &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: requests,
Limits: limits,
},
},
},
},
}
}
func makeTestNode(allocatable v1.ResourceList) *v1.Node {
return &v1.Node{
Status: v1.NodeStatus{
Allocatable: allocatable,
},
}
}

View File

@ -61,6 +61,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
@ -53,6 +54,13 @@ type predicateMetadata struct {
serviceAffinityInUse bool
serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service
// ignoredExtendedResources is a set of extended resource names that will
// be ignored in the PodFitsResources predicate.
//
// They can be scheduler extender managed resources, the consumption of
// which should be accounted only by the extenders. This set is synthesized
// from scheduler extender configuration and does not change per pod.
ignoredExtendedResources sets.String
}
// Ensure that predicateMetadata implements algorithm.PredicateMetadata.
@ -71,6 +79,17 @@ func RegisterPredicateMetadataProducer(predicateName string, precomp PredicateMe
predicateMetadataProducers[predicateName] = precomp
}
// RegisterPredicateMetadataProducerWithExtendedResourceOptions registers a
// PredicateMetadataProducer that creates predicate metadata with the provided
// options for extended resources.
//
// See the comments in "predicateMetadata" for the explanation of the options.
func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) {
RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) {
pm.ignoredExtendedResources = ignoredExtendedResources
})
}
// NewPredicateMetadataFactory creates a PredicateMetadataFactory.
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.PredicateMetadataProducer {
factory := &PredicateMetadataFactory{
@ -174,6 +193,7 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
podBestEffort: meta.podBestEffort,
podRequest: meta.podRequest,
serviceAffinityInUse: meta.serviceAffinityInUse,
ignoredExtendedResources: meta.ignoredExtendedResources,
}
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{}

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
@ -712,9 +713,15 @@ func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
}
// No extended resources should be ignored by default.
ignoredExtendedResources := sets.NewString()
var podRequest *schedulercache.Resource
if predicateMeta, ok := meta.(*predicateMetadata); ok {
podRequest = predicateMeta.podRequest
if predicateMeta.ignoredExtendedResources != nil {
ignoredExtendedResources = predicateMeta.ignoredExtendedResources
}
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = GetResourceRequest(pod)
@ -743,6 +750,13 @@ func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s
}
for rName, rQuant := range podRequest.ScalarResources {
if v1helper.IsExtendedResourceName(rName) {
// If this resource is one of the extended resources that should be
// ignored, we will skip checking it.
if ignoredExtendedResources.Has(string(rName)) {
continue
}
}
if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
}

View File

@ -27,6 +27,7 @@ import (
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -98,6 +99,7 @@ func TestPodFitsResources(t *testing.T) {
fits bool
test string
reasons []algorithm.PredicateFailureReason
ignoredExtendedResources sets.String
}{
{
pod: &v1.Pod{},
@ -323,12 +325,23 @@ func TestPodFitsResources(t *testing.T) {
test: "hugepages resource allocatable enforced for multiple containers",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(hugePageResourceA, 6, 2, 5)},
},
{
pod: newResourcePod(
schedulercache.Resource{MilliCPU: 1, Memory: 1, ScalarResources: map[v1.ResourceName]int64{extendedResourceB: 1}}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0})),
fits: true,
ignoredExtendedResources: sets.NewString(string(extendedResourceB)),
test: "skip checking ignored extended resource",
},
}
for _, test := range enoughPodsTests {
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5, 20, 5)}}
test.nodeInfo.SetNode(&node)
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
RegisterPredicateMetadataProducerWithExtendedResourceOptions(test.ignoredExtendedResources)
meta := PredicateMetadata(test.pod, nil)
fits, reasons, err := PodFitsResources(test.pod, meta, test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}

View File

@ -41,6 +41,10 @@ type SchedulerExtender interface {
// IsBinder returns whether this extender is configured for the Bind method.
IsBinder() bool
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
}
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods

View File

@ -151,6 +151,16 @@ type LabelPreference struct {
Presence bool
}
// ExtenderManagedResource describes the arguments of extended resources
// managed by an extender.
type ExtenderManagedResource struct {
// Name is the extended resource name.
Name v1.ResourceName
// IgnoredByScheduler indicates whether kube-scheduler should ignore this
// resource when applying predicates.
IgnoredByScheduler bool
}
// ExtenderConfig holds the parameters used to communicate with the extender. If a verb is unspecified/empty,
// it is assumed that the extender chose not to provide that extension.
type ExtenderConfig struct {
@ -178,6 +188,16 @@ type ExtenderConfig struct {
// so the scheduler should only send minimal information about the eligible nodes
// assuming that the extender already cached full details of all nodes in the cluster
NodeCacheCapable bool
// ManagedResources is a list of extended resources that are managed by
// this extender.
// - A pod will be sent to the extender on the Filter, Prioritize and Bind
// (if the extender is the binder) phases iff the pod requests at least
// one of the extended resources in this list. If empty or unspecified,
// all pods will be sent to this extender.
// - If IgnoredByScheduler is set to true for a resource, kube-scheduler
// will skip checking the resource in predicates.
// +optional
ManagedResources []ExtenderManagedResource
}
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize

View File

@ -125,6 +125,16 @@ type LabelPreference struct {
Presence bool `json:"presence"`
}
// ExtenderManagedResource describes the arguments of extended resources
// managed by an extender.
type ExtenderManagedResource struct {
// Name is the extended resource name.
Name apiv1.ResourceName `json:"name,casttype=ResourceName"`
// IgnoredByScheduler indicates whether kube-scheduler should ignore this
// resource when applying predicates.
IgnoredByScheduler bool `json:"ignoredByScheduler,omitempty"`
}
// ExtenderConfig holds the parameters used to communicate with the extender. If a verb is unspecified/empty,
// it is assumed that the extender chose not to provide that extension.
type ExtenderConfig struct {
@ -152,6 +162,16 @@ type ExtenderConfig struct {
// so the scheduler should only send minimal information about the eligible nodes
// assuming that the extender already cached full details of all nodes in the cluster
NodeCacheCapable bool `json:"nodeCacheCapable,omitempty"`
// ManagedResources is a list of extended resources that are managed by
// this extender.
// - A pod will be sent to the extender on the Filter, Prioritize and Bind
// (if the extender is the binder) phases iff the pod requests at least
// one of the extended resources in this list. If empty or unspecified,
// all pods will be sent to this extender.
// - If IgnoredByScheduler is set to true for a resource, kube-scheduler
// will skip checking the resource in predicates.
// +optional
ManagedResources []ExtenderManagedResource `json:"managedResources,omitempty"`
}
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize

View File

@ -109,6 +109,11 @@ func (in *ExtenderConfig) DeepCopyInto(out *ExtenderConfig) {
(*in).DeepCopyInto(*out)
}
}
if in.ManagedResources != nil {
in, out := &in.ManagedResources, &out.ManagedResources
*out = make([]ExtenderManagedResource, len(*in))
copy(*out, *in)
}
return
}
@ -167,6 +172,22 @@ func (in *ExtenderFilterResult) DeepCopy() *ExtenderFilterResult {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ExtenderManagedResource) DeepCopyInto(out *ExtenderManagedResource) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderManagedResource.
func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource {
if in == nil {
return nil
}
out := new(ExtenderManagedResource)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) {
{

View File

@ -11,8 +11,12 @@ go_library(
srcs = ["validation.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/api/validation",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",
],
)

View File

@ -17,9 +17,14 @@ limitations under the License.
package validation
import (
"errors"
"fmt"
"k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)
@ -35,6 +40,7 @@ func ValidatePolicy(policy schedulerapi.Policy) error {
}
binders := 0
extenderManagedResources := sets.NewString()
for _, extender := range policy.ExtenderConfigs {
if len(extender.PrioritizeVerb) > 0 && extender.Weight <= 0 {
validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a positive weight applied to it", extender.URLPrefix))
@ -42,9 +48,35 @@ func ValidatePolicy(policy schedulerapi.Policy) error {
if extender.BindVerb != "" {
binders++
}
for _, resource := range extender.ManagedResources {
errs := validateExtendedResourceName(resource.Name)
if len(errs) != 0 {
validationErrors = append(validationErrors, errs...)
}
if extenderManagedResources.Has(string(resource.Name)) {
validationErrors = append(validationErrors, fmt.Errorf("Duplicate extender managed resource name %s", string(resource.Name)))
}
extenderManagedResources.Insert(string(resource.Name))
}
}
if binders > 1 {
validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders))
}
return utilerrors.NewAggregate(validationErrors)
}
// validateExtendedResourceName checks whether the specified name is a valid
// extended resource name.
func validateExtendedResourceName(name v1.ResourceName) []error {
var validationErrors []error
for _, msg := range validation.IsQualifiedName(string(name)) {
validationErrors = append(validationErrors, errors.New(msg))
}
if len(validationErrors) != 0 {
return validationErrors
}
if !v1helper.IsExtendedResourceName(name) {
validationErrors = append(validationErrors, fmt.Errorf("%s is an invalid extended resource name", name))
}
return validationErrors
}

View File

@ -69,6 +69,21 @@ func TestValidatePolicy(t *testing.T) {
}},
expected: errors.New("Only one extender can implement bind, found 2"),
},
{
policy: api.Policy{
ExtenderConfigs: []api.ExtenderConfig{
{URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []api.ExtenderManagedResource{{Name: "foo.com/bar"}}},
{URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind", ManagedResources: []api.ExtenderManagedResource{{Name: "foo.com/bar"}}},
}},
expected: errors.New("Duplicate extender managed resource name foo.com/bar"),
},
{
policy: api.Policy{
ExtenderConfigs: []api.ExtenderConfig{
{URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []api.ExtenderManagedResource{{Name: "kubernetes.io/foo"}}},
}},
expected: errors.New("kubernetes.io/foo is an invalid extended resource name"),
},
}
for _, test := range tests {

View File

@ -109,6 +109,11 @@ func (in *ExtenderConfig) DeepCopyInto(out *ExtenderConfig) {
(*in).DeepCopyInto(*out)
}
}
if in.ManagedResources != nil {
in, out := &in.ManagedResources, &out.ManagedResources
*out = make([]ExtenderManagedResource, len(*in))
copy(*out, *in)
}
return
}
@ -167,6 +172,22 @@ func (in *ExtenderFilterResult) DeepCopy() *ExtenderFilterResult {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ExtenderManagedResource) DeepCopyInto(out *ExtenderManagedResource) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderManagedResource.
func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource {
if in == nil {
return nil
}
out := new(ExtenderManagedResource)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) {
{

View File

@ -26,6 +26,7 @@ import (
"k8s.io/api/core/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -46,6 +47,7 @@ type HTTPExtender struct {
weight int
client *http.Client
nodeCacheCapable bool
managedResources sets.String
}
func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) {
@ -85,6 +87,10 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx
Transport: transport,
Timeout: config.HTTPTimeout,
}
managedResources := sets.NewString()
for _, r := range config.ManagedResources {
managedResources.Insert(string(r.Name))
}
return &HTTPExtender{
extenderURL: config.URLPrefix,
filterVerb: config.FilterVerb,
@ -93,6 +99,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx
weight: config.Weight,
client: client,
nodeCacheCapable: config.NodeCacheCapable,
managedResources: managedResources,
}, nil
}
@ -252,3 +259,35 @@ func (h *HTTPExtender) send(action string, args interface{}, result interface{})
return json.NewDecoder(resp.Body).Decode(result)
}
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
if h.managedResources.Len() == 0 {
return true
}
if h.hasManagedResources(pod.Spec.Containers) {
return true
}
if h.hasManagedResources(pod.Spec.InitContainers) {
return true
}
return false
}
func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
for i := range containers {
container := &containers[i]
for resourceName := range container.Resources.Requests {
if h.managedResources.Has(string(resourceName)) {
return true
}
}
for resourceName := range container.Resources.Limits {
if h.managedResources.Has(string(resourceName)) {
return true
}
}
}
return false
}

View File

@ -110,6 +110,7 @@ type FakeExtender struct {
weight int
nodeCacheCapable bool
filteredNodes []*v1.Node
unInterested bool
}
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
@ -183,6 +184,10 @@ func (f *FakeExtender) IsBinder() bool {
return true
}
func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
return !f.unInterested
}
var _ algorithm.SchedulerExtender = &FakeExtender{}
func TestGenericSchedulerWithExtenders(t *testing.T) {
@ -304,6 +309,28 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
expectedHost: "machine2", // machine2 has higher score
name: "test 7",
},
{
// Scheduler is expected to not send pod to extender in
// Filter/Prioritize phases if the extender is not interested in
// the pod.
//
// If scheduler sends the pod by mistake, the test will fail
// because of the errors from errorPredicateExtender and/or
// errorPrioritizerExtender.
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{errorPredicateExtender},
prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}},
unInterested: true,
},
},
nodes: []string{"machine1", "machine2"},
expectsErr: false,
expectedHost: "machine2", // machine2 has higher score
name: "test 8",
},
}
for _, test := range tests {

View File

@ -354,6 +354,9 @@ func findNodesThatFit(
if len(filtered) > 0 && len(extenders) != 0 {
for _, extender := range extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo)
if err != nil {
return []*v1.Node{}, FailedPredicateMap{}, err
@ -624,6 +627,9 @@ func PrioritizeNodes(
if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo))
for _, extender := range extenders {
if !extender.IsInterested(pod) {
continue
}
wg.Add(1)
go func(ext algorithm.SchedulerExtender) {
defer wg.Done()

View File

@ -969,6 +969,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
extenders := make([]algorithm.SchedulerExtender, 0)
if len(policy.ExtenderConfigs) != 0 {
ignoredExtendedResources := sets.NewString()
for ii := range policy.ExtenderConfigs {
glog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
@ -976,8 +977,14 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
return nil, err
}
extenders = append(extenders, extender)
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources.Insert(string(r.Name))
}
}
}
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
}
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
@ -992,14 +999,22 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}
// getBinder returns an extender that supports bind or a default binder.
func (c *configFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder {
// getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod.
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
return extenders[i]
extenderBinder = extenders[i]
break
}
}
return &binder{c.client}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) scheduler.Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
return defaultBinder
}
}
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
@ -1051,7 +1066,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},
Algorithm: algo,
Binder: c.getBinder(extenders),
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
WaitForCacheSync: func() bool {

View File

@ -17,6 +17,8 @@ limitations under the License.
package factory
import (
"errors"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
@ -533,3 +535,85 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh
enableEquivalenceCache,
)
}
type fakeExtender struct {
isBinder bool
interestedPodName string
}
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) {
return nil, nil, nil
}
func (f *fakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) {
return nil, 0, nil
}
func (f *fakeExtender) Bind(binding *v1.Binding) error {
if f.isBinder {
return nil
}
return errors.New("not a binder")
}
func (f *fakeExtender) IsBinder() bool {
return f.isBinder
}
func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
return pod != nil && pod.Name == f.interestedPodName
}
func TestGetBinderFunc(t *testing.T) {
for _, test := range []struct {
podName string
extenders []algorithm.SchedulerExtender
expectedBinderType string
}{
// Expect to return the default binder because the extender is not a
// binder, even though it's interested in the pod.
{
podName: "pod0",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
},
expectedBinderType: "*factory.binder",
},
// Expect to return the fake binder because one of the extenders is a
// binder and it's interested in the pod.
{
podName: "pod0",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},
expectedBinderType: "*factory.fakeExtender",
},
// Expect to return the default binder because one of the extenders is
// a binder but the binder is not interested in the pod.
{
podName: "pod1",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod1"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},
expectedBinderType: "*factory.binder",
},
} {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: test.podName,
},
}
f := &configFactory{}
binderFunc := f.getBinderFunc(test.extenders)
binder := binderFunc(pod)
binderType := fmt.Sprintf("%s", reflect.TypeOf(binder))
if binderType != test.expectedBinderType {
t.Errorf("Expected binder %q but got %q", test.expectedBinderType, binderType)
}
}
}

View File

@ -107,7 +107,7 @@ type Config struct {
Ecache *core.EquivalenceCache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
@ -403,7 +403,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
bindingStart := time.Now()
// If binding succeeded then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err := sched.config.Binder.Bind(b)
err := sched.config.GetBinder(assumed).Bind(b)
if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil {
glog.Errorf("scheduler cache FinishBinding failed: %v", err)
}

View File

@ -195,10 +195,12 @@ func TestScheduler(t *testing.T) {
[]*v1.Node{&testNode},
),
Algorithm: item.algo,
Binder: fakeBinder{func(b *v1.Binding) error {
GetBinder: func(pod *v1.Pod) Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
return item.injectBindError
}},
}}
},
PodConditionUpdater: fakePodConditionUpdater{},
Error: func(p *v1.Pod, err error) {
gotPod = p
@ -543,10 +545,12 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
Binder: fakeBinder{func(b *v1.Binding) error {
GetBinder: func(pod *v1.Pod) Binder {
return fakeBinder{func(b *v1.Binding) error {
bindingChan <- b
return nil
}},
}}
},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},
@ -588,11 +592,13 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
Binder: fakeBinder{func(b *v1.Binding) error {
GetBinder: func(pod *v1.Pod) Binder {
return fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b
return nil
}},
}}
},
WaitForCacheSync: func() bool {
return true
},

View File

@ -50,6 +50,7 @@ const (
filter = "filter"
prioritize = "prioritize"
bind = "bind"
extendedResourceName = "foo.com/bar"
)
type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
@ -343,6 +344,12 @@ func TestSchedulerExtender(t *testing.T) {
BindVerb: bind,
Weight: 4,
EnableHTTPS: false,
ManagedResources: []schedulerapi.ExtenderManagedResource{
{
Name: extendedResourceName,
IgnoredByScheduler: true,
},
},
},
{
URLPrefix: es3.URL,
@ -420,7 +427,17 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "extender-test-pod"},
Spec: v1.PodSpec{
Containers: []v1.Container{{Name: "container", Image: e2e.GetPauseImageName(cs)}},
Containers: []v1.Container{
{
Name: "container",
Image: e2e.GetPauseImageName(cs),
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
extendedResourceName: *resource.NewQuantity(1, resource.DecimalSI),
},
},
},
},
},
}