Merge pull request #50949 from bsalamat/preemption_eviction

Automatic merge from submit-queue

Add pod preemption to the scheduler

**What this PR does / why we need it**:
This is the last of a series of PRs to add priority-based preemption to the scheduler. This PR connects the preemption logic to the scheduler workflow.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #48646

**Special notes for your reviewer**:
This PR includes other PRs which are under review (#50805, #50405, #50190). All the new code is located in 43627afdf9.

**Release note**:

```release-note
Add priority-based preemption to the scheduler.
```

ref/ #47604

/assign @davidopp 

@kubernetes/sig-scheduling-pr-reviews
pull/6/head
Kubernetes Submit Queue 2017-09-08 14:19:42 -07:00 committed by GitHub
commit f695a3120a
34 changed files with 1900 additions and 91 deletions

View File

@ -330,7 +330,8 @@ func ClusterRoles() []rbac.ClusterRole {
rbac.NewRule("get", "update", "patch", "delete").Groups(legacyGroup).Resources("endpoints").Names("kube-scheduler").RuleOrDie(),
// fundamental resources
rbac.NewRule(Read...).Groups(legacyGroup).Resources("nodes", "pods").RuleOrDie(),
rbac.NewRule(Read...).Groups(legacyGroup).Resources("nodes").RuleOrDie(),
rbac.NewRule("get", "list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbac.NewRule("create").Groups(legacyGroup).Resources("pods/binding", "bindings").RuleOrDie(),
rbac.NewRule("update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
// things that select pods

View File

@ -580,8 +580,16 @@ items:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- watch

View File

@ -36,6 +36,7 @@ go_library(
"testutil.go",
],
deps = [
"//pkg/features:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/core:go_default_library",
@ -47,6 +48,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",

View File

@ -25,6 +25,11 @@ import (
var (
// The predicateName tries to be consistent as the predicate name used in DefaultAlgorithmProvider defined in
// defaults.go (which tend to be stable for backward compatibility)
// NOTE: If you add a new predicate failure error for a predicate that can never
// be made to pass by removing pods, or you change an existing predicate so that
// it can never be made to pass by removing pods, you need to add the predicate
// failure error in nodesWherePreemptionMightHelp() in scheduler/core/generic_scheduler.go
ErrDiskConflict = newPredicateFailureError("NoDiskConflict")
ErrVolumeZoneConflict = newPredicateFailureError("NoVolumeZoneConflict")
ErrNodeSelectorNotMatch = newPredicateFailureError("MatchNodeSelector")

View File

@ -40,8 +40,8 @@ type matchingPodAntiAffinityTerm struct {
node *v1.Node
}
// NOTE: When new fields are added/removed or logic is changed, please make sure
// that RemovePod and AddPod functions are updated to work with the new changes.
// NOTE: When new fields are added/removed or logic is changed, please make sure that
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
@ -54,6 +54,9 @@ type predicateMetadata struct {
serviceAffinityMatchingPodServices []*v1.Service
}
// Ensure that predicateMetadata implements algorithm.PredicateMetadata.
var _ algorithm.PredicateMetadata = &predicateMetadata{}
// PredicateMetadataProducer: Helper types/variables...
type PredicateMetadataProducer func(pm *predicateMetadata)
@ -66,7 +69,7 @@ func RegisterPredicateMetadataProducer(predicateName string, precomp PredicateMe
predicateMetadataProducers[predicateName] = precomp
}
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.MetadataProducer {
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.PredicateMetadataProducer {
factory := &PredicateMetadataFactory{
podLister,
}
@ -74,7 +77,7 @@ func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.Metada
}
// GetMetadata returns the predicateMetadata used which will be used by various predicates.
func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulercache.NodeInfo) interface{} {
func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulercache.NodeInfo) algorithm.PredicateMetadata {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
@ -159,3 +162,27 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
}
return nil
}
// ShallowCopy copies a metadata struct into a new struct and creates a copy of
// its maps and slices, but it does not copy the contents of pointer values.
func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
newPredMeta := &predicateMetadata{
pod: meta.pod,
podBestEffort: meta.podBestEffort,
podRequest: meta.podRequest,
serviceAffinityInUse: meta.serviceAffinityInUse,
}
newPredMeta.podPorts = map[int]bool{}
for k, v := range meta.podPorts {
newPredMeta.podPorts[k] = v
}
newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{}
for k, v := range meta.matchingAntiAffinityTerms {
newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...)
}
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
meta.serviceAffinityMatchingPodList...)
return (algorithm.PredicateMetadata)(newPredMeta)
}

View File

@ -355,3 +355,46 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
}
}
}
// TestPredicateMetadata_ShallowCopy tests the ShallowCopy function. It is based
// on the idea that shallow-copy should produce an object that is deep-equal to the original
// object.
func TestPredicateMetadata_ShallowCopy(t *testing.T) {
source := predicateMetadata{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "testns",
},
},
podBestEffort: true,
podRequest: &schedulercache.Resource{
MilliCPU: 1000,
Memory: 300,
AllowedPodNumber: 4,
},
podPorts: map[int]bool{1234: true, 456: false},
matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{
"term1": {
{
term: &v1.PodAffinityTerm{TopologyKey: "node"},
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
},
},
},
},
serviceAffinityInUse: true,
serviceAffinityMatchingPodList: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}},
},
serviceAffinityMatchingPodServices: []*v1.Service{
{ObjectMeta: metav1.ObjectMeta{Name: "service1"}},
},
}
if !reflect.DeepEqual(source.ShallowCopy().(*predicateMetadata), &source) {
t.Errorf("Copy is not equal to source!")
}
}

View File

