diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 4e78fe7f7d..6b3d03b6ec 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -123,6 +123,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", + "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/custom_metrics:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/external_metrics:go_default_library", diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index 719363c398..d59c34a05f 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -22,10 +22,11 @@ package app import ( "fmt" - "net/http" + "time" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/replicaset" @@ -42,6 +43,7 @@ func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Nodes(), ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), + flowcontrol.NewBackOff(1*time.Second, 15*time.Minute), ) if err != nil { return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err) diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 1be65f07c9..c73872ce3a 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -50,6 +50,7 @@ go_library( "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/integer:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/github.com/golang/glog:go_default_library", @@ -79,6 +80,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library", @@ -88,6 +90,7 @@ go_test( "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index fde69a1f39..8b38271d51 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -46,6 +46,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/integer" "k8s.io/client-go/util/workqueue" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -67,6 +68,9 @@ const ( // StatusUpdateRetries limits the number of retries if sending a status update to API server fails. StatusUpdateRetries = 1 + + // BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run + BackoffGCInterval = 1 * time.Minute ) // Reasons for DaemonSet events @@ -131,10 +135,19 @@ type DaemonSetsController struct { // is DaemonSet set that want to run pods but can't schedule in latest syncup cycle. suspendedDaemonPodsMutex sync.Mutex suspendedDaemonPods map[string]sets.String + + failedPodsBackoff *flowcontrol.Backoff } // NewDaemonSetsController creates a new DaemonSetsController -func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) (*DaemonSetsController, error) { +func NewDaemonSetsController( + daemonSetInformer appsinformers.DaemonSetInformer, + historyInformer appsinformers.ControllerRevisionInformer, + podInformer coreinformers.PodInformer, + nodeInformer coreinformers.NodeInformer, + kubeClient clientset.Interface, + failedPodsBackoff *flowcontrol.Backoff, +) (*DaemonSetsController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) @@ -212,6 +225,9 @@ func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer, dsc.syncHandler = dsc.syncDaemonSet dsc.enqueueDaemonSet = dsc.enqueue dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited + + dsc.failedPodsBackoff = failedPodsBackoff + return dsc, nil } @@ -261,6 +277,8 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { go wait.Until(dsc.runWorker, time.Second, stopCh) } + go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh) + <-stopCh } @@ -847,6 +865,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( daemonPods, exists := nodeToDaemonPods[node.Name] dsKey, _ := cache.MetaNamespaceKeyFunc(ds) + dsc.removeSuspendedDaemonPods(node.Name, dsKey) switch { @@ -865,12 +884,29 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( continue } if pod.Status.Phase == v1.PodFailed { + failedPodsObserved++ + + // This is a critical place where DS is often fighting with kubelet that rejects pods. + // We need to avoid hot looping and backoff. + backoffKey := failedPodsBackoffKey(ds, node.Name) + + now := dsc.failedPodsBackoff.Clock.Now() + inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now) + if inBackoff { + delay := dsc.failedPodsBackoff.Get(backoffKey) + glog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining", + pod.Namespace, pod.Name, node.Name, delay) + dsc.enqueueDaemonSetAfter(ds, delay) + continue + } + + dsc.failedPodsBackoff.Next(backoffKey, now) + msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name) glog.V(2).Infof(msg) // Emit an event so that it's discoverable to users. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg) podsToDelete = append(podsToDelete, pod.Name) - failedPodsObserved++ } else { daemonPodsRunning = append(daemonPodsRunning, pod) } @@ -1529,3 +1565,7 @@ func isControlledByDaemonSet(p *v1.Pod, uuid types.UID) bool { } return false } + +func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string { + return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName) +} diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index ff09e7ae94..ebceb4e7d3 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -23,12 +23,14 @@ import ( "strconv" "sync" "testing" + "time" apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/storage/names" @@ -38,6 +40,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -320,6 +323,7 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), clientset, + flowcontrol.NewFakeBackOff(50*time.Millisecond, 500*time.Millisecond, clock.NewFakeClock(time.Now())), ) if err != nil { return nil, nil, nil, err @@ -346,6 +350,13 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, }, podControl, clientset, nil } +func resetCounters(manager *daemonSetsController) { + manager.podControl.(*fakePodControl).Clear() + fakeRecorder := record.NewFakeRecorder(100) + manager.eventRecorder = fakeRecorder + manager.fakeRecorder = fakeRecorder +} + func validateSyncDaemonSets(t *testing.T, manager *daemonSetsController, fakePodControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) { if len(fakePodControl.Templates) != expectedCreates { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates)) @@ -1305,24 +1316,90 @@ func TestDaemonKillFailedPods(t *testing.T) { {numFailedPods: 0, numNormalPods: 0, expectedCreates: 1, expectedDeletes: 0, expectedEvents: 0, test: "no pods (create 1)"}, {numFailedPods: 1, numNormalPods: 0, expectedCreates: 0, expectedDeletes: 1, expectedEvents: 1, test: "1 failed pod (kill 1), 0 normal pod (create 0; will create in the next sync)"}, {numFailedPods: 1, numNormalPods: 3, expectedCreates: 0, expectedDeletes: 3, expectedEvents: 1, test: "1 failed pod (kill 1), 3 normal pods (kill 2)"}, - {numFailedPods: 2, numNormalPods: 1, expectedCreates: 0, expectedDeletes: 2, expectedEvents: 2, test: "2 failed pods (kill 2), 1 normal pod"}, } for _, test := range tests { - t.Logf("test case: %s\n", test.test) - for _, strategy := range updateStrategies() { + t.Run(test.test, func(t *testing.T) { + for _, strategy := range updateStrategies() { + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = *strategy + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 1, nil) + addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numFailedPods) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numNormalPods) + syncAndValidateDaemonSets(t, manager, ds, podControl, test.expectedCreates, test.expectedDeletes, test.expectedEvents) + } + }) + } +} + +// DaemonSet controller needs to backoff when killing failed pods to avoid hot looping and fighting with kubelet. +func TestDaemonKillFailedPodsBackoff(t *testing.T) { + for _, strategy := range updateStrategies() { + t.Run(string(strategy.Type), func(t *testing.T) { ds := newDaemonSet("foo") ds.Spec.UpdateStrategy = *strategy + manager, podControl, _, err := newTestController(ds) if err != nil { t.Fatalf("error creating DaemonSets controller: %v", err) } + manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 1, nil) - addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numFailedPods) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numNormalPods) - syncAndValidateDaemonSets(t, manager, ds, podControl, test.expectedCreates, test.expectedDeletes, test.expectedEvents) - } + + nodeName := "node-0" + pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, simpleDaemonSetLabel, ds) + + // Add a failed Pod + pod.Status.Phase = v1.PodFailed + err = manager.podStore.Add(pod) + if err != nil { + t.Fatal(err) + } + + backoffKey := failedPodsBackoffKey(ds, nodeName) + + // First sync will delete the pod, initializing backoff + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 1, 1) + initialDelay := manager.failedPodsBackoff.Get(backoffKey) + if initialDelay <= 0 { + t.Fatal("Initial delay is expected to be set.") + } + + resetCounters(manager) + + // Immediate (second) sync gets limited by the backoff + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0, 0) + delay := manager.failedPodsBackoff.Get(backoffKey) + if delay != initialDelay { + t.Fatal("Backoff delay shouldn't be raised while waiting.") + } + + resetCounters(manager) + + // Sleep to wait out backoff + fakeClock := manager.failedPodsBackoff.Clock + + // Move just before the backoff end time + fakeClock.Sleep(delay - 1*time.Nanosecond) + if !manager.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, fakeClock.Now()) { + t.Errorf("Backoff delay didn't last the whole waitout period.") + } + + // Move to the backoff end time + fakeClock.Sleep(1 * time.Nanosecond) + if manager.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, fakeClock.Now()) { + t.Fatal("Backoff delay hasn't been reset after the period has passed.") + } + + // After backoff time, it will delete the failed pod + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 1, 1) + }) } } diff --git a/test/integration/daemonset/BUILD b/test/integration/daemonset/BUILD index 86042e2796..0223e24b01 100644 --- a/test/integration/daemonset/BUILD +++ b/test/integration/daemonset/BUILD @@ -41,6 +41,7 @@ go_test( "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//test/integration/framework:go_default_library", ], diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 4b5614a9c4..6b6f023f56 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -39,6 +39,7 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" @@ -75,6 +76,7 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS informers.Core().V1().Pods(), informers.Core().V1().Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-controller")), + flowcontrol.NewBackOff(5*time.Second, 15*time.Minute), ) if err != nil { t.Fatalf("error creating DaemonSets controller: %v", err)