2014-06-28 22:35:51 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2014 The Kubernetes Authors .
2014-06-28 22:35:51 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2015-05-08 11:01:09 +00:00
package predicates
2014-06-28 22:35:51 +00:00
import (
2014-09-26 23:28:30 +00:00
"fmt"
2016-06-11 00:15:50 +00:00
"math/rand"
"strconv"
2016-07-21 14:16:24 +00:00
"sync"
2016-06-11 00:15:50 +00:00
"time"
2014-09-26 23:28:30 +00:00
2016-05-04 06:50:31 +00:00
"github.com/golang/glog"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
2016-05-04 06:50:31 +00:00
"k8s.io/kubernetes/pkg/api/unversioned"
2015-11-26 08:57:26 +00:00
"k8s.io/kubernetes/pkg/client/cache"
2016-06-21 01:28:42 +00:00
"k8s.io/kubernetes/pkg/kubelet/qos"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/labels"
2016-06-13 20:44:12 +00:00
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
2016-07-21 14:16:24 +00:00
"k8s.io/kubernetes/pkg/util/workqueue"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
2016-05-04 06:50:31 +00:00
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
2016-01-28 20:14:45 +00:00
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
2014-06-28 22:35:51 +00:00
)
2014-09-25 20:55:42 +00:00
type NodeInfo interface {
2014-12-08 03:44:27 +00:00
GetNodeInfo ( nodeID string ) ( * api . Node , error )
2014-09-26 23:28:30 +00:00
}
2015-11-29 19:00:49 +00:00
type PersistentVolumeInfo interface {
GetPersistentVolumeInfo ( pvID string ) ( * api . PersistentVolume , error )
}
type PersistentVolumeClaimInfo interface {
GetPersistentVolumeClaimInfo ( namespace string , pvcID string ) ( * api . PersistentVolumeClaim , error )
}
2015-11-26 08:57:26 +00:00
type CachedNodeInfo struct {
* cache . StoreToNodeLister
}
// GetNodeInfo returns cached data for the node 'id'.
func ( c * CachedNodeInfo ) GetNodeInfo ( id string ) ( * api . Node , error ) {
node , exists , err := c . Get ( & api . Node { ObjectMeta : api . ObjectMeta { Name : id } } )
if err != nil {
return nil , fmt . Errorf ( "error retrieving node '%v' from cache: %v" , id , err )
}
if ! exists {
2016-06-21 12:56:11 +00:00
return nil , fmt . Errorf ( "node '%v' not found" , id )
2015-11-26 08:57:26 +00:00
}
return node . ( * api . Node ) , nil
}
2016-07-08 12:59:32 +00:00
// podMetadata is a type that is passed as metadata for predicate functions
2016-07-08 06:12:44 +00:00
type predicateMetadata struct {
2016-07-21 14:16:24 +00:00
podBestEffort bool
podRequest * schedulercache . Resource
podPorts map [ int ] bool
matchingAntiAffinityTerms [ ] matchingPodAntiAffinityTerm
2016-07-08 06:12:44 +00:00
}
2016-07-21 14:16:24 +00:00
type matchingPodAntiAffinityTerm struct {
term * api . PodAffinityTerm
node * api . Node
}
func PredicateMetadata ( pod * api . Pod , nodeInfoMap map [ string ] * schedulercache . NodeInfo ) interface { } {
// If we cannot compute metadata, just return nil
2016-07-08 06:25:49 +00:00
if pod == nil {
2016-07-21 14:16:24 +00:00
return nil
}
matchingTerms , err := getMatchingAntiAffinityTerms ( pod , nodeInfoMap )
if err != nil {
2016-07-08 06:25:49 +00:00
return nil
}
return & predicateMetadata {
2016-07-21 14:16:24 +00:00
podBestEffort : isPodBestEffort ( pod ) ,
podRequest : getResourceRequest ( pod ) ,
podPorts : getUsedPorts ( pod ) ,
matchingAntiAffinityTerms : matchingTerms ,
2016-07-08 06:25:49 +00:00
}
2016-07-08 06:12:44 +00:00
}
2014-10-13 04:34:23 +00:00
func isVolumeConflict ( volume api . Volume , pod * api . Pod ) bool {
2015-12-09 19:45:56 +00:00
// fast path if there is no conflict checking targets.
if volume . GCEPersistentDisk == nil && volume . AWSElasticBlockStore == nil && volume . RBD == nil {
return false
}
2015-12-09 19:19:57 +00:00
for _ , existingVolume := range pod . Spec . Volumes {
2015-12-09 19:45:56 +00:00
// Same GCE disk mounted by multiple pods conflicts unless all pods mount it read-only.
2015-12-09 19:19:57 +00:00
if volume . GCEPersistentDisk != nil && existingVolume . GCEPersistentDisk != nil {
disk , existingDisk := volume . GCEPersistentDisk , existingVolume . GCEPersistentDisk
if disk . PDName == existingDisk . PDName && ! ( disk . ReadOnly && existingDisk . ReadOnly ) {
2015-03-06 14:26:39 +00:00
return true
}
}
2014-10-13 04:34:23 +00:00
2015-12-09 19:19:57 +00:00
if volume . AWSElasticBlockStore != nil && existingVolume . AWSElasticBlockStore != nil {
if volume . AWSElasticBlockStore . VolumeID == existingVolume . AWSElasticBlockStore . VolumeID {
2015-03-06 14:26:39 +00:00
return true
}
2014-10-13 04:34:23 +00:00
}
2015-12-09 19:19:57 +00:00
if volume . RBD != nil && existingVolume . RBD != nil {
mon , pool , image := volume . RBD . CephMonitors , volume . RBD . RBDPool , volume . RBD . RBDImage
emon , epool , eimage := existingVolume . RBD . CephMonitors , existingVolume . RBD . RBDPool , existingVolume . RBD . RBDImage
2016-07-25 15:29:07 +00:00
// two RBDs images are the same if they share the same Ceph monitor, are in the same RADOS Pool, and have the same image name
// only one read-write mount is permitted for the same RBD image.
// same RBD image mounted by multiple Pods conflicts unless all Pods mount the image read-only
if haveSame ( mon , emon ) && pool == epool && image == eimage && ! ( volume . RBD . ReadOnly && existingVolume . RBD . ReadOnly ) {
2015-12-09 19:19:57 +00:00
return true
2015-10-20 17:24:23 +00:00
}
}
}
2015-12-09 19:19:57 +00:00
2014-10-13 04:34:23 +00:00
return false
}
// NoDiskConflict evaluates if a pod can fit due to the volumes it requests, and those that
2015-10-22 13:28:30 +00:00
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
2015-11-02 15:18:39 +00:00
// can't be scheduled there.
// This is GCE, Amazon EBS, and Ceph RBD specific for now:
// - GCE PD allows multiple mounts as long as they're all read-only
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image.
2014-10-13 04:34:23 +00:00
// TODO: migrate this into some per-volume specific code?
2016-08-09 12:01:46 +00:00
func NoDiskConflict ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2015-12-09 19:19:57 +00:00
for _ , v := range pod . Spec . Volumes {
2016-01-28 20:14:45 +00:00
for _ , ev := range nodeInfo . Pods ( ) {
2015-12-09 19:19:57 +00:00
if isVolumeConflict ( v , ev ) {
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrDiskConflict } , nil
2014-10-13 04:34:23 +00:00
}
}
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2014-10-13 04:34:23 +00:00
}
2016-01-14 20:45:08 +00:00
type MaxPDVolumeCountChecker struct {
filter VolumeFilter
maxVolumes int
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
}
// VolumeFilter contains information on how to filter PD Volumes when checking PD Volume caps
type VolumeFilter struct {
// Filter normal volumes
FilterVolume func ( vol * api . Volume ) ( id string , relevant bool )
FilterPersistentVolume func ( pv * api . PersistentVolume ) ( id string , relevant bool )
}
// NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the
// number of volumes which match a filter that it requests, and those that are already present. The
// maximum number is configurable to accommodate different systems.
//
// The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume
// types, counts the number of unique volumes, and rejects the new pod if it would place the total count over
// the maximum.
func NewMaxPDVolumeCountPredicate ( filter VolumeFilter , maxVolumes int , pvInfo PersistentVolumeInfo , pvcInfo PersistentVolumeClaimInfo ) algorithm . FitPredicate {
c := & MaxPDVolumeCountChecker {
filter : filter ,
maxVolumes : maxVolumes ,
pvInfo : pvInfo ,
pvcInfo : pvcInfo ,
}
return c . predicate
}
func ( c * MaxPDVolumeCountChecker ) filterVolumes ( volumes [ ] api . Volume , namespace string , filteredVolumes map [ string ] bool ) error {
for _ , vol := range volumes {
if id , ok := c . filter . FilterVolume ( & vol ) ; ok {
filteredVolumes [ id ] = true
} else if vol . PersistentVolumeClaim != nil {
pvcName := vol . PersistentVolumeClaim . ClaimName
if pvcName == "" {
2016-05-17 14:01:37 +00:00
return fmt . Errorf ( "PersistentVolumeClaim had no name" )
2016-01-14 20:45:08 +00:00
}
pvc , err := c . pvcInfo . GetPersistentVolumeClaimInfo ( namespace , pvcName )
if err != nil {
2016-06-11 00:15:50 +00:00
// if the PVC is not found, log the error and count the PV towards the PV limit
// generate a random volume ID since its required for de-dup
2016-06-13 20:44:12 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to look up PVC info for %s/%s, assuming PVC matches predicate when counting limits: %v" , namespace , pvcName , err ) )
2016-06-11 00:15:50 +00:00
source := rand . NewSource ( time . Now ( ) . UnixNano ( ) )
generatedID := "missingPVC" + strconv . Itoa ( rand . New ( source ) . Intn ( 1000000 ) )
filteredVolumes [ generatedID ] = true
return nil
2016-01-14 20:45:08 +00:00
}
2016-06-03 05:02:26 +00:00
if pvc == nil {
return fmt . Errorf ( "PersistentVolumeClaim not found: %q" , pvcName )
}
2016-01-14 20:45:08 +00:00
pvName := pvc . Spec . VolumeName
if pvName == "" {
return fmt . Errorf ( "PersistentVolumeClaim is not bound: %q" , pvcName )
}
pv , err := c . pvInfo . GetPersistentVolumeInfo ( pvName )
if err != nil {
2016-06-11 00:15:50 +00:00
// if the PV is not found, log the error
// and count the PV towards the PV limit
// generate a random volume ID since its required for de-dup
2016-06-13 20:44:12 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to look up PV info for %s/%s/%s, assuming PV matches predicate when counting limits: %v" , namespace , pvcName , pvName , err ) )
2016-06-11 00:15:50 +00:00
source := rand . NewSource ( time . Now ( ) . UnixNano ( ) )
generatedID := "missingPV" + strconv . Itoa ( rand . New ( source ) . Intn ( 1000000 ) )
filteredVolumes [ generatedID ] = true
return nil
2016-01-14 20:45:08 +00:00
}
2016-06-03 05:02:26 +00:00
if pv == nil {
return fmt . Errorf ( "PersistentVolume not found: %q" , pvName )
}
2016-01-14 20:45:08 +00:00
if id , ok := c . filter . FilterPersistentVolume ( pv ) ; ok {
filteredVolumes [ id ] = true
}
}
}
return nil
}
2016-08-09 12:01:46 +00:00
func ( c * MaxPDVolumeCountChecker ) predicate ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-07-07 09:55:41 +00:00
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len ( pod . Spec . Volumes ) == 0 {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-07-07 09:55:41 +00:00
}
2016-01-14 20:45:08 +00:00
newVolumes := make ( map [ string ] bool )
if err := c . filterVolumes ( pod . Spec . Volumes , pod . Namespace , newVolumes ) ; err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2016-01-14 20:45:08 +00:00
}
// quick return
if len ( newVolumes ) == 0 {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-01-14 20:45:08 +00:00
}
// count unique volumes
existingVolumes := make ( map [ string ] bool )
2016-01-28 20:14:45 +00:00
for _ , existingPod := range nodeInfo . Pods ( ) {
2016-01-14 20:45:08 +00:00
if err := c . filterVolumes ( existingPod . Spec . Volumes , existingPod . Namespace , existingVolumes ) ; err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2016-01-14 20:45:08 +00:00
}
}
numExistingVolumes := len ( existingVolumes )
// filter out already-mounted volumes
for k := range existingVolumes {
if _ , ok := newVolumes [ k ] ; ok {
delete ( newVolumes , k )
}
}
numNewVolumes := len ( newVolumes )
if numExistingVolumes + numNewVolumes > c . maxVolumes {
2016-01-06 01:10:59 +00:00
// violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrMaxVolumeCountExceeded } , nil
2016-01-14 20:45:08 +00:00
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-01-14 20:45:08 +00:00
}
// EBSVolumeFilter is a VolumeFilter for filtering AWS ElasticBlockStore Volumes
var EBSVolumeFilter VolumeFilter = VolumeFilter {
FilterVolume : func ( vol * api . Volume ) ( string , bool ) {
if vol . AWSElasticBlockStore != nil {
return vol . AWSElasticBlockStore . VolumeID , true
}
return "" , false
} ,
FilterPersistentVolume : func ( pv * api . PersistentVolume ) ( string , bool ) {
if pv . Spec . AWSElasticBlockStore != nil {
return pv . Spec . AWSElasticBlockStore . VolumeID , true
}
return "" , false
} ,
}
// GCEPDVolumeFilter is a VolumeFilter for filtering GCE PersistentDisk Volumes
var GCEPDVolumeFilter VolumeFilter = VolumeFilter {
FilterVolume : func ( vol * api . Volume ) ( string , bool ) {
if vol . GCEPersistentDisk != nil {
return vol . GCEPersistentDisk . PDName , true
}
return "" , false
} ,
FilterPersistentVolume : func ( pv * api . PersistentVolume ) ( string , bool ) {
if pv . Spec . GCEPersistentDisk != nil {
return pv . Spec . GCEPersistentDisk . PDName , true
}
return "" , false
} ,
}
2015-11-29 19:00:49 +00:00
type VolumeZoneChecker struct {
2016-04-21 08:24:12 +00:00
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
2015-11-29 19:00:49 +00:00
}
// VolumeZonePredicate evaluates if a pod can fit due to the volumes it requests, given
// that some volumes may have zone scheduling constraints. The requirement is that any
// volume zone-labels must match the equivalent zone-labels on the node. It is OK for
// the node to have more zone-label constraints (for example, a hypothetical replicated
// volume might allow region-wide access)
//
// Currently this is only supported with PersistentVolumeClaims, and looks to the labels
// only on the bound PersistentVolume.
//
// Working with volumes declared inline in the pod specification (i.e. not
// using a PersistentVolume) is likely to be harder, as it would require
// determining the zone of a volume during scheduling, and that is likely to
// require calling out to the cloud provider. It seems that we are moving away
// from inline volume declarations anyway.
2016-04-21 08:24:12 +00:00
func NewVolumeZonePredicate ( pvInfo PersistentVolumeInfo , pvcInfo PersistentVolumeClaimInfo ) algorithm . FitPredicate {
2015-11-29 19:00:49 +00:00
c := & VolumeZoneChecker {
2016-04-21 08:24:12 +00:00
pvInfo : pvInfo ,
pvcInfo : pvcInfo ,
2015-11-29 19:00:49 +00:00
}
return c . predicate
}
2016-08-09 12:01:46 +00:00
func ( c * VolumeZoneChecker ) predicate ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-07-07 09:55:41 +00:00
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len ( pod . Spec . Volumes ) == 0 {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-07-07 09:55:41 +00:00
}
2016-04-21 08:24:12 +00:00
node := nodeInfo . Node ( )
2015-11-29 19:00:49 +00:00
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2015-11-29 19:00:49 +00:00
}
nodeConstraints := make ( map [ string ] string )
for k , v := range node . ObjectMeta . Labels {
if k != unversioned . LabelZoneFailureDomain && k != unversioned . LabelZoneRegion {
continue
}
nodeConstraints [ k ] = v
}
if len ( nodeConstraints ) == 0 {
// The node has no zone constraints, so we're OK to schedule.
// In practice, when using zones, all nodes must be labeled with zone labels.
// We want to fast-path this case though.
2016-08-09 12:01:46 +00:00
return true , nil , nil
2015-11-29 19:00:49 +00:00
}
namespace := pod . Namespace
manifest := & ( pod . Spec )
for i := range manifest . Volumes {
volume := & manifest . Volumes [ i ]
if volume . PersistentVolumeClaim != nil {
pvcName := volume . PersistentVolumeClaim . ClaimName
if pvcName == "" {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "PersistentVolumeClaim had no name" )
2015-11-29 19:00:49 +00:00
}
pvc , err := c . pvcInfo . GetPersistentVolumeClaimInfo ( namespace , pvcName )
if err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2015-11-29 19:00:49 +00:00
}
if pvc == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "PersistentVolumeClaim was not found: %q" , pvcName )
2015-11-29 19:00:49 +00:00
}
pvName := pvc . Spec . VolumeName
if pvName == "" {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "PersistentVolumeClaim is not bound: %q" , pvcName )
2015-11-29 19:00:49 +00:00
}
pv , err := c . pvInfo . GetPersistentVolumeInfo ( pvName )
if err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2015-11-29 19:00:49 +00:00
}
if pv == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "PersistentVolume not found: %q" , pvName )
2015-11-29 19:00:49 +00:00
}
for k , v := range pv . ObjectMeta . Labels {
if k != unversioned . LabelZoneFailureDomain && k != unversioned . LabelZoneRegion {
continue
}
nodeV , _ := nodeConstraints [ k ]
if v != nodeV {
2016-04-28 14:51:17 +00:00
glog . V ( 2 ) . Infof ( "Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)" , pod . Name , node . Name , pvName , k )
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrVolumeZoneConflict } , nil
2015-11-29 19:00:49 +00:00
}
}
}
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2015-11-29 19:00:49 +00:00
}
2016-07-19 10:21:09 +00:00
func getResourceRequest ( pod * api . Pod ) * schedulercache . Resource {
result := schedulercache . Resource { }
2015-07-30 19:59:22 +00:00
for _ , container := range pod . Spec . Containers {
requests := container . Resources . Requests
2016-07-19 10:21:09 +00:00
result . Memory += requests . Memory ( ) . Value ( )
result . MilliCPU += requests . Cpu ( ) . MilliValue ( )
result . NvidiaGPU += requests . NvidiaGPU ( ) . Value ( )
2014-09-25 20:55:42 +00:00
}
2016-04-08 15:20:24 +00:00
// take max_resource(sum_pod, any_init_container)
for _ , container := range pod . Spec . InitContainers {
requests := container . Resources . Requests
2016-07-19 10:21:09 +00:00
if mem := requests . Memory ( ) . Value ( ) ; mem > result . Memory {
result . Memory = mem
2016-04-08 15:20:24 +00:00
}
2016-07-19 10:21:09 +00:00
if cpu := requests . Cpu ( ) . MilliValue ( ) ; cpu > result . MilliCPU {
result . MilliCPU = cpu
2016-04-08 15:20:24 +00:00
}
}
2016-07-08 12:59:32 +00:00
return & result
2014-09-25 20:55:42 +00:00
}
2015-10-19 22:00:41 +00:00
func podName ( pod * api . Pod ) string {
return pod . Namespace + "/" + pod . Name
}
2016-08-09 12:01:46 +00:00
func PodFitsResources ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-04-21 08:24:12 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2016-04-21 08:24:12 +00:00
}
2016-08-09 12:01:46 +00:00
var predicateFails [ ] algorithm . PredicateFailureReason
2016-07-11 07:46:04 +00:00
allowedPodNumber := nodeInfo . AllowedPodNumber ( )
if len ( nodeInfo . Pods ( ) ) + 1 > allowedPodNumber {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , NewInsufficientResourceError ( api . ResourcePods , 1 , int64 ( len ( nodeInfo . Pods ( ) ) ) , int64 ( allowedPodNumber ) ) )
2016-04-22 16:58:49 +00:00
}
2016-07-08 12:59:32 +00:00
2016-07-19 10:21:09 +00:00
var podRequest * schedulercache . Resource
2016-07-12 09:43:54 +00:00
if predicateMeta , ok := meta . ( * predicateMetadata ) ; ok {
2016-07-08 12:59:32 +00:00
podRequest = predicateMeta . podRequest
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = getResourceRequest ( pod )
}
2016-07-19 10:21:09 +00:00
if podRequest . MilliCPU == 0 && podRequest . Memory == 0 && podRequest . NvidiaGPU == 0 {
2016-08-09 12:01:46 +00:00
return len ( predicateFails ) == 0 , predicateFails , nil
2015-12-09 21:24:54 +00:00
}
2016-07-12 14:30:26 +00:00
allocatable := nodeInfo . AllocatableResource ( )
2016-07-19 10:21:09 +00:00
if allocatable . MilliCPU < podRequest . MilliCPU + nodeInfo . RequestedResource ( ) . MilliCPU {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , NewInsufficientResourceError ( api . ResourceCPU , podRequest . MilliCPU , nodeInfo . RequestedResource ( ) . MilliCPU , allocatable . MilliCPU ) )
2015-07-24 01:27:29 +00:00
}
2016-07-19 10:21:09 +00:00
if allocatable . Memory < podRequest . Memory + nodeInfo . RequestedResource ( ) . Memory {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , NewInsufficientResourceError ( api . ResourceMemory , podRequest . Memory , nodeInfo . RequestedResource ( ) . Memory , allocatable . Memory ) )
2014-09-25 20:55:42 +00:00
}
2016-07-19 10:21:09 +00:00
if allocatable . NvidiaGPU < podRequest . NvidiaGPU + nodeInfo . RequestedResource ( ) . NvidiaGPU {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , NewInsufficientResourceError ( api . ResourceNvidiaGPU , podRequest . NvidiaGPU , nodeInfo . RequestedResource ( ) . NvidiaGPU , allocatable . NvidiaGPU ) )
2016-04-27 00:54:19 +00:00
}
2016-07-11 07:46:04 +00:00
if glog . V ( 10 ) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog . Infof ( "Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods." ,
podName ( pod ) , node . Name , len ( nodeInfo . Pods ( ) ) , allowedPodNumber )
}
2016-08-09 12:01:46 +00:00
return len ( predicateFails ) == 0 , predicateFails , nil
2014-09-25 20:55:42 +00:00
}
2016-02-23 03:53:10 +00:00
// nodeMatchesNodeSelectorTerms checks if a node's labels satisfy a list of node selector terms,
2016-08-09 12:01:46 +00:00
// terms are ORed, and an empty list of terms will match nothing.
2016-02-23 03:53:10 +00:00
func nodeMatchesNodeSelectorTerms ( node * api . Node , nodeSelectorTerms [ ] api . NodeSelectorTerm ) bool {
2016-01-26 23:03:18 +00:00
for _ , req := range nodeSelectorTerms {
nodeSelector , err := api . NodeSelectorRequirementsAsSelector ( req . MatchExpressions )
if err != nil {
glog . V ( 10 ) . Infof ( "Failed to parse MatchExpressions: %+v, regarding as not match." , req . MatchExpressions )
return false
}
if nodeSelector . Matches ( labels . Set ( node . Labels ) ) {
return true
}
}
return false
}
// The pod can only schedule onto nodes that satisfy requirements in both NodeAffinity and nodeSelector.
2016-07-08 09:03:51 +00:00
func podMatchesNodeLabels ( pod * api . Pod , node * api . Node ) bool {
2016-01-26 23:03:18 +00:00
// Check if node.Labels match pod.Spec.NodeSelector.
if len ( pod . Spec . NodeSelector ) > 0 {
selector := labels . SelectorFromSet ( pod . Spec . NodeSelector )
if ! selector . Matches ( labels . Set ( node . Labels ) ) {
return false
}
}
// Parse required node affinity scheduling requirements
// and check if the current node match the requirements.
affinity , err := api . GetAffinityFromPodAnnotations ( pod . Annotations )
if err != nil {
glog . V ( 10 ) . Infof ( "Failed to get Affinity from Pod %+v, err: %+v" , podName ( pod ) , err )
return false
}
// 1. nil NodeSelector matches all nodes (i.e. does not filter out any nodes)
// 2. nil []NodeSelectorTerm (equivalent to non-nil empty NodeSelector) matches no nodes
// 3. zero-length non-nil []NodeSelectorTerm matches no nodes also, just for simplicity
// 4. nil []NodeSelectorRequirement (equivalent to non-nil empty NodeSelectorTerm) matches no nodes
// 5. zero-length non-nil []NodeSelectorRequirement matches no nodes also, just for simplicity
// 6. non-nil empty NodeSelectorRequirement is not allowed
nodeAffinityMatches := true
2016-07-22 10:48:35 +00:00
if affinity != nil && affinity . NodeAffinity != nil {
2016-01-26 23:03:18 +00:00
nodeAffinity := affinity . NodeAffinity
// if no required NodeAffinity requirements, will do no-op, means select all nodes.
2016-02-11 07:06:33 +00:00
// TODO: Replace next line with subsequent commented-out line when implement RequiredDuringSchedulingRequiredDuringExecution.
if nodeAffinity . RequiredDuringSchedulingIgnoredDuringExecution == nil {
// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution == nil && nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
2016-01-26 23:03:18 +00:00
return true
}
// Match node selector for requiredDuringSchedulingRequiredDuringExecution.
2016-02-11 07:06:33 +00:00
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil {
// nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution.NodeSelectorTerms
// glog.V(10).Infof("Match for RequiredDuringSchedulingRequiredDuringExecution node selector terms %+v", nodeSelectorTerms)
2016-02-23 03:53:10 +00:00
// nodeAffinityMatches = nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
2016-02-11 07:06:33 +00:00
// }
// Match node selector for requiredDuringSchedulingIgnoredDuringExecution.
2016-01-26 23:03:18 +00:00
if nodeAffinity . RequiredDuringSchedulingIgnoredDuringExecution != nil {
nodeSelectorTerms := nodeAffinity . RequiredDuringSchedulingIgnoredDuringExecution . NodeSelectorTerms
glog . V ( 10 ) . Infof ( "Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v" , nodeSelectorTerms )
2016-02-23 03:53:10 +00:00
nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms ( node , nodeSelectorTerms )
2016-01-26 23:03:18 +00:00
}
2015-03-20 16:52:32 +00:00
}
2016-01-26 23:03:18 +00:00
return nodeAffinityMatches
2015-03-20 16:52:32 +00:00
}
2016-08-09 12:01:46 +00:00
func PodSelectorMatches ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-04-21 08:24:12 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2014-10-22 00:13:52 +00:00
}
2016-07-08 09:03:51 +00:00
if podMatchesNodeLabels ( pod , node ) {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-01-06 01:10:59 +00:00
}
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrNodeSelectorNotMatch } , nil
2014-10-22 00:13:52 +00:00
}
2016-08-09 12:01:46 +00:00
func PodFitsHost ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2015-05-22 23:40:57 +00:00
if len ( pod . Spec . NodeName ) == 0 {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2014-12-18 22:12:58 +00:00
}
2016-04-28 14:51:17 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2016-04-28 14:51:17 +00:00
}
if pod . Spec . NodeName == node . Name {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-01-06 01:10:59 +00:00
}
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrPodNotMatchHostName } , nil
2014-12-18 22:12:58 +00:00
}
2014-12-22 21:54:41 +00:00
type NodeLabelChecker struct {
labels [ ] string
presence bool
}
2016-04-21 08:24:12 +00:00
func NewNodeLabelPredicate ( labels [ ] string , presence bool ) algorithm . FitPredicate {
2014-12-22 21:54:41 +00:00
labelChecker := & NodeLabelChecker {
labels : labels ,
presence : presence ,
}
return labelChecker . CheckNodeLabelPresence
}
2015-09-10 08:40:22 +00:00
// CheckNodeLabelPresence checks whether all of the specified labels exists on a node or not, regardless of their value
// If "presence" is false, then returns false if any of the requested labels matches any of the node's labels,
2015-01-05 22:51:22 +00:00
// otherwise returns true.
2015-09-10 08:40:22 +00:00
// If "presence" is true, then returns false if any of the requested labels does not match any of the node's labels,
2015-01-05 22:51:22 +00:00
// otherwise returns true.
//
2015-09-10 08:40:22 +00:00
// Consider the cases where the nodes are placed in regions/zones/racks and these are identified by labels
// In some cases, it is required that only nodes that are part of ANY of the defined regions/zones/racks be selected
2014-12-22 21:54:41 +00:00
//
2015-09-10 08:40:22 +00:00
// Alternately, eliminating nodes that have a certain label, regardless of value, is also useful
// A node may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this node
2016-08-09 12:01:46 +00:00
func ( n * NodeLabelChecker ) CheckNodeLabelPresence ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-04-21 08:24:12 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2014-12-22 21:54:41 +00:00
}
2016-04-21 08:24:12 +00:00
var exists bool
2015-09-10 08:40:22 +00:00
nodeLabels := labels . Set ( node . Labels )
2014-12-22 21:54:41 +00:00
for _ , label := range n . labels {
2015-09-10 08:40:22 +00:00
exists = nodeLabels . Has ( label )
2014-12-22 21:54:41 +00:00
if ( exists && ! n . presence ) || ( ! exists && n . presence ) {
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrNodeLabelPresenceViolated } , nil
2014-12-22 21:54:41 +00:00
}
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2014-12-22 21:54:41 +00:00
}
2014-12-22 23:55:31 +00:00
type ServiceAffinity struct {
2015-05-08 11:01:09 +00:00
podLister algorithm . PodLister
serviceLister algorithm . ServiceLister
2014-12-22 23:55:31 +00:00
nodeInfo NodeInfo
labels [ ] string
}
2015-05-08 11:01:09 +00:00
func NewServiceAffinityPredicate ( podLister algorithm . PodLister , serviceLister algorithm . ServiceLister , nodeInfo NodeInfo , labels [ ] string ) algorithm . FitPredicate {
2014-12-22 23:55:31 +00:00
affinity := & ServiceAffinity {
podLister : podLister ,
serviceLister : serviceLister ,
nodeInfo : nodeInfo ,
labels : labels ,
}
return affinity . CheckServiceAffinity
}
2015-09-10 08:40:22 +00:00
// CheckServiceAffinity ensures that only the nodes that match the specified labels are considered for scheduling.
2015-01-05 22:51:22 +00:00
// The set of labels to be considered are provided to the struct (ServiceAffinity).
2015-09-10 08:40:22 +00:00
// The pod is checked for the labels and any missing labels are then checked in the node
2015-01-05 22:51:22 +00:00
// that hosts the service pods (peers) for the given pod.
2015-01-08 06:18:22 +00:00
//
// We add an implicit selector requiring some particular value V for label L to a pod, if:
// - L is listed in the ServiceAffinity object that is passed into the function
// - the pod does not have any NodeSelector for L
2015-09-10 08:40:22 +00:00
// - some other pod from the same service is already scheduled onto a node that has value V for label L
2016-08-09 12:01:46 +00:00
func ( s * ServiceAffinity ) CheckServiceAffinity ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-07-08 08:54:43 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2016-07-08 08:54:43 +00:00
}
2014-12-22 23:55:31 +00:00
var affinitySelector labels . Selector
2015-01-05 22:51:22 +00:00
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
2014-12-22 23:55:31 +00:00
affinityLabels := map [ string ] string { }
2015-01-05 22:51:22 +00:00
nodeSelector := labels . Set ( pod . Spec . NodeSelector )
2014-12-22 23:55:31 +00:00
labelsExist := true
for _ , l := range s . labels {
2015-01-05 22:51:22 +00:00
if nodeSelector . Has ( l ) {
affinityLabels [ l ] = nodeSelector . Get ( l )
2014-12-22 23:55:31 +00:00
} else {
// the current pod does not specify all the labels, look in the existing service pods
labelsExist = false
}
}
// skip looking at other pods in the service if the current pod defines all the required affinity labels
if ! labelsExist {
2015-01-05 22:51:22 +00:00
services , err := s . serviceLister . GetPodServices ( pod )
2014-12-22 23:55:31 +00:00
if err == nil {
2015-01-05 22:51:22 +00:00
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels . SelectorFromSet ( services [ 0 ] . Spec . Selector )
2015-01-13 17:52:37 +00:00
servicePods , err := s . podLister . List ( selector )
2014-12-22 23:55:31 +00:00
if err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2014-12-22 23:55:31 +00:00
}
2015-03-06 22:29:44 +00:00
// consider only the pods that belong to the same namespace
2015-04-03 22:51:50 +00:00
nsServicePods := [ ] * api . Pod { }
2015-03-06 22:29:44 +00:00
for _ , nsPod := range servicePods {
if nsPod . Namespace == pod . Namespace {
nsServicePods = append ( nsServicePods , nsPod )
}
}
if len ( nsServicePods ) > 0 {
2015-09-10 08:40:22 +00:00
// consider any service pod and fetch the node its hosted on
otherNode , err := s . nodeInfo . GetNodeInfo ( nsServicePods [ 0 ] . Spec . NodeName )
2014-12-22 23:55:31 +00:00
if err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2014-12-22 23:55:31 +00:00
}
for _ , l := range s . labels {
// If the pod being scheduled has the label value specified, do not override it
if _ , exists := affinityLabels [ l ] ; exists {
continue
}
2015-09-10 08:40:22 +00:00
if labels . Set ( otherNode . Labels ) . Has ( l ) {
affinityLabels [ l ] = labels . Set ( otherNode . Labels ) . Get ( l )
2014-12-22 23:55:31 +00:00
}
}
}
}
}
2015-09-10 08:40:22 +00:00
// if there are no existing pods in the service, consider all nodes
2014-12-22 23:55:31 +00:00
if len ( affinityLabels ) == 0 {
affinitySelector = labels . Everything ( )
} else {
affinitySelector = labels . Set ( affinityLabels ) . AsSelector ( )
}
2015-09-10 08:40:22 +00:00
// check if the node matches the selector
2016-01-06 01:10:59 +00:00
if affinitySelector . Matches ( labels . Set ( node . Labels ) ) {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-01-06 01:10:59 +00:00
}
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrServiceAffinityViolated } , nil
2014-12-22 23:55:31 +00:00
}
2016-08-09 12:01:46 +00:00
func PodFitsHostPorts ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-07-12 09:43:54 +00:00
var wantPorts map [ int ] bool
if predicateMeta , ok := meta . ( * predicateMetadata ) ; ok {
wantPorts = predicateMeta . podPorts
} else {
// We couldn't parse metadata - fallback to computing it.
wantPorts = getUsedPorts ( pod )
}
2016-01-10 04:03:52 +00:00
if len ( wantPorts ) == 0 {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-01-10 04:03:52 +00:00
}
2016-07-12 09:43:54 +00:00
// TODO: Aggregate it at the NodeInfo level.
2016-01-28 20:14:45 +00:00
existingPorts := getUsedPorts ( nodeInfo . Pods ( ) ... )
2014-11-05 05:21:26 +00:00
for wport := range wantPorts {
2016-07-12 09:43:54 +00:00
if wport != 0 && existingPorts [ wport ] {
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrPodNotFitsHostPorts } , nil
2014-09-23 23:14:54 +00:00
}
2014-06-28 22:35:51 +00:00
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2014-06-28 22:35:51 +00:00
}
2015-04-03 22:51:50 +00:00
func getUsedPorts ( pods ... * api . Pod ) map [ int ] bool {
2014-11-05 05:21:26 +00:00
ports := make ( map [ int ] bool )
for _ , pod := range pods {
2016-07-11 08:32:29 +00:00
for j := range pod . Spec . Containers {
container := & pod . Spec . Containers [ j ]
for k := range container . Ports {
podPort := & container . Ports [ k ]
2016-04-21 08:24:12 +00:00
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort . HostPort != 0 {
2016-04-27 04:35:14 +00:00
ports [ int ( podPort . HostPort ) ] = true
2016-04-21 08:24:12 +00:00
}
2014-06-28 22:35:51 +00:00
}
}
}
2014-11-05 05:21:26 +00:00
return ports
2014-06-28 22:35:51 +00:00
}
2015-10-20 17:24:23 +00:00
// search two arrays and return true if they have at least one common element; return false otherwise
func haveSame ( a1 , a2 [ ] string ) bool {
for _ , val1 := range a1 {
for _ , val2 := range a2 {
if val1 == val2 {
return true
}
}
}
return false
}
2016-01-06 01:10:59 +00:00
2016-08-09 12:01:46 +00:00
func GeneralPredicates ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
var predicateFails [ ] algorithm . PredicateFailureReason
fit , reasons , err := PodFitsResources ( pod , meta , nodeInfo )
if err != nil {
return false , predicateFails , err
}
2016-01-06 01:10:59 +00:00
if ! fit {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , reasons ... )
2016-01-06 01:10:59 +00:00
}
2016-04-22 16:58:49 +00:00
2016-08-09 12:01:46 +00:00
fit , reasons , err = PodFitsHost ( pod , meta , nodeInfo )
if err != nil {
return false , predicateFails , err
}
2016-01-06 01:10:59 +00:00
if ! fit {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , reasons ... )
}
fit , reasons , err = PodFitsHostPorts ( pod , meta , nodeInfo )
if err != nil {
return false , predicateFails , err
2016-01-06 01:10:59 +00:00
}
if ! fit {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , reasons ... )
}
fit , reasons , err = PodSelectorMatches ( pod , meta , nodeInfo )
if err != nil {
return false , predicateFails , err
2016-01-06 01:10:59 +00:00
}
2016-04-21 08:24:12 +00:00
if ! fit {
2016-08-09 12:01:46 +00:00
predicateFails = append ( predicateFails , reasons ... )
2016-01-06 01:10:59 +00:00
}
2016-08-09 12:01:46 +00:00
return len ( predicateFails ) == 0 , predicateFails , nil
2016-01-06 01:10:59 +00:00
}
2016-05-04 06:50:31 +00:00
type PodAffinityChecker struct {
info NodeInfo
podLister algorithm . PodLister
failureDomains priorityutil . Topologies
}
func NewPodAffinityPredicate ( info NodeInfo , podLister algorithm . PodLister , failureDomains [ ] string ) algorithm . FitPredicate {
checker := & PodAffinityChecker {
info : info ,
podLister : podLister ,
failureDomains : priorityutil . Topologies { DefaultKeys : failureDomains } ,
}
return checker . InterPodAffinityMatches
}
2016-08-09 12:01:46 +00:00
func ( c * PodAffinityChecker ) InterPodAffinityMatches ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-04-28 14:51:17 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2016-05-04 06:50:31 +00:00
}
2016-07-21 14:16:24 +00:00
if ! c . satisfiesExistingPodsAntiAffinity ( pod , meta , node ) {
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrPodAffinityNotMatch } , nil
2016-05-04 06:50:31 +00:00
}
2016-07-21 14:16:24 +00:00
// Now check if <pod> requirements will be satisfied on this node.
2016-07-18 12:30:52 +00:00
affinity , err := api . GetAffinityFromPodAnnotations ( pod . Annotations )
if err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2016-07-18 12:30:52 +00:00
}
2016-07-21 14:16:24 +00:00
if affinity == nil || ( affinity . PodAffinity == nil && affinity . PodAntiAffinity == nil ) {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-07-22 10:48:35 +00:00
}
2016-07-21 14:16:24 +00:00
if ! c . satisfiesPodsAffinityAntiAffinity ( pod , node , affinity ) {
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrPodAffinityNotMatch } , nil
2016-05-04 06:50:31 +00:00
}
2016-07-18 12:30:52 +00:00
2016-07-21 14:16:24 +00:00
if glog . V ( 10 ) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog . Infof ( "Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied" ,
podName ( pod ) , node . Name )
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-05-04 06:50:31 +00:00
}
// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
2016-07-18 12:30:52 +00:00
// First return value indicates whether a matching pod exists on a node that matches the topology key,
// while the second return value indicates whether a matching pod exists anywhere.
// TODO: Do we really need any pod matching, or all pods matching? I think the latter.
2016-07-21 14:16:24 +00:00
func ( c * PodAffinityChecker ) anyPodMatchesPodAffinityTerm ( pod * api . Pod , allPods [ ] * api . Pod , node * api . Node , term * api . PodAffinityTerm ) ( bool , bool , error ) {
2016-07-18 12:30:52 +00:00
matchingPodExists := false
2016-07-21 14:16:24 +00:00
for _ , existingPod := range allPods {
match , err := priorityutil . PodMatchesTermsNamespaceAndSelector ( existingPod , pod , term )
2016-07-14 07:51:31 +00:00
if err != nil {
2016-07-18 12:30:52 +00:00
return false , matchingPodExists , err
2016-07-14 07:51:31 +00:00
}
2016-07-18 12:30:52 +00:00
if match {
matchingPodExists = true
2016-07-21 14:16:24 +00:00
existingPodNode , err := c . info . GetNodeInfo ( existingPod . Spec . NodeName )
if err != nil {
return false , matchingPodExists , err
}
if c . failureDomains . NodesHaveSameTopologyKey ( node , existingPodNode , term . TopologyKey ) {
2016-07-18 12:30:52 +00:00
return true , matchingPodExists , nil
}
}
}
return false , matchingPodExists , nil
}
func getPodAffinityTerms ( podAffinity * api . PodAffinity ) ( terms [ ] api . PodAffinityTerm ) {
if podAffinity != nil {
if len ( podAffinity . RequiredDuringSchedulingIgnoredDuringExecution ) != 0 {
terms = podAffinity . RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
2016-05-04 06:50:31 +00:00
}
2016-07-18 12:30:52 +00:00
return terms
}
func getPodAntiAffinityTerms ( podAntiAffinity * api . PodAntiAffinity ) ( terms [ ] api . PodAffinityTerm ) {
if podAntiAffinity != nil {
if len ( podAntiAffinity . RequiredDuringSchedulingIgnoredDuringExecution ) != 0 {
terms = podAntiAffinity . RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
2016-05-04 06:50:31 +00:00
}
2016-07-21 14:16:24 +00:00
func getMatchingAntiAffinityTerms ( pod * api . Pod , nodeInfoMap map [ string ] * schedulercache . NodeInfo ) ( [ ] matchingPodAntiAffinityTerm , error ) {
allNodeNames := make ( [ ] string , 0 , len ( nodeInfoMap ) )
for name := range nodeInfoMap {
allNodeNames = append ( allNodeNames , name )
}
var lock sync . Mutex
var result [ ] matchingPodAntiAffinityTerm
var firstError error
appendResult := func ( toAppend [ ] matchingPodAntiAffinityTerm ) {
lock . Lock ( )
defer lock . Unlock ( )
result = append ( result , toAppend ... )
}
catchError := func ( err error ) {
lock . Lock ( )
defer lock . Unlock ( )
if firstError == nil {
firstError = err
2016-05-04 06:50:31 +00:00
}
2016-07-21 14:16:24 +00:00
}
2016-05-04 06:50:31 +00:00
2016-07-21 14:16:24 +00:00
processNode := func ( i int ) {
nodeInfo := nodeInfoMap [ allNodeNames [ i ] ]
node := nodeInfo . Node ( )
if node == nil {
catchError ( fmt . Errorf ( "node not found" ) )
return
}
var nodeResult [ ] matchingPodAntiAffinityTerm
for _ , existingPod := range nodeInfo . PodsWithAffinity ( ) {
affinity , err := api . GetAffinityFromPodAnnotations ( existingPod . Annotations )
if err != nil {
catchError ( err )
return
}
if affinity == nil {
continue
2016-05-04 06:50:31 +00:00
}
2016-07-21 14:16:24 +00:00
for _ , term := range getPodAntiAffinityTerms ( affinity . PodAntiAffinity ) {
match , err := priorityutil . PodMatchesTermsNamespaceAndSelector ( pod , existingPod , & term )
if err != nil {
catchError ( err )
return
}
if match {
nodeResult = append ( nodeResult , matchingPodAntiAffinityTerm { term : & term , node : node } )
}
}
}
if len ( nodeResult ) > 0 {
appendResult ( nodeResult )
2016-05-04 06:50:31 +00:00
}
}
2016-07-21 14:16:24 +00:00
workqueue . Parallelize ( 16 , len ( allNodeNames ) , processNode )
return result , firstError
2016-05-04 06:50:31 +00:00
}
2016-07-21 14:16:24 +00:00
func ( c * PodAffinityChecker ) getMatchingAntiAffinityTerms ( pod * api . Pod , allPods [ ] * api . Pod ) ( [ ] matchingPodAntiAffinityTerm , error ) {
var result [ ] matchingPodAntiAffinityTerm
for _ , existingPod := range allPods {
affinity , err := api . GetAffinityFromPodAnnotations ( existingPod . Annotations )
if err != nil {
return nil , err
}
if affinity . PodAntiAffinity != nil {
existingPodNode , err := c . info . GetNodeInfo ( existingPod . Spec . NodeName )
if err != nil {
return nil , err
}
for _ , term := range getPodAntiAffinityTerms ( affinity . PodAntiAffinity ) {
match , err := priorityutil . PodMatchesTermsNamespaceAndSelector ( pod , existingPod , & term )
if err != nil {
return nil , err
}
if match {
result = append ( result , matchingPodAntiAffinityTerm { term : & term , node : existingPodNode } )
}
}
2016-05-04 06:50:31 +00:00
}
}
2016-07-21 14:16:24 +00:00
return result , nil
}
2016-05-04 06:50:31 +00:00
2016-07-21 14:16:24 +00:00
// Checks if scheduling the pod onto this node would break any anti-affinity
// rules indicated by the existing pods.
func ( c * PodAffinityChecker ) satisfiesExistingPodsAntiAffinity ( pod * api . Pod , meta interface { } , node * api . Node ) bool {
var matchingTerms [ ] matchingPodAntiAffinityTerm
if predicateMeta , ok := meta . ( * predicateMetadata ) ; ok {
matchingTerms = predicateMeta . matchingAntiAffinityTerms
} else {
allPods , err := c . podLister . List ( labels . Everything ( ) )
2016-05-04 06:50:31 +00:00
if err != nil {
2016-07-21 14:16:24 +00:00
glog . V ( 10 ) . Infof ( "Failed to get all pods, %+v" , err )
2016-05-04 06:50:31 +00:00
return false
}
2016-07-21 14:16:24 +00:00
if matchingTerms , err = c . getMatchingAntiAffinityTerms ( pod , allPods ) ; err != nil {
glog . V ( 10 ) . Infof ( "Failed to get all terms that pod %+v matches, err: %+v" , podName ( pod ) , err )
return false
2016-07-22 10:48:35 +00:00
}
2016-07-21 14:16:24 +00:00
}
for _ , term := range matchingTerms {
if c . failureDomains . NodesHaveSameTopologyKey ( node , term . node , term . term . TopologyKey ) {
glog . V ( 10 ) . Infof ( "Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v" ,
podName ( pod ) , node . Name , term . term )
return false
}
}
if glog . V ( 10 ) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog . Infof ( "Schedule Pod %+v on Node %+v is allowed, existing pods anti-affinity rules satisfied." ,
podName ( pod ) , node . Name )
}
return true
}
// Checks if scheduling the pod onto this node would break any rules of this pod.
func ( c * PodAffinityChecker ) satisfiesPodsAffinityAntiAffinity ( pod * api . Pod , node * api . Node , affinity * api . Affinity ) bool {
allPods , err := c . podLister . List ( labels . Everything ( ) )
if err != nil {
return false
}
// Check all affinity terms.
for _ , term := range getPodAffinityTerms ( affinity . PodAffinity ) {
termMatches , matchingPodExists , err := c . anyPodMatchesPodAffinityTerm ( pod , allPods , node , & term )
2016-07-18 12:30:52 +00:00
if err != nil {
2016-07-21 14:16:24 +00:00
glog . V ( 10 ) . Infof ( "Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v" ,
podName ( pod ) , node . Name , term , err )
2016-07-18 12:30:52 +00:00
return false
}
2016-07-21 14:16:24 +00:00
if ! termMatches {
// If the requirement matches a pod's own labels ane namespace, and there are
// no other such pods, then disregard the requirement. This is necessary to
// not block forever because the first pod of the collection can't be scheduled.
match , err := priorityutil . PodMatchesTermsNamespaceAndSelector ( pod , pod , & term )
if err != nil || ! match || matchingPodExists {
glog . V ( 10 ) . Infof ( "Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v" ,
podName ( pod ) , node . Name , term , err )
2016-07-18 12:30:52 +00:00
return false
2016-05-04 06:50:31 +00:00
}
}
}
2016-07-21 14:16:24 +00:00
// Check all anti-affinity terms.
for _ , term := range getPodAntiAffinityTerms ( affinity . PodAntiAffinity ) {
termMatches , _ , err := c . anyPodMatchesPodAffinityTerm ( pod , allPods , node , & term )
if err != nil || termMatches {
glog . V ( 10 ) . Infof ( "Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v, err: %v" ,
podName ( pod ) , node . Name , term , err )
return false
}
}
if glog . V ( 10 ) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog . Infof ( "Schedule Pod %+v on Node %+v is allowed, pod afinnity/anti-affinity constraints satisfied." ,
podName ( pod ) , node . Name )
}
2016-05-04 06:50:31 +00:00
return true
}
2016-08-09 12:01:46 +00:00
func PodToleratesNodeTaints ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-03-31 03:42:57 +00:00
node := nodeInfo . Node ( )
2016-07-08 12:49:25 +00:00
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2016-07-08 12:49:25 +00:00
}
2016-03-31 03:42:57 +00:00
taints , err := api . GetTaintsFromNodeAnnotations ( node . Annotations )
if err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2016-03-31 03:42:57 +00:00
}
tolerations , err := api . GetTolerationsFromPodAnnotations ( pod . Annotations )
if err != nil {
2016-08-09 12:01:46 +00:00
return false , nil , err
2016-03-31 03:42:57 +00:00
}
if tolerationsToleratesTaints ( tolerations , taints ) {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-03-31 03:42:57 +00:00
}
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrTaintsTolerationsNotMatch } , nil
2016-03-31 03:42:57 +00:00
}
func tolerationsToleratesTaints ( tolerations [ ] api . Toleration , taints [ ] api . Taint ) bool {
// If the taint list is nil/empty, it is tolerated by all tolerations by default.
if len ( taints ) == 0 {
return true
}
// The taint list isn't nil/empty, a nil/empty toleration list can't tolerate them.
if len ( tolerations ) == 0 {
return false
}
2016-07-11 08:32:29 +00:00
for i := range taints {
taint := & taints [ i ]
2016-03-31 03:42:57 +00:00
// skip taints that have effect PreferNoSchedule, since it is for priorities
if taint . Effect == api . TaintEffectPreferNoSchedule {
continue
}
if ! api . TaintToleratedByTolerations ( taint , tolerations ) {
return false
}
}
return true
}
2016-05-12 12:01:33 +00:00
// Determine if a pod is scheduled with best-effort QoS
func isPodBestEffort ( pod * api . Pod ) bool {
2016-06-26 23:08:18 +00:00
return qos . GetPodQOS ( pod ) == qos . BestEffort
2016-05-12 12:01:33 +00:00
}
// CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node
// reporting memory pressure condition.
2016-08-09 12:01:46 +00:00
func CheckNodeMemoryPressurePredicate ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-05-12 12:01:33 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2016-05-12 12:01:33 +00:00
}
2016-07-08 06:25:49 +00:00
var podBestEffort bool
2016-07-21 14:16:24 +00:00
if predicateMeta , ok := meta . ( * predicateMetadata ) ; ok {
2016-07-08 06:25:49 +00:00
podBestEffort = predicateMeta . podBestEffort
} else {
// We couldn't parse metadata - fallback to computing it.
podBestEffort = isPodBestEffort ( pod )
}
2016-05-12 12:01:33 +00:00
// pod is not BestEffort pod
2016-07-08 06:25:49 +00:00
if ! podBestEffort {
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-05-12 12:01:33 +00:00
}
// is node under presure?
for _ , cond := range node . Status . Conditions {
if cond . Type == api . NodeMemoryPressure && cond . Status == api . ConditionTrue {
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrNodeUnderMemoryPressure } , nil
2016-05-12 12:01:33 +00:00
}
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-05-12 12:01:33 +00:00
}
2016-07-22 19:23:34 +00:00
// CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node
// reporting disk pressure condition.
2016-08-09 12:01:46 +00:00
func CheckNodeDiskPressurePredicate ( pod * api . Pod , meta interface { } , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2016-07-22 19:23:34 +00:00
node := nodeInfo . Node ( )
if node == nil {
2016-08-09 12:01:46 +00:00
return false , nil , fmt . Errorf ( "node not found" )
2016-07-22 19:23:34 +00:00
}
// is node under presure?
for _ , cond := range node . Status . Conditions {
if cond . Type == api . NodeDiskPressure && cond . Status == api . ConditionTrue {
2016-08-09 12:01:46 +00:00
return false , [ ] algorithm . PredicateFailureReason { ErrNodeUnderDiskPressure } , nil
2016-07-22 19:23:34 +00:00
}
}
2016-08-09 12:01:46 +00:00
return true , nil , nil
2016-07-22 19:23:34 +00:00
}