@ -45,6 +45,10 @@ import (
"github.com/golang/glog"
)
const (
MatchInterPodAffinity = "MatchInterPodAffinity"
)
// NodeInfo: Other types for predicate functions...
type NodeInfo interface {
GetNodeInfo(nodeID string) (*v1.Node, error)
@ -152,7 +156,7 @@ func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool {
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image.
// - ISCSI forbids if any two pods share at least same IQN, LUN and Target
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
for _, v := range pod.Spec.Volumes {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
@ -250,7 +254,7 @@ func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []v1.Volume, namespace s
return nil
}
func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
@ -371,7 +375,7 @@ func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolum
return c.predicate
}
func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
@ -529,7 +533,7 @@ func podName(pod *v1.Pod) string {
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
@ -658,7 +662,7 @@ func podMatchesNodeLabels(pod *v1.Pod, node *v1.Node) bool {
}
// PodMatchNodeSelector checks if a pod node selector matches the node label.
func PodMatchNodeSelector(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PodMatchNodeSelector(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
@ -670,7 +674,7 @@ func PodMatchNodeSelector(pod *v1.Pod, meta interface{}, nodeInfo *schedulercach
}
// PodFitsHost checks if a pod spec node name matches the current node.
func PodFitsHost(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PodFitsHost(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if len(pod.Spec.NodeName) == 0 {
return true, nil, nil
}
@ -709,7 +713,7 @@ func NewNodeLabelPredicate(labels []string, presence bool) algorithm.FitPredicat
// Alternately, eliminating nodes that have a certain label, regardless of value, is also useful
// A node may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this node
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
@ -792,7 +796,7 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
//
// WARNING: This Predicate is NOT guaranteed to work if some of the predicateMetadata data isn't precomputed...
// For that reason it is not exported, i.e. it is highly coupled to the implementation of the FitPredicate construction.
func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var services []*v1.Service
var pods []*v1.Pod
if pm, ok := meta.(*predicateMetadata); ok && (pm.serviceAffinityMatchingPodList != nil || pm.serviceAffinityMatchingPodServices != nil) {
@ -804,6 +808,7 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no
s.serviceAffinityMetadataProducer(pm)
pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
}
filteredPods := nodeInfo.FilterOutPods(pods)
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
@ -813,8 +818,8 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no
// Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
if len(s.labels) > len(affinityLabels) {
if len(services) > 0 {
if len(pods) > 0 {
nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(pods[0].Spec.NodeName)
if len(filteredPods) > 0 {
nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(filteredPods[0].Spec.NodeName)
if err != nil {
return false, nil, err
}
@ -830,7 +835,7 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no
}
// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
func PodFitsHostPorts(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var wantPorts map[int]bool
if predicateMeta, ok := meta.(*predicateMetadata); ok {
wantPorts = predicateMeta.podPorts
@ -871,7 +876,7 @@ func haveSame(a1, a2 []string) bool {
// GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates
// that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need
func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func GeneralPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
fit, reasons, err := noncriticalPredicates(pod, meta, nodeInfo)
if err != nil {
@ -893,7 +898,7 @@ func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.N
}
// noncriticalPredicates are the predicates that only non-critical pods need
func noncriticalPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func noncriticalPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)
if err != nil {
@ -907,7 +912,7 @@ func noncriticalPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercac
}
// EssentialPredicates are the predicates that all pods, including critical pods, need
func EssentialPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func EssentialPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
fit, reasons, err := PodFitsHost(pod, meta, nodeInfo)
if err != nil {
@ -953,7 +958,7 @@ func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister) algor
// InterPodAffinityMatches checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration.
// First return value indicates whether a pod can be scheduled on the specified node while the second return value indicates the
// predicate failure reasons if the pod cannot be scheduled on the specified node.
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
@ -1138,7 +1143,7 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
// Checks if scheduling the pod onto this node would break any anti-affinity
// rules indicated by the existing pods.
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) bool {
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) bool {
node := nodeInfo.Node()
if node == nil {
return false
@ -1246,7 +1251,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
}
// PodToleratesNodeTaints checks if a pod tolerations can tolerate the node taints
func PodToleratesNodeTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PodToleratesNodeTaints(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute
@ -1254,7 +1259,7 @@ func PodToleratesNodeTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulerca
}
// PodToleratesNodeNoExecuteTaints checks if a pod tolerations can tolerate the node's NoExecute taints
func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute
})
@ -1279,7 +1284,7 @@ func isPodBestEffort(pod *v1.Pod) bool {
// CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node
// reporting memory pressure condition.
func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var podBestEffort bool
if predicateMeta, ok := meta.(*predicateMetadata); ok {
podBestEffort = predicateMeta.podBestEffort
@ -1301,7 +1306,7 @@ func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *s
// CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node
// reporting disk pressure condition.
func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// check if node is under disk pressure
if nodeInfo.DiskPressureCondition() == v1.ConditionTrue {
return false, []algorithm.PredicateFailureReason{ErrNodeUnderDiskPressure}, nil
@ -1311,7 +1316,7 @@ func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *sch
// CheckNodeConditionPredicate checks if a pod can be scheduled on a node reporting out of disk,
// network unavailable and not ready condition. Only node conditions are accounted in this predicate.
func CheckNodeConditionPredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func CheckNodeConditionPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
reasons := []algorithm.PredicateFailureReason{}
if nodeInfo == nil || nodeInfo.Node() == nil {
@ -1359,7 +1364,7 @@ func NewVolumeNodePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolum
return c.predicate
}
func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) {
return true, nil, nil
}

View File

@ -142,7 +142,7 @@ func newResourceInitPod(pod *v1.Pod, usage ...schedulercache.Resource) *v1.Pod {
return pod
}
func PredicateMetadata(p *v1.Pod, nodeInfo map[string]*schedulercache.NodeInfo) interface{} {
func PredicateMetadata(p *v1.Pod, nodeInfo map[string]*schedulercache.NodeInfo) algorithm.PredicateMetadata {
pm := PredicateMetadataFactory{schedulertesting.FakePodLister{p}}
return pm.GetMetadata(p, nodeInfo)
}
@ -3015,7 +3015,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
nodeInfo.SetNode(&node)
nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo}
var meta interface{} = nil
var meta algorithm.PredicateMetadata = nil
if !test.nometa {
meta = PredicateMetadata(test.pod, nodeInfoMap)

View File

@ -47,6 +47,10 @@ type SchedulerExtender interface {
// onto machines.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate

View File

@ -27,8 +27,7 @@ import (
// FitPredicate is a function that indicates if a pod fits into an existing node.
// The failure information is given by the error.
// TODO: Change interface{} to a specific type.
type FitPredicate func(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
// PriorityMapFunction is a function that computes per-node results for a given node.
// TODO: Figure out the exact API of this method.
@ -41,7 +40,12 @@ type PriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *scheduler
// TODO: Change interface{} to a specific type.
type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error
// MetadataProducer is a function that computes metadata for a given pod.
// PredicateMetadataProducer is a function that computes predicate metadata for a given pod.
type PredicateMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) PredicateMetadata
// MetadataProducer is a function that computes metadata for a given pod. This
// is now used for only for priority functions. For predicates please use PredicateMetadataProducer.
// TODO: Rename this once we have a specific type for priority metadata producer.
type MetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{}
// DEPRECATED
@ -57,6 +61,11 @@ type PriorityConfig struct {
Weight int
}
// EmptyPredicateMetadataProducer returns a no-op MetadataProducer type.
func EmptyPredicateMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) PredicateMetadata {
return nil
}
// EmptyMetadataProducer returns a no-op MetadataProducer type.
func EmptyMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
return nil
@ -147,3 +156,9 @@ type EmptyStatefulSetLister struct{}
func (f EmptyStatefulSetLister) GetPodStatefulSets(pod *v1.Pod) (sss []*apps.StatefulSet, err error) {
return nil, nil
}
type PredicateMetadata interface {
ShallowCopy() PredicateMetadata
AddPod(addedPod *v1.Pod, nodeInfo *schedulercache.NodeInfo) error
RemovePod(deletedPod *v1.Pod) error
}

View File

@ -52,7 +52,7 @@ const (
func init() {
// Register functions that extract metadata used by predicates and priorities computations.
factory.RegisterPredicateMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.MetadataProducer {
func(args factory.PluginFactoryArgs) algorithm.PredicateMetadataProducer {
return predicates.NewPredicateMetadataFactory(args.PodLister)
})
factory.RegisterPriorityMetadataProducerFactory(
@ -155,7 +155,7 @@ func defaultPredicates() sets.String {
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
"MatchInterPodAffinity",
predicates.MatchInterPodAffinity,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},

View File

@ -45,6 +45,7 @@ go_library(
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/golang/groupcache/lru:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",

View File

@ -183,6 +183,8 @@ func (f *FakeExtender) IsBinder() bool {
return true
}
var _ algorithm.SchedulerExtender = &FakeExtender{}
func TestGenericSchedulerWithExtenders(t *testing.T) {
tests := []struct {
name string
@ -314,7 +316,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
scheduler := NewGenericScheduler(
cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {

View File

@ -18,6 +18,7 @@ package core
import (
"fmt"
"math"
"sort"
"strings"
"sync"
@ -32,6 +33,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
"github.com/golang/glog"
)
@ -45,7 +47,14 @@ type FitError struct {
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
const NoNodeAvailableMsg = "No nodes are available that match all of the following predicates"
const (
NoNodeAvailableMsg = "No nodes are available that match all of the predicates"
// NominatedNodeAnnotationKey is used to annotate a pod that has preempted other pods.
// The scheduler uses the annotation to find that the pod shouldn't preempt more pods
// when it gets to the head of scheduling queue again.
// See podEligibleToPreemptOthers() for more information.
NominatedNodeAnnotationKey = "NominatedNodeName"
)
// Error returns detailed information of why the pod failed to fit on each node
func (f *FitError) Error() string {
@ -73,7 +82,7 @@ type genericScheduler struct {
equivalenceCache *EquivalenceCache
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
pods algorithm.PodLister
@ -159,6 +168,65 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
return priorityList[ix].Host, nil
}
// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node and the list of preempted pods if such a node is found.
// TODO(bsalamat): Add priority-based scheduling. More info: today one or more
// pending pods (different from the pod that triggered the preemption(s)) may
// schedule into some portion of the resources freed up by the preemption(s)
// before the pod that triggered the preemption(s) has a chance to schedule
// there, thereby preventing the pod that triggered the preemption(s) from
// scheduling. Solution is given at:
// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanics
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil
}
err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return nil, nil, err
}
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
glog.V(5).Infof("Pod %v is not eligible for more preemption.", pod.Name)
return nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
glog.V(3).Infof("Preemption will not help schedule pod %v on any node.", pod.Name)
return nil, nil, nil
}
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer)
if err != nil {
return nil, nil, err
}
for len(nodeToPods) > 0 {
node := pickOneNodeForPreemption(nodeToPods)
if node == nil {
return nil, nil, err
}
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
if passes && pErr == nil {
return node, nodeToPods[node], err
}
if pErr != nil {
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
}
// Remove the node from the map and try to pick a different node.
delete(nodeToPods, node)
}
return nil, nil, err
}
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func findNodesThatFit(
@ -167,7 +235,7 @@ func findNodesThatFit(
nodes []*v1.Node,
predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.MetadataProducer,
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
@ -232,7 +300,7 @@ func findNodesThatFit(
}
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate,
func podFitsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache) (bool, []algorithm.PredicateFailureReason, error) {
var (
equivalenceHash uint64
@ -422,11 +490,288 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
}, nil
}
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
// pods in each map entry are ordered by decreasing priority.
// It picks a node based on the following criteria:
// 1. A node with minimum highest priority victim is picked.
// 2. Ties are broken by sum of priorities of all victims.
// 3. If there are still ties, node with the minimum number of victims is picked.
// 4. If there are still ties, the first such node is picked (sort of randomly).
//TODO(bsalamat): Try to reuse the "nodeScore" slices in order to save GC time.
func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
type nodeScore struct {
node *v1.Node
highestPriority int32
sumPriorities int64
numPods int
}
if len(nodesToPods) == 0 {
return nil
}
minHighestPriority := int32(math.MaxInt32)
minPriorityScores := []*nodeScore{}
for node, pods := range nodesToPods {
if len(pods) == 0 {
// We found a node that doesn't need any preemption. Return it!
// This should happen rarely when one or more pods are terminated between
// the time that scheduler tries to schedule the pod and the time that
// preemption logic tries to find nodes for preemption.
return node
}
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
minPriorityScores = nil
}
if highestPodPriority == minHighestPriority {
minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)})
}
}
if len(minPriorityScores) == 1 {
return minPriorityScores[0].node
}
// There are a few nodes with minimum highest priority victim. Find the
// smallest sum of priorities.
minSumPriorities := int64(math.MaxInt64)
minSumPriorityScores := []*nodeScore{}
for _, nodeScore := range minPriorityScores {
var sumPriorities int64
for _, pod := range nodesToPods[nodeScore.node] {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
// priority (and similar scenarios).
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
minSumPriorityScores = nil
}
nodeScore.sumPriorities = sumPriorities
if sumPriorities == minSumPriorities {
minSumPriorityScores = append(minSumPriorityScores, nodeScore)
}
}
if len(minSumPriorityScores) == 1 {
return minSumPriorityScores[0].node
}
// There are a few nodes with minimum highest priority victim and sum of priorities.
// Find one with the minimum number of pods.
minNumPods := math.MaxInt32
minNumPodScores := []*nodeScore{}
for _, nodeScore := range minSumPriorityScores {
if nodeScore.numPods < minNumPods {
minNumPods = nodeScore.numPods
minNumPodScores = nil
}
if nodeScore.numPods == minNumPods {
minNumPodScores = append(minNumPodScores, nodeScore)
}
}
// At this point, even if there are more than one node with the same score,
// return the first one.
if len(minNumPodScores) > 0 {
return minNumPodScores[0].node
}
glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil
}
// selectNodesForPreemption finds all the nodes with possible victims for
// preemption in parallel.
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
potentialNodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
) (map[*v1.Node][]*v1.Pod, error) {
nodeNameToPods := map[*v1.Node][]*v1.Pod{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates)
if fits {
resultLock.Lock()
nodeNameToPods[potentialNodes[i]] = pods
resultLock.Unlock()
}
}
workqueue.Parallelize(16, len(potentialNodes), checkNode)
return nodeNameToPods, nil
}
func nodePassesExtendersForPreemption(
pod *v1.Pod,
nodeName string,
victims []*v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
extenders []algorithm.SchedulerExtender) (bool, error) {
// If there are any extenders, run them and filter the list of candidate nodes.
if len(extenders) == 0 {
return true, nil
}
// Remove the victims from the corresponding nodeInfo and send nodes to the
// extenders for filtering.
originalNodeInfo := nodeNameToInfo[nodeName]
nodeInfoCopy := nodeNameToInfo[nodeName].Clone()
for _, victim := range victims {
nodeInfoCopy.RemovePod(victim)
}
nodeNameToInfo[nodeName] = nodeInfoCopy
defer func() { nodeNameToInfo[nodeName] = originalNodeInfo }()
filteredNodes := []*v1.Node{nodeInfoCopy.Node()}
for _, extender := range extenders {
var err error
var failedNodesMap map[string]string
filteredNodes, failedNodesMap, err = extender.Filter(pod, filteredNodes, nodeNameToInfo)
if err != nil {
return false, err
}
if _, found := failedNodesMap[nodeName]; found || len(filteredNodes) == 0 {
return false, nil
}
}
return true, nil
}
// selectVictimsOnNode finds minimum set of pods on the given node that should
// be preempted in order to make enough room for "pod" to be scheduled. The
// minimum set selected is subject to the constraint that a higher-priority pod
// is never preempted when a lower-priority pod could be (higher/lower relative
// to one another, not relative to the preemptor "pod").
// The algorithm first checks if the pod can be scheduled on the node when all the
// lower priority pods are gone. If so, it sorts all the lower priority pods by
// their priority and starts from the highest priority one, tries to keep as
// many of them as possible while checking that the "pod" can still fit on the node.
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
// these predicates can be satisfied by removing more pods from the node.
// TODO(bsalamat): Add support for PodDisruptionBudget.
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
}
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}
potentialVictims.Sort()
// If the new pod does not fit after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption. The only condition
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, false
}
victims := []*v1.Pod{}
// Try to reprieve as many pods as possible starting from the highest priority one.
for _, p := range potentialVictims.Items {
lpp := p.(*v1.Pod)
addPod(lpp)
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
removePod(lpp)
victims = append(victims, lpp)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name)
}
}
return victims, true
}
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
potentialNodes := []*v1.Node{}
for _, node := range nodes {
unresolvableReasonExist := false
failedPredicates, found := failedPredicatesMap[node.Name]
// If we assume that scheduler looks at all nodes and populates the failedPredicateMap
// (which is the case today), the !found case should never happen, but we'd prefer
// to rely less on such assumptions in the code when checking does not impose
// significant overhead.
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition:
unresolvableReasonExist = true
break
// TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors.
}
}
if !found || !unresolvableReasonExist {
glog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}
// podEligibleToPreemptOthers determines whether this pod should be considered
// for preempting other pods or not. If this pod has already preempted other
// pods and those are in their graceful termination period, it shouldn't be
// considered for preemption.
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
// TODO(bsalamat): Revisit this algorithm once scheduling by priority is added.
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {
if nodeInfo, found := nodeNameToInfo[nodeName]; found {
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
// There is a terminating pod on the nominated node.
return false
}
}
}
}
return true
}
func NewGenericScheduler(
cache schedulercache.Cache,
eCache *EquivalenceCache,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.MetadataProducer,
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
algorithmpredicates "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
algorithmpriorities "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
@ -41,15 +42,15 @@ import (
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
func falsePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func falsePredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
func truePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func truePredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
func matchesPredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func matchesPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
@ -60,7 +61,7 @@ func matchesPredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.No
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
func hasNoPodsPredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func hasNoPodsPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if len(nodeInfo.Pods()) == 0 {
return true, nil, nil
}
@ -307,8 +308,7 @@ func TestGenericScheduler(t *testing.T) {
}
scheduler := NewGenericScheduler(
cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{})
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) {
@ -328,7 +328,7 @@ func TestFindFitAllError(t *testing.T) {
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
}
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil)
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -362,7 +362,7 @@ func TestFindFitSomeError(t *testing.T) {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil)
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -392,10 +392,13 @@ func makeNode(node string, milliCPU, memory int64) *v1.Node {
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
"pods": *resource.NewQuantity(100, resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
"pods": *resource.NewQuantity(100, resource.DecimalSI),
},
},
}
@ -544,3 +547,674 @@ func TestZeroRequest(t *testing.T) {
}
}
}
func printNodeToPods(nodeToPods map[*v1.Node][]*v1.Pod) string {
var output string
for node, pods := range nodeToPods {
output += node.Name + ": ["
for _, pod := range pods {
output += pod.Name + ", "
}
output += "]"
}
return output
}
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node][]*v1.Pod) error {
if len(expected) == len(nodeToPods) {
for k, pods := range nodeToPods {
if expPods, ok := expected[k.Name]; ok {
if len(pods) != len(expPods) {
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
}
prevPriority := int32(math.MaxInt32)
for _, p := range pods {
// Check that pods are sorted by their priority.
if *p.Spec.Priority > prevPriority {
return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k)
}
prevPriority = *p.Spec.Priority
if _, ok := expPods[p.Name]; !ok {
return fmt.Errorf("test [%v]: pod %v was not expected. Expected: %v", testName, p.Name, expPods)
}
}
} else {
return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
}
}
} else {
return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, printNodeToPods(nodeToPods))
}
return nil
}
type FakeNodeInfo v1.Node
func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
node := v1.Node(n)
return &node, nil
}
func PredicateMetadata(p *v1.Pod, nodeInfo map[string]*schedulercache.NodeInfo) algorithm.PredicateMetadata {
return algorithmpredicates.NewPredicateMetadataFactory(schedulertesting.FakePodLister{p})(p, nodeInfo)
}
var smallContainers = []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMilliCpuRequest, 10) + "m"),
"memory": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMemoryRequest, 10)),
},
},
},
}
var mediumContainers = []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMilliCpuRequest*2, 10) + "m"),
"memory": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMemoryRequest*2, 10)),
},
},
},
}
var largeContainers = []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMilliCpuRequest*3, 10) + "m"),
"memory": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMemoryRequest*3, 10)),
},
},
},
}
var veryLargeContainers = []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMilliCpuRequest*5, 10) + "m"),
"memory": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMemoryRequest*5, 10)),
},
},
},
}
var negPriority, lowPriority, midPriority, highPriority, veryHighPriority = int32(-100), int32(0), int32(100), int32(1000), int32(10000)
// TestSelectNodesForPreemption tests selectNodesForPreemption. This test assumes
// that podsFitsOnNode works correctly and is tested separately.
func TestSelectNodesForPreemption(t *testing.T) {
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
nodes []string
pod *v1.Pod
pods []*v1.Pod
expected map[string]map[string]bool // Map from node name to a list of pods names which should be preempted.
addAffinityPredicate bool
}{
{
name: "a pod that does not fit on any machine",
predicates: map[string]algorithm.FitPredicate{"matches": falsePredicate},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "new"}, Spec: v1.PodSpec{Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{},
},
{
name: "a pod that fits with no preemption",
predicates: map[string]algorithm.FitPredicate{"matches": truePredicate},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "new"}, Spec: v1.PodSpec{Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {}, "machine2": {}},
},
{
name: "a pod that fits on one machine with no preemption",
predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {}},
},
{
name: "a pod that fits on both machines when lower priority pods are preempted",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {"b": true}},
},
{
name: "a pod that would fit on the machines, but other pods running are higher priority",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &lowPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{},
},
{
name: "medium priority pod is preempted, but lower priority one stays as it is small",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"b": true}, "machine2": {"c": true}},
},
{
name: "mixed priority pods are preempted",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}},
},
{
name: "pod with anti-affinity is preempted",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "machine1",
Labels: map[string]string{"pod": "preemptor"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1", Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"preemptor", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
}}}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {}},
addAffinityPredicate: true,
},
}
for _, test := range tests {
nodes := []*v1.Node{}
for _, n := range test.nodes {
node := makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5)
node.ObjectMeta.Labels = map[string]string{"hostname": node.Name}
nodes = append(nodes, node)
}
if test.addAffinityPredicate {
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
if err != nil {
t.Error(err)
}
if err := checkPreemptionVictims(test.name, test.expected, nodeToPods); err != nil {
t.Error(err)
}
}
}
// TestPickOneNodeForPreemption tests pickOneNodeForPreemption.
func TestPickOneNodeForPreemption(t *testing.T) {
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
nodes []string
pod *v1.Pod
pods []*v1.Pod
expected []string // any of the items is valid
}{
{
name: "No node needs preemption",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}}},
expected: []string{"machine1"},
},
{
name: "a pod that fits on both machines when lower priority pods are preempted",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: []string{"machine1", "machine2"},
},
{
name: "a pod that fits on a machine with no preemption",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: []string{"machine3"},
},
{
name: "machine with min highest priority pod is picked",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.2"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &lowPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &lowPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.2"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &lowPriority, NodeName: "machine3"}},
},
expected: []string{"machine3"},
},
{
name: "when highest priorities are the same, minimum sum of priorities is picked",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.2"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &lowPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.2"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}},
},
expected: []string{"machine2"},
},
{
name: "when highest priority and sum are the same, minimum number of pods is picked",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.3"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.4"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.2"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &negPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.3"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine3"}},
},
expected: []string{"machine2"},
},
{
// pickOneNodeForPreemption adjusts pod priorities when finding the sum of the victims. This
// test ensures that the logic works correctly.
name: "sum of adjusted priorities is considered",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.3"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.2"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &negPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.3"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine3"}},
},
expected: []string{"machine2"},
},
{
name: "non-overlapping lowest high priority, sum priorities, and number of pods",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2", "machine3", "machine4"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &veryHighPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.3"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.3"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.4"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &lowPriority, NodeName: "machine3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m4.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine4"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m4.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine4"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m4.3"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine4"}},
{ObjectMeta: metav1.ObjectMeta{Name: "m4.4"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine4"}},
},
expected: []string{"machine1"},
},
}
for _, test := range tests {
nodes := []*v1.Node{}
for _, n := range test.nodes {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {
if node.Name == nodeName {
found = true
break
}
}
if !found {
t.Errorf("test [%v]: unexpected node: %v", test.name, node)
}
}
}
func TestNodesWherePreemptionMightHelp(t *testing.T) {
// Prepare 4 node names.
nodeNames := []string{}
for i := 1; i < 5; i++ {
nodeNames = append(nodeNames, fmt.Sprintf("machine%d", i))
}
tests := []struct {
name string
failedPredMap FailedPredicateMap
pod *v1.Pod
expected map[string]bool // set of expected node names. Value is ignored.
}{
{
name: "No node should be attempted",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrNodeSelectorNotMatch},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName},
"machine3": []algorithm.PredicateFailureReason{predicates.ErrTaintsTolerationsNotMatch},
"machine4": []algorithm.PredicateFailureReason{predicates.ErrNodeLabelPresenceViolated},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
expected: map[string]bool{},
},
{
name: "pod affinity should be tried",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrPodAffinityNotMatch},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName},
"machine3": []algorithm.PredicateFailureReason{predicates.ErrNodeUnschedulable},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
}}}},
expected: map[string]bool{"machine1": true, "machine4": true},
},
{
name: "pod with both pod affinity and anti-affinity should be tried",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrPodAffinityNotMatch},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
},
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"blah", "foo"},
},
},
},
TopologyKey: "region",
},
},
},
}}},
expected: map[string]bool{"machine1": true, "machine3": true, "machine4": true},
},
{
name: "Mix of failed predicates works fine",
failedPredMap: FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.ErrNodeSelectorNotMatch, predicates.ErrNodeOutOfDisk, predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 500, 300)},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrPodNotMatchHostName, predicates.ErrDiskConflict},
"machine3": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 600, 400)},
"machine4": []algorithm.PredicateFailureReason{},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
expected: map[string]bool{"machine3": true, "machine4": true},
},
}
for _, test := range tests {
nodes := nodesWherePreemptionMightHelp(test.pod, makeNodeList(nodeNames), test.failedPredMap)
if len(test.expected) != len(nodes) {
t.Errorf("test [%v]:number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", test.name, len(test.expected), len(nodes), nodes)
}
for _, node := range nodes {
if _, found := test.expected[node.Name]; !found {
t.Errorf("test [%v]: node %v is not expected.", test.name, node.Name)
}
}
}
}
func TestPreempt(t *testing.T) {
failedPredMap := FailedPredicateMap{
"machine1": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 500, 300)},
"machine2": []algorithm.PredicateFailureReason{predicates.ErrDiskConflict},
"machine3": []algorithm.PredicateFailureReason{predicates.NewInsufficientResourceError(v1.ResourceMemory, 1000, 600, 400)},
}
// Prepare 3 node names.
nodeNames := []string{}
for i := 1; i < 4; i++ {
nodeNames = append(nodeNames, fmt.Sprintf("machine%d", i))
}
tests := []struct {
name string
pod *v1.Pod
pods []*v1.Pod
extenders []*FakeExtender
expectedNode string
expectedPods []string // list of preempted pods
}{
{
name: "basic preemption logic",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
{
name: "One node doesn't need any preemption",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
expectedNode: "machine3",
expectedPods: []string{},
},
{
name: "Scheduler extenders allow only machine1, otherwise machine3 would have been chosen",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
extenders: []*FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
},
{
predicates: []fitPredicate{machine1PredicateExtender},
},
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
{
name: "Scheduler extenders do not allow any preemption",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Spec: v1.PodSpec{
Containers: veryLargeContainers,
Priority: &highPriority},
},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
extenders: []*FakeExtender{
{
predicates: []fitPredicate{falsePredicateExtender},
},
},
expectedNode: "",
expectedPods: []string{},
},
}
for _, test := range tests {
stop := make(chan struct{})
cache := schedulercache.New(time.Duration(0), stop)
for _, pod := range test.pods {
cache.AddPod(pod)
}
for _, name := range nodeNames {
cache.AddNode(makeNode(name, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
}
extenders := []algorithm.SchedulerExtender{}
for _, extender := range test.extenders {
extenders = append(extenders, extender)
}
scheduler := NewGenericScheduler(
cache, nil, map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders)
// Call Preempt and check the expected results.
node, victims, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{test.pod, failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}
if (node != nil && node.Name != test.expectedNode) || (node == nil && len(test.expectedNode) != 0) {
t.Errorf("test [%v]: expected node: %v, got: %v", test.name, test.expectedNode, node)
}
if len(victims) != len(test.expectedPods) {
t.Errorf("test [%v]: expected %v pods, got %v.", test.name, len(test.expectedPods), len(victims))
}
for _, victim := range victims {
found := false
for _, expPod := range test.expectedPods {
if expPod == victim.Name {
found = true
break
}
}
if !found {
t.Errorf("test [%v]: pod %v is not expected to be a victim.", test.name, victim.Name)
}
// Mark the victims for deletion and record the preemptor's nominated node name.
now := metav1.Now()
victim.DeletionTimestamp = &now
test.pod.Annotations = make(map[string]string)
test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{test.pod, failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}
if node != nil && len(victims) > 0 {
t.Errorf("test [%v]: didn't expect any more preemption. Node %v is selected for preemption.", test.name, node)
}
close(stop)
}
}

View File

@ -716,6 +716,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
Algorithm: algo,
Binder: f.getBinder(extenders),
PodConditionUpdater: &podConditionUpdater{f.client},
PodPreemptor: &podPreemptor{f.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodsHasSynced)
},
@ -753,7 +754,7 @@ func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProduce
return getPriorityMetadataProducer(*pluginArgs)
}
func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.MetadataProducer, error) {
func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
return nil, err
@ -991,3 +992,28 @@ func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) er
}
return nil
}
type podPreemptor struct {
Client clientset.Interface
}
func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
}
func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
//TODO(bsalamat): change this to patch PodStatus to avoid overwriting potential pending status updates.
func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]string) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
podCopy.Annotations = map[string]string{}
}
for k, v := range annotations {
podCopy.Annotations[k] = v
}
_, err := p.Client.CoreV1().Pods(podCopy.Namespace).UpdateStatus(podCopy)
return err
}

