diff --git a/pkg/controller/nodelifecycle/BUILD b/pkg/controller/nodelifecycle/BUILD index 6791848b65..63a77fa254 100644 --- a/pkg/controller/nodelifecycle/BUILD +++ b/pkg/controller/nodelifecycle/BUILD @@ -36,6 +36,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", ], diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index edab2d9cf0..3802645974 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -24,6 +24,8 @@ package nodelifecycle import ( "context" "fmt" + "hash/fnv" + "io" "sync" "time" @@ -46,6 +48,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" + "k8s.io/client-go/util/workqueue" v1node "k8s.io/kubernetes/pkg/api/v1/node" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" @@ -218,6 +221,9 @@ type Controller struct { // if set to true, NodeController will taint Nodes based on its condition for 'NetworkUnavailable', // 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'. taintNodeByCondition bool + + nodeUpdateChannels []chan *v1.Node + nodeUpdateQueue workqueue.Interface } // NewNodeLifecycleController returns a new taint controller. @@ -276,6 +282,7 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer, runTaintManager: runTaintManager, useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager, taintNodeByCondition: taintNodeByCondition, + nodeUpdateQueue: workqueue.New(), } if useTaintBasedEvictions { glog.Infof("Controller is using taint based evictions.") @@ -343,10 +350,12 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer, glog.Infof("Controller will taint node by condition.") nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { - return nc.doNoScheduleTaintingPass(node) + nc.nodeUpdateQueue.Add(node) + return nil }), UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { - return nc.doNoScheduleTaintingPass(newNode) + nc.nodeUpdateQueue.Add(newNode) + return nil }), }) } @@ -383,18 +392,52 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { } if nc.runTaintManager { - go nc.taintManager.Run(wait.NeverStop) + go nc.taintManager.Run(stopCh) + } + + if nc.taintNodeByCondition { + for i := 0; i < scheduler.UpdateWorkerSize; i++ { + nc.nodeUpdateChannels = append(nc.nodeUpdateChannels, make(chan *v1.Node, scheduler.NodeUpdateChannelSize)) + } + + // Dispatcher + go func(stopCh <-chan struct{}) { + for { + obj, shutdown := nc.nodeUpdateQueue.Get() + if shutdown { + break + } + + node := obj.(*v1.Node) + hash := hash(node.Name, scheduler.UpdateWorkerSize) + + select { + case <-stopCh: + nc.nodeUpdateQueue.Done(node) + return + case nc.nodeUpdateChannels[hash] <- node: + } + nc.nodeUpdateQueue.Done(node) + } + }(stopCh) + // Close node update queue to cleanup go routine. + defer nc.nodeUpdateQueue.ShutDown() + + // Start workers to update NoSchedule taint for nodes. + for i := 0; i < scheduler.UpdateWorkerSize; i++ { + go nc.doNoScheduleTaintingPassWorker(i, stopCh) + } } if nc.useTaintBasedEvictions { // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. - go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) + go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh) } else { // Managing eviction of nodes: // When we delete pods off a node, if the node was not empty at the time we then // queue an eviction watcher. If we hit an error, retry deletion. - go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) + go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh) } // Incorporate the results of node status pushed from kubelet to master. @@ -402,7 +445,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { if err := nc.monitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) } - }, nc.nodeMonitorPeriod, wait.NeverStop) + }, nc.nodeMonitorPeriod, stopCh) <-stopCh } @@ -445,6 +488,19 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error { return nil } +func (nc *Controller) doNoScheduleTaintingPassWorker(i int, stopCh <-chan struct{}) { + for { + select { + case <-stopCh: + return + case node := <-nc.nodeUpdateChannels[i]: + if err := nc.doNoScheduleTaintingPass(node); err != nil { + glog.Errorf("Failed to taint NoSchedule on node <%s>: %v", node.Name, err) + } + } + } +} + func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { // Map node's condition to Taints. var taints []v1.Taint @@ -1197,3 +1253,9 @@ func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) return notReadyNodes, stateNormal } } + +func hash(val string, max int) int { + hasher := fnv.New32a() + io.WriteString(hasher, val) + return int(hasher.Sum32()) % max +} diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index d90d8c1b95..b46450a756 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -40,9 +40,15 @@ import ( ) const ( - nodeUpdateChannelSize = 10 - podUpdateChannelSize = 1 - retries = 5 + // TODO (k82cn): Figure out a reasonable number of workers/channels and propagate + // the number of workers up making it a paramater of Run() function. + + // NodeUpdateChannelSize defines the size of channel for node update events. + NodeUpdateChannelSize = 10 + // UpdateWorkerSize defines the size of workers for node update or/and pod update. + UpdateWorkerSize = 8 + podUpdateChannelSize = 1 + retries = 5 ) // Needed to make workqueue work @@ -204,11 +210,8 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { glog.V(0).Infof("Starting NoExecuteTaintManager") - // TODO: Figure out a reasonable number of workers and propagate the - // number of workers up making it a paramater of Run() function. - workers := 8 - for i := 0; i < workers; i++ { - tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, nodeUpdateChannelSize)) + for i := 0; i < UpdateWorkerSize; i++ { + tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, NodeUpdateChannelSize)) tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize)) } @@ -221,11 +224,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { break } nodeUpdate := item.(*nodeUpdateItem) - hash := hash(nodeUpdate.name(), workers) + hash := hash(nodeUpdate.name(), UpdateWorkerSize) select { case <-stopCh: tc.nodeUpdateQueue.Done(item) - break + return case tc.nodeUpdateChannels[hash] <- nodeUpdate: } tc.nodeUpdateQueue.Done(item) @@ -239,11 +242,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { break } podUpdate := item.(*podUpdateItem) - hash := hash(podUpdate.nodeName(), workers) + hash := hash(podUpdate.nodeName(), UpdateWorkerSize) select { case <-stopCh: tc.podUpdateQueue.Done(item) - break + return case tc.podUpdateChannels[hash] <- podUpdate: } tc.podUpdateQueue.Done(item) @@ -251,8 +254,8 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { }(stopCh) wg := sync.WaitGroup{} - wg.Add(workers) - for i := 0; i < workers; i++ { + wg.Add(UpdateWorkerSize) + for i := 0; i < UpdateWorkerSize; i++ { go tc.worker(i, wg.Done, stopCh) } wg.Wait() diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index 9392f37db3..15e04ca5e2 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -639,7 +639,7 @@ func TestTaintNodeByCondition(t *testing.T) { t.Errorf("Failed to create node, err: %v", err) } if err := waitForNodeTaints(cs, node, test.expectedTaints); err != nil { - t.Errorf("Failed to taint node, err: %v", err) + t.Errorf("Failed to taint node <%s>, err: %v", node.Name, err) } var pods []*v1.Pod