k3s/pkg/server/server.go

646 lines
19 KiB
Go

package server
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
helmchart "github.com/k3s-io/helm-controller/pkg/controllers/chart"
helmcommon "github.com/k3s-io/helm-controller/pkg/controllers/common"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control"
"github.com/k3s-io/k3s/pkg/datadir"
"github.com/k3s-io/k3s/pkg/deploy"
"github.com/k3s-io/k3s/pkg/node"
"github.com/k3s-io/k3s/pkg/nodepassword"
"github.com/k3s-io/k3s/pkg/rootlessports"
"github.com/k3s-io/k3s/pkg/secretsencrypt"
"github.com/k3s-io/k3s/pkg/static"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/apply"
v1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/leader"
"github.com/rancher/wrangler/pkg/resolvehome"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func ResolveDataDir(dataDir string) (string, error) {
dataDir, err := datadir.Resolve(dataDir)
return filepath.Join(dataDir, "server"), err
}
func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
if err := setupDataDirAndChdir(&config.ControlConfig); err != nil {
return err
}
if err := setNoProxyEnv(&config.ControlConfig); err != nil {
return err
}
if err := control.Server(ctx, &config.ControlConfig); err != nil {
return errors.Wrap(err, "starting kubernetes")
}
wg := &sync.WaitGroup{}
wg.Add(len(config.StartupHooks))
config.ControlConfig.Runtime.Handler = router(ctx, config, cfg)
config.ControlConfig.Runtime.StartupHooksWg = wg
shArgs := cmds.StartupHookArgs{
APIServerReady: config.ControlConfig.Runtime.APIServerReady,
KubeConfigSupervisor: config.ControlConfig.Runtime.KubeConfigSupervisor,
Skips: config.ControlConfig.Skips,
Disables: config.ControlConfig.Disables,
}
for _, hook := range config.StartupHooks {
if err := hook(ctx, wg, shArgs); err != nil {
return errors.Wrap(err, "startup hook")
}
}
go startOnAPIServerReady(ctx, config)
if err := printTokens(&config.ControlConfig); err != nil {
return err
}
return writeKubeConfig(config.ControlConfig.Runtime.ServerCA, config)
}
func startOnAPIServerReady(ctx context.Context, config *Config) {
select {
case <-ctx.Done():
return
case <-config.ControlConfig.Runtime.APIServerReady:
if err := runControllers(ctx, config); err != nil {
logrus.Fatalf("failed to start controllers: %v", err)
}
}
}
func runControllers(ctx context.Context, config *Config) error {
controlConfig := &config.ControlConfig
sc, err := NewContext(ctx, config, true)
if err != nil {
return errors.Wrap(err, "failed to create new server context")
}
controlConfig.Runtime.StartupHooksWg.Wait()
if err := stageFiles(ctx, sc, controlConfig); err != nil {
return errors.Wrap(err, "failed to stage files")
}
// run migration before we set controlConfig.Runtime.Core
if err := nodepassword.MigrateFile(
sc.Core.Core().V1().Secret(),
sc.Core.Core().V1().Node(),
controlConfig.Runtime.NodePasswdFile); err != nil {
logrus.Warn(errors.Wrap(err, "error migrating node-password file"))
}
controlConfig.Runtime.K3s = sc.K3s
controlConfig.Runtime.Event = sc.Event
controlConfig.Runtime.Core = sc.Core
for name, cb := range controlConfig.Runtime.ClusterControllerStarts {
go runOrDie(ctx, name, cb)
}
for _, controller := range config.Controllers {
if err := controller(ctx, sc); err != nil {
return errors.Wrapf(err, "failed to start %s controller", util.GetFunctionName(controller))
}
}
if err := sc.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start wranger controllers")
}
if !controlConfig.DisableAPIServer {
controlConfig.Runtime.LeaderElectedClusterControllerStarts[version.Program] = func(ctx context.Context) {
apiserverControllers(ctx, sc, config)
}
}
go setNodeLabelsAndAnnotations(ctx, sc.Core.Core().V1().Node(), config)
go setClusterDNSConfig(ctx, config, sc.Core.Core().V1().ConfigMap())
if controlConfig.NoLeaderElect {
for name, cb := range controlConfig.Runtime.LeaderElectedClusterControllerStarts {
go runOrDie(ctx, name, cb)
}
} else {
for name, cb := range controlConfig.Runtime.LeaderElectedClusterControllerStarts {
go leader.RunOrDie(ctx, "", name, sc.K8s, cb)
}
}
return nil
}
// apiServerControllers starts the core controllers, as well as the leader-elected controllers
// that should only run on a control-plane node.
func apiserverControllers(ctx context.Context, sc *Context, config *Config) {
if err := coreControllers(ctx, sc, config); err != nil {
panic(err)
}
for _, controller := range config.LeaderControllers {
if err := controller(ctx, sc); err != nil {
panic(errors.Wrapf(err, "failed to start %s leader controller", util.GetFunctionName(controller)))
}
}
// Re-run context startup after core and leader-elected controllers have started. Additional
// informer caches may need to start for the newly added OnChange callbacks.
if err := sc.Start(ctx); err != nil {
panic(errors.Wrap(err, "failed to start wranger controllers"))
}
}
// runOrDie is similar to leader.RunOrDie, except that it runs the callback
// immediately, without performing leader election.
func runOrDie(ctx context.Context, name string, cb leader.Callback) {
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("%s controller panic: %v", name, err)
}
}()
cb(ctx)
<-ctx.Done()
}
// coreControllers starts the following controllers, if they are enabled:
// * Node controller (manages nodes passwords and coredns hosts file)
// * Helm controller
// * Secrets encryption
// * Rootless ports
// These controllers should only be run on nodes with a local apiserver
func coreControllers(ctx context.Context, sc *Context, config *Config) error {
if err := node.Register(ctx,
!config.ControlConfig.Skips["coredns"],
sc.Core.Core().V1().Secret(),
sc.Core.Core().V1().ConfigMap(),
sc.Core.Core().V1().Node()); err != nil {
return err
}
// apply SystemDefaultRegistry setting to Helm before starting controllers
if config.ControlConfig.HelmJobImage != "" {
helmchart.DefaultJobImage = config.ControlConfig.HelmJobImage
} else if config.ControlConfig.SystemDefaultRegistry != "" {
helmchart.DefaultJobImage = config.ControlConfig.SystemDefaultRegistry + "/" + helmchart.DefaultJobImage
}
if !config.ControlConfig.DisableHelmController {
restConfig, err := clientcmd.BuildConfigFromFlags("", config.ControlConfig.Runtime.KubeConfigSupervisor)
if err != nil {
return err
}
restConfig.UserAgent = util.GetUserAgent(helmcommon.Name)
k8s, err := clientset.NewForConfig(restConfig)
if err != nil {
return err
}
apply := apply.New(k8s, apply.NewClientFactory(restConfig)).WithDynamicLookup()
helm := sc.Helm.WithAgent(restConfig.UserAgent)
batch := sc.Batch.WithAgent(restConfig.UserAgent)
auth := sc.Auth.WithAgent(restConfig.UserAgent)
core := sc.Core.WithAgent(restConfig.UserAgent)
helmchart.Register(ctx,
metav1.NamespaceAll,
helmcommon.Name,
"cluster-admin",
strconv.Itoa(config.ControlConfig.APIServerPort),
k8s,
apply,
util.BuildControllerEventRecorder(k8s, helmcommon.Name, metav1.NamespaceAll),
helm.V1().HelmChart(),
helm.V1().HelmChart().Cache(),
helm.V1().HelmChartConfig(),
helm.V1().HelmChartConfig().Cache(),
batch.V1().Job(),
batch.V1().Job().Cache(),
auth.V1().ClusterRoleBinding(),
core.V1().ServiceAccount(),
core.V1().ConfigMap(),
core.V1().Secret())
}
if config.ControlConfig.Rootless {
return rootlessports.Register(ctx,
sc.Core.Core().V1().Service(),
!config.ControlConfig.DisableServiceLB,
config.ControlConfig.HTTPSPort)
}
return nil
}
func stageFiles(ctx context.Context, sc *Context, controlConfig *config.Control) error {
if controlConfig.DisableAPIServer {
return nil
}
dataDir := filepath.Join(controlConfig.DataDir, "static")
if err := static.Stage(dataDir); err != nil {
return err
}
dataDir = filepath.Join(controlConfig.DataDir, "manifests")
dnsIPFamilyPolicy := "SingleStack"
if len(controlConfig.ClusterDNSs) > 1 {
dnsIPFamilyPolicy = "RequireDualStack"
}
templateVars := map[string]string{
"%{CLUSTER_DNS}%": controlConfig.ClusterDNS.String(),
"%{CLUSTER_DNS_LIST}%": fmt.Sprintf("[%s]", util.JoinIPs(controlConfig.ClusterDNSs)),
"%{CLUSTER_DNS_IPFAMILYPOLICY}%": dnsIPFamilyPolicy,
"%{CLUSTER_DOMAIN}%": controlConfig.ClusterDomain,
"%{DEFAULT_LOCAL_STORAGE_PATH}%": controlConfig.DefaultLocalStoragePath,
"%{SYSTEM_DEFAULT_REGISTRY}%": registryTemplate(controlConfig.SystemDefaultRegistry),
"%{SYSTEM_DEFAULT_REGISTRY_RAW}%": controlConfig.SystemDefaultRegistry,
"%{PREFERRED_ADDRESS_TYPES}%": addrTypesPrioTemplate(controlConfig.FlannelExternalIP),
}
skip := controlConfig.Skips
if !skip["traefik"] && isHelmChartTraefikV1(sc) {
logrus.Warn("Skipping Traefik v2 deployment due to existing Traefik v1 installation")
skip["traefik"] = true
}
if err := deploy.Stage(dataDir, templateVars, skip); err != nil {
return err
}
restConfig, err := clientcmd.BuildConfigFromFlags("", controlConfig.Runtime.KubeConfigSupervisor)
if err != nil {
return err
}
restConfig.UserAgent = util.GetUserAgent("deploy")
k8s, err := clientset.NewForConfig(restConfig)
if err != nil {
return err
}
apply := apply.New(k8s, apply.NewClientFactory(restConfig)).WithDynamicLookup()
k3s := sc.K3s.WithAgent(restConfig.UserAgent)
return deploy.WatchFiles(ctx,
k8s,
apply,
k3s.V1().Addon(),
controlConfig.Disables,
dataDir)
}
// registryTemplate behaves like the system_default_registry template in Rancher helm charts,
// and returns the registry value with a trailing forward slash if the registry string is not empty.
// If it is empty, it is passed through as a no-op.
func registryTemplate(registry string) string {
if registry == "" {
return registry
}
return registry + "/"
}
// addressTypesTemplate prioritizes ExternalIP addresses if we are in the multi-cloud env where
// cluster traffic flows over the external IPs only
func addrTypesPrioTemplate(flannelExternal bool) string {
if flannelExternal {
return "ExternalIP,InternalIP,Hostname"
}
return "InternalIP,ExternalIP,Hostname"
}
// isHelmChartTraefikV1 checks for an existing HelmChart resource with spec.chart containing traefik-1,
// as deployed by the legacy chart (https://%{KUBERNETES_API}%/static/charts/traefik-1.81.0.tgz)
func isHelmChartTraefikV1(sc *Context) bool {
prefix := "traefik-1."
helmChart, err := sc.Helm.Helm().V1().HelmChart().Get(metav1.NamespaceSystem, "traefik", metav1.GetOptions{})
if err != nil {
logrus.WithError(err).Info("Failed to get existing traefik HelmChart")
return false
}
chart := path.Base(helmChart.Spec.Chart)
if strings.HasPrefix(chart, prefix) {
logrus.WithField("chart", chart).Info("Found existing traefik v1 HelmChart")
return true
}
return false
}
func HomeKubeConfig(write, rootless bool) (string, error) {
if write {
if os.Getuid() == 0 && !rootless {
return datadir.GlobalConfig, nil
}
return resolvehome.Resolve(datadir.HomeConfig)
}
if _, err := os.Stat(datadir.GlobalConfig); err == nil {
return datadir.GlobalConfig, nil
}
return resolvehome.Resolve(datadir.HomeConfig)
}
func printTokens(config *config.Control) error {
var serverTokenFile string
if config.Runtime.ServerToken != "" {
serverTokenFile = filepath.Join(config.DataDir, "token")
if err := writeToken(config.Runtime.ServerToken, serverTokenFile, config.Runtime.ServerCA); err != nil {
return err
}
// backwards compatibility
np := filepath.Join(config.DataDir, "node-token")
if !isSymlink(np) {
if err := os.RemoveAll(np); err != nil {
return err
}
if err := os.Symlink(serverTokenFile, np); err != nil {
return err
}
}
logrus.Infof("Server node token is available at %s", serverTokenFile)
printToken(config.SupervisorPort, config.BindAddressOrLoopback(true, true), "To join server node to cluster:", "server", "SERVER_NODE_TOKEN")
}
var agentTokenFile string
if config.Runtime.AgentToken != "" {
if config.AgentToken != "" {
agentTokenFile = filepath.Join(config.DataDir, "agent-token")
if isSymlink(agentTokenFile) {
if err := os.RemoveAll(agentTokenFile); err != nil {
return err
}
}
if err := writeToken(config.Runtime.AgentToken, agentTokenFile, config.Runtime.ServerCA); err != nil {
return err
}
} else if serverTokenFile != "" {
agentTokenFile = filepath.Join(config.DataDir, "agent-token")
if !isSymlink(agentTokenFile) {
if err := os.RemoveAll(agentTokenFile); err != nil {
return err
}
if err := os.Symlink(serverTokenFile, agentTokenFile); err != nil {
return err
}
}
}
}
if agentTokenFile != "" {
logrus.Infof("Agent node token is available at %s", agentTokenFile)
printToken(config.SupervisorPort, config.BindAddressOrLoopback(true, true), "To join agent node to cluster:", "agent", "AGENT_NODE_TOKEN")
}
return nil
}
func writeKubeConfig(certs string, config *Config) error {
ip := config.ControlConfig.BindAddressOrLoopback(false, true)
port := config.ControlConfig.HTTPSPort
// on servers without a local apiserver, tunnel access via the loadbalancer
if config.ControlConfig.DisableAPIServer {
ip = config.ControlConfig.Loopback(true)
port = config.ControlConfig.APIServerPort
}
url := fmt.Sprintf("https://%s:%d", ip, port)
kubeConfig, err := HomeKubeConfig(true, config.ControlConfig.Rootless)
def := true
if err != nil {
kubeConfig = filepath.Join(config.ControlConfig.DataDir, "kubeconfig-"+version.Program+".yaml")
def = false
}
kubeConfigSymlink := kubeConfig
if config.ControlConfig.KubeConfigOutput != "" {
kubeConfig = config.ControlConfig.KubeConfigOutput
}
if isSymlink(kubeConfigSymlink) {
if err := os.Remove(kubeConfigSymlink); err != nil {
logrus.Errorf("Failed to remove kubeconfig symlink")
}
}
if err = clientaccess.WriteClientKubeConfig(kubeConfig, url, config.ControlConfig.Runtime.ServerCA, config.ControlConfig.Runtime.ClientAdminCert,
config.ControlConfig.Runtime.ClientAdminKey); err == nil {
logrus.Infof("Wrote kubeconfig %s", kubeConfig)
} else {
logrus.Errorf("Failed to generate kubeconfig: %v", err)
return err
}
if config.ControlConfig.KubeConfigMode != "" {
mode, err := strconv.ParseInt(config.ControlConfig.KubeConfigMode, 8, 0)
if err == nil {
util.SetFileModeForPath(kubeConfig, os.FileMode(mode))
} else {
logrus.Errorf("Failed to set %s to mode %s: %v", kubeConfig, os.FileMode(mode), err)
}
} else {
util.SetFileModeForPath(kubeConfig, os.FileMode(0600))
}
if kubeConfigSymlink != kubeConfig {
if err := writeConfigSymlink(kubeConfig, kubeConfigSymlink); err != nil {
logrus.Errorf("Failed to write kubeconfig symlink: %v", err)
}
}
if def {
logrus.Infof("Run: %s kubectl", filepath.Base(os.Args[0]))
}
return nil
}
func setupDataDirAndChdir(config *config.Control) error {
var (
err error
)
config.DataDir, err = ResolveDataDir(config.DataDir)
if err != nil {
return err
}
dataDir := config.DataDir
if err := os.MkdirAll(dataDir, 0700); err != nil {
return errors.Wrapf(err, "can not mkdir %s", dataDir)
}
if err := os.Chdir(dataDir); err != nil {
return errors.Wrapf(err, "can not chdir %s", dataDir)
}
return nil
}
func printToken(httpsPort int, advertiseIP, prefix, cmd, varName string) {
logrus.Infof("%s %s %s -s https://%s:%d -t ${%s}", prefix, version.Program, cmd, advertiseIP, httpsPort, varName)
}
func writeToken(token, file, certs string) error {
if len(token) == 0 {
return nil
}
token, err := clientaccess.FormatToken(token, certs)
if err != nil {
return err
}
return os.WriteFile(file, []byte(token+"\n"), 0600)
}
func setNoProxyEnv(config *config.Control) error {
splitter := func(c rune) bool {
return c == ','
}
envList := []string{}
envList = append(envList, strings.FieldsFunc(os.Getenv("NO_PROXY"), splitter)...)
envList = append(envList, strings.FieldsFunc(os.Getenv("no_proxy"), splitter)...)
envList = append(envList,
".svc",
"."+config.ClusterDomain,
util.JoinIPNets(config.ClusterIPRanges),
util.JoinIPNets(config.ServiceIPRanges),
)
os.Unsetenv("no_proxy")
return os.Setenv("NO_PROXY", strings.Join(envList, ","))
}
func writeConfigSymlink(kubeconfig, kubeconfigSymlink string) error {
if err := os.Remove(kubeconfigSymlink); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove %s file: %v", kubeconfigSymlink, err)
}
if err := os.MkdirAll(filepath.Dir(kubeconfigSymlink), 0755); err != nil {
return fmt.Errorf("failed to create path for symlink: %v", err)
}
if err := os.Symlink(kubeconfig, kubeconfigSymlink); err != nil {
return fmt.Errorf("failed to create symlink: %v", err)
}
return nil
}
func isSymlink(config string) bool {
if fi, err := os.Lstat(config); err == nil && (fi.Mode()&os.ModeSymlink == os.ModeSymlink) {
return true
}
return false
}
func setNodeLabelsAndAnnotations(ctx context.Context, nodes v1.NodeClient, config *Config) error {
if config.DisableAgent || config.ControlConfig.DisableAPIServer {
return nil
}
for {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Info("Waiting for control-plane node agent startup")
time.Sleep(1 * time.Second)
continue
}
node, err := nodes.Get(nodeName, metav1.GetOptions{})
if err != nil {
logrus.Infof("Waiting for control-plane node %s startup: %v", nodeName, err)
time.Sleep(1 * time.Second)
continue
}
if node.Labels == nil {
node.Labels = make(map[string]string)
}
v, ok := node.Labels[util.ControlPlaneRoleLabelKey]
if !ok || v != "true" {
node.Labels[util.ControlPlaneRoleLabelKey] = "true"
node.Labels[util.MasterRoleLabelKey] = "true"
}
if config.ControlConfig.EncryptSecrets {
if err = secretsencrypt.BootstrapEncryptionHashAnnotation(node, config.ControlConfig.Runtime); err != nil {
logrus.Infof("Unable to set encryption hash annotation %s", err.Error())
break
}
}
_, err = nodes.Update(node)
if err == nil {
logrus.Infof("Labels and annotations have been set successfully on node: %s", nodeName)
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
}
return nil
}
func setClusterDNSConfig(ctx context.Context, config *Config, configMap v1.ConfigMapClient) error {
if config.ControlConfig.DisableAPIServer {
return nil
}
// check if configmap already exists
_, err := configMap.Get("kube-system", "cluster-dns", metav1.GetOptions{})
if err == nil {
logrus.Infof("Cluster dns configmap already exists")
return nil
}
clusterDNS := config.ControlConfig.ClusterDNS
clusterDomain := config.ControlConfig.ClusterDomain
c := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-dns",
Namespace: "kube-system",
},
Data: map[string]string{
"clusterDNS": clusterDNS.String(),
"clusterDomain": clusterDomain,
},
}
for {
_, err = configMap.Create(c)
if err == nil {
logrus.Infof("Cluster dns configmap has been set successfully")
break
}
logrus.Infof("Waiting for control-plane dns startup: %v", err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
}
return nil
}