Merge pull request #68403 from wgliang/master.deprecate-Parallelize

Replace Parallelize with function ParallelizeUntil and formally depre…
pull/58/head
k8s-ci-robot 2018-10-06 09:40:07 -07:00 committed by GitHub
commit c00f19bd15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 25 additions and 15 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package predicates package predicates
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
@ -487,7 +488,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps) appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
} }
} }
workqueue.Parallelize(16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package priorities package priorities
import ( import (
"context"
"sync" "sync"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -210,7 +211,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
} }
} }
} }
workqueue.Parallelize(16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
if pm.firstError != nil { if pm.firstError != nil {
return nil, pm.firstError return nil, pm.firstError
} }

View File

@ -684,7 +684,7 @@ func PrioritizeNodes(
} }
} }
} }
workqueue.Parallelize(16, len(nodes), processNode) workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), processNode)
for i, priorityConfig := range priorityConfigs { for i, priorityConfig := range priorityConfigs {
if priorityConfig.Reduce == nil { if priorityConfig.Reduce == nil {
continue continue
@ -915,7 +915,7 @@ func selectNodesForPreemption(pod *v1.Pod,
resultLock.Unlock() resultLock.Unlock()
} }
} }
workqueue.Parallelize(16, len(potentialNodes), checkNode) workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil return nodeToVictims, nil
} }

View File

