diff --git a/pkg/server/router.go b/pkg/server/router.go index b4a2dc57cb..fca5540278 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -19,6 +19,7 @@ import ( "github.com/k3s-io/k3s/pkg/bootstrap" "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/etcd" "github.com/k3s-io/k3s/pkg/nodepassword" "github.com/k3s-io/k3s/pkg/server/auth" "github.com/k3s-io/k3s/pkg/util" @@ -31,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" @@ -305,22 +307,15 @@ func fileHandler(fileName ...string) http.Handler { }) } +// apiserversHandler returns a list of apiserver addresses. +// It attempts to merge results from both the apiserver and directly from etcd, +// in case we are recovering from an apiserver outage that rendered the endpoint list unavailable. func apiserversHandler(server *config.Control) http.Handler { - var endpointsClient typedcorev1.EndpointsInterface + collectAddresses := getAddressCollector(server) return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - ctx := req.Context() - var endpoints []string - if endpointsClient == nil { - if server.Runtime.Core != nil { - endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault) - } - } - if endpointsClient != nil { - if endpoint, _ := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil { - endpoints = util.GetAddresses(endpoint) - } - } - + ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second) + defer cancel() + endpoints := collectAddresses(ctx) resp.Header().Set("content-type", "application/json") if err := json.NewEncoder(resp).Encode(endpoints); err != nil { util.SendError(errors.Wrap(err, "failed to encode apiserver endpoints"), resp, req, http.StatusInternalServerError) @@ -526,3 +521,75 @@ func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) { return false, nil }) } + +// addressGetter is a common signature for functions that return an address channel +type addressGetter func(ctx context.Context) <-chan []string + +// kubernetesGetter returns a function that returns a channel that can be read to get apiserver addresses from kubernetes endpoints +func kubernetesGetter(server *config.Control) addressGetter { + var endpointsClient typedcorev1.EndpointsInterface + return func(ctx context.Context) <-chan []string { + ch := make(chan []string, 1) + go func() { + if endpointsClient == nil { + if server.Runtime.K8s != nil { + endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault) + } + } + if endpointsClient != nil { + if endpoint, err := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil { + logrus.Debugf("Failed to get apiserver addresses from kubernetes: %v", err) + } else { + ch <- util.GetAddresses(endpoint) + } + } + close(ch) + }() + return ch + } +} + +// etcdGetter returns a function that returns a channel that can be read to get apiserver addresses from etcd +func etcdGetter(server *config.Control) addressGetter { + return func(ctx context.Context) <-chan []string { + ch := make(chan []string, 1) + go func() { + if addresses, err := etcd.GetAPIServerURLsFromETCD(ctx, server); err != nil { + logrus.Debugf("Failed to get apiserver addresses from etcd: %v", err) + } else { + ch <- addresses + } + close(ch) + }() + return ch + } +} + +// getAddressCollector returns a function that can be called to return +// apiserver addresses from both kubernetes and etcd +func getAddressCollector(server *config.Control) func(ctx context.Context) []string { + getFromKubernetes := kubernetesGetter(server) + getFromEtcd := etcdGetter(server) + + // read from both kubernetes and etcd in parallel, returning the collected results + return func(ctx context.Context) []string { + a := sets.Set[string]{} + r := []string{} + k8sCh := getFromKubernetes(ctx) + k8sOk := true + etcdCh := getFromEtcd(ctx) + etcdOk := true + + for k8sOk || etcdOk { + select { + case r, k8sOk = <-k8sCh: + a.Insert(r...) + case r, etcdOk = <-etcdCh: + a.Insert(r...) + case <-ctx.Done(): + return a.UnsortedList() + } + } + return a.UnsortedList() + } +}