You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
k3s/pkg/agent/run.go

452 lines
14 KiB

package agent
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"time"
systemd "github.com/coreos/go-systemd/daemon"
"github.com/k3s-io/k3s/pkg/agent/config"
"github.com/k3s-io/k3s/pkg/agent/containerd"
"github.com/k3s-io/k3s/pkg/agent/flannel"
"github.com/k3s-io/k3s/pkg/agent/netpol"
"github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/agent/syssetup"
"github.com/k3s-io/k3s/pkg/agent/tunnel"
"github.com/k3s-io/k3s/pkg/cgroups"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/clientaccess"
cp "github.com/k3s-io/k3s/pkg/cloudprovider"
"github.com/k3s-io/k3s/pkg/daemons/agent"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/nodeconfig"
"github.com/k3s-io/k3s/pkg/rootless"
"github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
app2 "k8s.io/kubernetes/cmd/kube-proxy/app"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
utilsnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
)
func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
nodeConfig := config.Get(ctx, cfg, proxy)
dualCluster, err := utilsnet.IsDualStackCIDRs(nodeConfig.AgentConfig.ClusterCIDRs)
if err != nil {
return errors.Wrap(err, "failed to validate cluster-cidr")
}
dualService, err := utilsnet.IsDualStackCIDRs(nodeConfig.AgentConfig.ServiceCIDRs)
if err != nil {
return errors.Wrap(err, "failed to validate service-cidr")
}
dualNode, err := utilsnet.IsDualStackIPs(nodeConfig.AgentConfig.NodeIPs)
if err != nil {
return errors.Wrap(err, "failed to validate node-ip")
}
serviceIPv4 := utilsnet.IsIPv4CIDR(nodeConfig.AgentConfig.ServiceCIDR)
clusterIPv4 := utilsnet.IsIPv4CIDR(nodeConfig.AgentConfig.ClusterCIDR)
serviceIPv6 := utilsnet.IsIPv6CIDR(nodeConfig.AgentConfig.ServiceCIDR)
clusterIPv6 := utilsnet.IsIPv6CIDR(nodeConfig.AgentConfig.ClusterCIDR)
enableIPv4 := dualCluster || dualService || dualNode || serviceIPv4 || clusterIPv4
enableIPv6 := dualCluster || dualService || dualNode || serviceIPv6 || clusterIPv6
conntrackConfig, err := getConntrackConfig(nodeConfig)
if err != nil {
return errors.Wrap(err, "failed to validate kube-proxy conntrack configuration")
}
syssetup.Configure(enableIPv6, conntrackConfig)
nodeConfig.AgentConfig.EnableIPv4 = enableIPv4
nodeConfig.AgentConfig.EnableIPv6 = enableIPv6
if err := setupCriCtlConfig(cfg, nodeConfig); err != nil {
return err
}
if err := executor.Bootstrap(ctx, nodeConfig, cfg); err != nil {
return err
}
if !nodeConfig.NoFlannel {
if err := flannel.Prepare(ctx, nodeConfig); err != nil {
return err
}
}
if !nodeConfig.Docker && nodeConfig.ContainerRuntimeEndpoint == "" {
if err := containerd.Run(ctx, nodeConfig); err != nil {
return err
}
}
// the agent runtime is ready to host workloads when containerd is up and the airgap
// images have finished loading, as that portion of startup may block for an arbitrary
// amount of time depending on how long it takes to import whatever the user has placed
// in the images directory.
if cfg.AgentReady != nil {
close(cfg.AgentReady)
}
notifySocket := os.Getenv("NOTIFY_SOCKET")
os.Unsetenv("NOTIFY_SOCKET")
if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil {
return err
}
coreClient, err := coreClient(nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil {
return err
}
if err := util.WaitForAPIServerReady(ctx, coreClient, util.DefaultAPIServerReadyTimeout); err != nil {
return errors.Wrap(err, "failed to wait for apiserver ready")
}
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}
if !nodeConfig.NoFlannel {
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}
}
if !nodeConfig.AgentConfig.DisableNPC {
if err := netpol.Run(ctx, nodeConfig); err != nil {
return err
}
}
os.Setenv("NOTIFY_SOCKET", notifySocket)
systemd.SdNotify(true, "READY=1\n")
<-ctx.Done()
return ctx.Err()
}
// getConntrackConfig uses the kube-proxy code to parse the user-provided kube-proxy-arg values, and
// extract the conntrack settings so that K3s can set them itself. This allows us to soft-fail when
// running K3s in Docker, where kube-proxy is no longer allowed to set conntrack sysctls on newer kernels.
// When running rootless, we do not attempt to set conntrack sysctls - this behavior is copied from kubeadm.
func getConntrackConfig(nodeConfig *daemonconfig.Node) (*kubeproxyconfig.KubeProxyConntrackConfiguration, error) {
ctConfig := &kubeproxyconfig.KubeProxyConntrackConfiguration{
MaxPerCore: utilpointer.Int32Ptr(0),
Min: utilpointer.Int32Ptr(0),
TCPEstablishedTimeout: &metav1.Duration{},
TCPCloseWaitTimeout: &metav1.Duration{},
}
if nodeConfig.AgentConfig.Rootless {
return ctConfig, nil
}
cmd := app2.NewProxyCommand()
if err := cmd.ParseFlags(daemonconfig.GetArgs(map[string]string{}, nodeConfig.AgentConfig.ExtraKubeProxyArgs)); err != nil {
return nil, err
}
maxPerCore, err := cmd.Flags().GetInt32("conntrack-max-per-core")
if err != nil {
return nil, err
}
ctConfig.MaxPerCore = &maxPerCore
min, err := cmd.Flags().GetInt32("conntrack-min")
if err != nil {
return nil, err
}
ctConfig.Min = &min
establishedTimeout, err := cmd.Flags().GetDuration("conntrack-tcp-timeout-established")
if err != nil {
return nil, err
}
ctConfig.TCPEstablishedTimeout.Duration = establishedTimeout
closeWaitTimeout, err := cmd.Flags().GetDuration("conntrack-tcp-timeout-close-wait")
if err != nil {
return nil, err
}
ctConfig.TCPCloseWaitTimeout.Duration = closeWaitTimeout
return ctConfig, nil
}
func coreClient(cfg string) (kubernetes.Interface, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", cfg)
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(restConfig)
}
// RunStandalone bootstraps the executor, but does not run the kubelet or containerd.
// This allows other bits of code that expect the executor to be set up properly to function
// even when the agent is disabled. It will only return in case of error or context
// cancellation.
func RunStandalone(ctx context.Context, cfg cmds.Agent) error {
proxy, err := createProxyAndValidateToken(ctx, &cfg)
if err != nil {
return err
}
nodeConfig := config.Get(ctx, cfg, proxy)
if err := executor.Bootstrap(ctx, nodeConfig, cfg); err != nil {
return err
}
if cfg.AgentReady != nil {
close(cfg.AgentReady)
}
if err := tunnelSetup(ctx, nodeConfig, cfg, proxy); err != nil {
return err
}
<-ctx.Done()
return ctx.Err()
}
// Run sets up cgroups, configures the LB proxy, and triggers startup
// of containerd and kubelet. It will only return in case of error or context
// cancellation.
func Run(ctx context.Context, cfg cmds.Agent) error {
if err := cgroups.Validate(); err != nil {
return err
}
if cfg.Rootless && !cfg.RootlessAlreadyUnshared {
if err := rootless.Rootless(cfg.DataDir); err != nil {
return err
}
}
proxy, err := createProxyAndValidateToken(ctx, &cfg)
if err != nil {
return err
}
return run(ctx, cfg, proxy)
}
func createProxyAndValidateToken(ctx context.Context, cfg *cmds.Agent) (proxy.Proxy, error) {
agentDir := filepath.Join(cfg.DataDir, "agent")
if err := os.MkdirAll(agentDir, 0700); err != nil {
return nil, err
}
_, isIPv6, _ := util.GetFirstString([]string{cfg.NodeIP.String()})
proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort, isIPv6)
5 years ago
if err != nil {
return nil, err
5 years ago
}
for {
newToken, err := clientaccess.ParseAndValidateTokenForUser(proxy.SupervisorURL(), cfg.Token, "node")
if err != nil {
logrus.Error(err)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(2 * time.Second):
}
continue
}
cfg.Token = newToken.String()
break
}
return proxy, nil
}
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error {
fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String()
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return err
}
defer watch.Stop()
for ev := range watch.ResultChan() {
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
}
updateNode := false
if labels, changed := updateMutableLabels(agentConfig, node.Labels); changed {
node.Labels = labels
updateNode = true
}
if !agentConfig.DisableCCM {
if annotations, changed := updateAddressAnnotations(agentConfig, node.Annotations); changed {
node.Annotations = annotations
updateNode = true
}
if labels, changed := updateLegacyAddressLabels(agentConfig, node.Labels); changed {
node.Labels = labels
updateNode = true
}
}
// inject node config
if changed, err := nodeconfig.SetNodeConfigAnnotations(node); err != nil {
return err
} else if changed {
updateNode = true
}
if updateNode {
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
continue
}
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
} else {
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName)
}
break
}
return nil
}
func updateMutableLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
result := map[string]string{}
for _, m := range agentConfig.NodeLabels {
var (
v string
p = strings.SplitN(m, `=`, 2)
k = p[0]
)
if len(p) > 1 {
v = p[1]
}
result[k] = v
}
result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
}
func updateLegacyAddressLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]string) (map[string]string, bool) {
ls := labels.Set(nodeLabels)
if ls.Has(cp.InternalIPKey) || ls.Has(cp.HostnameKey) {
result := map[string]string{
cp.InternalIPKey: agentConfig.NodeIP,
cp.HostnameKey: agentConfig.NodeName,
}
if agentConfig.NodeExternalIP != "" {
result[cp.ExternalIPKey] = agentConfig.NodeExternalIP
}
result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result)
}
return nil, false
}
func updateAddressAnnotations(agentConfig *daemonconfig.Agent, nodeAnnotations map[string]string) (map[string]string, bool) {
result := map[string]string{
cp.InternalIPKey: util.JoinIPs(agentConfig.NodeIPs),
cp.HostnameKey: agentConfig.NodeName,
}
if agentConfig.NodeExternalIP != "" {
result[cp.ExternalIPKey] = util.JoinIPs(agentConfig.NodeExternalIPs)
}
result = labels.Merge(nodeAnnotations, result)
return result, !equality.Semantic.DeepEqual(nodeAnnotations, result)
}
// setupTunnelAndRunAgent should start the setup tunnel before starting kubelet and kubeproxy
// there are special case for etcd agents, it will wait until it can find the apiaddress from
// the address channel and update the proxy with the servers addresses, if in rke2 we need to
// start the agent before the tunnel is setup to allow kubelet to start first and start the pods
func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
var agentRan bool
// IsAPIServerLBEnabled is used as a shortcut for detecting RKE2, where the kubelet needs to
// be run earlier in order to manage static pods. This should probably instead query a
// flag on the executor or something.
if !cfg.ClusterReset && cfg.ETCDAgent {
// ETCDAgent is only set to true on servers that are started with --disable-apiserver.
// In this case, we may be running without an apiserver available in the cluster, and need
// to wait for one to register and post it's address into APIAddressCh so that we can update
// the LB proxy with its address.
if proxy.IsAPIServerLBEnabled() {
// On RKE2, the agent needs to be started early to run the etcd static pod.
if err := agent.Agent(ctx, nodeConfig, proxy); err != nil {
return err
}
agentRan = true
}
if err := waitForAPIServerAddresses(ctx, nodeConfig, cfg, proxy); err != nil {
return err
}
} else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() {
// If we're doing a cluster-reset on RKE2, the kubelet needs to be started early to clean
// up static pods.
if err := agent.Agent(ctx, nodeConfig, proxy); err != nil {
return err
}
agentRan = true
}
if err := tunnelSetup(ctx, nodeConfig, cfg, proxy); err != nil {
return err
}
if !agentRan {
return agent.Agent(ctx, nodeConfig, proxy)
}
return nil
}
func waitForAPIServerAddresses(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
for {
select {
case <-time.After(5 * time.Second):
logrus.Info("Waiting for apiserver addresses")
case addresses := <-cfg.APIAddressCh:
for i, a := range addresses {
host, _, err := net.SplitHostPort(a)
if err == nil {
addresses[i] = net.JoinHostPort(host, strconv.Itoa(nodeConfig.ServerHTTPSPort))
if i == 0 {
proxy.SetSupervisorDefault(addresses[i])
}
}
}
proxy.Update(addresses)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
// tunnelSetup calls tunnel setup, unless the embedded etc cluster is being reset/restored, in which case
// this is unnecessary as the kubelet is only needed to manage static pods and does not need to establish
// tunneled connections to other cluster members.
func tunnelSetup(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
if cfg.ClusterReset {
return nil
}
return tunnel.Setup(ctx, nodeConfig, proxy)
}