View File

@ -226,11 +226,11 @@ func TestCreateFromEmptyConfig(t *testing.T) {
factory.CreateFromConfig(policy)
}
func PredicateOne(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PredicateOne(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
func PredicateTwo(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PredicateTwo(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}

View File

@ -47,8 +47,12 @@ type PluginFactoryArgs struct {
}
// MetadataProducerFactory produces MetadataProducer from the given args.
// TODO: Rename this to PriorityMetadataProducerFactory.
type MetadataProducerFactory func(PluginFactoryArgs) algorithm.MetadataProducer
// PredicateMetadataProducerFactory produces PredicateMetadataProducer from the given args.
type PredicateMetadataProducerFactory func(PluginFactoryArgs) algorithm.PredicateMetadataProducer
// A FitPredicateFactory produces a FitPredicate from the given args.
type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate
@ -80,7 +84,7 @@ var (
// Registered metadata producers
priorityMetadataProducer MetadataProducerFactory
predicateMetadataProducer MetadataProducerFactory
predicateMetadataProducer PredicateMetadataProducerFactory
// get equivalence pod function
getEquivalencePodFunc algorithm.GetEquivalencePodFunc
@ -181,7 +185,7 @@ func RegisterPriorityMetadataProducerFactory(factory MetadataProducerFactory) {
priorityMetadataProducer = factory
}
func RegisterPredicateMetadataProducerFactory(factory MetadataProducerFactory) {
func RegisterPredicateMetadataProducerFactory(factory PredicateMetadataProducerFactory) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
predicateMetadataProducer = factory
@ -343,12 +347,12 @@ func getPriorityMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProd
return priorityMetadataProducer(args), nil
}
func getPredicateMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProducer, error) {
func getPredicateMetadataProducer(args PluginFactoryArgs) (algorithm.PredicateMetadataProducer, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
if predicateMetadataProducer == nil {
return algorithm.EmptyMetadataProducer, nil
return algorithm.EmptyPredicateMetadataProducer, nil
}
return predicateMetadataProducer(args), nil
}

View File

@ -23,10 +23,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
@ -48,6 +50,14 @@ type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
@ -66,7 +76,7 @@ func (sched *Scheduler) StopEverything() {
type Configurator interface {
GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error)
GetPriorityMetadataProducer() (algorithm.MetadataProducer, error)
GetPredicateMetadataProducer() (algorithm.MetadataProducer, error)
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
GetHardPodAffinitySymmetricWeight() int
GetSchedulerName() string
@ -102,6 +112,8 @@ type Config struct {
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -176,6 +188,41 @@ func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
return host, err
}
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) {
glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.")
return "", nil
}
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
glog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
if err != nil {
glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
}
if node == nil {
return "", err
}
glog.Infof("Preempting %d pod(s) on node %v to make room for %v/%v.", len(victims), node.Name, preemptor.Namespace, preemptor.Name)
annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, node.Name)
}
return node.Name, err
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asnychronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
@ -258,6 +305,13 @@ func (sched *Scheduler) scheduleOne() {
suggestedHost, err := sched.schedule(pod)
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
sched.preempt(pod, fitError)
}
return
}

View File

@ -103,6 +103,10 @@ func (es mockScheduler) Prioritizers() []algorithm.PriorityConfig {
return nil
}
func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
return nil, nil, nil
}
func TestScheduler(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()
@ -500,7 +504,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
scache,
nil,
predicateMap,
algorithm.EmptyMetadataProducer,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})
@ -536,7 +540,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
scache,
nil,
predicateMap,
algorithm.EmptyMetadataProducer,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})

