Merge pull request #36238 from resouer/eclass-2-dev

Automatic merge from submit-queue (batch tested with PRs 48043, 48200, 49139, 36238, 49130)

Implement equivalence cache by caching and re-using predicate result

The last part of #30844, I opened a new PR instead of overwrite the old one because we changed some basic assumption by allowing invalidating equivalence cache item by individual predicate.

The idea of this PR is based on discussion in https://github.com/kubernetes/kubernetes/issues/32024

- [x]  Pods belong to same controllerRef considered to be equivalent
- [x] ` podFitsOnNode` will use cached predicate result if it's available
- [x] Equivalence cache will be updated when if a fresh new predicate is done
- [x] `factory.go` will invalid specific predicate cache(s) based on the object change
- [x] Since `schedule` and `bind` are async, we need to optimistically invalid affected cache(s) before `bind`
- [x] Fully unit test of affected files
- [x] e2e test to verify cache update/invalid workflow
- [x] performance test results

- [x] Some nits fixes related but expected to result in `needs-rebase` so they are split to: #36060 #35968 #37512

cc @wojtek-t @davidopp
pull/6/head
Kubernetes Submit Queue 2017-07-19 01:57:32 -07:00 committed by GitHub
commit 2faf7ff2bc
16 changed files with 650 additions and 29 deletions

View File

