Merge pull request #70265 from jingyih/use_chunking_when_list_in_cronjob_controller

Retrieve list in chunks in cronjob controller
pull/564/head
Kubernetes Prow Robot 2019-03-06 04:45:59 -08:00 committed by GitHub
commit bcbbb98f78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 2 deletions

View File

@ -153,6 +153,7 @@
"k8s.io/client-go/testing",
"k8s.io/client-go/tools/cache",
"k8s.io/client-go/tools/leaderelection/resourcelock",
"k8s.io/client-go/tools/pager",
"k8s.io/client-go/tools/record",
"k8s.io/client-go/tools/reference",
"k8s.io/client-go/tools/watch",

View File

@ -30,6 +30,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/pager:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
"//vendor/github.com/robfig/cron:go_default_library",

View File

@ -29,6 +29,7 @@ Just periodically list jobs and SJs, and then reconcile them.
*/
import (
"context"
"fmt"
"sort"
"time"
@ -46,6 +47,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/pager"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"
"k8s.io/kubernetes/pkg/util/metrics"
@ -102,19 +104,35 @@ func (jm *CronJobController) syncAll() {
// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
// Note that this only works because we are NOT using any caches here.
jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(opts)
}
jlTmp, err := pager.New(pager.SimplePageFunc(jobListFunc)).List(context.Background(), metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
return
}
jl, ok := jlTmp.(*batchv1.JobList)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected type *batchv1.JobList, got type %T", jlTmp))
return
}
js := jl.Items
klog.V(4).Infof("Found %d jobs", len(js))
sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(opts)
}
sjlTmp, err := pager.New(pager.SimplePageFunc(cronJobListFunc)).List(context.Background(), metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
return
}
sjl, ok := sjlTmp.(*batchv1beta1.CronJobList)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected type *batchv1beta1.CronJobList, got type %T", sjlTmp))
return
}
sjs := sjl.Items
klog.V(4).Infof("Found %d cronjobs", len(sjs))