View File

@ -193,7 +193,7 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {
n = NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.addPod(pod)
n.AddPod(pod)
}
// Assumes that lock is already acquired.
@ -208,7 +208,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
// Assumes that lock is already acquired.
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n := cache.nodes[pod.Spec.NodeName]
if err := n.removePod(pod); err != nil {
if err := n.RemovePod(pod); err != nil {
return err
}
if len(n.pods) == 0 && n.node == nil {

View File

@ -187,7 +187,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
usedPorts: make(map[int]bool),
}
for _, pod := range pods {
ni.addPod(pod)
ni.AddPod(pod)
}
return ni
}
@ -319,8 +319,8 @@ func hasPodAffinityConstraints(pod *v1.Pod) bool {
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
}
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *v1.Pod) {
// AddPod adds pod information to this NodeInfo.
func (n *NodeInfo) AddPod(pod *v1.Pod) {
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory
@ -351,8 +351,8 @@ func (n *NodeInfo) addPod(pod *v1.Pod) {
n.generation++
}
// removePod subtracts pod information to this NodeInfo.
func (n *NodeInfo) removePod(pod *v1.Pod) error {
// RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
k1, err := getPodKey(pod)
if err != nil {
return err
@ -478,6 +478,37 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
return nil
}
// FilterOutPods receives a list of pods and filters out those whose node names
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
//
// Preemption logic simulates removal of pods on a node by removing them from the
// corresponding NodeInfo. In order for the simulation to work, we call this method
// on the pods returned from SchedulerCache, so that predicate functions see
// only the pods that are not removed from the NodeInfo.
func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod {
node := n.Node()
if node == nil {
return pods
}
filtered := make([]*v1.Pod, 0, len(pods))
for _, p := range pods {
if p.Spec.NodeName == node.Name {
// If pod is on the given node, add it to 'filtered' only if it is present in nodeInfo.
podKey, _ := getPodKey(p)
for _, np := range n.Pods() {
npodkey, _ := getPodKey(np)
if npodkey == podKey {
filtered = append(filtered, p)
break
}
}
} else {
filtered = append(filtered, p)
}
}
return filtered
}
// getPodKey returns the string key of a pod.
func getPodKey(pod *v1.Pod) (string, error) {
return clientcache.MetaNamespaceKeyFunc(pod)

View File

@ -27,7 +27,7 @@ func CreateNodeNameToInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*NodeI
if _, ok := nodeNameToInfo[nodeName]; !ok {
nodeNameToInfo[nodeName] = NewNodeInfo()
}
nodeNameToInfo[nodeName].addPod(pod)
nodeNameToInfo[nodeName].AddPod(pod)
}
for _, node := range nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {

View File

@ -45,7 +45,7 @@ func (fc *FakeConfigurator) GetPriorityMetadataProducer() (algorithm.MetadataPro
}
// GetPredicateMetadataProducer is not implemented yet.
func (fc *FakeConfigurator) GetPredicateMetadataProducer() (algorithm.MetadataProducer, error) {
func (fc *FakeConfigurator) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) {
return nil, fmt.Errorf("not implemented")
}

View File

@ -8,9 +8,16 @@ load(
go_test(
name = "go_default_test",
srcs = ["backoff_utils_test.go"],
srcs = [
"backoff_utils_test.go",
"utils_test.go",
],
library = ":go_default_library",
deps = ["//vendor/k8s.io/apimachinery/pkg/types:go_default_library"],
deps = [
"//pkg/apis/scheduling:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)
go_library(
@ -23,6 +30,7 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/install:go_default_library",
"//pkg/apis/scheduling:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -17,7 +17,10 @@ limitations under the License.
package util
import (
"sort"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/scheduling"
)
// GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
@ -46,3 +49,49 @@ func GetPodFullName(pod *v1.Pod) string {
// (DNS subdomain format).
return pod.Name + "_" + pod.Namespace
}
// GetPodPriority return priority of the given pod.
func GetPodPriority(pod *v1.Pod) int32 {
if pod.Spec.Priority != nil {
return *pod.Spec.Priority
}
// When priority of a running pod is nil, it means it was created at a time
// that there was no global default priority class and the priority class
// name of the pod was empty. So, we resolve to the static default priority.
return scheduling.DefaultPriorityWhenNoDefaultClassExists
}
// SortableList is a list that implements sort.Interface.
type SortableList struct {
Items []interface{}
CompFunc LessFunc
}
// LessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type LessFunc func(item1, item2 interface{}) bool
var _ = sort.Interface(&SortableList{})
func (l *SortableList) Len() int { return len(l.Items) }
func (l *SortableList) Less(i, j int) bool {
return l.CompFunc(l.Items[i], l.Items[j])
}
func (l *SortableList) Swap(i, j int) {
l.Items[i], l.Items[j] = l.Items[j], l.Items[i]
}
// Sort sorts the items in the list using the given CompFunc. Item1 is placed
// before Item2 when CompFunc(Item1, Item2) returns true.
func (l *SortableList) Sort() {
sort.Sort(l)
}
// HigherPriorityPod return true when priority of the first pod is higher than
// the second one. It takes arguments of the type "interface{}" to be used with
// SortableList, but expects those arguments to be *v1.Pod.
func HigherPriorityPod(pod1, pod2 interface{}) bool {
return GetPodPriority(pod1.(*v1.Pod)) > GetPodPriority(pod2.(*v1.Pod))
}

View File

@ -0,0 +1,95 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/scheduling"
)
// TestGetPodPriority tests GetPodPriority function.
func TestGetPodPriority(t *testing.T) {
p := int32(20)
tests := []struct {
name string
pod *v1.Pod
expectedPriority int32
}{
{
name: "no priority pod resolves to static default priority",
pod: &v1.Pod{
Spec: v1.PodSpec{Containers: []v1.Container{
{Name: "container", Image: "image"}},
},
},
expectedPriority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
},
{
name: "pod with priority resolves correctly",
pod: &v1.Pod{
Spec: v1.PodSpec{Containers: []v1.Container{
{Name: "container", Image: "image"}},
Priority: &p,
},
},
expectedPriority: p,
},
}
for _, test := range tests {
if GetPodPriority(test.pod) != test.expectedPriority {
t.Errorf("expected pod priority: %v, got %v", test.expectedPriority, GetPodPriority(test.pod))
}
}
}
// TestSortableList tests SortableList by storing pods in the list and sorting
// them by their priority.
func TestSortableList(t *testing.T) {
higherPriority := func(pod1, pod2 interface{}) bool {
return GetPodPriority(pod1.(*v1.Pod)) > GetPodPriority(pod2.(*v1.Pod))
}
podList := SortableList{CompFunc: higherPriority}
// Add a few Pods with different priorities from lowest to highest priority.
for i := 0; i < 10; i++ {
var p int32 = int32(i)
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
},
},
Priority: &p,
},
}
podList.Items = append(podList.Items, pod)
}
podList.Sort()
if len(podList.Items) != 10 {
t.Errorf("expected length of list was 10, got: %v", len(podList.Items))
}
var prevPriority = int32(10)
for _, p := range podList.Items {
if *p.(*v1.Pod).Spec.Priority >= prevPriority {
t.Errorf("Pods are not soreted. Current pod pririty is %v, while previous one was %v.", *p.(*v1.Pod).Spec.Priority, prevPriority)
}
}
}

View File

@ -15,6 +15,7 @@ go_library(
"nvidia-gpus.go",
"opaque_resource.go",
"predicates.go",
"preemption.go",
"priorities.go",
"rescheduler.go",
],
@ -33,6 +34,7 @@ go_library(
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/api/scheduling/v1alpha1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -52,6 +52,7 @@ type pausePodConfig struct {
NodeName string
Ports []v1.ContainerPort
OwnerReferences []metav1.OwnerReference
PriorityClassName string
}
var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
@ -555,8 +556,9 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
Ports: conf.Ports,
},
},
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
PriorityClassName: conf.PriorityClassName,
},
}
if conf.Resources != nil {

View File

@ -0,0 +1,128 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduling
import (
"fmt"
"time"
"k8s.io/api/core/v1"
"k8s.io/api/scheduling/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
_ "github.com/stretchr/testify/assert"
)
var _ = SIGDescribe("SchedulerPreemption [Serial] [Feature:PodPreemption]", func() {
var cs clientset.Interface
var nodeList *v1.NodeList
var ns string
f := framework.NewDefaultFramework("sched-preemption")
lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000)
lowPriorityClassName := f.BaseName + "-low-priority"
mediumPriorityClassName := f.BaseName + "-medium-priority"
highPriorityClassName := f.BaseName + "-high-priority"
AfterEach(func() {
})
BeforeEach(func() {
cs = f.ClientSet
ns = f.Namespace.Name
nodeList = &v1.NodeList{}
_, err := f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: highPriorityClassName}, Value: highPriority})
Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true))
_, err = f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: mediumPriorityClassName}, Value: mediumPriority})
Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true))
_, err = f.ClientSet.SchedulingV1alpha1().PriorityClasses().Create(&v1alpha1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: lowPriorityClassName}, Value: lowPriority})
Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true))
framework.WaitForAllNodesHealthy(cs, time.Minute)
masterNodes, nodeList = framework.GetMasterAndWorkerNodesOrDie(cs)
err = framework.CheckTestingNSDeletedExcept(cs, ns)
framework.ExpectNoError(err)
})
// This test verifies that when a higher priority pod is created and no node with
// enough resources is found, scheduler preempts a lower priority pod to schedule
// the high priority pod.
It("validates basic preemption works", func() {
var podRes v1.ResourceList
// Create one pod per node that uses a lot of the node's resources.
By("Create pods that use 60% of node resources.")
pods := make([]*v1.Pod, len(nodeList.Items))
for i, node := range nodeList.Items {
cpuAllocatable, found := node.Status.Allocatable["cpu"]
Expect(found).To(Equal(true))
milliCPU := cpuAllocatable.MilliValue() * 40 / 100
memAllocatable, found := node.Status.Allocatable["memory"]
Expect(found).To(Equal(true))
memory := memAllocatable.Value() * 60 / 100
podRes = v1.ResourceList{}
podRes[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(milliCPU), resource.DecimalSI)
podRes[v1.ResourceMemory] = *resource.NewQuantity(int64(memory), resource.BinarySI)
// make the first pod low priority and the rest medium priority.
priorityName := mediumPriorityClassName
if i == 0 {
priorityName = lowPriorityClassName
}
pods[i] = createPausePod(f, pausePodConfig{
Name: fmt.Sprintf("pod%d-%v", i, priorityName),
PriorityClassName: priorityName,
Resources: &v1.ResourceRequirements{
Requests: podRes,
},
})
framework.Logf("Created pod: %v", pods[i].Name)
}
By("Wait for pods to be scheduled.")
for _, pod := range pods {
framework.ExpectNoError(framework.WaitForPodRunningInNamespace(cs, pod))
}
By("Run a high priority pod that use 60% of a node resources.")
// Create a high priority pod and make sure it is scheduled.
runPausePod(f, pausePodConfig{
Name: "preemptor-pod",
PriorityClassName: highPriorityClassName,
Resources: &v1.ResourceRequirements{
Requests: podRes,
},
})
// Make sure that the lowest priority pod is deleted.
preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(pods[0].Name, metav1.GetOptions{})
podDeleted := (err != nil && errors.IsNotFound(err)) ||
(err == nil && preemptedPod.DeletionTimestamp != nil)
Expect(podDeleted).To(BeTrue())
// Other pods (mid priority ones) should be present.
for i := 1; i < len(pods); i++ {
livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(pods[i].Name, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(livePod.DeletionTimestamp).To(BeNil())
}
})
})