@ -120,6 +120,12 @@ const (
//
// Add priority to pods. Priority affects scheduling and preemption of pods.
PodPriority utilfeature.Feature = "PodPriority"
// owner: @resouer
// alpha: v1.8
//
// Enable equivalence class cache for scheduler.
EnableEquivalenceClassCache utilfeature.Feature = "EnableEquivalenceClassCache"
)
func init() {
@ -144,6 +150,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
LocalStorageCapacityIsolation: {Default: false, PreRelease: utilfeature.Alpha},
DebugContainers: {Default: false, PreRelease: utilfeature.Alpha},
PodPriority: {Default: false, PreRelease: utilfeature.Alpha},
EnableEquivalenceClassCache: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -18,6 +18,7 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/configz:go_default_library",
"//plugin/cmd/kube-scheduler/app/options:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
@ -33,6 +34,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -39,6 +40,7 @@ import (
clientv1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
@ -104,6 +106,7 @@ func CreateScheduler(
statefulSetInformer,
serviceInformer,
s.HardPodAffinitySymmetricWeight,
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
)
// Rebuild the configurator with a default Create(...) method.

View File

@ -37,6 +37,8 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
)
const enableEquivalenceCache = true
func TestCompatibility_v1_Scheduler(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
schedulerFiles := map[string]struct {
@ -432,6 +434,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
).CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
continue

View File

@ -158,7 +158,8 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}
// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod, on the given node as invalid
// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod (i.e. equivalenceHash),
// on the given node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) {
if len(predicateKeys) == 0 {
return

View File

@ -16,7 +16,9 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api/helper:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",

View File

@ -20,6 +20,7 @@ package factory
import (
"fmt"
"reflect"
"time"
"github.com/golang/glog"
@ -40,7 +41,9 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/helper"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -56,6 +59,14 @@ const (
maximalGetBackoff = time.Minute
)
var (
serviceAffinitySet = sets.NewString("ServiceAffinity")
maxPDVolumeCountPredicateSet = sets.NewString("MaxPDVolumeCountPredicate")
matchInterPodAffinitySet = sets.NewString("MatchInterPodAffinity")
generalPredicatesSets = sets.NewString("GeneralPredicates")
noDiskConflictSet = sets.NewString("NoDiskConflict")
)
// ConfigFactory is the default implementation of the scheduler.Configurator interface.
// TODO make this private if possible, so that only its interface is externally used.
type ConfigFactory struct {
@ -99,6 +110,9 @@ type ConfigFactory struct {
// Equivalence class cache
equivalencePodCache *core.EquivalenceCache
// Enable equivalence class cache
enableEquivalenceClassCache bool
}
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
@ -115,6 +129,7 @@ func NewConfigFactory(
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
hardPodAffinitySymmetricWeight int,
enableEquivalenceClassCache bool,
) scheduler.Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
@ -133,6 +148,7 @@ func NewConfigFactory(
StopEverything: stopEverything,
schedulerName: schedulerName,
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: enableEquivalenceClassCache,
}
c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
@ -201,11 +217,154 @@ func NewConfigFactory(
)
c.nodeLister = nodeInformer.Lister()
// TODO(harryz) need to fill all the handlers here and below for equivalence cache
// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
pvInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: c.onPvAdd,
DeleteFunc: c.onPvDelete,
},
0,
)
c.pVLister = pvInformer.Lister()
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
pvcInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onPvcAdd,
DeleteFunc: c.onPvcDelete,
},
0,
)
c.pVCLister = pvcInformer.Lister()
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onServiceAdd,
UpdateFunc: c.onServiceUpdate,
DeleteFunc: c.onServiceDelete,
},
0,
)
c.serviceLister = serviceInformer.Lister()
// Existing equivalence cache should not be affected by add/delete RC/Deployment etc,
// it only make sense when pod is scheduled or deleted
return c
}
func (c *ConfigFactory) onPvAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
glog.Errorf("cannot convert to *v1.PersistentVolume: %v", obj)
return
}
c.invalidatePredicatesForPv(pv)
}
}
func (c *ConfigFactory) onPvDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
var pv *v1.PersistentVolume
switch t := obj.(type) {
case *v1.PersistentVolume:
pv = t
case cache.DeletedFinalStateUnknown:
var ok bool
pv, ok = t.Obj.(*v1.PersistentVolume)
if !ok {
glog.Errorf("cannot convert to *v1.PersistentVolume: %v", t.Obj)
return
}
default:
glog.Errorf("cannot convert to *v1.PersistentVolume: %v", t)
return
}
c.invalidatePredicatesForPv(pv)
}
}
func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate")
if pv.Spec.AWSElasticBlockStore != nil {
invalidPredicates.Insert("MaxEBSVolumeCount")
}
if pv.Spec.GCEPersistentDisk != nil {
invalidPredicates.Insert("MaxGCEPDVolumeCount")
}
if pv.Spec.AzureDisk != nil {
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
func (c *ConfigFactory) onPvcAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", obj)
return
}
c.invalidatePredicatesForPvc(pvc)
}
}
func (c *ConfigFactory) onPvcDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
var pvc *v1.PersistentVolumeClaim
switch t := obj.(type) {
case *v1.PersistentVolumeClaim:
pvc = t
case cache.DeletedFinalStateUnknown:
var ok bool
pvc, ok = t.Obj.(*v1.PersistentVolumeClaim)
if !ok {
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t.Obj)
return
}
default:
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t)
return
}
c.invalidatePredicatesForPvc(pvc)
}
}
func (c *ConfigFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
if pvc.Spec.VolumeName != "" {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet)
}
}
func (c *ConfigFactory) onServiceAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
}
}
func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
if c.enableEquivalenceClassCache {
// TODO(resouer) We may need to invalidate this for specified group of pods only
oldService := oldObj.(*v1.Service)
newService := newObj.(*v1.Service)
if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
}
}
}
func (c *ConfigFactory) onServiceDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
}
}
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister {
return c.nodeLister
@ -229,7 +388,6 @@ func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister {
return c.scheduledPodLister
}
// TODO(resouer) need to update all the handlers here and below for equivalence cache
func (c *ConfigFactory) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
@ -240,6 +398,8 @@ func (c *ConfigFactory) addPodToCache(obj interface{}) {
if err := c.schedulerCache.AddPod(pod); err != nil {
glog.Errorf("scheduler cache AddPod failed: %v", err)
}
// NOTE: Updating equivalence cache of addPodToCache has been
// handled optimistically in InvalidateCachedPredicateItemForPodAdd.
}
func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
@ -257,6 +417,29 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
}
func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
if c.enableEquivalenceClassCache {
// if the pod does not have binded node, updating equivalence cache is meaningless;
// if pod's binded node has been changed, that case should be handled by pod add & delete.
if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
// MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain.
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
matchInterPodAffinitySet)
}
// if requested container resource changed, invalidate GeneralPredicates of this node
if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
predicates.GetResourceRequest(oldPod)) {
c.equivalencePodCache.InvalidateCachedPredicateItem(
newPod.Spec.NodeName, generalPredicatesSets)
}
}
}
}
func (c *ConfigFactory) deletePodFromCache(obj interface{}) {
@ -278,6 +461,29 @@ func (c *ConfigFactory) deletePodFromCache(obj interface{}) {
if err := c.schedulerCache.RemovePod(pod); err != nil {
glog.Errorf("scheduler cache RemovePod failed: %v", err)
}
c.invalidateCachedPredicatesOnDeletePod(pod)
}
func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
if c.enableEquivalenceClassCache {
// part of this case is the same as pod add.
c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
// MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain.
// TODO(resouer) can we just do this for nodes in the same failure domain
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
matchInterPodAffinitySet)
// if this pod have these PV, cached result of disk conflict will become invalid.
for _, volume := range pod.Spec.Volumes {
if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
volume.RBD != nil || volume.ISCSI != nil {
c.equivalencePodCache.InvalidateCachedPredicateItem(
pod.Spec.NodeName, noDiskConflictSet)
}
}
}
}
func (c *ConfigFactory) addNodeToCache(obj interface{}) {
@ -290,6 +496,8 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) {
if err := c.schedulerCache.AddNode(node); err != nil {
glog.Errorf("scheduler cache AddNode failed: %v", err)
}
// NOTE: add a new node does not affect existing predicates in equivalence cache
}
func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
@ -307,6 +515,64 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
glog.Errorf("scheduler cache UpdateNode failed: %v", err)
}
c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode)
}
func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
if c.enableEquivalenceClassCache {
// Begin to update equivalence cache based on node update
// TODO(resouer): think about lazily initialize this set
invalidPredicates := sets.NewString()
oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations())
if oldErr != nil {
glog.Errorf("Failed to get taints from old node annotation for equivalence cache")
}
newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations())
if newErr != nil {
glog.Errorf("Failed to get taints from new node annotation for equivalence cache")
}
if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
invalidPredicates.Insert("GeneralPredicates") // "PodFitsResources"
}
if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
invalidPredicates.Insert("GeneralPredicates", "ServiceAffinity") // "PodSelectorMatches"
for k, v := range oldNode.GetLabels() {
// any label can be topology key of pod, we have to invalidate in all cases
if v != newNode.GetLabels()[k] {
invalidPredicates.Insert("MatchInterPodAffinity")
}
// NoVolumeZoneConflict will only be affected by zone related label change
if k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion {
if v != newNode.GetLabels()[k] {
invalidPredicates.Insert("NoVolumeZoneConflict")
}
}
}
}
if !reflect.DeepEqual(oldTaints, newTaints) {
invalidPredicates.Insert("PodToleratesNodeTaints")
}
if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) {
oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
for _, cond := range oldNode.Status.Conditions {
oldConditions[cond.Type] = cond.Status
}
for _, cond := range newNode.Status.Conditions {
newConditions[cond.Type] = cond.Status
}
if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] {
invalidPredicates.Insert("CheckNodeMemoryPressure")
}
if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] {
invalidPredicates.Insert("CheckNodeDiskPressure")
}
}
c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates)
}
}
func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) {
@ -328,6 +594,9 @@ func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) {
if err := c.schedulerCache.RemoveNode(node); err != nil {
glog.Errorf("scheduler cache RemoveNode failed: %v", err)
}
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(node.GetName())
}
}
// Create creates a scheduler with the default algorithm provider.
@ -424,11 +693,17 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, err
}
// TODO(resouer) use equivalence cache instead of nil here when #36238 get merged
algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
// Init equivalence class cache
if f.enableEquivalenceClassCache && getEquivalencePodFunc != nil {
f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc)
glog.Info("Created equivalence class cache")
}
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
Ecache: f.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodePredicateLister{f.nodeLister},
Algorithm: algo,

