From f2ceeb01d9c392e7d9497b6ba3181510620f5102 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 21 Apr 2022 09:24:34 -0700 Subject: [PATCH] Fix issue with long-running apiserver endpoints watch (#5478) Use ListWatch helpers to retry when the watch channel is closed. Signed-off-by: Brad Davidson --- pkg/etcd/apiaddresses_controller.go | 36 +++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/etcd/apiaddresses_controller.go b/pkg/etcd/apiaddresses_controller.go index af181a8171..50483192fd 100644 --- a/pkg/etcd/apiaddresses_controller.go +++ b/pkg/etcd/apiaddresses_controller.go @@ -10,23 +10,38 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + toolswatch "k8s.io/client-go/tools/watch" ) -func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error { +func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) { if etcd.config.DisableAPIServer { - return nil + return } endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints() - watch, err := endpoints.Watch(metav1.NamespaceDefault, metav1.ListOptions{ - FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), - ResourceVersion: "0", - }) - if err != nil { - return err + fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + options.FieldSelector = fieldSelector + return endpoints.List(metav1.NamespaceDefault, options) + }, + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return endpoints.Watch(metav1.NamespaceDefault, options) + }, } + _, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Endpoints{}) + + go func() { + <-ctx.Done() + watch.Stop() + <-done + }() + h := &handler{ etcd: etcd, watch: watch, @@ -34,8 +49,6 @@ func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error { logrus.Infof("Starting managed etcd apiserver addresses controller") go h.watchEndpoints(ctx) - - return nil } type handler struct { @@ -53,8 +66,7 @@ func (h *handler) watchEndpoints(ctx context.Context) { case ev, ok := <-h.watch.ResultChan(): endpoint, ok := ev.Object.(*v1.Endpoints) if !ok { - logrus.Errorf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev) - continue + logrus.Fatalf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev) } w := &bytes.Buffer{}