diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index 94a0e72546..7d63c45060 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -17,6 +17,7 @@ limitations under the License. package predicates import ( + "context" "fmt" "sync" @@ -487,7 +488,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps) } } - workqueue.Parallelize(16, len(allNodeNames), processNode) + workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode) return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError } diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 6aa5a3896d..7e640566c8 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -17,6 +17,7 @@ limitations under the License. package priorities import ( + "context" "sync" "k8s.io/api/core/v1" @@ -210,7 +211,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node } } } - workqueue.Parallelize(16, len(allNodeNames), processNode) + workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode) if pm.firstError != nil { return nil, pm.firstError } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index dbadf0147b..092b9026af 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -684,7 +684,7 @@ func PrioritizeNodes( } } } - workqueue.Parallelize(16, len(nodes), processNode) + workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), processNode) for i, priorityConfig := range priorityConfigs { if priorityConfig.Reduce == nil { continue @@ -915,7 +915,7 @@ func selectNodesForPreemption(pod *v1.Pod, resultLock.Unlock() } } - workqueue.Parallelize(16, len(potentialNodes), checkNode) + workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode) return nodeToVictims, nil } diff --git a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go index 526bd244e6..ad25350182 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go +++ b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go @@ -27,6 +27,8 @@ type DoWorkPieceFunc func(piece int) // Parallelize is a very simple framework that allows for parallelizing // N independent pieces of work. +// +// Deprecated: Use ParallelizeUntil instead. func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { ParallelizeUntil(nil, workers, pieces, doWorkPiece) } diff --git a/test/e2e/apimachinery/chunking.go b/test/e2e/apimachinery/chunking.go index d3efb27146..7969424180 100644 --- a/test/e2e/apimachinery/chunking.go +++ b/test/e2e/apimachinery/chunking.go @@ -17,6 +17,7 @@ limitations under the License. package apimachinery import ( + "context" "fmt" "math/rand" "reflect" @@ -44,7 +45,7 @@ var _ = SIGDescribe("Servers with support for API chunking", func() { c := f.ClientSet client := c.CoreV1().PodTemplates(ns) By("creating a large number of resources") - workqueue.Parallelize(20, numberOfTotalResources, func(i int) { + workqueue.ParallelizeUntil(context.TODO(), 20, numberOfTotalResources, func(i int) { for tries := 3; tries >= 0; tries-- { _, err := client.Create(&v1.PodTemplate{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/e2e/apimachinery/table_conversion.go b/test/e2e/apimachinery/table_conversion.go index 2a8b9893be..9f020e03d0 100644 --- a/test/e2e/apimachinery/table_conversion.go +++ b/test/e2e/apimachinery/table_conversion.go @@ -18,6 +18,7 @@ package apimachinery import ( "bytes" + "context" "fmt" "text/tabwriter" @@ -79,7 +80,7 @@ var _ = SIGDescribe("Servers with support for Table transformation", func() { client := c.CoreV1().PodTemplates(ns) By("creating a large number of resources") - workqueue.Parallelize(5, 20, func(i int) { + workqueue.ParallelizeUntil(context.TODO(), 5, 20, func(i int) { for tries := 3; tries >= 0; tries-- { _, err := client.Create(&v1.PodTemplate{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/e2e/network/dns_scale_records.go b/test/e2e/network/dns_scale_records.go index ed64e02161..2d9aca03a4 100644 --- a/test/e2e/network/dns_scale_records.go +++ b/test/e2e/network/dns_scale_records.go @@ -17,6 +17,7 @@ limitations under the License. package network import ( + "context" "fmt" "strconv" "time" @@ -66,7 +67,7 @@ var _ = SIGDescribe("[Feature:PerformanceDNS][Serial]", func() { framework.ExpectNoError(testutils.CreateServiceWithRetries(f.ClientSet, services[i].Namespace, services[i])) } framework.Logf("Creating %v test services", maxServicesPerCluster) - workqueue.Parallelize(parallelCreateServiceWorkers, len(services), createService) + workqueue.ParallelizeUntil(context.TODO(), parallelCreateServiceWorkers, len(services), createService) dnsTest := dnsTestCommon{ f: f, c: f.ClientSet, diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index a06f43c2da..69ab8d2e50 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -17,6 +17,7 @@ limitations under the License. package scalability import ( + "context" "fmt" "math" "os" @@ -851,7 +852,7 @@ var _ = SIGDescribe("Density", func() { name := additionalPodsPrefix + "-" + strconv.Itoa(podIndexOffset+i+1) framework.ExpectNoError(framework.DeleteRCAndWaitForGC(c, rcNameToNsMap[name], name)) } - workqueue.Parallelize(25, nodeCount, deleteRC) + workqueue.ParallelizeUntil(context.TODO(), 25, nodeCount, deleteRC) podDeletionPhase.End() } close(stopCh) diff --git a/test/e2e/scalability/load.go b/test/e2e/scalability/load.go index 644c86f306..3f528c229c 100644 --- a/test/e2e/scalability/load.go +++ b/test/e2e/scalability/load.go @@ -17,6 +17,7 @@ limitations under the License. package scalability import ( + "context" "fmt" "math" "math/rand" @@ -37,11 +38,14 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" cacheddiscovery "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/extensions" @@ -52,9 +56,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/restmapper" - "k8s.io/kubernetes/pkg/api/legacyscheme" ) const ( @@ -240,7 +241,7 @@ var _ = SIGDescribe("Load capacity", func() { defer GinkgoRecover() framework.ExpectNoError(testutils.CreateServiceWithRetries(clientset, services[i].Namespace, services[i])) } - workqueue.Parallelize(serviceOperationsParallelism, len(services), createService) + workqueue.ParallelizeUntil(context.TODO(), serviceOperationsParallelism, len(services), createService) framework.Logf("%v Services created.", len(services)) defer func(services []*v1.Service) { serviceCleanupPhase := testPhaseDurations.StartPhase(800, "services deletion") @@ -250,7 +251,7 @@ var _ = SIGDescribe("Load capacity", func() { defer GinkgoRecover() framework.ExpectNoError(testutils.DeleteResourceWithRetries(clientset, api.Kind("Service"), services[i].Namespace, services[i].Name, nil)) } - workqueue.Parallelize(serviceOperationsParallelism, len(services), deleteService) + workqueue.ParallelizeUntil(context.TODO(), serviceOperationsParallelism, len(services), deleteService) framework.Logf("Services deleted") }(services) } else { diff --git a/test/utils/runners.go b/test/utils/runners.go index 01fb6a0d24..51f7603851 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -17,6 +17,7 @@ limitations under the License. package utils import ( + "context" "fmt" "math" "os" @@ -1061,9 +1062,9 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe } if podCount < 30 { - workqueue.Parallelize(podCount, podCount, createPodFunc) + workqueue.ParallelizeUntil(context.TODO(), podCount, podCount, createPodFunc) } else { - workqueue.Parallelize(30, podCount, createPodFunc) + workqueue.ParallelizeUntil(context.TODO(), 30, podCount, createPodFunc) } return createError }