mirror of https://github.com/k3s-io/k3s
Merge pull request #33763 from jayunit100/sched-checkservice-predicateCache
Automatic merge from submit-queue Predicate cacheing and cleanup Fix to #31795 First pass @ cleanup and caching of the CheckServiceAffinity function. The cleanup IMO is necessary because the logic around the pod listing and the use of the "implicit selector" (which is reverse engineered to enable the homogenous pod groups). Should still pass the E2Es. @timothysc @wojtek-tpull/6/head
commit
b0a4216182
|
@ -76,7 +76,7 @@ func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) {
|
|||
return f, nil
|
||||
}
|
||||
|
||||
// GetPodServices gets the services that have the selector that match the labels on the given pod
|
||||
// GetPodServices gets the services that have the selector that match the labels on the given pod.
|
||||
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
|
||||
var selector labels.Selector
|
||||
|
||||
|
@ -91,10 +91,6 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service
|
|||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
if len(services) == 0 {
|
||||
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package predicates
|
||||
|
||||
import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
type PredicateMetadataFactory struct {
|
||||
podLister algorithm.PodLister
|
||||
}
|
||||
|
||||
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.MetadataProducer {
|
||||
factory := &PredicateMetadataFactory{
|
||||
podLister,
|
||||
}
|
||||
return factory.GetMetadata
|
||||
}
|
||||
|
||||
// GetMetadata returns the predicateMetadata used which will be used by various predicates.
|
||||
func (pfactory *PredicateMetadataFactory) GetMetadata(pod *api.Pod, nodeNameToInfoMap map[string]*schedulercache.NodeInfo) interface{} {
|
||||
// If we cannot compute metadata, just return nil
|
||||
if pod == nil {
|
||||
return nil
|
||||
}
|
||||
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
predicateMetadata := &predicateMetadata{
|
||||
pod: pod,
|
||||
podBestEffort: isPodBestEffort(pod),
|
||||
podRequest: GetResourceRequest(pod),
|
||||
podPorts: GetUsedPorts(pod),
|
||||
matchingAntiAffinityTerms: matchingTerms,
|
||||
}
|
||||
for predicateName, precomputeFunc := range predicatePrecomputations {
|
||||
glog.V(4).Info("Precompute: %v", predicateName)
|
||||
precomputeFunc(predicateMetadata)
|
||||
}
|
||||
return predicateMetadata
|
||||
}
|
|
@ -36,6 +36,19 @@ import (
|
|||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
// predicatePrecomputations: Helper types/variables...
|
||||
type PredicateMetadataModifier func(pm *predicateMetadata)
|
||||
|
||||
var predicatePrecomputeRegisterLock sync.Mutex
|
||||
var predicatePrecomputations map[string]PredicateMetadataModifier = make(map[string]PredicateMetadataModifier)
|
||||
|
||||
func RegisterPredicatePrecomputation(predicateName string, precomp PredicateMetadataModifier) {
|
||||
predicatePrecomputeRegisterLock.Lock()
|
||||
defer predicatePrecomputeRegisterLock.Unlock()
|
||||
predicatePrecomputations[predicateName] = precomp
|
||||
}
|
||||
|
||||
// Other types for predicate functions...
|
||||
type NodeInfo interface {
|
||||
GetNodeInfo(nodeID string) (*api.Node, error)
|
||||
}
|
||||
|
@ -77,34 +90,21 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
|
|||
return node.(*api.Node), nil
|
||||
}
|
||||
|
||||
// predicateMetadata is a type that is passed as metadata for predicate functions
|
||||
type predicateMetadata struct {
|
||||
podBestEffort bool
|
||||
podRequest *schedulercache.Resource
|
||||
podPorts map[int]bool
|
||||
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
|
||||
}
|
||||
|
||||
// Note that predicateMetdata and matchingPodAntiAffinityTerm need to be declared in the same file
|
||||
// due to the way declarations are processed in predicate declaration unit tests.
|
||||
type matchingPodAntiAffinityTerm struct {
|
||||
term *api.PodAffinityTerm
|
||||
node *api.Node
|
||||
}
|
||||
|
||||
func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) interface{} {
|
||||
// If we cannot compute metadata, just return nil
|
||||
if pod == nil {
|
||||
return nil
|
||||
}
|
||||
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeInfoMap)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return &predicateMetadata{
|
||||
podBestEffort: isPodBestEffort(pod),
|
||||
podRequest: GetResourceRequest(pod),
|
||||
podPorts: GetUsedPorts(pod),
|
||||
matchingAntiAffinityTerms: matchingTerms,
|
||||
}
|
||||
type predicateMetadata struct {
|
||||
pod *api.Pod
|
||||
podBestEffort bool
|
||||
podRequest *schedulercache.Resource
|
||||
podPorts map[int]bool
|
||||
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
|
||||
serviceAffinityMatchingPodList []*api.Pod
|
||||
serviceAffinityMatchingPodServices []*api.Service
|
||||
}
|
||||
|
||||
func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
|
||||
|
@ -637,20 +637,42 @@ type ServiceAffinity struct {
|
|||
labels []string
|
||||
}
|
||||
|
||||
func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) algorithm.FitPredicate {
|
||||
// serviceAffinityPrecomputation should be run once by the scheduler before looping through the Predicate. It is a helper function that
|
||||
// only should be referenced by NewServiceAffinityPredicate.
|
||||
func (s *ServiceAffinity) serviceAffinityPrecomputation(pm *predicateMetadata) {
|
||||
if pm.pod == nil {
|
||||
glog.Errorf("Cannot precompute service affinity, a pod is required to caluculate service affinity.")
|
||||
return
|
||||
}
|
||||
|
||||
var errSvc, errList error
|
||||
// Store services which match the pod.
|
||||
pm.serviceAffinityMatchingPodServices, errSvc = s.serviceLister.GetPodServices(pm.pod)
|
||||
selector := CreateSelectorFromLabels(pm.pod.Labels)
|
||||
// consider only the pods that belong to the same namespace
|
||||
allMatches, errList := s.podLister.List(selector)
|
||||
|
||||
// In the future maybe we will return them as part of the function.
|
||||
if errSvc != nil || errList != nil {
|
||||
glog.Errorf("Some Error were found while precomputing svc affinity: \nservices:%v , \npods:%v", errSvc, errList)
|
||||
}
|
||||
pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace)
|
||||
}
|
||||
|
||||
func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataModifier) {
|
||||
affinity := &ServiceAffinity{
|
||||
podLister: podLister,
|
||||
serviceLister: serviceLister,
|
||||
nodeInfo: nodeInfo,
|
||||
labels: labels,
|
||||
}
|
||||
return affinity.CheckServiceAffinity
|
||||
return affinity.checkServiceAffinity, affinity.serviceAffinityPrecomputation
|
||||
}
|
||||
|
||||
// The checkServiceAffinity predicate matches nodes in such a way to force that
|
||||
// ServiceAffinity.labels are homogenous for pods added to a node.
|
||||
// (i.e. it returns true IFF this pod can be added to this node, such
|
||||
// that all other pods in the same service are running on nodes w/
|
||||
// checkServiceAffinity is a predicate which matches nodes in such a way to force that
|
||||
// ServiceAffinity.labels are homogenous for pods that are scheduled to a node.
|
||||
// (i.e. it returns true IFF this pod can be added to this node such that all other pods in
|
||||
// the same service are running on nodes with
|
||||
// the exact same ServiceAffinity.label values).
|
||||
//
|
||||
// Details:
|
||||
|
@ -660,46 +682,47 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
|
|||
// the match.
|
||||
// Otherwise:
|
||||
// Create an "implicit selector" which gaurantees pods will land on nodes with similar values
|
||||
// for the affinity labels.
|
||||
// for the affinity labels.
|
||||
//
|
||||
// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
|
||||
// These backfilled labels in the selector "L" are defined like so:
|
||||
// - L is a label that the ServiceAffinity object needs as a matching constraints.
|
||||
// - L is not defined in the pod itself already.
|
||||
// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
|
||||
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
//
|
||||
// WARNING: This Predicate is NOT gauranteed to work if some of the predicateMetadata data isn't precomputed...
|
||||
// For that reason it is not exported, i.e. it is highlhy coupled to the implementation of the FitPredicate construction.
|
||||
func (s *ServiceAffinity) checkServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var services []*api.Service
|
||||
var pods []*api.Pod
|
||||
if pm, ok := meta.(*predicateMetadata); ok && (pm.serviceAffinityMatchingPodList != nil || pm.serviceAffinityMatchingPodServices != nil) {
|
||||
services = pm.serviceAffinityMatchingPodServices
|
||||
pods = pm.serviceAffinityMatchingPodList
|
||||
} else {
|
||||
// Make the predicate resilient in case metadata is missing.
|
||||
pm = &predicateMetadata{pod: pod}
|
||||
s.serviceAffinityPrecomputation(pm)
|
||||
pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
|
||||
}
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return false, nil, fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
|
||||
affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))
|
||||
|
||||
// Introspect services IFF we didn't predefine all the affinity labels in the pod itself.
|
||||
// Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
|
||||
if len(s.labels) > len(affinityLabels) {
|
||||
services, err := s.serviceLister.GetPodServices(pod)
|
||||
if err == nil && len(services) > 0 {
|
||||
// just use the first service and get the other pods within the service
|
||||
// TODO: a separate predicate can be created that tries to handle all services for the pod
|
||||
selector := labels.SelectorFromSet(services[0].Spec.Selector)
|
||||
servicePods, err := s.podLister.List(selector)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
// consider only the pods that belong to the same namespace
|
||||
nsServicePods := FilterPodsByNamespace(servicePods, pod.Namespace)
|
||||
if len(nsServicePods) > 0 {
|
||||
// consider any service pod and fetch the node its hosted on
|
||||
otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName)
|
||||
if len(services) > 0 {
|
||||
if len(pods) > 0 {
|
||||
nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(pods[0].Spec.NodeName)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(otherNode.Labels))
|
||||
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Labels))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if the node matches the selector
|
||||
// Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.
|
||||
if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
|
|
@ -119,6 +119,11 @@ func newResourceInitPod(pod *api.Pod, usage ...schedulercache.Resource) *api.Pod
|
|||
return pod
|
||||
}
|
||||
|
||||
func PredicateMetadata(p *api.Pod, nodeInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||
pm := PredicateMetadataFactory{algorithm.FakePodLister{p}}
|
||||
return pm.GetMetadata(p, nodeInfo)
|
||||
}
|
||||
|
||||
func TestPodFitsResources(t *testing.T) {
|
||||
enoughPodsTests := []struct {
|
||||
pod *api.Pod
|
||||
|
@ -233,7 +238,6 @@ func TestPodFitsResources(t *testing.T) {
|
|||
for _, test := range enoughPodsTests {
|
||||
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
|
||||
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.test, err)
|
||||
|
@ -289,7 +293,6 @@ func TestPodFitsResources(t *testing.T) {
|
|||
for _, test := range notEnoughPodsTests {
|
||||
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
|
||||
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.test, err)
|
||||
|
@ -1310,22 +1313,38 @@ func TestServiceAffinity(t *testing.T) {
|
|||
},
|
||||
}
|
||||
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}
|
||||
|
||||
for _, test := range tests {
|
||||
nodes := []api.Node{node1, node2, node3, node4, node5}
|
||||
serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
|
||||
nodeInfo := schedulercache.NewNodeInfo()
|
||||
nodeInfo.SetNode(test.node)
|
||||
fits, reasons, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.test, err)
|
||||
}
|
||||
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
|
||||
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
|
||||
}
|
||||
if fits != test.fits {
|
||||
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
|
||||
testIt := func(skipPrecompute bool) {
|
||||
nodes := []api.Node{node1, node2, node3, node4, node5}
|
||||
nodeInfo := schedulercache.NewNodeInfo()
|
||||
nodeInfo.SetNode(test.node)
|
||||
nodeInfoMap := map[string]*schedulercache.NodeInfo{test.node.Name: nodeInfo}
|
||||
// Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations.
|
||||
predicate, precompute := NewServiceAffinityPredicate(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels)
|
||||
// Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test.
|
||||
RegisterPredicatePrecomputation("checkServiceAffinity-unitTestPredicate", func(pm *predicateMetadata) {
|
||||
if !skipPrecompute {
|
||||
precompute(pm)
|
||||
}
|
||||
})
|
||||
if pmeta, ok := (PredicateMetadata(test.pod, nodeInfoMap)).(*predicateMetadata); ok {
|
||||
fits, reasons, err := predicate(test.pod, pmeta, nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.test, err)
|
||||
}
|
||||
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
|
||||
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
|
||||
}
|
||||
if fits != test.fits {
|
||||
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
|
||||
}
|
||||
} else {
|
||||
t.Errorf("Error casting.")
|
||||
}
|
||||
}
|
||||
|
||||
testIt(false) // Confirm that the predicate works without precomputed data (resilience)
|
||||
testIt(true) // Confirm that the predicate works with the precomputed data (better performance)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1586,7 +1605,6 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
|
|||
}
|
||||
return "", false
|
||||
},
|
||||
|
||||
FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) {
|
||||
if pv.Spec.AWSElasticBlockStore != nil {
|
||||
return pv.Spec.AWSElasticBlockStore.VolumeID, true
|
||||
|
@ -1652,7 +1670,7 @@ func TestPredicatesRegistered(t *testing.T) {
|
|||
if err == nil {
|
||||
functions = append(functions, fileFunctions...)
|
||||
} else {
|
||||
t.Errorf("unexpected error when parsing %s", filePath)
|
||||
t.Errorf("unexpected error %s when parsing %s", err, filePath)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,11 @@ func ExampleFindLabelsInSet() {
|
|||
},
|
||||
},
|
||||
|
||||
{}, // a third pod which will have no effect on anything.
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod3ThatWeWontSee",
|
||||
},
|
||||
},
|
||||
}
|
||||
fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"])
|
||||
AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)
|
||||
|
|
|
@ -54,6 +54,7 @@ type PriorityConfig struct {
|
|||
Weight int
|
||||
}
|
||||
|
||||
// EmptyMetadataProducer returns a no-op MetadataProducer type.
|
||||
func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -66,6 +66,11 @@ func init() {
|
|||
return priorities.PriorityMetadata
|
||||
})
|
||||
|
||||
factory.RegisterPredicateMetadataProducerFactory(
|
||||
func(args factory.PluginFactoryArgs) algorithm.MetadataProducer {
|
||||
return predicates.NewPredicateMetadataFactory(args.PodLister)
|
||||
})
|
||||
|
||||
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
|
||||
// Register the priority function so that its available
|
||||
// but do not include it as part of the default priorities
|
||||
|
|
|
@ -291,8 +291,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||
cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
|
||||
}
|
||||
scheduler := NewGenericScheduler(
|
||||
cache, test.predicates, algorithm.EmptyMetadataProducer,
|
||||
test.prioritizers, extenders)
|
||||
cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
|
||||
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
|
||||
if test.expectsErr {
|
||||
if err == nil {
|
||||
|
|
|
@ -364,10 +364,13 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
predicateMetaProducer, err := f.GetPredicateMetadataProducer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f.Run()
|
||||
|
||||
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityMetaProducer, priorityConfigs, extenders)
|
||||
|
||||
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
|
||||
podBackoff := podBackoff{
|
||||
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
|
||||
clock: realClock{},
|
||||
|
@ -409,6 +412,14 @@ func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProduce
|
|||
return getPriorityMetadataProducer(*pluginArgs)
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.MetadataProducer, error) {
|
||||
pluginArgs, err := f.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return getPredicateMetadataProducer(*pluginArgs)
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) {
|
||||
pluginArgs, err := f.getPluginArgs()
|
||||
if err != nil {
|
||||
|
|
|
@ -78,7 +78,8 @@ var (
|
|||
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
|
||||
|
||||
// Registered metadata producers
|
||||
priorityMetadataProducer MetadataProducerFactory
|
||||
priorityMetadataProducer MetadataProducerFactory
|
||||
predicateMetadataProducer MetadataProducerFactory
|
||||
|
||||
// get equivalence pod function
|
||||
getEquivalencePodFunc algorithm.GetEquivalencePodFunc = nil
|
||||
|
@ -121,12 +122,16 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
|
|||
if policy.Argument != nil {
|
||||
if policy.Argument.ServiceAffinity != nil {
|
||||
predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
|
||||
return predicates.NewServiceAffinityPredicate(
|
||||
predicate, precomputationFunction := predicates.NewServiceAffinityPredicate(
|
||||
args.PodLister,
|
||||
args.ServiceLister,
|
||||
args.NodeInfo,
|
||||
policy.Argument.ServiceAffinity.Labels,
|
||||
)
|
||||
|
||||
// Once we generate the predicate we should also Register the Precomputation
|
||||
predicates.RegisterPredicatePrecomputation(policy.Name, precomputationFunction)
|
||||
return predicate
|
||||
}
|
||||
} else if policy.Argument.LabelsPresence != nil {
|
||||
predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
|
||||
|
@ -163,6 +168,12 @@ func RegisterPriorityMetadataProducerFactory(factory MetadataProducerFactory) {
|
|||
priorityMetadataProducer = factory
|
||||
}
|
||||
|
||||
func RegisterPredicateMetadataProducerFactory(factory MetadataProducerFactory) {
|
||||
schedulerFactoryMutex.Lock()
|
||||
defer schedulerFactoryMutex.Unlock()
|
||||
predicateMetadataProducer = factory
|
||||
}
|
||||
|
||||
// DEPRECATED
|
||||
// Use Map-Reduce pattern for priority functions.
|
||||
// Registers a priority function with the algorithm registry. Returns the name,
|
||||
|
@ -312,6 +323,16 @@ func getPriorityMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProd
|
|||
return priorityMetadataProducer(args), nil
|
||||
}
|
||||
|
||||
func getPredicateMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProducer, error) {
|
||||
schedulerFactoryMutex.Lock()
|
||||
defer schedulerFactoryMutex.Unlock()
|
||||
|
||||
if predicateMetadataProducer == nil {
|
||||
return algorithm.EmptyMetadataProducer, nil
|
||||
}
|
||||
return predicateMetadataProducer(args), nil
|
||||
}
|
||||
|
||||
func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) {
|
||||
schedulerFactoryMutex.Lock()
|
||||
defer schedulerFactoryMutex.Unlock()
|
||||
|
|
|
@ -61,14 +61,15 @@ func (f *FitError) Error() string {
|
|||
}
|
||||
|
||||
type genericScheduler struct {
|
||||
cache schedulercache.Cache
|
||||
predicates map[string]algorithm.FitPredicate
|
||||
priorityMetaProducer algorithm.MetadataProducer
|
||||
prioritizers []algorithm.PriorityConfig
|
||||
extenders []algorithm.SchedulerExtender
|
||||
pods algorithm.PodLister
|
||||
lastNodeIndexLock sync.Mutex
|
||||
lastNodeIndex uint64
|
||||
cache schedulercache.Cache
|
||||
predicates map[string]algorithm.FitPredicate
|
||||
priorityMetaProducer algorithm.MetadataProducer
|
||||
predicateMetaProducer algorithm.MetadataProducer
|
||||
prioritizers []algorithm.PriorityConfig
|
||||
extenders []algorithm.SchedulerExtender
|
||||
pods algorithm.PodLister
|
||||
lastNodeIndexLock sync.Mutex
|
||||
lastNodeIndex uint64
|
||||
|
||||
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
|
||||
|
||||
|
@ -104,7 +105,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
|
|||
// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here
|
||||
|
||||
trace.Step("Computing predicates")
|
||||
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders)
|
||||
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -153,7 +154,9 @@ func findNodesThatFit(
|
|||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
nodes []*api.Node,
|
||||
predicateFuncs map[string]algorithm.FitPredicate,
|
||||
extenders []algorithm.SchedulerExtender) ([]*api.Node, FailedPredicateMap, error) {
|
||||
extenders []algorithm.SchedulerExtender,
|
||||
metadataProducer algorithm.MetadataProducer,
|
||||
) ([]*api.Node, FailedPredicateMap, error) {
|
||||
var filtered []*api.Node
|
||||
failedPredicateMap := FailedPredicateMap{}
|
||||
|
||||
|
@ -163,11 +166,12 @@ func findNodesThatFit(
|
|||
// Create filtered list with enough space to avoid growing it
|
||||
// and allow assigning.
|
||||
filtered = make([]*api.Node, len(nodes))
|
||||
meta := predicates.PredicateMetadata(pod, nodeNameToInfo)
|
||||
errs := []error{}
|
||||
|
||||
var predicateResultLock sync.Mutex
|
||||
var filteredLen int32
|
||||
|
||||
// We can use the same metadata producer for all nodes.
|
||||
meta := metadataProducer(pod, nodeNameToInfo)
|
||||
checkNode := func(i int) {
|
||||
nodeName := nodes[i].Name
|
||||
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
|
||||
|
@ -381,15 +385,17 @@ func EqualPriorityMap(_ *api.Pod, _ interface{}, nodeInfo *schedulercache.NodeIn
|
|||
func NewGenericScheduler(
|
||||
cache schedulercache.Cache,
|
||||
predicates map[string]algorithm.FitPredicate,
|
||||
priorityMetaProducer algorithm.MetadataProducer,
|
||||
predicateMetaProducer algorithm.MetadataProducer,
|
||||
prioritizers []algorithm.PriorityConfig,
|
||||
priorityMetaProducer algorithm.MetadataProducer,
|
||||
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
|
||||
return &genericScheduler{
|
||||
cache: cache,
|
||||
predicates: predicates,
|
||||
priorityMetaProducer: priorityMetaProducer,
|
||||
prioritizers: prioritizers,
|
||||
extenders: extenders,
|
||||
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
|
||||
cache: cache,
|
||||
predicates: predicates,
|
||||
predicateMetaProducer: predicateMetaProducer,
|
||||
prioritizers: prioritizers,
|
||||
priorityMetaProducer: priorityMetaProducer,
|
||||
extenders: extenders,
|
||||
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,8 +277,7 @@ func TestGenericScheduler(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
|
||||
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
|
||||
prioritizers: []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}},
|
||||
nodes: []string{"1", "2"},
|
||||
expectsErr: true,
|
||||
|
@ -302,8 +301,8 @@ func TestGenericScheduler(t *testing.T) {
|
|||
}
|
||||
|
||||
scheduler := NewGenericScheduler(
|
||||
cache, test.predicates, algorithm.EmptyMetadataProducer,
|
||||
test.prioritizers, []algorithm.SchedulerExtender{})
|
||||
cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer,
|
||||
[]algorithm.SchedulerExtender{})
|
||||
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
|
||||
|
||||
if !reflect.DeepEqual(err, test.wErr) {
|
||||
|
@ -323,7 +322,7 @@ func TestFindFitAllError(t *testing.T) {
|
|||
"2": schedulercache.NewNodeInfo(),
|
||||
"1": schedulercache.NewNodeInfo(),
|
||||
}
|
||||
_, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil)
|
||||
_, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -357,7 +356,7 @@ func TestFindFitSomeError(t *testing.T) {
|
|||
nodeNameToInfo[name].SetNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
|
||||
}
|
||||
|
||||
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil)
|
||||
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
|
@ -388,6 +388,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
|
|||
predicateMap,
|
||||
algorithm.EmptyMetadataProducer,
|
||||
[]algorithm.PriorityConfig{},
|
||||
algorithm.EmptyMetadataProducer,
|
||||
[]algorithm.SchedulerExtender{})
|
||||
bindingChan := make(chan *api.Binding, 1)
|
||||
errChan := make(chan error, 1)
|
||||
|
|
Loading…
Reference in New Issue