View File

@ -21,12 +21,14 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/features:go_default_library",
"//plugin/cmd/kube-scheduler/app:go_default_library",
"//plugin/cmd/kube-scheduler/app/options:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithmprovider:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/core:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//test/e2e/framework:go_default_library",
@ -37,6 +39,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",

View File

@ -51,7 +51,7 @@ func TestNodeAffinity(t *testing.T) {
}
// Create a pod with node affinity.
podName := "pod-with-node-affinity"
pod, err := runPausePod(context.clientSet, &pausePodConfig{
pod, err := runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{
Name: podName,
Namespace: context.ns.Name,
Affinity: &v1.Affinity{
@ -72,7 +72,7 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
})
}))
if err != nil {
t.Fatalf("Error running pause pod: %v", err)
}
@ -110,11 +110,11 @@ func TestPodAffinity(t *testing.T) {
// Add a pod with a label and wait for it to schedule.
labelKey := "service"
labelValue := "S1"
_, err = runPausePod(context.clientSet, &pausePodConfig{
_, err = runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{
Name: "attractor-pod",
Namespace: context.ns.Name,
Labels: map[string]string{labelKey: labelValue},
})
}))
if err != nil {
t.Fatalf("Error running the attractor pod: %v", err)
}
@ -125,7 +125,7 @@ func TestPodAffinity(t *testing.T) {
}
// Add a new pod with affinity to the attractor pod.
podName := "pod-with-podaffinity"
pod, err := runPausePod(context.clientSet, &pausePodConfig{
pod, err := runPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{
Name: podName,
Namespace: context.ns.Name,
Affinity: &v1.Affinity{
@ -158,7 +158,7 @@ func TestPodAffinity(t *testing.T) {
},
},
},
})
}))
if err != nil {
t.Fatalf("Error running pause pod: %v", err)
}

