mirror of https://github.com/k3s-io/k3s
Merge pull request #46223 from smarterclayton/scheduler_max
Automatic merge from submit-queue (batch tested with PRs 45766, 46223) Scheduler should use a shared informer, and fix broken watch behavior for cached watches Can be used either from a true shared informer or a local shared informer created just for the scheduler. Fixes a bug in the cache watcher where we were returning the "current" object from a watch event, not the historic event. This means that we broke behavior when introducing the watch cache. This may have API implications for filtering watch consumers - but on the other hand, it prevents clients filtering from seeing objects outside of their watch correctly, which can lead to other subtle bugs. ```release-note The behavior of some watch calls to the server when filtering on fields was incorrect. If watching objects with a filter, when an update was made that no longer matched the filter a DELETE event was correctly sent. However, the object that was returned by that delete was not the (correct) version before the update, but instead, the newer version. That meant the new object was not matched by the filter. This was a regression from behavior between cached watches on the server side and uncached watches, and thus broke downstream API clients. ```pull/6/head
commit
8e07e61a43
|
@ -24,6 +24,7 @@ go_library(
|
|||
"//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library",
|
||||
"//pkg/client/leaderelection:go_default_library",
|
||||
"//pkg/client/leaderelection/resourcelock:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/util/configz:go_default_library",
|
||||
"//plugin/cmd/kube-scheduler/app/options:go_default_library",
|
||||
"//plugin/pkg/scheduler:go_default_library",
|
||||
|
|
|
@ -77,6 +77,7 @@ func CreateScheduler(
|
|||
s *options.SchedulerServer,
|
||||
kubecli *clientset.Clientset,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
replicationControllerInformer coreinformers.ReplicationControllerInformer,
|
||||
|
@ -89,6 +90,7 @@ func CreateScheduler(
|
|||
s.SchedulerName,
|
||||
kubecli,
|
||||
nodeInformer,
|
||||
podInformer,
|
||||
pvInformer,
|
||||
pvcInformer,
|
||||
replicationControllerInformer,
|
||||
|
|
|
@ -31,9 +31,11 @@ import (
|
|||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -71,11 +73,14 @@ func Run(s *options.SchedulerServer) error {
|
|||
recorder := createRecorder(kubecli, s)
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(kubecli, 0)
|
||||
// cache only non-terminal pods
|
||||
podInformer := factory.NewPodInformer(kubecli, 0)
|
||||
|
||||
sched, err := CreateScheduler(
|
||||
s,
|
||||
kubecli,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
podInformer,
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -92,9 +97,11 @@ func Run(s *options.SchedulerServer) error {
|
|||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
go podInformer.Informer().Run(stop)
|
||||
informerFactory.Start(stop)
|
||||
// Waiting for all cache to sync before scheduling.
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
controller.WaitForCacheSync("scheduler", stop, podInformer.Informer().HasSynced)
|
||||
|
||||
run := func(_ <-chan struct{}) {
|
||||
sched.Run()
|
||||
|
|
|
@ -41,6 +41,7 @@ go_library(
|
|||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/listers/core/v1:go_default_library",
|
||||
|
@ -52,6 +53,7 @@ go_library(
|
|||
"//plugin/pkg/scheduler/util:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
|
|
|
@ -352,6 +352,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
|||
"some-scheduler-name",
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
|
|
@ -38,6 +38,8 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
@ -82,7 +84,7 @@ type ConfigFactory struct {
|
|||
// Close this to stop all reflectors
|
||||
StopEverything chan struct{}
|
||||
|
||||
scheduledPodPopulator cache.Controller
|
||||
scheduledPodsHasSynced cache.InformerSynced
|
||||
|
||||
schedulerCache schedulercache.Cache
|
||||
|
||||
|
@ -105,6 +107,7 @@ func NewConfigFactory(
|
|||
schedulerName string,
|
||||
client clientset.Interface,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
replicationControllerInformer coreinformers.ReplicationControllerInformer,
|
||||
|
@ -132,23 +135,60 @@ func NewConfigFactory(
|
|||
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
|
||||
}
|
||||
|
||||
// On add/delete to the scheduled pods, remove from the assumed pods.
|
||||
// We construct this here instead of in CreateFromKeys because
|
||||
// ScheduledPodLister is something we provide to plug in functions that
|
||||
// they may need to call.
|
||||
var scheduledPodIndexer cache.Indexer
|
||||
scheduledPodIndexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
|
||||
c.createAssignedNonTerminatedPodLW(),
|
||||
&v1.Pod{},
|
||||
0,
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addPodToCache,
|
||||
UpdateFunc: c.updatePodInCache,
|
||||
DeleteFunc: c.deletePodFromCache,
|
||||
c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
||||
// scheduled pod cache
|
||||
podInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return assignedNonTerminatedPod(t)
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addPodToCache,
|
||||
UpdateFunc: c.updatePodInCache,
|
||||
DeleteFunc: c.deletePodFromCache,
|
||||
},
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
c.scheduledPodLister = corelisters.NewPodLister(scheduledPodIndexer)
|
||||
// unscheduled pod queue
|
||||
podInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return unassignedNonTerminatedPod(t)
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
if err := c.podQueue.Add(obj); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
if err := c.podQueue.Update(newObj); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
if err := c.podQueue.Delete(obj); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
// ScheduledPodLister is something we provide to plug-in functions that
|
||||
// they may need to call.
|
||||
c.scheduledPodLister = assignedPodLister{podInformer.Lister()}
|
||||
|
||||
// Only nodes in the "Ready" condition with status == "True" are schedulable
|
||||
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
|
@ -369,7 +409,6 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
f.Run()
|
||||
// TODO(resouer) use equivalence cache instead of nil here when #36238 get merged
|
||||
algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
|
||||
podBackoff := util.CreateDefaultPodBackoff()
|
||||
|
@ -381,7 +420,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||
Binder: &binder{f.client},
|
||||
PodConditionUpdater: &podConditionUpdater{f.client},
|
||||
WaitForCacheSync: func() bool {
|
||||
return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodPopulator.HasSynced)
|
||||
return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodsHasSynced)
|
||||
},
|
||||
NextPod: func() *v1.Pod {
|
||||
return f.getNextPod()
|
||||
|
@ -450,14 +489,6 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) Run() {
|
||||
// Watch and queue pods that need scheduling.
|
||||
cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &v1.Pod{}, f.podQueue, 0).RunUntil(f.StopEverything)
|
||||
|
||||
// Begin populating scheduled pods.
|
||||
go f.scheduledPodPopulator.Run(f.StopEverything)
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) getNextPod() *v1.Pod {
|
||||
for {
|
||||
pod := cache.Pop(f.podQueue).(*v1.Pod)
|
||||
|
@ -500,19 +531,106 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
|
|||
}
|
||||
}
|
||||
|
||||
// Returns a cache.ListWatch that finds all pods that need to be
|
||||
// scheduled.
|
||||
func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch {
|
||||
selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed))
|
||||
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", metav1.NamespaceAll, selector)
|
||||
// unassignedNonTerminatedPod selects pods that are unassigned and non-terminal.
|
||||
func unassignedNonTerminatedPod(pod *v1.Pod) bool {
|
||||
if len(pod.Spec.NodeName) != 0 {
|
||||
return false
|
||||
}
|
||||
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Returns a cache.ListWatch that finds all pods that are
|
||||
// already scheduled.
|
||||
// TODO: return a ListerWatcher interface instead?
|
||||
func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatch {
|
||||
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed))
|
||||
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", metav1.NamespaceAll, selector)
|
||||
// assignedNonTerminatedPod selects pods that are assigned and non-terminal (scheduled and running).
|
||||
func assignedNonTerminatedPod(pod *v1.Pod) bool {
|
||||
if len(pod.Spec.NodeName) == 0 {
|
||||
return false
|
||||
}
|
||||
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// assignedPodLister filters the pods returned from a PodLister to
|
||||
// only include those that have a node name set.
|
||||
type assignedPodLister struct {
|
||||
corelisters.PodLister
|
||||
}
|
||||
|
||||
// List lists all Pods in the indexer for a given namespace.
|
||||
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
|
||||
list, err := l.PodLister.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filtered := make([]*v1.Pod, 0, len(list))
|
||||
for _, pod := range list {
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
filtered = append(filtered, pod)
|
||||
}
|
||||
}
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
// List lists all Pods in the indexer for a given namespace.
|
||||
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
|
||||
return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
|
||||
}
|
||||
|
||||
// assignedPodNamespaceLister filters the pods returned from a PodNamespaceLister to
|
||||
// only include those that have a node name set.
|
||||
type assignedPodNamespaceLister struct {
|
||||
corelisters.PodNamespaceLister
|
||||
}
|
||||
|
||||
// List lists all Pods in the indexer for a given namespace.
|
||||
func (l assignedPodNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
|
||||
list, err := l.PodNamespaceLister.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filtered := make([]*v1.Pod, 0, len(list))
|
||||
for _, pod := range list {
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
filtered = append(filtered, pod)
|
||||
}
|
||||
}
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
// Get retrieves the Pod from the indexer for a given namespace and name.
|
||||
func (l assignedPodNamespaceLister) Get(name string) (*v1.Pod, error) {
|
||||
pod, err := l.PodNamespaceLister.Get(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
return pod, nil
|
||||
}
|
||||
return nil, errors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
|
||||
}
|
||||
|
||||
type podInformer struct {
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
func (i *podInformer) Informer() cache.SharedIndexInformer {
|
||||
return i.informer
|
||||
}
|
||||
|
||||
func (i *podInformer) Lister() corelisters.PodLister {
|
||||
return corelisters.NewPodLister(i.informer.GetIndexer())
|
||||
}
|
||||
|
||||
// NewPodInformer creates a shared index informer that returns only non-terminal pods.
|
||||
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
|
||||
selector := fields.ParseSelectorOrDie("status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed))
|
||||
lw := cache.NewListWatchFromClient(client.Core().RESTClient(), "pods", metav1.NamespaceAll, selector)
|
||||
return &podInformer{
|
||||
informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
|
||||
}
|
||||
}
|
||||
|
||||
func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
|
||||
|
|
|
@ -55,6 +55,7 @@ func TestCreate(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -85,6 +86,7 @@ func TestCreateFromConfig(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -138,6 +140,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -193,6 +196,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -304,6 +308,7 @@ func TestResponsibleForPod(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -317,6 +322,7 @@ func TestResponsibleForPod(t *testing.T) {
|
|||
"foo-scheduler",
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -385,6 +391,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -429,6 +436,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
|
|
@ -20,10 +20,12 @@ import (
|
|||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||
|
@ -35,6 +37,7 @@ import (
|
|||
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
|
@ -80,7 +83,6 @@ type Configurator interface {
|
|||
GetNodeLister() corelisters.NodeLister
|
||||
GetClient() clientset.Interface
|
||||
GetScheduledPodLister() corelisters.PodLister
|
||||
Run()
|
||||
|
||||
Create() (*Config, error)
|
||||
CreateFromProvider(providerName string) (*Config, error)
|
||||
|
@ -164,6 +166,12 @@ func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
|
|||
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
|
||||
if err != nil {
|
||||
glog.V(1).Infof("Failed to schedule pod: %v/%v", pod.Namespace, pod.Name)
|
||||
copied, cerr := api.Scheme.Copy(pod)
|
||||
if cerr != nil {
|
||||
runtime.HandleError(err)
|
||||
return "", err
|
||||
}
|
||||
pod = copied.(*v1.Pod)
|
||||
sched.config.Error(pod, err)
|
||||
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
|
||||
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
|
||||
|
|
|
@ -28,6 +28,7 @@ go_test(
|
|||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
|
|
|
@ -850,18 +850,28 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|||
return
|
||||
}
|
||||
|
||||
object, err := c.copier.Copy(event.Object)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected copy error: %v", err)
|
||||
return
|
||||
}
|
||||
var watchEvent watch.Event
|
||||
switch {
|
||||
case curObjPasses && !oldObjPasses:
|
||||
object, err := c.copier.Copy(event.Object)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
|
||||
return
|
||||
}
|
||||
watchEvent = watch.Event{Type: watch.Added, Object: object}
|
||||
case curObjPasses && oldObjPasses:
|
||||
object, err := c.copier.Copy(event.Object)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
|
||||
return
|
||||
}
|
||||
watchEvent = watch.Event{Type: watch.Modified, Object: object}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
object, err := c.copier.Copy(event.PrevObject)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
|
||||
return
|
||||
}
|
||||
watchEvent = watch.Event{Type: watch.Deleted, Object: object}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,13 +17,17 @@ limitations under the License.
|
|||
package storage
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
)
|
||||
|
@ -55,3 +59,120 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheWatcherHandlesFiltering(t *testing.T) {
|
||||
filter := func(_ string, _ labels.Set, field fields.Set) bool {
|
||||
return field["spec.nodeName"] == "host"
|
||||
}
|
||||
forget := func(bool) {}
|
||||
|
||||
testCases := []struct {
|
||||
events []*watchCacheEvent
|
||||
expected []watch.Event
|
||||
}{
|
||||
// properly handle starting with the filter, then being deleted, then re-added
|
||||
{
|
||||
events: []*watchCacheEvent{
|
||||
{
|
||||
Type: watch.Added,
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||
},
|
||||
},
|
||||
expected: []watch.Event{
|
||||
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
|
||||
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
|
||||
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
|
||||
},
|
||||
},
|
||||
// properly handle ignoring changes prior to the filter, then getting added, then deleted
|
||||
{
|
||||
events: []*watchCacheEvent{
|
||||
{
|
||||
Type: watch.Added,
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
|
||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
|
||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
|
||||
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": "host"},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
|
||||
PrevObjFields: fields.Set{"spec.nodeName": "host"},
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
|
||||
PrevObjFields: fields.Set{"spec.nodeName": ""},
|
||||
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
|
||||
ObjFields: fields.Set{"spec.nodeName": ""},
|
||||
},
|
||||
},
|
||||
expected: []watch.Event{
|
||||
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
|
||||
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
|
||||
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
TestCase:
|
||||
for i, testCase := range testCases {
|
||||
// set the size of the buffer of w.result to 0, so that the writes to
|
||||
// w.result is blocked.
|
||||
for j := range testCase.events {
|
||||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||
}
|
||||
w := newCacheWatcher(scheme.Scheme, 0, 0, testCase.events, filter, forget)
|
||||
ch := w.ResultChan()
|
||||
for j, event := range testCase.expected {
|
||||
e := <-ch
|
||||
if !reflect.DeepEqual(event, e) {
|
||||
t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e))
|
||||
break TestCase
|
||||
}
|
||||
}
|
||||
select {
|
||||
case obj, ok := <-ch:
|
||||
t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok)
|
||||
break TestCase
|
||||
default:
|
||||
}
|
||||
w.Stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -326,6 +326,7 @@ func TestSchedulerExtender(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
|
|
@ -122,6 +122,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
|
|||
ss.PolicyConfigMapName = configPolicyName
|
||||
sched, err := app.CreateScheduler(ss, clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -174,6 +175,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
|
|||
|
||||
_, err := app.CreateScheduler(ss, clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -211,6 +213,7 @@ func TestSchedulerCreationInLegacyMode(t *testing.T) {
|
|||
|
||||
sched, err := app.CreateScheduler(ss, clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -245,6 +248,7 @@ func TestUnschedulableNodes(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -527,6 +531,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -612,6 +617,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||
"foo-scheduler",
|
||||
clientSet2,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -721,6 +727,7 @@ func TestAllocatable(t *testing.T) {
|
|||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
|
|
@ -65,6 +65,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy
|
|||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
|
|
Loading…
Reference in New Issue