mirror of https://github.com/k3s-io/k3s
daemonset: differentiate between cases in nodeShouldRun
secifically we need to differentiate between wanting to run, should run and should continue running. This is required to support all taint effects and will improve reporting and end user debuggability.pull/6/head
parent
330c922706
commit
c518e89042
|
@ -387,8 +387,11 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) {
|
|||
node := obj.(*v1.Node)
|
||||
for i := range dsList.Items {
|
||||
ds := &dsList.Items[i]
|
||||
shouldEnqueue := dsc.nodeShouldRunDaemonPod(node, ds)
|
||||
if shouldEnqueue {
|
||||
_, shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if shouldSchedule {
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
}
|
||||
|
@ -406,14 +409,21 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
|
|||
glog.V(4).Infof("Error enqueueing daemon sets: %v", err)
|
||||
return
|
||||
}
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
|
||||
for i := range dsList.Items {
|
||||
ds := &dsList.Items[i]
|
||||
shouldEnqueue := (dsc.nodeShouldRunDaemonPod(oldNode, ds) != dsc.nodeShouldRunDaemonPod(curNode, ds))
|
||||
if shouldEnqueue {
|
||||
_, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if (oldShouldSchedule != currentShouldSchedule) || (oldShouldContinueRunning != currentShouldContinueRunning) {
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
}
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
|
||||
}
|
||||
|
||||
// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
|
||||
|
@ -451,22 +461,25 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
|
|||
}
|
||||
var nodesNeedingDaemonPods, podsToDelete []string
|
||||
for _, node := range nodeList.Items {
|
||||
shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds)
|
||||
_, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(&node, ds)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
daemonPods, isRunning := nodeToDaemonPods[node.Name]
|
||||
|
||||
switch {
|
||||
case shouldRun && !isRunning:
|
||||
case shouldSchedule && !isRunning:
|
||||
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
|
||||
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
|
||||
case shouldRun && len(daemonPods) > 1:
|
||||
case shouldContinueRunning && len(daemonPods) > 1:
|
||||
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
|
||||
// Sort the daemon pods by creation time, so the the oldest is preserved.
|
||||
sort.Sort(podByCreationTimestamp(daemonPods))
|
||||
for i := 1; i < len(daemonPods); i++ {
|
||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||
}
|
||||
case !shouldRun && isRunning:
|
||||
case !shouldContinueRunning && isRunning:
|
||||
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
|
||||
for i := range daemonPods {
|
||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||
|
@ -588,11 +601,14 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
|
|||
|
||||
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady int
|
||||
for _, node := range nodeList.Items {
|
||||
shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds)
|
||||
wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(&node, ds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scheduled := len(nodeToDaemonPods[node.Name]) > 0
|
||||
|
||||
if shouldRun {
|
||||
if wantToRun {
|
||||
desiredNumberScheduled++
|
||||
if scheduled {
|
||||
currentNumberScheduled++
|
||||
|
@ -658,16 +674,35 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
|
|||
return dsc.updateDaemonSetStatus(ds)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *extensions.DaemonSet) bool {
|
||||
// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
|
||||
// summary. Returned booleans are:
|
||||
// * wantToRun:
|
||||
// Returns true when a user would expect a pod to run on this node and ignores conditions
|
||||
// such as OutOfDisk or insufficent resource that would cause a daemonset pod not to schedule.
|
||||
// This is primarily used to populate daemonset status.
|
||||
// * shouldSchedule:
|
||||
// Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already
|
||||
// running on that node.
|
||||
// * shouldContinueRunning:
|
||||
// Returns true when a daemonset should continue running on a node if a daemonset pod is already
|
||||
// running on that node.
|
||||
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *extensions.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
|
||||
// Because these bools require an && of all their required conditions, we start
|
||||
// with all bools set to true and set a bool to false if a condition is not met.
|
||||
// A bool should probably not be set to true after this line.
|
||||
wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
|
||||
// If the daemon set specifies a node name, check that it matches with node.Name.
|
||||
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
|
||||
return false
|
||||
return false, false, false, nil
|
||||
}
|
||||
|
||||
// TODO: Move it to the predicates
|
||||
for _, c := range node.Status.Conditions {
|
||||
if c.Type == v1.NodeOutOfDisk && c.Status == v1.ConditionTrue {
|
||||
return false
|
||||
// the kubelet will evict this pod if it needs to. Let kubelet
|
||||
// decide whether to continue running this pod so leave shouldContinueRunning
|
||||
// set to true
|
||||
shouldSchedule = false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -695,22 +730,59 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten
|
|||
|
||||
nodeInfo := schedulercache.NewNodeInfo(pods...)
|
||||
nodeInfo.SetNode(node)
|
||||
fit, reasons, err := predicates.GeneralPredicates(newPod, nil, nodeInfo)
|
||||
_, reasons, err := predicates.GeneralPredicates(newPod, nil, nodeInfo)
|
||||
if err != nil {
|
||||
glog.Warningf("GeneralPredicates failed on ds '%s/%s' due to unexpected error: %v", ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
|
||||
return false, false, false, err
|
||||
}
|
||||
for _, r := range reasons {
|
||||
glog.V(4).Infof("GeneralPredicates failed on ds '%s/%s' for reason: %v", ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
|
||||
switch reason := r.(type) {
|
||||
case *predicates.InsufficientResourceError:
|
||||
dsc.eventRecorder.Eventf(ds, v1.EventTypeNormal, "FailedPlacement", "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.Error())
|
||||
shouldSchedule = false
|
||||
case *predicates.PredicateFailureError:
|
||||
if reason == predicates.ErrPodNotFitsHostPorts {
|
||||
dsc.eventRecorder.Eventf(ds, v1.EventTypeNormal, "FailedPlacement", "failed to place pod on %q: host port conflict", node.ObjectMeta.Name)
|
||||
var emitEvent bool
|
||||
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
|
||||
switch reason {
|
||||
// intentional
|
||||
case
|
||||
predicates.ErrNodeSelectorNotMatch,
|
||||
predicates.ErrPodNotMatchHostName,
|
||||
predicates.ErrNodeLabelPresenceViolated,
|
||||
// this one is probably intentional since it's a workaround for not having
|
||||
// pod hard anti affinity.
|
||||
predicates.ErrPodNotFitsHostPorts:
|
||||
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
|
||||
// unintentional
|
||||
case
|
||||
predicates.ErrDiskConflict,
|
||||
predicates.ErrVolumeZoneConflict,
|
||||
predicates.ErrMaxVolumeCountExceeded,
|
||||
predicates.ErrNodeUnderMemoryPressure,
|
||||
predicates.ErrNodeUnderDiskPressure:
|
||||
// wantToRun and shouldContinueRunning are likely true here. They are
|
||||
// absolutely true at the time of writing the comment. See first comment
|
||||
// of this method.
|
||||
shouldSchedule = false
|
||||
emitEvent = true
|
||||
// unexpected
|
||||
case
|
||||
predicates.ErrPodAffinityNotMatch,
|
||||
predicates.ErrServiceAffinityViolated,
|
||||
predicates.ErrTaintsTolerationsNotMatch:
|
||||
return false, false, false, fmt.Errorf("unexpected reason: GeneralPredicates should not return reason %s", reason.GetReason())
|
||||
default:
|
||||
glog.V(4).Infof("unknownd predicate failure reason: %s", reason.GetReason())
|
||||
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
|
||||
emitEvent = true
|
||||
}
|
||||
if emitEvent {
|
||||
dsc.eventRecorder.Eventf(ds, v1.EventTypeNormal, "FailedPlacement", "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
|
||||
}
|
||||
}
|
||||
}
|
||||
return fit
|
||||
return
|
||||
}
|
||||
|
||||
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
||||
|
|
|
@ -267,6 +267,23 @@ func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
|
|||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
}
|
||||
|
||||
// DaemonSets should not unschedule a daemonset pod from a node with insufficient free resource
|
||||
func TestInsufficentCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T) {
|
||||
podSpec := resourcePodSpec("too-much-mem", "75M", "75m")
|
||||
podSpec.NodeName = "too-much-mem"
|
||||
manager, podControl, _ := newTestController()
|
||||
node := newNode("too-much-mem", nil)
|
||||
node.Status.Allocatable = allocatableResources("100M", "200m")
|
||||
manager.nodeStore.Add(node)
|
||||
manager.podStore.Indexer.Add(&v1.Pod{
|
||||
Spec: podSpec,
|
||||
})
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec = podSpec
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
}
|
||||
|
||||
func TestSufficentCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) {
|
||||
podSpec := resourcePodSpec("too-much-mem", "75M", "75m")
|
||||
manager, podControl, _ := newTestController()
|
||||
|
|
Loading…
Reference in New Issue