View File

@ -24,9 +24,11 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -36,15 +38,18 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
)
const enableEquivalenceCache = true
@ -56,11 +61,11 @@ type nodeStateManager struct {
makeUnSchedulable nodeMutationFunc
}
func PredicateOne(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PredicateOne(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
func PredicateTwo(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
func PredicateTwo(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
@ -457,13 +462,13 @@ func TestMultiScheduler(t *testing.T) {
}
defaultScheduler := "default-scheduler"
testPodFitsDefault, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler})
testPodFitsDefault, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler}))
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
fooScheduler := "foo-scheduler"
testPodFitsFoo, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler})
testPodFitsFoo, err := createPausePod(context.clientSet, initPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler}))
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
@ -647,3 +652,251 @@ func TestAllocatable(t *testing.T) {
t.Logf("Test allocatable awareness: %s Pod not scheduled as expected", testAllocPod2.Name)
}
}
// TestPreemption tests a few preemption scenarios.
func TestPreemption(t *testing.T) {
// Enable PodPriority feature gate.
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.PodPriority))
// Initialize scheduler.
context := initTest(t, "preemption")
defer cleanupTest(t, context)
cs := context.clientSet
lowPriority, mediumPriority, highPriority := int32(100), int32(200), int32(300)
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)},
}
tests := []struct {
description string
existingPods []*v1.Pod
pod *v1.Pod
preemptedPodIndexes map[int]struct{}
}{
{
description: "basic pod preemption",
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
{
description: "preemption is performed to satisfy anti-affinity",
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
Priority: &mediumPriority,
Labels: map[string]string{"pod": "p0"},
Resources: defaultPodRes,
}),
initPausePod(cs, &pausePodConfig{
Name: "pod-1", Namespace: context.ns.Name,
Priority: &lowPriority,
Labels: map[string]string{"pod": "p1"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"preemptor"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
},
// A higher priority pod with anti-affinity.
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Labels: map[string]string{"pod": "preemptor"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"p0"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
},
{
// This is similar to the previous case only pod-1 is high priority.
description: "preemption is not performed when anti-affinity is not satisfied",
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
Priority: &mediumPriority,
Labels: map[string]string{"pod": "p0"},
Resources: defaultPodRes,
}),
initPausePod(cs, &pausePodConfig{
Name: "pod-1", Namespace: context.ns.Name,
Priority: &highPriority,
Labels: map[string]string{"pod": "p1"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"preemptor"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
},
// A higher priority pod with anti-affinity.
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Labels: map[string]string{"pod": "preemptor"},
Resources: defaultPodRes,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "pod",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"p0"},
},
},
},
TopologyKey: "node",
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{},
},
}
// Create a node with some resources and a label.
nodeRes := &v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI),
}
node, err := createNode(context.clientSet, "node1", nodeRes)
if err != nil {
t.Fatalf("Error creating nodes: %v", err)
}
nodeLabels := map[string]string{"node": node.Name}
if err = testutils.AddLabelsToNode(context.clientSet, node.Name, nodeLabels); err != nil {
t.Fatalf("Cannot add labels to node: %v", err)
}
if err = waitForNodeLabels(context.clientSet, node.Name, nodeLabels); err != nil {
t.Fatalf("Adding labels to node didn't succeed: %v", err)
}
for _, test := range tests {
pods := make([]*v1.Pod, len(test.existingPods))
// Create and run existingPods.
for i, p := range test.existingPods {
pods[i], err = runPausePod(cs, p)
if err != nil {
t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
}
}
// Create the "pod".
preemptor, err := createPausePod(cs, test.pod)
if err != nil {
t.Errorf("Error while creating high priority pod: %v", err)
}
// Wait for preemption of pods and make sure the other ones are not preempted.
for i, p := range pods {
if _, found := test.preemptedPodIndexes[i]; found {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
t.Errorf("Test [%v]: Pod %v is not getting evicted.", test.description, p.Name)
}
} else {
if p.DeletionTimestamp != nil {
t.Errorf("Test [%v]: Didn't expect pod %v to get preempted.", test.description, p.Name)
}
}
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
pod, err := context.clientSet.CoreV1().Pods(context.ns.Name).Get("preemptor-pod", metav1.GetOptions{})
if err != nil {
t.Errorf("Test [%v]: error getting pod: %v", test.description, err)
}
annot, found := pod.Annotations[core.NominatedNodeAnnotationKey]
if found && len(annot) > 0 {
return true, nil
}
return false, err
}); err != nil {
t.Errorf("Test [%v]: Pod annotation did not get set.", test.description)
}
}
// Cleanup
pods = append(pods, preemptor)
for _, p := range pods {
err = cs.CoreV1().Pods(p.Namespace).Delete(p.Name, metav1.NewDeleteOptions(0))
if err != nil && !errors.IsNotFound(err) {
t.Errorf("Test [%v]: error, %v, while deleting pod during test.", test.description, err)
}
err = wait.Poll(time.Second, wait.ForeverTestTimeout, podDeleted(cs, p.Namespace, p.Name))
if err != nil {
t.Errorf("Test [%v]: error, %v, while waiting for pod to get deleted.", test.description, err)
}
}
}
}

