Enqueue DaemonSet only it has new or updated nodes in it.

When add node or update node func is called, all DaemonSets are enqueued to workqueue
of DaemonSet Controller. The consumer of the workqueue does an iteration of node.List
for every DaemonSet in the workqueue.

This change examinate the daemonSet before we enqueue it into DaemonSetController's work queue.
It will enqueue the DaemonSet only when it is related to the changed node.
pull/6/head
Weixu Zhuang 2015-11-17 17:39:55 -08:00
parent 8752165fb6
commit c7756ce8d5
1 changed files with 26 additions and 4 deletions

View File

@ -307,7 +307,19 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
func (dsc *DaemonSetsController) addNode(obj interface{}) {
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
dsc.enqueueAllDaemonSets()
dsList, err := dsc.dsStore.List()
if err != nil {
glog.V(4).Infof("Error enqueueing daemon sets: %v", err)
return
}
node := obj.(*api.Node)
for i := range dsList.Items {
ds := &dsList.Items[i]
shouldEnqueue := nodeShouldRunDaemonPod(node, ds)
if shouldEnqueue {
dsc.enqueueDaemonSet(ds)
}
}
}
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
@ -317,8 +329,19 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
// A periodic relist will send update events for all known pods.
return
}
dsList, err := dsc.dsStore.List()
if err != nil {
glog.V(4).Infof("Error enqueueing daemon sets: %v", err)
return
}
for i := range dsList.Items {
ds := &dsList.Items[i]
shouldEnqueue := (nodeShouldRunDaemonPod(oldNode, ds) != nodeShouldRunDaemonPod(curNode, ds))
if shouldEnqueue {
dsc.enqueueDaemonSet(ds)
}
}
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
dsc.enqueueAllDaemonSets()
}
// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
@ -355,6 +378,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList.Items {
shouldRun := nodeShouldRunDaemonPod(&node, ds)
daemonPods, isRunning := nodeToDaemonPods[node.Name]
if shouldRun && !isRunning {
@ -513,7 +537,6 @@ func nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool {
shouldRun := nodeSelector.Matches(labels.Set(node.Labels))
// If the daemon set specifies a node name, check that it matches with node.Name.
shouldRun = shouldRun && (ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name)
// If the node is not ready, don't run on it.
// TODO(mikedanese): remove this once daemonpods forgive nodes
shouldRun = shouldRun && api.IsNodeReady(node)
@ -522,7 +545,6 @@ func nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool {
// TODO(mikedanese): remove this once we have the right node admitance levels.
// See https://github.com/kubernetes/kubernetes/issues/17297#issuecomment-156857375.
shouldRun = shouldRun && !node.Spec.Unschedulable
return shouldRun
}