mirror of https://github.com/k3s-io/k3s
Fix issue with long-running apiserver endpoints watch (#5480)
Use ListWatch helpers to retry when the watch channel is closed. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/5496/head
parent
3b1ae9cd5f
commit
18098ca0d8
|
@ -10,23 +10,38 @@ import (
|
|||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
toolswatch "k8s.io/client-go/tools/watch"
|
||||
)
|
||||
|
||||
func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error {
|
||||
func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) {
|
||||
if etcd.config.DisableAPIServer {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
endpoints := etcd.config.Runtime.Core.Core().V1().Endpoints()
|
||||
watch, err := endpoints.Watch(metav1.NamespaceDefault, metav1.ListOptions{
|
||||
FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(),
|
||||
ResourceVersion: "0",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return endpoints.List(metav1.NamespaceDefault, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return endpoints.Watch(metav1.NamespaceDefault, options)
|
||||
},
|
||||
}
|
||||
|
||||
_, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Endpoints{})
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
watch.Stop()
|
||||
<-done
|
||||
}()
|
||||
|
||||
h := &handler{
|
||||
etcd: etcd,
|
||||
watch: watch,
|
||||
|
@ -34,8 +49,6 @@ func registerEndpointsHandlers(ctx context.Context, etcd *ETCD) error {
|
|||
|
||||
logrus.Infof("Starting managed etcd apiserver addresses controller")
|
||||
go h.watchEndpoints(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
|
@ -53,8 +66,7 @@ func (h *handler) watchEndpoints(ctx context.Context) {
|
|||
case ev, ok := <-h.watch.ResultChan():
|
||||
endpoint, ok := ev.Object.(*v1.Endpoints)
|
||||
if !ok {
|
||||
logrus.Errorf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev)
|
||||
continue
|
||||
logrus.Fatalf("Failed to watch apiserver addresses: could not convert event object to endpoint: %v", ev)
|
||||
}
|
||||
|
||||
w := &bytes.Buffer{}
|
||||
|
|
Loading…
Reference in New Issue