Fix issue with long-running apiserver endpoints watch (#5478)

Use ListWatch helpers to retry when the watch channel is closed.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/5487/head v1.23.6-rc1+k3s1
Brad Davidson 3 years ago committed by GitHub
parent 93f9562272
commit f2ceeb01d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,23 +10,38 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
) )
func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error { func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) {
if etcd.config.DisableAPIServer { if etcd.config.DisableAPIServer {
return nil return
} }
endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints() endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints()
watch, err := endpoints.Watch(metav1.NamespaceDefault, metav1.ListOptions{ fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), lw := &cache.ListWatch{
ResourceVersion: "0", ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
}) options.FieldSelector = fieldSelector
if err != nil { return endpoints.List(metav1.NamespaceDefault, options)
return err },
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return endpoints.Watch(metav1.NamespaceDefault, options)
},
} }
_, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Endpoints{})
go func() {
<-ctx.Done()
watch.Stop()
<-done
}()
h := &handler{ h := &handler{
etcd: etcd, etcd: etcd,
watch: watch, watch: watch,
@ -34,8 +49,6 @@ func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error {
logrus.Infof("Starting managed etcd apiserver addresses controller") logrus.Infof("Starting managed etcd apiserver addresses controller")
go h.watchEndpoints(ctx) go h.watchEndpoints(ctx)
return nil
} }
type handler struct { type handler struct {
@ -53,8 +66,7 @@ func (h *handler) watchEndpoints(ctx context.Context) {
case ev, ok := <-h.watch.ResultChan(): case ev, ok := <-h.watch.ResultChan():
endpoint, ok := ev.Object.(*v1.Endpoints) endpoint, ok := ev.Object.(*v1.Endpoints)
if !ok { if !ok {
logrus.Errorf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev) logrus.Fatalf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev)
continue
} }
w := &bytes.Buffer{} w := &bytes.Buffer{}

Loading…
Cancel
Save