From e0a7d96632b9b4daa5133df63f1ed16c93cbee6f Mon Sep 17 00:00:00 2001 From: Mayank Kumar Date: Mon, 1 Oct 2018 14:57:17 -0700 Subject: [PATCH] Move informer event handlers to Scheduler --- pkg/scheduler/BUILD | 12 +- pkg/scheduler/equivalence/BUILD | 13 + pkg/scheduler/eventhandlers.go | 481 +++++++++++++++++++ pkg/scheduler/eventhandlers_test.go | 265 ++++++++++ pkg/scheduler/factory/BUILD | 4 - pkg/scheduler/factory/factory.go | 437 +---------------- pkg/scheduler/factory/factory_test.go | 237 --------- pkg/scheduler/scheduler.go | 4 +- pkg/scheduler/scheduler_test.go | 11 +- test/integration/daemonset/daemonset_test.go | 33 +- test/integration/scheduler/scheduler_test.go | 18 +- test/integration/scheduler/util.go | 17 +- test/integration/util/util.go | 14 + 13 files changed, 856 insertions(+), 690 deletions(-) create mode 100644 pkg/scheduler/equivalence/BUILD create mode 100644 pkg/scheduler/eventhandlers.go create mode 100644 pkg/scheduler/eventhandlers_test.go diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index ada94b40b2..5a587510e6 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -3,12 +3,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "eventhandlers.go", "scheduler.go", "testutil.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler", visibility = ["//visibility:public"], deps = [ + "//pkg/kubelet/apis:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/api:go_default_library", @@ -22,8 +24,10 @@ go_library( "//pkg/scheduler/plugins/v1alpha1:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", @@ -32,6 +36,7 @@ go_library( "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -39,7 +44,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["scheduler_test.go"], + srcs = [ + "eventhandlers_test.go", + "scheduler_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/api/legacyscheme:go_default_library", @@ -53,6 +61,7 @@ go_test( "//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library", + "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -88,6 +97,7 @@ filegroup( "//pkg/scheduler/api:all-srcs", "//pkg/scheduler/apis/config:all-srcs", "//pkg/scheduler/core:all-srcs", + "//pkg/scheduler/equivalence:all-srcs", "//pkg/scheduler/factory:all-srcs", "//pkg/scheduler/internal/cache:all-srcs", "//pkg/scheduler/internal/queue:all-srcs", diff --git a/pkg/scheduler/equivalence/BUILD b/pkg/scheduler/equivalence/BUILD new file mode 100644 index 0000000000..6df04e38cd --- /dev/null +++ b/pkg/scheduler/equivalence/BUILD @@ -0,0 +1,13 @@ +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go new file mode 100644 index 0000000000..5c00f04b9d --- /dev/null +++ b/pkg/scheduler/eventhandlers.go @@ -0,0 +1,481 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "k8s.io/klog" + "reflect" + + "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + appsinformers "k8s.io/client-go/informers/apps/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + policyinformers "k8s.io/client-go/informers/policy/v1beta1" + storageinformers "k8s.io/client-go/informers/storage/v1" + "k8s.io/client-go/tools/cache" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" +) + +func (sched *Scheduler) onPvAdd(obj interface{}) { + // Pods created when there are no PVs available will be stuck in + // unschedulable queue. But unbound PVs created for static provisioning and + // delay binding storage class are skipped in PV controller dynamic + // provisiong and binding process, will not trigger events to schedule pod + // again. So we need to move pods to active queue on PV add for this + // scenario. + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onPvUpdate(old, new interface{}) { + // Scheduler.bindVolumesWorker may fail to update assumed pod volume + // bindings due to conflicts if PVs are updated by PV controller or other + // parties, then scheduler will add pod back to unschedulable queue. We + // need to move pods to active queue on PV update for this scenario. + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onPvcAdd(obj interface{}) { + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onPvcUpdate(old, new interface{}) { + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onStorageClassAdd(obj interface{}) { + sc, ok := obj.(*storagev1.StorageClass) + if !ok { + klog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj) + return + } + + // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these + // PVCs have specified StorageClass name, creating StorageClass objects + // with late binding will cause predicates to pass, so we need to move pods + // to active queue. + // We don't need to invalidate cached results because results will not be + // cached for pod that has unbound immediate PVCs. + if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { + sched.config.SchedulingQueue.MoveAllToActiveQueue() + } +} + +func (sched *Scheduler) onServiceAdd(obj interface{}) { + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) { + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) onServiceDelete(obj interface{}) { + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) addNodeToCache(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + klog.Errorf("cannot convert to *v1.Node: %v", obj) + return + } + + if err := sched.config.SchedulerCache.AddNode(node); err != nil { + klog.Errorf("scheduler cache AddNode failed: %v", err) + } + + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { + oldNode, ok := oldObj.(*v1.Node) + if !ok { + klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj) + return + } + newNode, ok := newObj.(*v1.Node) + if !ok { + klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj) + return + } + + if err := sched.config.SchedulerCache.UpdateNode(oldNode, newNode); err != nil { + klog.Errorf("scheduler cache UpdateNode failed: %v", err) + } + + // Only activate unschedulable pods if the node became more schedulable. + // We skip the node property comparison when there is no unschedulable pods in the queue + // to save processing cycles. We still trigger a move to active queue to cover the case + // that a pod being processed by the scheduler is determined unschedulable. We want this + // pod to be reevaluated when a change in the cluster happens. + if sched.config.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) { + sched.config.SchedulingQueue.MoveAllToActiveQueue() + } +} + +func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { + var node *v1.Node + switch t := obj.(type) { + case *v1.Node: + node = t + case cache.DeletedFinalStateUnknown: + var ok bool + node, ok = t.Obj.(*v1.Node) + if !ok { + klog.Errorf("cannot convert to *v1.Node: %v", t.Obj) + return + } + default: + klog.Errorf("cannot convert to *v1.Node: %v", t) + return + } + // NOTE: Updates must be written to scheduler cache before invalidating + // equivalence cache, because we could snapshot equivalence cache after the + // invalidation and then snapshot the cache itself. If the cache is + // snapshotted before updates are written, we would update equivalence + // cache with stale information which is based on snapshot of old cache. + if err := sched.config.SchedulerCache.RemoveNode(node); err != nil { + klog.Errorf("scheduler cache RemoveNode failed: %v", err) + } +} +func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { + if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) + } +} + +func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { + pod := newObj.(*v1.Pod) + if sched.skipPodUpdate(pod) { + return + } + if err := sched.config.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) + } +} + +func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = obj.(*v1.Pod) + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) + return + } + default: + utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) + return + } + if err := sched.config.SchedulingQueue.Delete(pod); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) + } + if sched.config.VolumeBinder != nil { + // Volume binder only wants to keep unassigned pods + sched.config.VolumeBinder.DeletePodBindings(pod) + } +} + +func (sched *Scheduler) addPodToCache(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + klog.Errorf("cannot convert to *v1.Pod: %v", obj) + return + } + + if err := sched.config.SchedulerCache.AddPod(pod); err != nil { + klog.Errorf("scheduler cache AddPod failed: %v", err) + } + + sched.config.SchedulingQueue.AssignedPodAdded(pod) +} + +func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { + oldPod, ok := oldObj.(*v1.Pod) + if !ok { + klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj) + return + } + newPod, ok := newObj.(*v1.Pod) + if !ok { + klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) + return + } + + // NOTE: Updates must be written to scheduler cache before invalidating + // equivalence cache, because we could snapshot equivalence cache after the + // invalidation and then snapshot the cache itself. If the cache is + // snapshotted before updates are written, we would update equivalence + // cache with stale information which is based on snapshot of old cache. + if err := sched.config.SchedulerCache.UpdatePod(oldPod, newPod); err != nil { + klog.Errorf("scheduler cache UpdatePod failed: %v", err) + } + + sched.config.SchedulingQueue.AssignedPodUpdated(newPod) +} + +func (sched *Scheduler) deletePodFromCache(obj interface{}) { + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj) + return + } + default: + klog.Errorf("cannot convert to *v1.Pod: %v", t) + return + } + // NOTE: Updates must be written to scheduler cache before invalidating + // equivalence cache, because we could snapshot equivalence cache after the + // invalidation and then snapshot the cache itself. If the cache is + // snapshotted before updates are written, we would update equivalence + // cache with stale information which is based on snapshot of old cache. + if err := sched.config.SchedulerCache.RemovePod(pod); err != nil { + klog.Errorf("scheduler cache RemovePod failed: %v", err) + } + + sched.config.SchedulingQueue.MoveAllToActiveQueue() +} + +// assignedPod selects pods that are assigned (scheduled and running). +func assignedPod(pod *v1.Pod) bool { + return len(pod.Spec.NodeName) != 0 +} + +// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler. +func responsibleForPod(pod *v1.Pod, schedulerName string) bool { + return schedulerName == pod.Spec.SchedulerName +} + +// isZoneRegionLabel check if given key of label is zone or region label. +func isZoneRegionLabel(k string) bool { + return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion +} + +// skipPodUpdate checks whether the specified pod update should be ignored. +// This function will return true if +// - The pod has already been assumed, AND +// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations +// updated. +func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool { + // Non-assumed pods should never be skipped. + isAssumed, err := sched.config.SchedulerCache.IsAssumedPod(pod) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) + return false + } + if !isAssumed { + return false + } + + // Gets the assumed pod from the cache. + assumedPod, err := sched.config.SchedulerCache.GetPod(pod) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err)) + return false + } + + // Compares the assumed pod in the cache with the pod update. If they are + // equal (with certain fields excluded), this pod update will be skipped. + f := func(pod *v1.Pod) *v1.Pod { + p := pod.DeepCopy() + // ResourceVersion must be excluded because each object update will + // have a new resource version. + p.ResourceVersion = "" + // Spec.NodeName must be excluded because the pod assumed in the cache + // is expected to have a node assigned while the pod update may nor may + // not have this field set. + p.Spec.NodeName = "" + // Annotations must be excluded for the reasons described in + // https://github.com/kubernetes/kubernetes/issues/52914. + p.Annotations = nil + return p + } + assumedPodCopy, podCopy := f(assumedPod), f(pod) + if !reflect.DeepEqual(assumedPodCopy, podCopy) { + return false + } + klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name) + return true +} + +// AddAllEventHandlers is a helper function used in tests and in Scheduler +// to add event handlers for various informers. +func AddAllEventHandlers( + sched *Scheduler, + schedulerName string, + nodeInformer coreinformers.NodeInformer, + podInformer coreinformers.PodInformer, + pvInformer coreinformers.PersistentVolumeInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + replicationControllerInformer coreinformers.ReplicationControllerInformer, + replicaSetInformer appsinformers.ReplicaSetInformer, + statefulSetInformer appsinformers.StatefulSetInformer, + serviceInformer coreinformers.ServiceInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, + storageClassInformer storageinformers.StorageClassInformer, +) { + // scheduled pod cache + podInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return assignedPod(t) + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return assignedPod(pod) + } + utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) + return false + default: + utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: sched.addPodToCache, + UpdateFunc: sched.updatePodInCache, + DeleteFunc: sched.deletePodFromCache, + }, + }, + ) + // unscheduled pod queue + podInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return !assignedPod(t) && responsibleForPod(t, schedulerName) + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return !assignedPod(pod) && responsibleForPod(pod, schedulerName) + } + utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) + return false + default: + utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: sched.addPodToSchedulingQueue, + UpdateFunc: sched.updatePodInSchedulingQueue, + DeleteFunc: sched.deletePodFromSchedulingQueue, + }, + }, + ) + + nodeInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: sched.addNodeToCache, + UpdateFunc: sched.updateNodeInCache, + DeleteFunc: sched.deleteNodeFromCache, + }, + ) + + // On add and delete of PVs, it will affect equivalence cache items + // related to persistent volume + pvInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + // MaxPDVolumeCountPredicate: since it relies on the counts of PV. + AddFunc: sched.onPvAdd, + UpdateFunc: sched.onPvUpdate, + }, + ) + + // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound. + pvcInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: sched.onPvcAdd, + UpdateFunc: sched.onPvcUpdate, + }, + ) + + // This is for ServiceAffinity: affected by the selector of the service is updated. + // Also, if new service is added, equivalence cache will also become invalid since + // existing pods may be "captured" by this service and change this predicate result. + serviceInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: sched.onServiceAdd, + UpdateFunc: sched.onServiceUpdate, + DeleteFunc: sched.onServiceDelete, + }, + ) + + storageClassInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: sched.onStorageClassAdd, + }, + ) +} + +func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool { + if nodeSpecUnschedulableChanged(newNode, oldNode) { + return true + } + if nodeAllocatableChanged(newNode, oldNode) { + return true + } + if nodeLabelsChanged(newNode, oldNode) { + return true + } + if nodeTaintsChanged(newNode, oldNode) { + return true + } + if nodeConditionsChanged(newNode, oldNode) { + return true + } + + return false +} + +func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) +} + +func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) +} + +func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) +} + +func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool { + strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus { + conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions)) + for i := range conditions { + conditionStatuses[conditions[i].Type] = conditions[i].Status + } + return conditionStatuses + } + return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions)) +} + +func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false +} diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go new file mode 100644 index 0000000000..6afc56a768 --- /dev/null +++ b/pkg/scheduler/eventhandlers_test.go @@ -0,0 +1,265 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/factory" + + fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" +) + +func TestSkipPodUpdate(t *testing.T) { + table := []struct { + pod *v1.Pod + isAssumedPodFunc func(*v1.Pod) bool + getPodFunc func(*v1.Pod) *v1.Pod + expected bool + name string + }{ + { + name: "Non-assumed pod", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + }, + }, + isAssumedPodFunc: func(*v1.Pod) bool { return false }, + getPodFunc: func(*v1.Pod) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + }, + } + }, + expected: false, + }, + { + name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Annotations: map[string]string{"a": "b"}, + ResourceVersion: "0", + }, + Spec: v1.PodSpec{ + NodeName: "node-0", + }, + }, + isAssumedPodFunc: func(*v1.Pod) bool { + return true + }, + getPodFunc: func(*v1.Pod) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Annotations: map[string]string{"c": "d"}, + ResourceVersion: "1", + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + }, + } + }, + expected: true, + }, + { + name: "with changes on Labels", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Labels: map[string]string{"a": "b"}, + }, + }, + isAssumedPodFunc: func(*v1.Pod) bool { + return true + }, + getPodFunc: func(*v1.Pod) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-0", + Labels: map[string]string{"c": "d"}, + }, + } + }, + expected: false, + }, + } + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + c := NewFromConfig(&factory.Config{ + SchedulerCache: &fakecache.Cache{ + IsAssumedPodFunc: test.isAssumedPodFunc, + GetPodFunc: test.getPodFunc, + }, + }, + ) + got := c.skipPodUpdate(test.pod) + if got != test.expected { + t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected) + } + }) + } +} + +func TestNodeAllocatableChanged(t *testing.T) { + newQuantity := func(value int64) resource.Quantity { + return *resource.NewQuantity(value, resource.BinarySI) + } + for _, c := range []struct { + Name string + Changed bool + OldAllocatable v1.ResourceList + NewAllocatable v1.ResourceList + }{ + { + Name: "no allocatable resources changed", + Changed: false, + OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, + NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, + }, + { + Name: "new node has more allocatable resources", + Changed: true, + OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, + NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)}, + }, + } { + oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}} + newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}} + changed := nodeAllocatableChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed) + } + } +} + +func TestNodeLabelsChanged(t *testing.T) { + for _, c := range []struct { + Name string + Changed bool + OldLabels map[string]string + NewLabels map[string]string + }{ + { + Name: "no labels changed", + Changed: false, + OldLabels: map[string]string{"foo": "bar"}, + NewLabels: map[string]string{"foo": "bar"}, + }, + // Labels changed. + { + Name: "new node has more labels", + Changed: true, + OldLabels: map[string]string{"foo": "bar"}, + NewLabels: map[string]string{"foo": "bar", "test": "value"}, + }, + } { + oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}} + newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}} + changed := nodeLabelsChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed) + } + } +} + +func TestNodeTaintsChanged(t *testing.T) { + for _, c := range []struct { + Name string + Changed bool + OldTaints []v1.Taint + NewTaints []v1.Taint + }{ + { + Name: "no taint changed", + Changed: false, + OldTaints: []v1.Taint{{Key: "key", Value: "value"}}, + NewTaints: []v1.Taint{{Key: "key", Value: "value"}}, + }, + { + Name: "taint value changed", + Changed: true, + OldTaints: []v1.Taint{{Key: "key", Value: "value1"}}, + NewTaints: []v1.Taint{{Key: "key", Value: "value2"}}, + }, + } { + oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}} + newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}} + changed := nodeTaintsChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed) + } + } +} + +func TestNodeConditionsChanged(t *testing.T) { + nodeConditionType := reflect.TypeOf(v1.NodeCondition{}) + if nodeConditionType.NumField() != 6 { + t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.") + } + + for _, c := range []struct { + Name string + Changed bool + OldConditions []v1.NodeCondition + NewConditions []v1.NodeCondition + }{ + { + Name: "no condition changed", + Changed: false, + OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, + }, + { + Name: "only LastHeartbeatTime changed", + Changed: false, + OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}}, + }, + { + Name: "new node has more healthy conditions", + Changed: true, + OldConditions: []v1.NodeCondition{}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}, + }, + { + Name: "new node has less unhealthy conditions", + Changed: true, + OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, + NewConditions: []v1.NodeCondition{}, + }, + { + Name: "condition status changed", + Changed: true, + OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}, + }, + } { + oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}} + newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}} + changed := nodeConditionsChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed) + } + } +} diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 0121dd4774..486ff1fdf4 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -10,7 +10,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", - "//pkg/kubelet/apis:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/priorities:go_default_library", @@ -25,7 +24,6 @@ go_library( "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", @@ -65,12 +63,10 @@ go_test( "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api/latest:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", - "//pkg/scheduler/internal/cache/fake:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index b74a8aa9b6..61665cc59f 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -20,11 +20,9 @@ package factory import ( "fmt" - "reflect" "time" v1 "k8s.io/api/core/v1" - storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -47,7 +45,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" @@ -68,14 +65,6 @@ const ( maximalGetBackoff = time.Minute ) -var ( - serviceAffinitySet = sets.NewString(predicates.CheckServiceAffinityPred) - matchInterPodAffinitySet = sets.NewString(predicates.MatchInterPodAffinityPred) - generalPredicatesSets = sets.NewString(predicates.GeneralPred) - noDiskConflictSet = sets.NewString(predicates.NoDiskConflictPred) - maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred, predicates.MaxCinderVolumeCountPred} -) - // Binder knows how to write a binding. type Binder interface { Bind(binding *v1.Binding) error @@ -172,8 +161,6 @@ type Configurator interface { // configFactory is the default implementation of the scheduler.Configurator interface. type configFactory struct { client clientset.Interface - // queue for pods that need scheduling - podQueue internalqueue.SchedulingQueue // a means to list all known scheduled pods. scheduledPodLister corelisters.PodLister // a means to list all known scheduled pods and pods assumed to have been scheduled. @@ -226,6 +213,10 @@ type configFactory struct { // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle. percentageOfNodesToScore int32 + + bindTimeoutSeconds int64 + // queue for pods that need scheduling + podQueue internalqueue.SchedulingQueue } // ConfigFactoryArgs is a set arguments passed to NewConfigFactory. @@ -282,105 +273,15 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight, disablePreemption: args.DisablePreemption, percentageOfNodesToScore: args.PercentageOfNodesToScore, + bindTimeoutSeconds: args.BindTimeoutSeconds, } - + // Setup volume binder + c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced - // scheduled pod cache - args.PodInformer.Informer().AddEventHandler( - cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1.Pod: - return assignedPod(t) - case cache.DeletedFinalStateUnknown: - if pod, ok := t.Obj.(*v1.Pod); ok { - return assignedPod(pod) - } - runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering scheduledPod in %T", obj, c)) - return false - default: - runtime.HandleError(fmt.Errorf("unable to handle object for filtering scheduledPod in %T: %T", c, obj)) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: c.addPodToCache, - UpdateFunc: c.updatePodInCache, - DeleteFunc: c.deletePodFromCache, - }, - }, - ) - // unscheduled pod queue - args.PodInformer.Informer().AddEventHandler( - cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1.Pod: - return !assignedPod(t) && responsibleForPod(t, args.SchedulerName) - case cache.DeletedFinalStateUnknown: - if pod, ok := t.Obj.(*v1.Pod); ok { - return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName) - } - runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering unscheduledPod in %T", obj, c)) - return false - default: - runtime.HandleError(fmt.Errorf("unable to handle object for filtering unscheduledPod in %T: %T", c, obj)) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: c.addPodToSchedulingQueue, - UpdateFunc: c.updatePodInSchedulingQueue, - DeleteFunc: c.deletePodFromSchedulingQueue, - }, - }, - ) // ScheduledPodLister is something we provide to plug-in functions that // they may need to call. c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()} - args.NodeInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.addNodeToCache, - UpdateFunc: c.updateNodeInCache, - DeleteFunc: c.deleteNodeFromCache, - }, - ) - - args.PvInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - // MaxPDVolumeCountPredicate: since it relies on the counts of PV. - AddFunc: c.onPvAdd, - UpdateFunc: c.onPvUpdate, - }, - ) - - // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound. - args.PvcInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.onPvcAdd, - UpdateFunc: c.onPvcUpdate, - }, - ) - - // This is for ServiceAffinity: affected by the selector of the service is updated. - args.ServiceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.onServiceAdd, - UpdateFunc: c.onServiceUpdate, - DeleteFunc: c.onServiceDelete, - }, - ) - - // Setup volume binder - c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) - - args.StorageClassInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.onStorageClassAdd, - }, - ) - // Setup cache debugger debugger := cachedebugger.New( args.NodeInformer.Lister(), @@ -394,116 +295,9 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { <-c.StopEverything c.podQueue.Close() }() - return c } -// skipPodUpdate checks whether the specified pod update should be ignored. -// This function will return true if -// - The pod has already been assumed, AND -// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations -// updated. -func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool { - // Non-assumed pods should never be skipped. - isAssumed, err := c.schedulerCache.IsAssumedPod(pod) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) - return false - } - if !isAssumed { - return false - } - - // Gets the assumed pod from the cache. - assumedPod, err := c.schedulerCache.GetPod(pod) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err)) - return false - } - - // Compares the assumed pod in the cache with the pod update. If they are - // equal (with certain fields excluded), this pod update will be skipped. - f := func(pod *v1.Pod) *v1.Pod { - p := pod.DeepCopy() - // ResourceVersion must be excluded because each object update will - // have a new resource version. - p.ResourceVersion = "" - // Spec.NodeName must be excluded because the pod assumed in the cache - // is expected to have a node assigned while the pod update may nor may - // not have this field set. - p.Spec.NodeName = "" - // Annotations must be excluded for the reasons described in - // https://github.com/kubernetes/kubernetes/issues/52914. - p.Annotations = nil - return p - } - assumedPodCopy, podCopy := f(assumedPod), f(pod) - if !reflect.DeepEqual(assumedPodCopy, podCopy) { - return false - } - klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name) - return true -} - -func (c *configFactory) onPvAdd(obj interface{}) { - // Pods created when there are no PVs available will be stuck in - // unschedulable queue. But unbound PVs created for static provisioning and - // delay binding storage class are skipped in PV controller dynamic - // provisioning and binding process, will not trigger events to schedule pod - // again. So we need to move pods to active queue on PV add for this - // scenario. - c.podQueue.MoveAllToActiveQueue() -} - -func (c *configFactory) onPvUpdate(old, new interface{}) { - // Scheduler.bindVolumesWorker may fail to update assumed pod volume - // bindings due to conflicts if PVs are updated by PV controller or other - // parties, then scheduler will add pod back to unschedulable queue. We - // need to move pods to active queue on PV update for this scenario. - c.podQueue.MoveAllToActiveQueue() -} - -// isZoneRegionLabel check if given key of label is zone or region label. -func isZoneRegionLabel(k string) bool { - return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion -} - -func (c *configFactory) onPvcAdd(obj interface{}) { - c.podQueue.MoveAllToActiveQueue() -} - -func (c *configFactory) onPvcUpdate(old, new interface{}) { - c.podQueue.MoveAllToActiveQueue() -} - -func (c *configFactory) onStorageClassAdd(obj interface{}) { - sc, ok := obj.(*storagev1.StorageClass) - if !ok { - klog.Errorf("cannot convert to *storagev1.StorageClass for storageClassAdd: %v", obj) - return - } - - // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these - // PVCs have specified StorageClass name, creating StorageClass objects - // with late binding will cause predicates to pass, so we need to move pods - // to active queue. - if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { - c.podQueue.MoveAllToActiveQueue() - } -} - -func (c *configFactory) onServiceAdd(obj interface{}) { - c.podQueue.MoveAllToActiveQueue() -} - -func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { - c.podQueue.MoveAllToActiveQueue() -} - -func (c *configFactory) onServiceDelete(obj interface{}) { - c.podQueue.MoveAllToActiveQueue() -} - // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. func (c *configFactory) GetNodeLister() corelisters.NodeLister { return c.nodeLister @@ -527,213 +321,6 @@ func (c *configFactory) GetScheduledPodLister() corelisters.PodLister { return c.scheduledPodLister } -func (c *configFactory) addPodToCache(obj interface{}) { - pod, ok := obj.(*v1.Pod) - if !ok { - klog.Errorf("cannot convert to *v1.Pod: %v", obj) - return - } - - if err := c.schedulerCache.AddPod(pod); err != nil { - klog.Errorf("scheduler cache AddPod failed: %v", err) - } - - c.podQueue.AssignedPodAdded(pod) -} - -func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { - oldPod, ok := oldObj.(*v1.Pod) - if !ok { - klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj) - return - } - newPod, ok := newObj.(*v1.Pod) - if !ok { - klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) - return - } - - if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil { - klog.Errorf("scheduler cache UpdatePod failed: %v", err) - } - - c.podQueue.AssignedPodUpdated(newPod) -} - -func (c *configFactory) addPodToSchedulingQueue(obj interface{}) { - if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil { - runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) - } -} - -func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) { - pod := newObj.(*v1.Pod) - if c.skipPodUpdate(pod) { - return - } - if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil { - runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) - } -} - -func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) { - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = obj.(*v1.Pod) - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*v1.Pod) - if !ok { - runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) - return - } - default: - runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) - return - } - if err := c.podQueue.Delete(pod); err != nil { - runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) - } - if c.volumeBinder != nil { - // Volume binder only wants to keep unassigned pods - c.volumeBinder.DeletePodBindings(pod) - } -} - -func (c *configFactory) deletePodFromCache(obj interface{}) { - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = t - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*v1.Pod) - if !ok { - klog.Errorf("cannot convert DeletedFinalStateUnknown obj to *v1.Pod: %v", t.Obj) - return - } - default: - klog.Errorf("cannot convert to *v1.Pod: %v", t) - return - } - - if err := c.schedulerCache.RemovePod(pod); err != nil { - klog.Errorf("scheduler cache RemovePod failed: %v", err) - } - - c.podQueue.MoveAllToActiveQueue() -} - -func (c *configFactory) addNodeToCache(obj interface{}) { - node, ok := obj.(*v1.Node) - if !ok { - klog.Errorf("cannot convert to *v1.Node: %v", obj) - return - } - - if err := c.schedulerCache.AddNode(node); err != nil { - klog.Errorf("scheduler cache AddNode failed: %v", err) - } - - c.podQueue.MoveAllToActiveQueue() -} - -func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { - oldNode, ok := oldObj.(*v1.Node) - if !ok { - klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj) - return - } - newNode, ok := newObj.(*v1.Node) - if !ok { - klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj) - return - } - - if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil { - klog.Errorf("scheduler cache UpdateNode failed: %v", err) - } - - // Only activate unschedulable pods if the node became more schedulable. - // We skip the node property comparison when there is no unschedulable pods in the queue - // to save processing cycles. We still trigger a move to active queue to cover the case - // that a pod being processed by the scheduler is determined unschedulable. We want this - // pod to be reevaluated when a change in the cluster happens. - if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) { - c.podQueue.MoveAllToActiveQueue() - } -} - -func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool { - if nodeSpecUnschedulableChanged(newNode, oldNode) { - return true - } - if nodeAllocatableChanged(newNode, oldNode) { - return true - } - if nodeLabelsChanged(newNode, oldNode) { - return true - } - if nodeTaintsChanged(newNode, oldNode) { - return true - } - if nodeConditionsChanged(newNode, oldNode) { - return true - } - - return false -} - -func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool { - return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) -} - -func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool { - return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) -} - -func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool { - return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) -} - -func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool { - strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus { - conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions)) - for i := range conditions { - conditionStatuses[conditions[i].Type] = conditions[i].Status - } - return conditionStatuses - } - return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions)) -} - -func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool { - return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false -} - -func (c *configFactory) deleteNodeFromCache(obj interface{}) { - var node *v1.Node - switch t := obj.(type) { - case *v1.Node: - node = t - case cache.DeletedFinalStateUnknown: - var ok bool - node, ok = t.Obj.(*v1.Node) - if !ok { - klog.Errorf("cannot convert DeletedFinalStateUnknown obj to *v1.Node: %v", t.Obj) - return - } - default: - klog.Errorf("cannot convert to *v1.Node: %v", t) - return - } - - if err := c.schedulerCache.RemoveNode(node); err != nil { - klog.Errorf("scheduler cache RemoveNode failed: %v", err) - } -} - // Create creates a scheduler with the default algorithm provider. func (c *configFactory) Create() (*Config, error) { return c.CreateFromProvider(DefaultProvider) @@ -968,16 +555,6 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { }, nil } -// assignedPod selects pods that are assigned (scheduled and running). -func assignedPod(pod *v1.Pod) bool { - return len(pod.Spec.NodeName) != 0 -} - -// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler. -func responsibleForPod(pod *v1.Pod, schedulerName string) bool { - return schedulerName == pod.Spec.SchedulerName -} - // assignedPodLister filters the pods returned from a PodLister to // only include those that have a node name set. type assignedPodLister struct { diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 395e6bcf8c..be3dc11b0e 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -24,7 +24,6 @@ import ( "time" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -40,7 +39,6 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" - fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/pkg/scheduler/util" @@ -425,98 +423,6 @@ func TestInvalidFactoryArgs(t *testing.T) { } -func TestSkipPodUpdate(t *testing.T) { - table := []struct { - pod *v1.Pod - isAssumedPodFunc func(*v1.Pod) bool - getPodFunc func(*v1.Pod) *v1.Pod - expected bool - name string - }{ - { - name: "Non-assumed pod", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { return false }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - }, - } - }, - expected: false, - }, - { - name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Annotations: map[string]string{"a": "b"}, - ResourceVersion: "0", - }, - Spec: v1.PodSpec{ - NodeName: "node-0", - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { - return true - }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Annotations: map[string]string{"c": "d"}, - ResourceVersion: "1", - }, - Spec: v1.PodSpec{ - NodeName: "node-1", - }, - } - }, - expected: true, - }, - { - name: "with changes on Labels", - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Labels: map[string]string{"a": "b"}, - }, - }, - isAssumedPodFunc: func(*v1.Pod) bool { - return true - }, - getPodFunc: func(*v1.Pod) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-0", - Labels: map[string]string{"c": "d"}, - }, - } - }, - expected: false, - }, - } - for _, test := range table { - t.Run(test.name, func(t *testing.T) { - c := &configFactory{ - schedulerCache: &fakecache.Cache{ - IsAssumedPodFunc: test.isAssumedPodFunc, - GetPodFunc: test.getPodFunc, - }, - } - got := c.skipPodUpdate(test.pod) - if got != test.expected { - t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected) - } - }) - } -} - func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) Configurator { informerFactory := informers.NewSharedInformerFactory(client, 0) return NewConfigFactory(&ConfigFactoryArgs{ @@ -654,146 +560,3 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType) } } - -func TestNodeAllocatableChanged(t *testing.T) { - newQuantity := func(value int64) resource.Quantity { - return *resource.NewQuantity(value, resource.BinarySI) - } - for _, c := range []struct { - Name string - Changed bool - OldAllocatable v1.ResourceList - NewAllocatable v1.ResourceList - }{ - { - Name: "no allocatable resources changed", - Changed: false, - OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, - NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, - }, - { - Name: "new node has more allocatable resources", - Changed: true, - OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, - NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)}, - }, - } { - oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}} - newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}} - changed := nodeAllocatableChanged(newNode, oldNode) - if changed != c.Changed { - t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed) - } - } -} - -func TestNodeLabelsChanged(t *testing.T) { - for _, c := range []struct { - Name string - Changed bool - OldLabels map[string]string - NewLabels map[string]string - }{ - { - Name: "no labels changed", - Changed: false, - OldLabels: map[string]string{"foo": "bar"}, - NewLabels: map[string]string{"foo": "bar"}, - }, - // Labels changed. - { - Name: "new node has more labels", - Changed: true, - OldLabels: map[string]string{"foo": "bar"}, - NewLabels: map[string]string{"foo": "bar", "test": "value"}, - }, - } { - oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}} - newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}} - changed := nodeLabelsChanged(newNode, oldNode) - if changed != c.Changed { - t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed) - } - } -} - -func TestNodeTaintsChanged(t *testing.T) { - for _, c := range []struct { - Name string - Changed bool - OldTaints []v1.Taint - NewTaints []v1.Taint - }{ - { - Name: "no taint changed", - Changed: false, - OldTaints: []v1.Taint{{Key: "key", Value: "value"}}, - NewTaints: []v1.Taint{{Key: "key", Value: "value"}}, - }, - { - Name: "taint value changed", - Changed: true, - OldTaints: []v1.Taint{{Key: "key", Value: "value1"}}, - NewTaints: []v1.Taint{{Key: "key", Value: "value2"}}, - }, - } { - oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}} - newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}} - changed := nodeTaintsChanged(newNode, oldNode) - if changed != c.Changed { - t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed) - } - } -} - -func TestNodeConditionsChanged(t *testing.T) { - nodeConditionType := reflect.TypeOf(v1.NodeCondition{}) - if nodeConditionType.NumField() != 6 { - t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.") - } - - for _, c := range []struct { - Name string - Changed bool - OldConditions []v1.NodeCondition - NewConditions []v1.NodeCondition - }{ - { - Name: "no condition changed", - Changed: false, - OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, - NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, - }, - { - Name: "only LastHeartbeatTime changed", - Changed: false, - OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}}, - NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}}, - }, - { - Name: "new node has more healthy conditions", - Changed: true, - OldConditions: []v1.NodeCondition{}, - NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}, - }, - { - Name: "new node has less unhealthy conditions", - Changed: true, - OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, - NewConditions: []v1.NodeCondition{}, - }, - { - Name: "condition status changed", - Changed: true, - OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}, - NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}, - }, - } { - oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}} - newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}} - changed := nodeConditionsChanged(newNode, oldNode) - if changed != c.Changed { - t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed) - } - } -} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3d2bc3cb29..b78bab7d7a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -138,7 +138,6 @@ func New(client clientset.Interface, for _, opt := range opts { opt(&options) } - // Set up the configurator which can create schedulers from configs. configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{ SchedulerName: options.schedulerName, @@ -193,8 +192,11 @@ func New(client clientset.Interface, config.Recorder = recorder config.DisablePreemption = options.disablePreemption config.StopEverything = stopCh + // Create the scheduler. sched := NewFromConfig(config) + + AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, replicationControllerInformer, replicaSetInformer, statefulSetInformer, serviceInformer, pdbInformer, storageClassInformer) return sched, nil } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 579c5eb0b9..e062a9ce60 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/factory" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" + internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -633,7 +634,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, - nil, + internalqueue.NewSchedulingQueue(nil), predicateMap, predicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{}, @@ -645,7 +646,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, false, - api.DefaultPercentageOfNodesToScore) + api.DefaultPercentageOfNodesToScore, + ) bindingChan := make(chan *v1.Binding, 1) errChan := make(chan error, 1) @@ -684,7 +686,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { algo := core.NewGenericScheduler( scache, - nil, + internalqueue.NewSchedulingQueue(nil), predicateMap, predicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{}, @@ -696,7 +698,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, false, - api.DefaultPercentageOfNodesToScore) + api.DefaultPercentageOfNodesToScore, + ) bindingChan := make(chan *v1.Binding, 2) sched := NewFromConfig(&factory.Config{ diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 993fa2d4f5..48f8dfe8b5 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -114,14 +114,29 @@ func setupScheduler( HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, DisablePreemption: false, PercentageOfNodesToScore: 100, + StopCh: stopCh, }) - schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) } - schedulerConfig.StopEverything = stopCh + // TODO: Replace NewFromConfig and AddAllEventHandlers with scheduler.New() in + // all test/integration tests. + sched := scheduler.NewFromConfig(schedulerConfig) + scheduler.AddAllEventHandlers(sched, + v1.DefaultSchedulerName, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Apps().V1().ReplicaSets(), + informerFactory.Apps().V1().StatefulSets(), + informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), + ) eventBroadcaster := record.NewBroadcaster() schedulerConfig.Recorder = eventBroadcaster.NewRecorder( @@ -132,8 +147,6 @@ func setupScheduler( Interface: cs.CoreV1().Events(""), }) - sched := scheduler.NewFromConfig(schedulerConfig) - algorithmprovider.ApplyFeatureGates() go sched.Run() @@ -513,12 +526,12 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - informers.Start(stopCh) - go dc.Run(5, stopCh) - // Start Scheduler setupScheduler(t, clientset, informers, stopCh) + informers.Start(stopCh) + go dc.Run(5, stopCh) + ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy _, err := dsClient.Create(ds) @@ -924,11 +937,11 @@ func TestTaintedNode(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - informers.Start(stopCh) - go dc.Run(5, stopCh) - // Start Scheduler setupScheduler(t, clientset, informers, stopCh) + informers.Start(stopCh) + + go dc.Run(5, stopCh) ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 8968b92f41..4970294264 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -539,10 +539,24 @@ func TestMultiScheduler(t *testing.T) { eventBroadcaster2 := record.NewBroadcaster() schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler}) eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet2.CoreV1().Events("")}) - go podInformer2.Informer().Run(stopCh) - informerFactory2.Start(stopCh) sched2 := scheduler.NewFromConfig(schedulerConfig2) + scheduler.AddAllEventHandlers(sched2, + fooScheduler, + context.informerFactory.Core().V1().Nodes(), + podInformer2, + context.informerFactory.Core().V1().PersistentVolumes(), + context.informerFactory.Core().V1().PersistentVolumeClaims(), + context.informerFactory.Core().V1().ReplicationControllers(), + context.informerFactory.Apps().V1().ReplicaSets(), + context.informerFactory.Apps().V1().StatefulSets(), + context.informerFactory.Core().V1().Services(), + context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + context.informerFactory.Storage().V1().StorageClasses(), + ) + + go podInformer2.Informer().Run(stopCh) + informerFactory2.Start(stopCh) sched2.Run() // 6. **check point-2**: diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 37a3ddfb5c..8ee661c2c8 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -189,6 +189,22 @@ func initTestSchedulerWithOptions( // set DisablePreemption option context.schedulerConfig.DisablePreemption = disablePreemption + context.scheduler = scheduler.NewFromConfig(context.schedulerConfig) + + scheduler.AddAllEventHandlers(context.scheduler, + v1.DefaultSchedulerName, + context.informerFactory.Core().V1().Nodes(), + podInformer, + context.informerFactory.Core().V1().PersistentVolumes(), + context.informerFactory.Core().V1().PersistentVolumeClaims(), + context.informerFactory.Core().V1().ReplicationControllers(), + context.informerFactory.Apps().V1().ReplicaSets(), + context.informerFactory.Apps().V1().StatefulSets(), + context.informerFactory.Core().V1().Services(), + context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + context.informerFactory.Storage().V1().StorageClasses(), + ) + // set setPodInformer if provided. if setPodInformer { go podInformer.Informer().Run(context.schedulerConfig.StopEverything) @@ -212,7 +228,6 @@ func initTestSchedulerWithOptions( context.informerFactory.Start(context.schedulerConfig.StopEverything) context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything) - context.scheduler = scheduler.NewFromConfig(context.schedulerConfig) context.scheduler.Run() return context } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index bf01127092..5d3132832a 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -74,6 +74,20 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo config.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}) sched := scheduler.NewFromConfig(config) + scheduler.AddAllEventHandlers(sched, + v1.DefaultSchedulerName, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Apps().V1().ReplicaSets(), + informerFactory.Apps().V1().StatefulSets(), + informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), + ) + informerFactory.Start(stopCh) sched.Run()