View File

@ -40,6 +40,8 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
)
const enableEquivalenceCache = true
func TestCreate(t *testing.T) {
handler := utiltesting.FakeHandler{
StatusCode: 500,
@ -62,6 +64,7 @@ func TestCreate(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
factory.Create()
}
@ -93,6 +96,7 @@ func TestCreateFromConfig(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
// Pre-register some predicate and priority functions
@ -151,6 +155,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
// Pre-register some predicate and priority functions
@ -210,6 +215,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
configData = []byte(`{}`)
@ -266,6 +272,7 @@ func TestDefaultErrorFunc(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
@ -378,6 +385,7 @@ func TestResponsibleForPod(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
// factory of "foo-scheduler"
factoryFooScheduler := NewConfigFactory(
@ -392,6 +400,7 @@ func TestResponsibleForPod(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
// scheduler annotations to be tested
schedulerFitsDefault := "default-scheduler"
@ -461,6 +470,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
-1,
enableEquivalenceCache,
)
_, err := factory.Create()
if err == nil {
@ -506,6 +516,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
test.hardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
_, err := factory.Create()
if err == nil {

View File

@ -411,7 +411,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
framework.AddOrUpdateLabelOnNode(c, node, labelKey, labelValue)
}
CreateNodeSelectorPods(f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false)
scheduling.CreateNodeSelectorPods(f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false)
By("Waiting for new node to appear and annotating it")
framework.WaitForGroupSize(minMig, int32(minSize+1))
@ -907,26 +907,6 @@ func doPut(url, content string) (string, error) {
return strBody, nil
}
func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) {
By(fmt.Sprintf("Running RC which reserves host port and defines node selector"))
config := &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: defaultTimeout,
Image: framework.GetPauseImageName(f.ClientSet),
Replicas: replicas,
HostPorts: map[string]int{"port1": 4321},
NodeSelector: nodeSelector,
}
err := framework.RunRC(*config)
if expectRunning {
framework.ExpectNoError(err)
}
}
func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) {
By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes))
request := int64(1024 * 1024 * megabytes / replicas)

View File

@ -2462,6 +2462,15 @@ func AddOrUpdateLabelOnNode(c clientset.Interface, nodeName string, labelKey, la
ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue}))
}
func AddOrUpdateLabelOnNodeAndReturnOldValue(c clientset.Interface, nodeName string, labelKey, labelValue string) string {
var oldValue string
node, err := c.Core().Nodes().Get(nodeName, metav1.GetOptions{})
ExpectNoError(err)
oldValue = node.Labels[labelKey]
ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue}))
return oldValue
}
func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string, labelValue string) {
By("verifying the node has the label " + labelKey + " " + labelValue)
node, err := c.Core().Nodes().Get(nodeName, metav1.GetOptions{})

View File

@ -10,6 +10,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"equivalence_cache_predicates.go",
"events.go",
"framework.go",
"nvidia-gpus.go",

View File

@ -0,0 +1,286 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduling
import (
"fmt"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
_ "github.com/stretchr/testify/assert"
)
var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
var cs clientset.Interface
var nodeList *v1.NodeList
var masterNodes sets.String
var systemPodsNo int
var ns string
f := framework.NewDefaultFramework("equivalence-cache")
ignoreLabels := framework.ImagePullerLabels
BeforeEach(func() {
cs = f.ClientSet
ns = f.Namespace.Name
framework.WaitForAllNodesHealthy(cs, time.Minute)
masterNodes, nodeList = framework.GetMasterAndWorkerNodesOrDie(cs)
framework.ExpectNoError(framework.CheckTestingNSDeletedExcept(cs, ns))
// Every test case in this suite assumes that cluster add-on pods stay stable and
// cannot be run in parallel with any other test that touches Nodes or Pods.
// It is so because we need to have precise control on what's running in the cluster.
systemPods, err := framework.GetPodsInNamespace(cs, ns, ignoreLabels)
Expect(err).NotTo(HaveOccurred())
systemPodsNo = 0
for _, pod := range systemPods {
if !masterNodes.Has(pod.Spec.NodeName) && pod.DeletionTimestamp == nil {
systemPodsNo++
}
}
err = framework.WaitForPodsRunningReady(cs, api.NamespaceSystem, int32(systemPodsNo), int32(systemPodsNo), framework.PodReadyBeforeTimeout, ignoreLabels)
Expect(err).NotTo(HaveOccurred())
for _, node := range nodeList.Items {
framework.Logf("\nLogging pods the kubelet thinks is on node %v before test", node.Name)
framework.PrintAllKubeletPods(cs, node.Name)
}
})
// This test verifies that GeneralPredicates works as expected:
// When a replica pod (with HostPorts) is scheduled to a node, it will invalidate GeneralPredicates cache on this node,
// so that subsequent replica pods with same host port claim will be rejected.
// We enforce all replica pods bind to the same node so there will always be conflicts.
It("validates GeneralPredicates is properly invalidated when a pod is scheduled [Slow]", func() {
By("Launching a RC with two replica pods with HostPorts")
nodeName := getNodeThatCanRunPodWithoutToleration(f)
rcName := "host-port"
// bind all replicas to same node
nodeSelector := map[string]string{"kubernetes.io/hostname": nodeName}
By("One pod should be scheduled, the other should be rejected")
// CreateNodeSelectorPods creates RC with host port 4312
WaitForSchedulerAfterAction(f, func() error {
err := CreateNodeSelectorPods(f, rcName, 2, nodeSelector, false)
return err
}, rcName, false)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, rcName)
// the first replica pod is scheduled, and the second pod will be rejected.
verifyResult(cs, 1, 1, ns)
})
// This test verifies that MatchInterPodAffinity works as expected.
// In equivalence cache, it does not handle inter pod affinity (anti-affinity) specially (unless node label changed),
// because current predicates algorithm will ensure newly scheduled pod does not break existing affinity in cluster.
It("validates pod affinity works properly when new replica pod is scheduled", func() {
// create a pod running with label {security: S1}, and choose this node
nodeName, _ := runAndKeepPodWithLabelAndGetNodeName(f)
By("Trying to apply a random label on the found node.")
// we need to use real failure domains, since scheduler only know them
k := "failure-domain.beta.kubernetes.io/zone"
v := "equivalence-e2e-test"
oldValue := framework.AddOrUpdateLabelOnNodeAndReturnOldValue(cs, nodeName, k, v)
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
// restore the node label
defer framework.AddOrUpdateLabelOnNode(cs, nodeName, k, oldValue)
By("Trying to schedule RC with Pod Affinity should success.")
framework.WaitForStableCluster(cs, masterNodes)
affinityRCName := "with-pod-affinity-" + string(uuid.NewUUID())
replica := 2
labelsMap := map[string]string{
"name": affinityRCName,
}
affinity := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: k,
Namespaces: []string{ns},
},
},
},
}
rc := getRCWithInterPodAffinity(affinityRCName, labelsMap, replica, affinity, framework.GetPauseImageName(f.ClientSet))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, affinityRCName)
// RC should be running successfully
// TODO: WaitForSchedulerAfterAction() can on be used to wait for failure event,
// not for successful RC, since no specific pod name can be provided.
_, err := cs.Core().ReplicationControllers(ns).Create(rc)
framework.ExpectNoError(err)
framework.ExpectNoError(framework.WaitForControlledPodsRunning(cs, ns, affinityRCName, api.Kind("ReplicationController")))
By("Remove node failure domain label")
framework.RemoveLabelOffNode(cs, nodeName, k)
By("Trying to schedule another equivalent Pod should fail due to node label has been removed.")
// use scale to create another equivalent pod and wait for failure event
WaitForSchedulerAfterAction(f, func() error {
err := framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, affinityRCName, uint(replica+1), false)
return err
}, affinityRCName, false)
// and this new pod should be rejected since node label has been updated
verifyReplicasResult(cs, replica, 1, ns, affinityRCName)
})
// This test verifies that MatchInterPodAffinity (anti-affinity) is respected as expected.
It("validates pod anti-affinity works properly when new replica pod is scheduled", func() {
By("Launching two pods on two distinct nodes to get two node names")
CreateHostPortPods(f, "host-port", 2, true)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "host-port")
podList, err := cs.Core().Pods(ns).List(metav1.ListOptions{})
framework.ExpectNoError(err)
Expect(len(podList.Items)).To(Equal(2))
nodeNames := []string{podList.Items[0].Spec.NodeName, podList.Items[1].Spec.NodeName}
Expect(nodeNames[0]).ToNot(Equal(nodeNames[1]))
By("Applying a random label to both nodes.")
k := "e2e.inter-pod-affinity.kubernetes.io/zone"
v := "equivalence-e2etest"
for _, nodeName := range nodeNames {
framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
defer framework.RemoveLabelOffNode(cs, nodeName, k)
}
By("Trying to launch a pod with the service label on the selected nodes.")
// run a pod with label {"service": "S1"} and expect it to be running
runPausePod(f, pausePodConfig{
Name: "with-label-" + string(uuid.NewUUID()),
Labels: map[string]string{"service": "S1"},
NodeSelector: map[string]string{k: v}, // only launch on our two nodes
})
By("Trying to launch RC with podAntiAffinity on these two nodes should be rejected.")
labelRCName := "with-podantiaffinity-" + string(uuid.NewUUID())
replica := 2
labelsMap := map[string]string{
"name": labelRCName,
}
affinity := &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: k,
Namespaces: []string{ns},
},
},
},
}
rc := getRCWithInterPodAffinityNodeSelector(labelRCName, labelsMap, replica, affinity,
framework.GetPauseImageName(f.ClientSet), map[string]string{k: v})
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, labelRCName)
WaitForSchedulerAfterAction(f, func() error {
_, err := cs.Core().ReplicationControllers(ns).Create(rc)
return err
}, labelRCName, false)
// these two replicas should all be rejected since podAntiAffinity says it they anit-affinity with pod {"service": "S1"}
verifyReplicasResult(cs, 0, replica, ns, labelRCName)
})
})
// getRCWithInterPodAffinity returns RC with given affinity rules.
func getRCWithInterPodAffinity(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string) *v1.ReplicationController {
return getRCWithInterPodAffinityNodeSelector(name, labelsMap, replica, affinity, image, map[string]string{})
}
// getRCWithInterPodAffinity returns RC with given affinity rules and node selector.
func getRCWithInterPodAffinityNodeSelector(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string, nodeSelector map[string]string) *v1.ReplicationController {
replicaInt32 := int32(replica)
return &v1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.ReplicationControllerSpec{
Replicas: &replicaInt32,
Selector: labelsMap,
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labelsMap,
},
Spec: v1.PodSpec{
Affinity: affinity,
Containers: []v1.Container{
{
Name: name,
Image: image,
},
},
DNSPolicy: v1.DNSDefault,
NodeSelector: nodeSelector,
},
},
},
}
}
func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) error {
By(fmt.Sprintf("Running RC which reserves host port and defines node selector"))
config := &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: defaultTimeout,
Image: framework.GetPauseImageName(f.ClientSet),
Replicas: replicas,
HostPorts: map[string]int{"port1": 4321},
NodeSelector: nodeSelector,
}
err := framework.RunRC(*config)
if expectRunning {
return err
}
return nil
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
@ -51,6 +52,8 @@ type pausePodConfig struct {
Resources *v1.ResourceRequirements
Tolerations []v1.Toleration
NodeName string
Ports []v1.ContainerPort
OwnerReferences []metav1.OwnerReference
}
var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
@ -725,9 +728,10 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Labels: conf.Labels,
Annotations: conf.Annotations,
Name: conf.Name,
Labels: conf.Labels,
Annotations: conf.Annotations,
OwnerReferences: conf.OwnerReferences,
},
Spec: v1.PodSpec{
NodeSelector: conf.NodeSelector,
@ -736,6 +740,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
{
Name: conf.Name,
Image: framework.GetPauseImageName(f.ClientSet),
Ports: conf.Ports,
},
},
Tolerations: conf.Tolerations,
@ -924,6 +929,32 @@ func verifyResult(c clientset.Interface, expectedScheduled int, expectedNotSched
Expect(len(scheduledPods)).To(Equal(expectedScheduled), printOnce(fmt.Sprintf("Scheduled Pods: %#v", scheduledPods)))
}
// verifyReplicasResult is wrapper of verifyResult for a group pods with same "name: labelName" label, which means they belong to same RC
func verifyReplicasResult(c clientset.Interface, expectedScheduled int, expectedNotScheduled int, ns string, labelName string) {
allPods := getPodsByLabels(c, ns, map[string]string{"name": labelName})
scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods)
printed := false
printOnce := func(msg string) string {
if !printed {
printed = true
return msg
} else {
return ""
}
}
Expect(len(notScheduledPods)).To(Equal(expectedNotScheduled), printOnce(fmt.Sprintf("Not scheduled Pods: %#v", notScheduledPods)))
Expect(len(scheduledPods)).To(Equal(expectedScheduled), printOnce(fmt.Sprintf("Scheduled Pods: %#v", scheduledPods)))
}
func getPodsByLabels(c clientset.Interface, ns string, labelsMap map[string]string) *v1.PodList {
selector := labels.SelectorFromSet(labels.Set(labelsMap))
allPods, err := c.Core().Pods(ns).List(metav1.ListOptions{LabelSelector: selector.String()})
framework.ExpectNoError(err)
return allPods
}
func runAndKeepPodWithLabelAndGetNodeName(f *framework.Framework) (string, string) {
// launch a pod to find a node which can launch a pod. We intentionally do
// not just take the node list and choose the first of them. Depending on the

View File

@ -370,6 +370,7 @@ func TestSchedulerExtender(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy)
if err != nil {

View File

@ -50,6 +50,8 @@ import (
"k8s.io/kubernetes/test/integration/framework"
)
const enableEquivalenceCache = true
type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface)
type nodeStateManager struct {
@ -257,6 +259,7 @@ func TestUnschedulableNodes(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
@ -540,6 +543,7 @@ func TestMultiScheduler(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
@ -626,6 +630,7 @@ func TestMultiScheduler(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
schedulerConfig2, err := schedulerConfigFactory2.Create()
if err != nil {
@ -736,6 +741,7 @@ func TestAllocatable(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {

View File

@ -36,6 +36,8 @@ import (
"k8s.io/kubernetes/test/integration/framework"
)
const enableEquivalenceCache = true
// mustSetupScheduler starts the following components:
// - k8s api server (a.k.a. master)
// - scheduler
@ -74,6 +76,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
eventBroadcaster := record.NewBroadcaster()