Merge pull request #68408 from k82cn/k8s_67823_2

Using node name to improve node controller performance.
pull/8/head
k8s-ci-robot 2018-09-11 09:18:50 -07:00 committed by GitHub
commit 7bfd0d358c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 38 deletions

View File

@ -222,7 +222,6 @@ type Controller struct {
// 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'.
taintNodeByCondition bool
nodeUpdateChannels []chan *v1.Node
nodeUpdateQueue workqueue.Interface
}
@ -350,11 +349,11 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer,
glog.Infof("Controller will taint node by condition.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node)
nc.nodeUpdateQueue.Add(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode)
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
})
@ -396,36 +395,16 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
}
if nc.taintNodeByCondition {
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
nc.nodeUpdateChannels = append(nc.nodeUpdateChannels, make(chan *v1.Node, scheduler.NodeUpdateChannelSize))
}
// Dispatcher
go func(stopCh <-chan struct{}) {
for {
obj, shutdown := nc.nodeUpdateQueue.Get()
if shutdown {
break
}
node := obj.(*v1.Node)
hash := hash(node.Name, scheduler.UpdateWorkerSize)
select {
case <-stopCh:
nc.nodeUpdateQueue.Done(node)
return
case nc.nodeUpdateChannels[hash] <- node:
}
nc.nodeUpdateQueue.Done(node)
}
}(stopCh)
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Start workers to update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
go nc.doNoScheduleTaintingPassWorker(i, stopCh)
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
}
}
@ -488,20 +467,34 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error {
return nil
}
func (nc *Controller) doNoScheduleTaintingPassWorker(i int, stopCh <-chan struct{}) {
func (nc *Controller) doNoScheduleTaintingPassWorker() {
for {
select {
case <-stopCh:
obj, shutdown := nc.nodeUpdateQueue.Get()
// "nodeUpdateQueue" will be shutdown when "stopCh" closed;
// we do not need to re-check "stopCh" again.
if shutdown {
return
case node := <-nc.nodeUpdateChannels[i]:
if err := nc.doNoScheduleTaintingPass(node); err != nil {
glog.Errorf("Failed to taint NoSchedule on node <%s>: %v", node.Name, err)
}
nodeName := obj.(string)
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
// TODO (k82cn): Add nodeName back to the queue.
glog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
}
nc.nodeUpdateQueue.Done(nodeName)
}
}
func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error {
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// Map node's condition to Taints.
var taints []v1.Taint
for _, condition := range node.Status.Conditions {

View File

@ -2334,7 +2334,7 @@ func TestTaintsNodeByCondition(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoScheduleTaintingPass(test.Node)
nodeController.doNoScheduleTaintingPass(test.Node.Name)
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}