mirror of https://github.com/k3s-io/k3s
Part 1 of pr #33763: cleanup CheckServiceAffinity in preparation for
predicate injection support, Update metadata structpull/6/head
parent
71249bb82b
commit
182e89b84e
|
@ -637,38 +637,36 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
|
|||
return affinity.CheckServiceAffinity
|
||||
}
|
||||
|
||||
// CheckServiceAffinity ensures that only the nodes that match the specified labels are considered for scheduling.
|
||||
// The set of labels to be considered are provided to the struct (ServiceAffinity).
|
||||
// The pod is checked for the labels and any missing labels are then checked in the node
|
||||
// that hosts the service pods (peers) for the given pod.
|
||||
// The checkServiceAffinity predicate matches nodes in such a way to force that
|
||||
// ServiceAffinity.labels are homogenous for pods added to a node.
|
||||
// (i.e. it returns true IFF this pod can be added to this node, such
|
||||
// that all other pods in the same service are running on nodes w/
|
||||
// the exact same ServiceAffinity.label values).
|
||||
//
|
||||
// We add an implicit selector requiring some particular value V for label L to a pod, if:
|
||||
// - L is listed in the ServiceAffinity object that is passed into the function
|
||||
// - the pod does not have any NodeSelector for L
|
||||
// - some other pod from the same service is already scheduled onto a node that has value V for label L
|
||||
// Details:
|
||||
//
|
||||
// If (the svc affinity labels are not a subset of pod's label selectors )
|
||||
// The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate
|
||||
// the match.
|
||||
// Otherwise:
|
||||
// Create an "implicit selector" which gaurantees pods will land on nodes with similar values
|
||||
// for the affinity labels.
|
||||
// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
|
||||
// These backfilled labels in the selector "L" are defined like so:
|
||||
// - L is a label that the ServiceAffinity object needs as a matching constraints.
|
||||
// - L is not defined in the pod itself already.
|
||||
// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
|
||||
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return false, nil, fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
var affinitySelector labels.Selector
|
||||
|
||||
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
|
||||
affinityLabels := map[string]string{}
|
||||
nodeSelector := labels.Set(pod.Spec.NodeSelector)
|
||||
labelsExist := true
|
||||
for _, l := range s.labels {
|
||||
if nodeSelector.Has(l) {
|
||||
affinityLabels[l] = nodeSelector.Get(l)
|
||||
} else {
|
||||
// the current pod does not specify all the labels, look in the existing service pods
|
||||
labelsExist = false
|
||||
}
|
||||
}
|
||||
affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))
|
||||
|
||||
// skip looking at other pods in the service if the current pod defines all the required affinity labels
|
||||
if !labelsExist {
|
||||
// Introspect services IFF we didn't predefine all the affinity labels in the pod itself.
|
||||
if len(s.labels) > len(affinityLabels) {
|
||||
services, err := s.serviceLister.GetPodServices(pod)
|
||||
if err == nil && len(services) > 0 {
|
||||
// just use the first service and get the other pods within the service
|
||||
|
@ -679,40 +677,20 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
|
|||
return false, nil, err
|
||||
}
|
||||
// consider only the pods that belong to the same namespace
|
||||
nsServicePods := []*api.Pod{}
|
||||
for _, nsPod := range servicePods {
|
||||
if nsPod.Namespace == pod.Namespace {
|
||||
nsServicePods = append(nsServicePods, nsPod)
|
||||
}
|
||||
}
|
||||
nsServicePods := FilterPodsByNamespace(servicePods, pod.Namespace)
|
||||
if len(nsServicePods) > 0 {
|
||||
// consider any service pod and fetch the node its hosted on
|
||||
otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
for _, l := range s.labels {
|
||||
// If the pod being scheduled has the label value specified, do not override it
|
||||
if _, exists := affinityLabels[l]; exists {
|
||||
continue
|
||||
}
|
||||
if labels.Set(otherNode.Labels).Has(l) {
|
||||
affinityLabels[l] = labels.Set(otherNode.Labels).Get(l)
|
||||
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(otherNode.Labels))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if there are no existing pods in the service, consider all nodes
|
||||
if len(affinityLabels) == 0 {
|
||||
affinitySelector = labels.Everything()
|
||||
} else {
|
||||
affinitySelector = labels.Set(affinityLabels).AsSelector()
|
||||
}
|
||||
|
||||
// check if the node matches the selector
|
||||
if affinitySelector.Matches(labels.Set(node.Labels)) {
|
||||
if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
|
||||
return true, nil, nil
|
||||
}
|
||||
return false, []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}, nil
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package predicates
|
||||
|
||||
import "k8s.io/kubernetes/pkg/labels"
|
||||
import "k8s.io/kubernetes/pkg/api"
|
||||
|
||||
// FindLabelsInSet gets as many key/value pairs as possible out of a label set.
|
||||
func FindLabelsInSet(labelsToKeep []string, selector labels.Set) map[string]string {
|
||||
aL := make(map[string]string)
|
||||
for _, l := range labelsToKeep {
|
||||
if selector.Has(l) {
|
||||
aL[l] = selector.Get(l)
|
||||
}
|
||||
}
|
||||
return aL
|
||||
}
|
||||
|
||||
// AddUnsetLabelsToMap backfills missing values with values we find in a map.
|
||||
func AddUnsetLabelsToMap(aL map[string]string, labelsToAdd []string, labelSet labels.Set) {
|
||||
for _, l := range labelsToAdd {
|
||||
// if the label is already there, dont overwrite it.
|
||||
if _, exists := aL[l]; exists {
|
||||
continue
|
||||
}
|
||||
// otherwise, backfill this label.
|
||||
if labelSet.Has(l) {
|
||||
aL[l] = labelSet.Get(l)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FilterPodsByNamespace filters pods outside a namespace from the given list.
|
||||
func FilterPodsByNamespace(pods []*api.Pod, ns string) []*api.Pod {
|
||||
filtered := []*api.Pod{}
|
||||
for _, nsPod := range pods {
|
||||
if nsPod.Namespace == ns {
|
||||
filtered = append(filtered, nsPod)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
// CreateSelectorFromLabels is used to define a selector that corresponds to the keys in a map.
|
||||
func CreateSelectorFromLabels(aL map[string]string) labels.Selector {
|
||||
if aL == nil || len(aL) == 0 {
|
||||
return labels.Everything()
|
||||
}
|
||||
return labels.Set(aL).AsSelector()
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package predicates
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
// ExampleUtils is a https://blog.golang.org/examples styled unit test.
|
||||
func ExampleFindLabelsInSet() {
|
||||
labelSubset := labels.Set{}
|
||||
labelSubset["label1"] = "value1"
|
||||
labelSubset["label2"] = "value2"
|
||||
// Lets make believe that these pods are on the cluster.
|
||||
// Utility functions will inspect their labels, filter them, and so on.
|
||||
nsPods := []*api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod1",
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{
|
||||
"label1": "wontSeeThis",
|
||||
"label2": "wontSeeThis",
|
||||
"label3": "will_see_this",
|
||||
},
|
||||
},
|
||||
}, // first pod which will be used via the utilities
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod2",
|
||||
Namespace: "ns1",
|
||||
},
|
||||
},
|
||||
|
||||
{}, // a third pod which will have no effect on anything.
|
||||
}
|
||||
fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"])
|
||||
AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)
|
||||
fmt.Println(labelSubset)
|
||||
|
||||
for _, pod := range FilterPodsByNamespace(nsPods, "ns1") {
|
||||
fmt.Print(pod.Name, ",")
|
||||
}
|
||||
// Output:
|
||||
// will_see_this
|
||||
// label1=value1,label2=value2,label3=will_see_this
|
||||
// pod1,pod2,
|
||||
}
|
|
@ -28,7 +28,8 @@ type priorityMetadata struct {
|
|||
affinity *api.Affinity
|
||||
}
|
||||
|
||||
func PriorityMetadata(pod *api.Pod) interface{} {
|
||||
// PriorityMetadata is a MetadataProducer. Node info can be nil.
|
||||
func PriorityMetadata(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||
// If we cannot compute metadata, just return nil
|
||||
if pod == nil {
|
||||
return nil
|
||||
|
|
|
@ -39,7 +39,7 @@ type PriorityMapFunction func(pod *api.Pod, meta interface{}, nodeInfo *schedule
|
|||
type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error
|
||||
|
||||
// MetdataProducer is a function that computes metadata for a given pod.
|
||||
type MetadataProducer func(pod *api.Pod) interface{}
|
||||
type MetadataProducer func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{}
|
||||
|
||||
// DEPRECATED
|
||||
// Use Map-Reduce pattern for priority functions.
|
||||
|
@ -54,7 +54,7 @@ type PriorityConfig struct {
|
|||
Weight int
|
||||
}
|
||||
|
||||
func EmptyMetadataProducer(pod *api.Pod) interface{} {
|
||||
func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -113,8 +113,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
|
|||
}
|
||||
|
||||
trace.Step("Prioritizing")
|
||||
meta := g.priorityMetaProducer(pod)
|
||||
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, meta, g.prioritizers, filteredNodes, g.extenders)
|
||||
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
|
||||
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue