mirror of https://github.com/k3s-io/k3s
Return apiserver addresses from both etcd and endpoints
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 168b344d1d
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/11460/head
parent
fba0f092d4
commit
1a16be41e4
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/bootstrap"
|
||||
"github.com/k3s-io/k3s/pkg/cli/cmds"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/etcd"
|
||||
"github.com/k3s-io/k3s/pkg/nodepassword"
|
||||
"github.com/k3s-io/k3s/pkg/server/auth"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
|
@ -31,6 +32,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
|
@ -305,22 +307,15 @@ func fileHandler(fileName ...string) http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
// apiserversHandler returns a list of apiserver addresses.
|
||||
// It attempts to merge results from both the apiserver and directly from etcd,
|
||||
// in case we are recovering from an apiserver outage that rendered the endpoint list unavailable.
|
||||
func apiserversHandler(server *config.Control) http.Handler {
|
||||
var endpointsClient typedcorev1.EndpointsInterface
|
||||
collectAddresses := getAddressCollector(server)
|
||||
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
|
||||
ctx := req.Context()
|
||||
var endpoints []string
|
||||
if endpointsClient == nil {
|
||||
if server.Runtime.Core != nil {
|
||||
endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault)
|
||||
}
|
||||
}
|
||||
if endpointsClient != nil {
|
||||
if endpoint, _ := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil {
|
||||
endpoints = util.GetAddresses(endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second)
|
||||
defer cancel()
|
||||
endpoints := collectAddresses(ctx)
|
||||
resp.Header().Set("content-type", "application/json")
|
||||
if err := json.NewEncoder(resp).Encode(endpoints); err != nil {
|
||||
util.SendError(errors.Wrap(err, "failed to encode apiserver endpoints"), resp, req, http.StatusInternalServerError)
|
||||
|
@ -526,3 +521,75 @@ func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) {
|
|||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// addressGetter is a common signature for functions that return an address channel
|
||||
type addressGetter func(ctx context.Context) <-chan []string
|
||||
|
||||
// kubernetesGetter returns a function that returns a channel that can be read to get apiserver addresses from kubernetes endpoints
|
||||
func kubernetesGetter(server *config.Control) addressGetter {
|
||||
var endpointsClient typedcorev1.EndpointsInterface
|
||||
return func(ctx context.Context) <-chan []string {
|
||||
ch := make(chan []string, 1)
|
||||
go func() {
|
||||
if endpointsClient == nil {
|
||||
if server.Runtime.K8s != nil {
|
||||
endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault)
|
||||
}
|
||||
}
|
||||
if endpointsClient != nil {
|
||||
if endpoint, err := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil {
|
||||
logrus.Debugf("Failed to get apiserver addresses from kubernetes: %v", err)
|
||||
} else {
|
||||
ch <- util.GetAddresses(endpoint)
|
||||
}
|
||||
}
|
||||
close(ch)
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
}
|
||||
|
||||
// etcdGetter returns a function that returns a channel that can be read to get apiserver addresses from etcd
|
||||
func etcdGetter(server *config.Control) addressGetter {
|
||||
return func(ctx context.Context) <-chan []string {
|
||||
ch := make(chan []string, 1)
|
||||
go func() {
|
||||
if addresses, err := etcd.GetAPIServerURLsFromETCD(ctx, server); err != nil {
|
||||
logrus.Debugf("Failed to get apiserver addresses from etcd: %v", err)
|
||||
} else {
|
||||
ch <- addresses
|
||||
}
|
||||
close(ch)
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
}
|
||||
|
||||
// getAddressCollector returns a function that can be called to return
|
||||
// apiserver addresses from both kubernetes and etcd
|
||||
func getAddressCollector(server *config.Control) func(ctx context.Context) []string {
|
||||
getFromKubernetes := kubernetesGetter(server)
|
||||
getFromEtcd := etcdGetter(server)
|
||||
|
||||
// read from both kubernetes and etcd in parallel, returning the collected results
|
||||
return func(ctx context.Context) []string {
|
||||
a := sets.Set[string]{}
|
||||
r := []string{}
|
||||
k8sCh := getFromKubernetes(ctx)
|
||||
k8sOk := true
|
||||
etcdCh := getFromEtcd(ctx)
|
||||
etcdOk := true
|
||||
|
||||
for k8sOk || etcdOk {
|
||||
select {
|
||||
case r, k8sOk = <-k8sCh:
|
||||
a.Insert(r...)
|
||||
case r, etcdOk = <-etcdCh:
|
||||
a.Insert(r...)
|
||||
case <-ctx.Done():
|
||||
return a.UnsortedList()
|
||||
}
|
||||
}
|
||||
return a.UnsortedList()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue