Ensure that WaitForAPIServerReady always re-dials through the loadbalancer

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit e763fadbba)
pull/5612/head
Brad Davidson 2022-04-29 12:57:38 -07:00 committed by Brad Davidson
parent b158411687
commit ea09106737
5 changed files with 40 additions and 48 deletions

View File

@ -108,15 +108,15 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
return err return err
} }
if err := util.WaitForAPIServerReady(ctx, nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
return errors.Wrap(err, "failed to wait for apiserver ready")
}
coreClient, err := coreClient(nodeConfig.AgentConfig.KubeConfigKubelet) coreClient, err := coreClient(nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil { if err != nil {
return err return err
} }
if err := util.WaitForAPIServerReady(ctx, coreClient, util.DefaultAPIServerReadyTimeout); err != nil {
return errors.Wrap(err, "failed to wait for apiserver ready")
}
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil { if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err return err
} }

View File

@ -90,7 +90,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
// Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come // Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster. // and go from the cluster.
go func() { go func() {
if err := util.WaitForAPIServerReady(ctx, client, util.DefaultAPIServerReadyTimeout); err != nil { if err := util.WaitForAPIServerReady(ctx, config.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
logrus.Warnf("Tunnel endpoint watch failed to wait for apiserver ready: %v", err) logrus.Warnf("Tunnel endpoint watch failed to wait for apiserver ready: %v", err)
} }

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"math/rand" "math/rand"
"net" "net"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -23,7 +22,6 @@ import (
authorizationv1 "k8s.io/api/authorization/v1" authorizationv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1" authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
@ -41,12 +39,6 @@ func getLocalhostIP(serviceCIDR []*net.IPNet) net.IP {
return net.ParseIP("127.0.0.1") return net.ParseIP("127.0.0.1")
} }
type roundTripFunc func(req *http.Request) (*http.Response, error)
func (w roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return w(req)
}
func Server(ctx context.Context, cfg *config.Control) error { func Server(ctx context.Context, cfg *config.Control) error {
rand.Seed(time.Now().UTC().UnixNano()) rand.Seed(time.Now().UTC().UnixNano())
@ -409,26 +401,6 @@ func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntim
} }
func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error { func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
if err != nil {
return err
}
// By default, idle connections to the apiserver are returned to a global pool
// between requests. Explicitly flag this client's request for closure so that
// we re-dial through the loadbalancer in case the endpoints have changed.
restConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return roundTripFunc(func(req *http.Request) (*http.Response, error) {
req.Close = true
return rt.RoundTrip(req)
})
})
k8sClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
done := make(chan struct{}) done := make(chan struct{})
runtime.APIServerReady = done runtime.APIServerReady = done
@ -452,7 +424,7 @@ func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRu
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case err := <-promise(func() error { return util.WaitForAPIServerReady(ctx, k8sClient, 30*time.Second) }): case err := <-promise(func() error { return util.WaitForAPIServerReady(ctx, runtime.KubeConfigAdmin, 30*time.Second) }):
if err != nil { if err != nil {
logrus.Infof("Waiting for API server to become available") logrus.Infof("Waiting for API server to become available")
continue continue

View File

@ -20,7 +20,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
@ -52,15 +51,6 @@ func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node,
} }
func (e *Embedded) Kubelet(ctx context.Context, args []string) error { func (e *Embedded) Kubelet(ctx context.Context, args []string) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", e.nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil {
return err
}
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
command := kubelet.NewKubeletCommand(context.Background()) command := kubelet.NewKubeletCommand(context.Background())
command.SetArgs(args) command.SetArgs(args)
@ -73,7 +63,7 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error {
// The embedded executor doesn't need the kubelet to come up to host any components, and // The embedded executor doesn't need the kubelet to come up to host any components, and
// having it come up on servers before the apiserver is available causes a lot of log spew. // having it come up on servers before the apiserver is available causes a lot of log spew.
// Agents don't have access to the server's apiReady channel, so just wait directly. // Agents don't have access to the server's apiReady channel, so just wait directly.
if err := util.WaitForAPIServerReady(ctx, client, util.DefaultAPIServerReadyTimeout); err != nil { if err := util.WaitForAPIServerReady(ctx, e.nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
logrus.Fatalf("Kubelet failed to wait for apiserver ready: %v", err) logrus.Fatalf("Kubelet failed to wait for apiserver ready: %v", err)
} }
logrus.Fatalf("kubelet exited: %v", command.ExecuteContext(ctx)) logrus.Fatalf("kubelet exited: %v", command.ExecuteContext(ctx))

View File

@ -14,9 +14,13 @@ import (
"github.com/rancher/wrangler/pkg/schemes" "github.com/rancher/wrangler/pkg/schemes"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
coregetter "k8s.io/client-go/kubernetes/typed/core/v1" coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
) )
@ -49,11 +53,31 @@ func GetAddresses(endpoint *v1.Endpoints) []string {
// WaitForAPIServerReady waits for the API Server's /readyz endpoint to report "ok" with timeout. // WaitForAPIServerReady waits for the API Server's /readyz endpoint to report "ok" with timeout.
// This is modified from WaitForAPIServer from the Kubernetes controller-manager app, but checks the // This is modified from WaitForAPIServer from the Kubernetes controller-manager app, but checks the
// readyz endpoint instead of the deprecated healthz endpoint, and supports context. // readyz endpoint instead of the deprecated healthz endpoint, and supports context.
func WaitForAPIServerReady(ctx context.Context, client clientset.Interface, timeout time.Duration) error { func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout time.Duration) error {
var lastErr error var lastErr error
restClient := client.Discovery().RESTClient() restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return err
}
err := wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) { // By default, idle connections to the apiserver are returned to a global pool
// between requests. Explicitly flag this client's request for closure so that
// we re-dial through the loadbalancer in case the endpoints have changed.
restConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return roundTripFunc(func(req *http.Request) (*http.Response, error) {
req.Close = true
return rt.RoundTrip(req)
})
})
restConfig = dynamic.ConfigFor(restConfig)
restConfig.GroupVersion = &schema.GroupVersion{}
restClient, err := rest.RESTClientFor(restConfig)
if err != nil {
return err
}
err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
healthStatus := 0 healthStatus := 0
result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus) result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus)
if rerr := result.Error(); rerr != nil { if rerr := result.Error(); rerr != nil {
@ -85,3 +109,9 @@ func BuildControllerEventRecorder(k8s clientset.Interface, controllerName, names
nodeName := os.Getenv("NODE_NAME") nodeName := os.Getenv("NODE_NAME")
return eventBroadcaster.NewRecorder(schemes.All, v1.EventSource{Component: controllerName, Host: nodeName}) return eventBroadcaster.NewRecorder(schemes.All, v1.EventSource{Component: controllerName, Host: nodeName})
} }
type roundTripFunc func(req *http.Request) (*http.Response, error)
func (w roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return w(req)
}