mirror of https://github.com/k3s-io/k3s
Merge pull request #63003 from bsalamat/fix_scheduler_informer
Automatic merge from submit-queue (batch tested with PRs 62495, 63003, 62829, 62151, 62002). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Fix scheduler informers to receive events for all the pods in the cluster **What this PR does / why we need it**: This PR has an important change to fix scheduler informers. More information in #63002. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #63002 **Special notes for your reviewer**: This should be back-ported to 1.10 and 1.9. **Release note**: ```release-note Fix scheduler informers to receive events for all the pods in the cluster. ```pull/8/head
commit
b0fb272494
|
@ -476,7 +476,7 @@ func NewSchedulerServer(config *componentconfig.KubeSchedulerConfiguration, mast
|
|||
SchedulerName: config.SchedulerName,
|
||||
Client: client,
|
||||
InformerFactory: informers.NewSharedInformerFactory(client, 0),
|
||||
PodInformer: factory.NewPodInformer(client, 0, config.SchedulerName),
|
||||
PodInformer: factory.NewPodInformer(client, 0),
|
||||
AlgorithmSource: config.AlgorithmSource,
|
||||
HardPodAffinitySymmetricWeight: config.HardPodAffinitySymmetricWeight,
|
||||
EventClient: eventClient,
|
||||
|
|
|
@ -218,10 +218,10 @@ func NewConfigFactory(
|
|||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return unassignedNonTerminatedPod(t)
|
||||
return unassignedNonTerminatedPod(t) && responsibleForPod(t, schedulerName)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if pod, ok := t.Obj.(*v1.Pod); ok {
|
||||
return unassignedNonTerminatedPod(pod)
|
||||
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, schedulerName)
|
||||
}
|
||||
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
|
||||
return false
|
||||
|
@ -1198,6 +1198,11 @@ func assignedNonTerminatedPod(pod *v1.Pod) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
|
||||
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
|
||||
return schedulerName == pod.Spec.SchedulerName
|
||||
}
|
||||
|
||||
// assignedPodLister filters the pods returned from a PodLister to
|
||||
// only include those that have a node name set.
|
||||
type assignedPodLister struct {
|
||||
|
@ -1270,10 +1275,9 @@ func (i *podInformer) Lister() corelisters.PodLister {
|
|||
}
|
||||
|
||||
// NewPodInformer creates a shared index informer that returns only non-terminal pods.
|
||||
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, schedulerName string) coreinformers.PodInformer {
|
||||
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
|
||||
selector := fields.ParseSelectorOrDie(
|
||||
"spec.schedulerName=" + schedulerName +
|
||||
",status.phase!=" + string(v1.PodSucceeded) +
|
||||
"status.phase!=" + string(v1.PodSucceeded) +
|
||||
",status.phase!=" + string(v1.PodFailed))
|
||||
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
|
||||
return &podInformer{
|
||||
|
|
|
@ -195,7 +195,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
|
|||
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
Client: clientSet,
|
||||
InformerFactory: informerFactory,
|
||||
PodInformer: factory.NewPodInformer(clientSet, 0, v1.DefaultSchedulerName),
|
||||
PodInformer: factory.NewPodInformer(clientSet, 0),
|
||||
EventClient: clientSet.CoreV1(),
|
||||
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
|
||||
Broadcaster: eventBroadcaster,
|
||||
|
@ -254,7 +254,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
|
|||
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
Client: clientSet,
|
||||
InformerFactory: informerFactory,
|
||||
PodInformer: factory.NewPodInformer(clientSet, 0, v1.DefaultSchedulerName),
|
||||
PodInformer: factory.NewPodInformer(clientSet, 0),
|
||||
EventClient: clientSet.CoreV1(),
|
||||
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
|
||||
Broadcaster: eventBroadcaster,
|
||||
|
@ -516,7 +516,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||
// 5. create and start a scheduler with name "foo-scheduler"
|
||||
clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
||||
informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
|
||||
podInformer2 := factory.NewPodInformer(context.clientSet, 0, fooScheduler)
|
||||
podInformer2 := factory.NewPodInformer(context.clientSet, 0)
|
||||
|
||||
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2)
|
||||
schedulerConfig2, err := schedulerConfigFactory2.Create()
|
||||
|
@ -749,3 +749,96 @@ func TestPDBCache(t *testing.T) {
|
|||
t.Errorf("No PDB was deleted from the cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSchedulerInformers tests that scheduler receives informer events and updates its cache when
|
||||
// pods are scheduled by other schedulers.
|
||||
func TestSchedulerInformers(t *testing.T) {
|
||||
// Initialize scheduler.
|
||||
context := initTest(t, "scheduler-informer")
|
||||
defer cleanupTest(t, context)
|
||||
cs := context.clientSet
|
||||
|
||||
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
|
||||
}
|
||||
defaultNodeRes := &v1.ResourceList{
|
||||
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI),
|
||||
}
|
||||
|
||||
type nodeConfig struct {
|
||||
name string
|
||||
res *v1.ResourceList
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
nodes []*nodeConfig
|
||||
existingPods []*v1.Pod
|
||||
pod *v1.Pod
|
||||
preemptedPodIndexes map[int]struct{}
|
||||
}{
|
||||
{
|
||||
description: "Pod cannot be scheduled when node is occupied by pods scheduled by other schedulers",
|
||||
nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}},
|
||||
existingPods: []*v1.Pod{
|
||||
initPausePod(context.clientSet, &pausePodConfig{
|
||||
Name: "pod1",
|
||||
Namespace: context.ns.Name,
|
||||
Resources: defaultPodRes,
|
||||
Labels: map[string]string{"foo": "bar"},
|
||||
NodeName: "node-1",
|
||||
SchedulerName: "foo-scheduler",
|
||||
}),
|
||||
initPausePod(context.clientSet, &pausePodConfig{
|
||||
Name: "pod2",
|
||||
Namespace: context.ns.Name,
|
||||
Resources: defaultPodRes,
|
||||
Labels: map[string]string{"foo": "bar"},
|
||||
NodeName: "node-1",
|
||||
SchedulerName: "bar-scheduler",
|
||||
}),
|
||||
},
|
||||
pod: initPausePod(cs, &pausePodConfig{
|
||||
Name: "unschedulable-pod",
|
||||
Namespace: context.ns.Name,
|
||||
Resources: defaultPodRes,
|
||||
}),
|
||||
preemptedPodIndexes: map[int]struct{}{2: {}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
for _, nodeConf := range test.nodes {
|
||||
_, err := createNode(cs, nodeConf.name, nodeConf.res)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
pods := make([]*v1.Pod, len(test.existingPods))
|
||||
var err error
|
||||
// Create and run existingPods.
|
||||
for i, p := range test.existingPods {
|
||||
if pods[i], err = runPausePod(cs, p); err != nil {
|
||||
t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
|
||||
}
|
||||
}
|
||||
// Create the new "pod".
|
||||
unschedulable, err := createPausePod(cs, test.pod)
|
||||
if err != nil {
|
||||
t.Errorf("Error while creating new pod: %v", err)
|
||||
}
|
||||
if err := waitForPodUnschedulable(cs, unschedulable); err != nil {
|
||||
t.Errorf("Pod %v got scheduled: %v", unschedulable.Name, err)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
pods = append(pods, unschedulable)
|
||||
cleanupPods(cs, t, pods)
|
||||
cs.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).DeleteCollection(nil, metav1.ListOptions{})
|
||||
cs.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -164,7 +164,7 @@ func initTestSchedulerWithOptions(
|
|||
|
||||
// create independent pod informer if required
|
||||
if setPodInformer {
|
||||
podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour, v1.DefaultSchedulerName)
|
||||
podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour)
|
||||
} else {
|
||||
podInformer = context.informerFactory.Core().V1().Pods()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue