mirror of https://github.com/k3s-io/k3s
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
3.9 KiB
117 lines
3.9 KiB
package util |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"net" |
|
"net/http" |
|
"os" |
|
"strconv" |
|
"time" |
|
|
|
"github.com/pkg/errors" |
|
"github.com/rancher/wrangler/pkg/merr" |
|
"github.com/rancher/wrangler/pkg/schemes" |
|
"github.com/sirupsen/logrus" |
|
v1 "k8s.io/api/core/v1" |
|
"k8s.io/apimachinery/pkg/runtime/schema" |
|
"k8s.io/apimachinery/pkg/util/wait" |
|
"k8s.io/client-go/dynamic" |
|
clientset "k8s.io/client-go/kubernetes" |
|
coregetter "k8s.io/client-go/kubernetes/typed/core/v1" |
|
"k8s.io/client-go/rest" |
|
"k8s.io/client-go/tools/clientcmd" |
|
"k8s.io/client-go/tools/record" |
|
) |
|
|
|
// This sets a default duration to wait for the apiserver to become ready. This is primarily used to |
|
// block startup of agent supervisor controllers until the apiserver is ready to serve requests, in the |
|
// same way that the apiReady channel is used in the server packages, so it can be fairly long. It must |
|
// be at least long enough for downstream projects like RKE2 to start the apiserver in the background. |
|
const DefaultAPIServerReadyTimeout = 15 * time.Minute |
|
|
|
func GetAddresses(endpoint *v1.Endpoints) []string { |
|
serverAddresses := []string{} |
|
if endpoint == nil { |
|
return serverAddresses |
|
} |
|
for _, subset := range endpoint.Subsets { |
|
var port string |
|
if len(subset.Ports) > 0 { |
|
port = strconv.Itoa(int(subset.Ports[0].Port)) |
|
} |
|
if port == "" { |
|
port = "443" |
|
} |
|
for _, address := range subset.Addresses { |
|
serverAddresses = append(serverAddresses, net.JoinHostPort(address.IP, port)) |
|
} |
|
} |
|
return serverAddresses |
|
} |
|
|
|
// WaitForAPIServerReady waits for the API Server's /readyz endpoint to report "ok" with timeout. |
|
// This is modified from WaitForAPIServer from the Kubernetes controller-manager app, but checks the |
|
// readyz endpoint instead of the deprecated healthz endpoint, and supports context. |
|
func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout time.Duration) error { |
|
var lastErr error |
|
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
// By default, idle connections to the apiserver are returned to a global pool |
|
// between requests. Explicitly flag this client's request for closure so that |
|
// we re-dial through the loadbalancer in case the endpoints have changed. |
|
restConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { |
|
return roundTripFunc(func(req *http.Request) (*http.Response, error) { |
|
req.Close = true |
|
return rt.RoundTrip(req) |
|
}) |
|
}) |
|
|
|
restConfig = dynamic.ConfigFor(restConfig) |
|
restConfig.GroupVersion = &schema.GroupVersion{} |
|
restClient, err := rest.RESTClientFor(restConfig) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) { |
|
healthStatus := 0 |
|
result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus) |
|
if rerr := result.Error(); rerr != nil { |
|
lastErr = errors.Wrap(rerr, "failed to get apiserver /readyz status") |
|
return false, nil |
|
} |
|
if healthStatus != http.StatusOK { |
|
content, _ := result.Raw() |
|
lastErr = fmt.Errorf("APIServer isn't ready: %v", string(content)) |
|
logrus.Warnf("APIServer isn't ready yet: %v. Waiting a little while.", string(content)) |
|
return false, nil |
|
} |
|
|
|
return true, nil |
|
}) |
|
|
|
if err != nil { |
|
return merr.NewErrors(err, lastErr) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func BuildControllerEventRecorder(k8s clientset.Interface, controllerName, namespace string) record.EventRecorder { |
|
logrus.Infof("Creating %s event broadcaster", controllerName) |
|
eventBroadcaster := record.NewBroadcaster() |
|
eventBroadcaster.StartLogging(logrus.Infof) |
|
eventBroadcaster.StartRecordingToSink(&coregetter.EventSinkImpl{Interface: k8s.CoreV1().Events(namespace)}) |
|
nodeName := os.Getenv("NODE_NAME") |
|
return eventBroadcaster.NewRecorder(schemes.All, v1.EventSource{Component: controllerName, Host: nodeName}) |
|
} |
|
|
|
type roundTripFunc func(req *http.Request) (*http.Response, error) |
|
|
|
func (w roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { |
|
return w(req) |
|
}
|
|
|