View File

@ -205,6 +205,7 @@ type pausePodConfig struct {
Tolerations []v1.Toleration
NodeName string
SchedulerName string
Priority *int32
}
// initPausePod initializes a pod API object from the given config. It is used
@ -213,6 +214,7 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
Labels: conf.Labels,
Annotations: conf.Annotations,
},
@ -228,6 +230,7 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
SchedulerName: conf.SchedulerName,
Priority: conf.Priority,
},
}
if conf.Resources != nil {
@ -238,9 +241,8 @@ func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
// createPausePod creates a pod with "Pause" image and the given config and
// return its pointer and error status.
func createPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) {
p := initPausePod(cs, conf)
return cs.CoreV1().Pods(conf.Namespace).Create(p)
func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
return cs.CoreV1().Pods(p.Namespace).Create(p)
}
// createPausePodWithResource creates a pod with "Pause" image and the given
@ -262,22 +264,21 @@ func createPausePodWithResource(cs clientset.Interface, podName string, nsName s
},
}
}
return createPausePod(cs, &conf)
return createPausePod(cs, initPausePod(cs, &conf))
}
// runPausePod creates a pod with "Pause" image and the given config and waits
// until it is scheduled. It returns its pointer and error status.
func runPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) {
p := initPausePod(cs, conf)
pod, err := cs.CoreV1().Pods(conf.Namespace).Create(p)
func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
return nil, fmt.Errorf("Error creating pause pod: %v", err)
}
if err = waitForPodToSchedule(cs, pod); err != nil {
return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
}
if pod, err = cs.CoreV1().Pods(conf.Namespace).Get(conf.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("Error getting pod %v info: %v", conf.Name, err)
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("Error getting pod %v info: %v", pod.Name, err)
}
return pod, nil
}
@ -285,7 +286,10 @@ func runPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error)
// podDeleted returns true if a pod is not found in the given namespace.
func podDeleted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
_, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if pod.DeletionTimestamp != nil {
return true, nil
}
if errors.IsNotFound(err) {
return true, nil
}
@ -293,6 +297,20 @@ func podDeleted(c clientset.Interface, podNamespace, podName string) wait.Condit
}
}
// podIsGettingEvicted returns true if the pod's deletion timestamp is set.
func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if err != nil {
return false, err
}
if pod.DeletionTimestamp != nil {
return true, nil
}
return false, nil
}
}
// podScheduled returns true if a node is assigned to the given pod.
func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {