From 1a16be41e41051c48537916c0f4a7f9cb7d8b46d Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Fri, 22 Nov 2024 01:27:18 +0000 Subject: [PATCH] Return apiserver addresses from both etcd and endpoints Signed-off-by: Brad Davidson (cherry picked from commit 168b344d1d40d3d9c86452cd0b1b38fca7ab7196) Signed-off-by: Brad Davidson --- pkg/server/router.go | 95 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 14 deletions(-) diff --git a/pkg/server/router.go b/pkg/server/router.go index 93666f38c3..794a1c94a3 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -19,6 +19,7 @@ import ( "github.com/k3s-io/k3s/pkg/bootstrap" "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/etcd" "github.com/k3s-io/k3s/pkg/nodepassword" "github.com/k3s-io/k3s/pkg/server/auth" "github.com/k3s-io/k3s/pkg/util" @@ -31,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" @@ -305,22 +307,15 @@ func fileHandler(fileName ...string) http.Handler { }) } +// apiserversHandler returns a list of apiserver addresses. +// It attempts to merge results from both the apiserver and directly from etcd, +// in case we are recovering from an apiserver outage that rendered the endpoint list unavailable. func apiserversHandler(server *config.Control) http.Handler { - var endpointsClient typedcorev1.EndpointsInterface + collectAddresses := getAddressCollector(server) return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - ctx := req.Context() - var endpoints []string - if endpointsClient == nil { - if server.Runtime.Core != nil { - endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault) - } - } - if endpointsClient != nil { - if endpoint, _ := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil { - endpoints = util.GetAddresses(endpoint) - } - } - + ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second) + defer cancel() + endpoints := collectAddresses(ctx) resp.Header().Set("content-type", "application/json") if err := json.NewEncoder(resp).Encode(endpoints); err != nil { util.SendError(errors.Wrap(err, "failed to encode apiserver endpoints"), resp, req, http.StatusInternalServerError) @@ -526,3 +521,75 @@ func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) { return false, nil }) } + +// addressGetter is a common signature for functions that return an address channel +type addressGetter func(ctx context.Context) <-chan []string + +// kubernetesGetter returns a function that returns a channel that can be read to get apiserver addresses from kubernetes endpoints +func kubernetesGetter(server *config.Control) addressGetter { + var endpointsClient typedcorev1.EndpointsInterface + return func(ctx context.Context) <-chan []string { + ch := make(chan []string, 1) + go func() { + if endpointsClient == nil { + if server.Runtime.K8s != nil { + endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault) + } + } + if endpointsClient != nil { + if endpoint, err := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil { + logrus.Debugf("Failed to get apiserver addresses from kubernetes: %v", err) + } else { + ch <- util.GetAddresses(endpoint) + } + } + close(ch) + }() + return ch + } +} + +// etcdGetter returns a function that returns a channel that can be read to get apiserver addresses from etcd +func etcdGetter(server *config.Control) addressGetter { + return func(ctx context.Context) <-chan []string { + ch := make(chan []string, 1) + go func() { + if addresses, err := etcd.GetAPIServerURLsFromETCD(ctx, server); err != nil { + logrus.Debugf("Failed to get apiserver addresses from etcd: %v", err) + } else { + ch <- addresses + } + close(ch) + }() + return ch + } +} + +// getAddressCollector returns a function that can be called to return +// apiserver addresses from both kubernetes and etcd +func getAddressCollector(server *config.Control) func(ctx context.Context) []string { + getFromKubernetes := kubernetesGetter(server) + getFromEtcd := etcdGetter(server) + + // read from both kubernetes and etcd in parallel, returning the collected results + return func(ctx context.Context) []string { + a := sets.Set[string]{} + r := []string{} + k8sCh := getFromKubernetes(ctx) + k8sOk := true + etcdCh := getFromEtcd(ctx) + etcdOk := true + + for k8sOk || etcdOk { + select { + case r, k8sOk = <-k8sCh: + a.Insert(r...) + case r, etcdOk = <-etcdCh: + a.Insert(r...) + case <-ctx.Done(): + return a.UnsortedList() + } + } + return a.UnsortedList() + } +}