@ -27,6 +27,8 @@ type DoWorkPieceFunc func(piece int)
// Parallelize is a very simple framework that allows for parallelizing // Parallelize is a very simple framework that allows for parallelizing
// N independent pieces of work. // N independent pieces of work.
//
// Deprecated: Use ParallelizeUntil instead.
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
ParallelizeUntil(nil, workers, pieces, doWorkPiece) ParallelizeUntil(nil, workers, pieces, doWorkPiece)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package apimachinery package apimachinery
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"reflect" "reflect"
@ -44,7 +45,7 @@ var _ = SIGDescribe("Servers with support for API chunking", func() {
c := f.ClientSet c := f.ClientSet
client := c.CoreV1().PodTemplates(ns) client := c.CoreV1().PodTemplates(ns)
By("creating a large number of resources") By("creating a large number of resources")
workqueue.Parallelize(20, numberOfTotalResources, func(i int) { workqueue.ParallelizeUntil(context.TODO(), 20, numberOfTotalResources, func(i int) {
for tries := 3; tries >= 0; tries-- { for tries := 3; tries >= 0; tries-- {
_, err := client.Create(&v1.PodTemplate{ _, err := client.Create(&v1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{

View File

@ -18,6 +18,7 @@ package apimachinery
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"text/tabwriter" "text/tabwriter"
@ -79,7 +80,7 @@ var _ = SIGDescribe("Servers with support for Table transformation", func() {
client := c.CoreV1().PodTemplates(ns) client := c.CoreV1().PodTemplates(ns)
By("creating a large number of resources") By("creating a large number of resources")
workqueue.Parallelize(5, 20, func(i int) { workqueue.ParallelizeUntil(context.TODO(), 5, 20, func(i int) {
for tries := 3; tries >= 0; tries-- { for tries := 3; tries >= 0; tries-- {
_, err := client.Create(&v1.PodTemplate{ _, err := client.Create(&v1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{

View File

@ -17,6 +17,7 @@ limitations under the License.
package network package network
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
"time" "time"
@ -66,7 +67,7 @@ var _ = SIGDescribe("[Feature:PerformanceDNS][Serial]", func() {
framework.ExpectNoError(testutils.CreateServiceWithRetries(f.ClientSet, services[i].Namespace, services[i])) framework.ExpectNoError(testutils.CreateServiceWithRetries(f.ClientSet, services[i].Namespace, services[i]))
} }
framework.Logf("Creating %v test services", maxServicesPerCluster) framework.Logf("Creating %v test services", maxServicesPerCluster)
workqueue.Parallelize(parallelCreateServiceWorkers, len(services), createService) workqueue.ParallelizeUntil(context.TODO(), parallelCreateServiceWorkers, len(services), createService)
dnsTest := dnsTestCommon{ dnsTest := dnsTestCommon{
f: f, f: f,
c: f.ClientSet, c: f.ClientSet,

View File

@ -17,6 +17,7 @@ limitations under the License.
package scalability package scalability
import ( import (
"context"
"fmt" "fmt"
"math" "math"
"os" "os"
@ -851,7 +852,7 @@ var _ = SIGDescribe("Density", func() {
name := additionalPodsPrefix + "-" + strconv.Itoa(podIndexOffset+i+1) name := additionalPodsPrefix + "-" + strconv.Itoa(podIndexOffset+i+1)
framework.ExpectNoError(framework.DeleteRCAndWaitForGC(c, rcNameToNsMap[name], name)) framework.ExpectNoError(framework.DeleteRCAndWaitForGC(c, rcNameToNsMap[name], name))
} }
workqueue.Parallelize(25, nodeCount, deleteRC) workqueue.ParallelizeUntil(context.TODO(), 25, nodeCount, deleteRC)
podDeletionPhase.End() podDeletionPhase.End()
} }
close(stopCh) close(stopCh)

View File

@ -17,6 +17,7 @@ limitations under the License.
package scalability package scalability
import ( import (
"context"
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
@ -37,11 +38,14 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached" cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
scaleclient "k8s.io/client-go/scale" scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
@ -52,9 +56,6 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/kubernetes/pkg/api/legacyscheme"
) )
const ( const (
@ -240,7 +241,7 @@ var _ = SIGDescribe("Load capacity", func() {
defer GinkgoRecover() defer GinkgoRecover()
framework.ExpectNoError(testutils.CreateServiceWithRetries(clientset, services[i].Namespace, services[i])) framework.ExpectNoError(testutils.CreateServiceWithRetries(clientset, services[i].Namespace, services[i]))
} }
workqueue.Parallelize(serviceOperationsParallelism, len(services), createService) workqueue.ParallelizeUntil(context.TODO(), serviceOperationsParallelism, len(services), createService)
framework.Logf("%v Services created.", len(services)) framework.Logf("%v Services created.", len(services))
defer func(services []*v1.Service) { defer func(services []*v1.Service) {
serviceCleanupPhase := testPhaseDurations.StartPhase(800, "services deletion") serviceCleanupPhase := testPhaseDurations.StartPhase(800, "services deletion")
@ -250,7 +251,7 @@ var _ = SIGDescribe("Load capacity", func() {
defer GinkgoRecover() defer GinkgoRecover()
framework.ExpectNoError(testutils.DeleteResourceWithRetries(clientset, api.Kind("Service"), services[i].Namespace, services[i].Name, nil)) framework.ExpectNoError(testutils.DeleteResourceWithRetries(clientset, api.Kind("Service"), services[i].Namespace, services[i].Name, nil))
} }
workqueue.Parallelize(serviceOperationsParallelism, len(services), deleteService) workqueue.ParallelizeUntil(context.TODO(), serviceOperationsParallelism, len(services), deleteService)
framework.Logf("Services deleted") framework.Logf("Services deleted")
}(services) }(services)
} else { } else {

View File

@ -17,6 +17,7 @@ limitations under the License.
package utils package utils
import ( import (
"context"
"fmt" "fmt"
"math" "math"
"os" "os"
@ -1061,9 +1062,9 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe
} }
if podCount < 30 { if podCount < 30 {
workqueue.Parallelize(podCount, podCount, createPodFunc) workqueue.ParallelizeUntil(context.TODO(), podCount, podCount, createPodFunc)
} else { } else {
workqueue.Parallelize(30, podCount, createPodFunc) workqueue.ParallelizeUntil(context.TODO(), 30, podCount, createPodFunc)
} }
return createError return createError
} }