mirror of https://github.com/k3s-io/k3s
Merge pull request #65309 from tnozicka/add-ds-recreate-backoff
Automatic merge from submit-queue (batch tested with PRs 62441, 66702, 67254, 67421, 65309). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add backoff for DS's pod deletion to limit fighting with kubelet failing the pod repeatedly **What this PR does / why we need it**: Limits consequences of DS controller on hot loop fighting with kubelet. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes https://github.com/kubernetes/kubernetes/issues/65240 **Release note**: ```release-note DaemonSet controller is now using backoff algorithm to avoid hot loops fighting with kubelet on pod recreation when a particular DaemonSet is misconfigured. ``` TODO: - [x] Export the backoff settings as args or constants - [x] Add test a case /cc @mfojtik (Will add more folks when it's ready, to avoid spamming them.)pull/8/head
commit
c1f7df2b0e
|
@ -123,6 +123,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
"//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/metrics/pkg/client/custom_metrics:go_default_library",
|
||||
"//staging/src/k8s.io/metrics/pkg/client/external_metrics:go_default_library",
|
||||
|
|
|
@ -22,10 +22,11 @@ package app
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/controller/daemon"
|
||||
"k8s.io/kubernetes/pkg/controller/deployment"
|
||||
"k8s.io/kubernetes/pkg/controller/replicaset"
|
||||
|
@ -42,6 +43,7 @@ func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error)
|
|||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
|
||||
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
|
||||
|
|
|
@ -50,6 +50,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/integer:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
|
@ -79,6 +80,7 @@ go_test(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||
|
@ -88,6 +90,7 @@ go_test(
|
|||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/client-go/util/integer"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
|
@ -67,6 +68,9 @@ const (
|
|||
|
||||
// StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
|
||||
StatusUpdateRetries = 1
|
||||
|
||||
// BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run
|
||||
BackoffGCInterval = 1 * time.Minute
|
||||
)
|
||||
|
||||
// Reasons for DaemonSet events
|
||||
|
@ -131,10 +135,19 @@ type DaemonSetsController struct {
|
|||
// is DaemonSet set that want to run pods but can't schedule in latest syncup cycle.
|
||||
suspendedDaemonPodsMutex sync.Mutex
|
||||
suspendedDaemonPods map[string]sets.String
|
||||
|
||||
failedPodsBackoff *flowcontrol.Backoff
|
||||
}
|
||||
|
||||
// NewDaemonSetsController creates a new DaemonSetsController
|
||||
func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) (*DaemonSetsController, error) {
|
||||
func NewDaemonSetsController(
|
||||
daemonSetInformer appsinformers.DaemonSetInformer,
|
||||
historyInformer appsinformers.ControllerRevisionInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
kubeClient clientset.Interface,
|
||||
failedPodsBackoff *flowcontrol.Backoff,
|
||||
) (*DaemonSetsController, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
||||
|
@ -212,6 +225,9 @@ func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer,
|
|||
dsc.syncHandler = dsc.syncDaemonSet
|
||||
dsc.enqueueDaemonSet = dsc.enqueue
|
||||
dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
|
||||
|
||||
dsc.failedPodsBackoff = failedPodsBackoff
|
||||
|
||||
return dsc, nil
|
||||
}
|
||||
|
||||
|
@ -261,6 +277,8 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
|
|||
go wait.Until(dsc.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
|
@ -847,6 +865,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
|
|||
|
||||
daemonPods, exists := nodeToDaemonPods[node.Name]
|
||||
dsKey, _ := cache.MetaNamespaceKeyFunc(ds)
|
||||
|
||||
dsc.removeSuspendedDaemonPods(node.Name, dsKey)
|
||||
|
||||
switch {
|
||||
|
@ -865,12 +884,29 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
|
|||
continue
|
||||
}
|
||||
if pod.Status.Phase == v1.PodFailed {
|
||||
failedPodsObserved++
|
||||
|
||||
// This is a critical place where DS is often fighting with kubelet that rejects pods.
|
||||
// We need to avoid hot looping and backoff.
|
||||
backoffKey := failedPodsBackoffKey(ds, node.Name)
|
||||
|
||||
now := dsc.failedPodsBackoff.Clock.Now()
|
||||
inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
|
||||
if inBackoff {
|
||||
delay := dsc.failedPodsBackoff.Get(backoffKey)
|
||||
glog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
|
||||
pod.Namespace, pod.Name, node.Name, delay)
|
||||
dsc.enqueueDaemonSetAfter(ds, delay)
|
||||
continue
|
||||
}
|
||||
|
||||
dsc.failedPodsBackoff.Next(backoffKey, now)
|
||||
|
||||
msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
|
||||
glog.V(2).Infof(msg)
|
||||
// Emit an event so that it's discoverable to users.
|
||||
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
|
||||
podsToDelete = append(podsToDelete, pod.Name)
|
||||
failedPodsObserved++
|
||||
} else {
|
||||
daemonPodsRunning = append(daemonPodsRunning, pod)
|
||||
}
|
||||
|
@ -1529,3 +1565,7 @@ func isControlledByDaemonSet(p *v1.Pod, uuid types.UID) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
|
||||
return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
|
||||
}
|
||||
|
|
|
@ -23,12 +23,14 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apiserver/pkg/storage/names"
|
||||
|
@ -38,6 +40,7 @@ import (
|
|||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
|
@ -320,6 +323,7 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController,
|
|||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
clientset,
|
||||
flowcontrol.NewFakeBackOff(50*time.Millisecond, 500*time.Millisecond, clock.NewFakeClock(time.Now())),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
|
@ -346,6 +350,13 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController,
|
|||
}, podControl, clientset, nil
|
||||
}
|
||||
|
||||
func resetCounters(manager *daemonSetsController) {
|
||||
manager.podControl.(*fakePodControl).Clear()
|
||||
fakeRecorder := record.NewFakeRecorder(100)
|
||||
manager.eventRecorder = fakeRecorder
|
||||
manager.fakeRecorder = fakeRecorder
|
||||
}
|
||||
|
||||
func validateSyncDaemonSets(t *testing.T, manager *daemonSetsController, fakePodControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) {
|
||||
if len(fakePodControl.Templates) != expectedCreates {
|
||||
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates))
|
||||
|
@ -1305,11 +1316,10 @@ func TestDaemonKillFailedPods(t *testing.T) {
|
|||
{numFailedPods: 0, numNormalPods: 0, expectedCreates: 1, expectedDeletes: 0, expectedEvents: 0, test: "no pods (create 1)"},
|
||||
{numFailedPods: 1, numNormalPods: 0, expectedCreates: 0, expectedDeletes: 1, expectedEvents: 1, test: "1 failed pod (kill 1), 0 normal pod (create 0; will create in the next sync)"},
|
||||
{numFailedPods: 1, numNormalPods: 3, expectedCreates: 0, expectedDeletes: 3, expectedEvents: 1, test: "1 failed pod (kill 1), 3 normal pods (kill 2)"},
|
||||
{numFailedPods: 2, numNormalPods: 1, expectedCreates: 0, expectedDeletes: 2, expectedEvents: 2, test: "2 failed pods (kill 2), 1 normal pod"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("test case: %s\n", test.test)
|
||||
t.Run(test.test, func(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
|
@ -1323,6 +1333,73 @@ func TestDaemonKillFailedPods(t *testing.T) {
|
|||
addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numNormalPods)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, test.expectedCreates, test.expectedDeletes, test.expectedEvents)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// DaemonSet controller needs to backoff when killing failed pods to avoid hot looping and fighting with kubelet.
|
||||
func TestDaemonKillFailedPodsBackoff(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
t.Run(string(strategy.Type), func(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
|
||||
manager.dsStore.Add(ds)
|
||||
addNodes(manager.nodeStore, 0, 1, nil)
|
||||
|
||||
nodeName := "node-0"
|
||||
pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, simpleDaemonSetLabel, ds)
|
||||
|
||||
// Add a failed Pod
|
||||
pod.Status.Phase = v1.PodFailed
|
||||
err = manager.podStore.Add(pod)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
backoffKey := failedPodsBackoffKey(ds, nodeName)
|
||||
|
||||
// First sync will delete the pod, initializing backoff
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 1, 1)
|
||||
initialDelay := manager.failedPodsBackoff.Get(backoffKey)
|
||||
if initialDelay <= 0 {
|
||||
t.Fatal("Initial delay is expected to be set.")
|
||||
}
|
||||
|
||||
resetCounters(manager)
|
||||
|
||||
// Immediate (second) sync gets limited by the backoff
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0, 0)
|
||||
delay := manager.failedPodsBackoff.Get(backoffKey)
|
||||
if delay != initialDelay {
|
||||
t.Fatal("Backoff delay shouldn't be raised while waiting.")
|
||||
}
|
||||
|
||||
resetCounters(manager)
|
||||
|
||||
// Sleep to wait out backoff
|
||||
fakeClock := manager.failedPodsBackoff.Clock
|
||||
|
||||
// Move just before the backoff end time
|
||||
fakeClock.Sleep(delay - 1*time.Nanosecond)
|
||||
if !manager.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, fakeClock.Now()) {
|
||||
t.Errorf("Backoff delay didn't last the whole waitout period.")
|
||||
}
|
||||
|
||||
// Move to the backoff end time
|
||||
fakeClock.Sleep(1 * time.Nanosecond)
|
||||
if manager.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, fakeClock.Now()) {
|
||||
t.Fatal("Backoff delay hasn't been reset after the period has passed.")
|
||||
}
|
||||
|
||||
// After backoff time, it will delete the failed pod
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 1, 1)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ go_test(
|
|||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
],
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
|
@ -75,6 +76,7 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS
|
|||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().Nodes(),
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-controller")),
|
||||
flowcontrol.NewBackOff(5*time.Second, 15*time.Minute),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
|
|
Loading…
Reference in New Issue