From 62c3ec969d7a59a69fe7c45039e4379096870891 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Fri, 9 Nov 2018 11:55:24 -0800 Subject: [PATCH] Fix a race in the scheduler. Loop over priorityConfigs seperately. The node loop can only safely modify result[i][index]. Before this change it sometimes modified result[i] concurrently with other loops. Fixes: 7164967662c5e99cf3095a00b97a49a976572723 ==================== Test output for //pkg/scheduler/core:go_default_test: ================== WARNING: DATA RACE Read at 0x00c0005e8ed0 by goroutine 22: k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes.func2() pkg/scheduler/core/generic_scheduler.go:667 +0x2ea k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil.func1() staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:65 +0x9e Previous write at 0x00c0005e8ed0 by goroutine 21: k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes.func2() pkg/scheduler/core/generic_scheduler.go:668 +0x450 k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil.func1() staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:65 +0x9e Goroutine 22 (running) created at: k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil() staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:57 +0x1a3 k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes() pkg/scheduler/core/generic_scheduler.go:682 +0x592 k8s.io/kubernetes/pkg/scheduler/core.(*genericScheduler).Schedule() pkg/scheduler/core/generic_scheduler.go:186 +0x77d k8s.io/kubernetes/pkg/scheduler/core.TestGenericScheduler.func1() pkg/scheduler/core/generic_scheduler_test.go:464 +0x91f testing.tRunner() GOROOT/src/testing/testing.go:827 +0x162 Goroutine 21 (running) created at: k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil() staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:57 +0x1a3 k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes() pkg/scheduler/core/generic_scheduler.go:682 +0x592 k8s.io/kubernetes/pkg/scheduler/core.(*genericScheduler).Schedule() pkg/scheduler/core/generic_scheduler.go:186 +0x77d k8s.io/kubernetes/pkg/scheduler/core.TestGenericScheduler.func1() pkg/scheduler/core/generic_scheduler_test.go:464 +0x91f testing.tRunner() GOROOT/src/testing/testing.go:827 +0x162 ================== --- FAIL: TestGenericScheduler (0.01s) --- FAIL: TestGenericScheduler/test_6 (0.00s) testing.go:771: race detected during execution of test testing.go:771: race detected during execution of test FAIL --- pkg/scheduler/core/generic_scheduler.go | 37 ++++++++++++++----------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index feed37bb86..b60e3bf7bd 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -653,33 +653,38 @@ func PrioritizeNodes( results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs)) - for i := range priorityConfigs { - results[i] = make(schedulerapi.HostPriorityList, len(nodes)) - } + // DEPRECATED: we can remove this when all priorityConfigs implement the + // Map-Reduce pattern. + workqueue.ParallelizeUntil(context.TODO(), 16, len(priorityConfigs), func(i int) { + priorityConfig := priorityConfigs[i] + if priorityConfig.Function == nil { + results[i] = make(schedulerapi.HostPriorityList, len(nodes)) + return + } - processNode := func(index int) { - nodeInfo := nodeNameToInfo[nodes[index].Name] var err error + results[i], err = priorityConfig.Function(pod, nodeNameToInfo, nodes) + if err != nil { + appendError(err) + } + }) + + workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { + nodeInfo := nodeNameToInfo[nodes[index].Name] for i, priorityConfig := range priorityConfigs { - // DEPRECATED when ALL priorityConfigs have Map-Reduce pattern. - if priorityConfigs[i].Function != nil { - // Make sure that the old-style priority function only runs once. - if results[i][0].Host == "" { - results[i], err = priorityConfig.Function(pod, nodeNameToInfo, nodes) - if err != nil { - appendError(err) - } - } + if priorityConfig.Function != nil { continue } + + var err error results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) results[i][index].Host = nodes[index].Name } } - } - workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), processNode) + }) + for i, priorityConfig := range priorityConfigs { if priorityConfig.Reduce == nil { continue