diff --git a/pkg/controller/.import-restrictions b/pkg/controller/.import-restrictions index 67877dea14..79dbe90178 100644 --- a/pkg/controller/.import-restrictions +++ b/pkg/controller/.import-restrictions @@ -153,6 +153,7 @@ "k8s.io/client-go/testing", "k8s.io/client-go/tools/cache", "k8s.io/client-go/tools/leaderelection/resourcelock", + "k8s.io/client-go/tools/pager", "k8s.io/client-go/tools/record", "k8s.io/client-go/tools/reference", "k8s.io/client-go/tools/watch", diff --git a/pkg/controller/cronjob/BUILD b/pkg/controller/cronjob/BUILD index 15b4f652d4..f91982bacf 100644 --- a/pkg/controller/cronjob/BUILD +++ b/pkg/controller/cronjob/BUILD @@ -30,6 +30,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/tools/pager:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/reference:go_default_library", "//vendor/github.com/robfig/cron:go_default_library", diff --git a/pkg/controller/cronjob/cronjob_controller.go b/pkg/controller/cronjob/cronjob_controller.go index 8e797b4171..e2cb7d9ae3 100644 --- a/pkg/controller/cronjob/cronjob_controller.go +++ b/pkg/controller/cronjob/cronjob_controller.go @@ -29,6 +29,7 @@ Just periodically list jobs and SJs, and then reconcile them. */ import ( + "context" "fmt" "sort" "time" @@ -46,6 +47,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" ref "k8s.io/client-go/tools/reference" "k8s.io/kubernetes/pkg/util/metrics" @@ -102,19 +104,35 @@ func (jm *CronJobController) syncAll() { // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer, // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639). // Note that this only works because we are NOT using any caches here. - jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{}) + jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) { + return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(opts) + } + jlTmp, err := pager.New(pager.SimplePageFunc(jobListFunc)).List(context.Background(), metav1.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err)) return } + jl, ok := jlTmp.(*batchv1.JobList) + if !ok { + utilruntime.HandleError(fmt.Errorf("expected type *batchv1.JobList, got type %T", jlTmp)) + return + } js := jl.Items klog.V(4).Infof("Found %d jobs", len(js)) - sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) + cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) { + return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(opts) + } + sjlTmp, err := pager.New(pager.SimplePageFunc(cronJobListFunc)).List(context.Background(), metav1.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err)) return } + sjl, ok := sjlTmp.(*batchv1beta1.CronJobList) + if !ok { + utilruntime.HandleError(fmt.Errorf("expected type *batchv1beta1.CronJobList, got type %T", sjlTmp)) + return + } sjs := sjl.Items klog.V(4).Infof("Found %d cronjobs", len(sjs))