k3s/pkg/daemons/control/server.go

492 lines
17 KiB
Go

package control
import (
"context"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/cluster"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/control/deps"
"github.com/rancher/k3s/pkg/daemons/executor"
util2 "github.com/rancher/k3s/pkg/util"
"github.com/rancher/k3s/pkg/version"
"github.com/rancher/wrangler-api/pkg/generated/controllers/rbac"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
ccm "k8s.io/cloud-provider"
cloudprovider "k8s.io/cloud-provider"
ccmapp "k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
ccmopt "k8s.io/cloud-provider/options"
cliflag "k8s.io/component-base/cli/flag"
app2 "k8s.io/controller-manager/app"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/proxy/util"
// registering k3s cloud provider
_ "github.com/rancher/k3s/pkg/cloudprovider"
// for client metric registration
_ "k8s.io/component-base/metrics/prometheus/restclient"
)
var localhostIP = net.ParseIP("127.0.0.1")
func Server(ctx context.Context, cfg *config.Control) error {
rand.Seed(time.Now().UTC().UnixNano())
runtime := &config.ControlRuntime{}
cfg.Runtime = runtime
if err := prepare(ctx, cfg, runtime); err != nil {
return errors.Wrap(err, "preparing server")
}
cfg.Runtime.Tunnel = setupTunnel()
util.DisableProxyHostnameCheck = true
var auth authenticator.Request
var handler http.Handler
var err error
if !cfg.DisableAPIServer {
auth, handler, err = apiServer(ctx, cfg, runtime)
if err != nil {
return err
}
if err := waitForAPIServerInBackground(ctx, runtime); err != nil {
return err
}
}
basicAuth, err := basicAuthenticator(runtime.PasswdFile)
if err != nil {
return err
}
runtime.Authenticator = combineAuthenticators(basicAuth, auth)
runtime.Handler = handler
if !cfg.DisableScheduler {
if err := scheduler(cfg, runtime); err != nil {
return err
}
}
if !cfg.DisableControllerManager {
if err := controllerManager(cfg, runtime); err != nil {
return err
}
}
if !cfg.DisableCCM {
cloudControllerManager(ctx, cfg, runtime)
}
return nil
}
func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigController,
"service-account-private-key-file": runtime.ServiceKey,
"allocate-node-cidrs": "true",
"cluster-cidr": util2.JoinIPNets(cfg.ClusterIPRanges),
"root-ca-file": runtime.ServerCA,
"port": "10252",
"profiling": "false",
"address": localhostIP.String(),
"bind-address": localhostIP.String(),
"secure-port": "0",
"use-service-account-credentials": "true",
"cluster-signing-kube-apiserver-client-cert-file": runtime.ClientCA,
"cluster-signing-kube-apiserver-client-key-file": runtime.ClientCAKey,
"cluster-signing-kubelet-client-cert-file": runtime.ClientCA,
"cluster-signing-kubelet-client-key-file": runtime.ClientCAKey,
"cluster-signing-kubelet-serving-cert-file": runtime.ServerCA,
"cluster-signing-kubelet-serving-key-file": runtime.ServerCAKey,
"cluster-signing-legacy-unknown-cert-file": runtime.ClientCA,
"cluster-signing-legacy-unknown-key-file": runtime.ClientCAKey,
}
if cfg.NoLeaderElect {
argsMap["leader-elect"] = "false"
}
if !cfg.DisableCCM {
argsMap["configure-cloud-routes"] = "false"
argsMap["controllers"] = "*,-service,-route,-cloud-node-lifecycle"
}
args := config.GetArgsList(argsMap, cfg.ExtraControllerArgs)
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
return executor.ControllerManager(runtime.APIServerReady, args)
}
func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigScheduler,
"port": "10251",
"address": "127.0.0.1",
"bind-address": "127.0.0.1",
"secure-port": "0",
"profiling": "false",
}
if cfg.NoLeaderElect {
argsMap["leader-elect"] = "false"
}
args := config.GetArgsList(argsMap, cfg.ExtraSchedulerAPIArgs)
logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
return executor.Scheduler(runtime.APIServerReady, args)
}
func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) {
argsMap := make(map[string]string)
setupStorageBackend(argsMap, cfg)
certDir := filepath.Join(cfg.DataDir, "tls", "temporary-certs")
os.MkdirAll(certDir, 0700)
argsMap["cert-dir"] = certDir
argsMap["allow-privileged"] = "true"
argsMap["authorization-mode"] = strings.Join([]string{modes.ModeNode, modes.ModeRBAC}, ",")
argsMap["service-account-signing-key-file"] = runtime.ServiceKey
argsMap["service-cluster-ip-range"] = util2.JoinIPNets(cfg.ServiceIPRanges)
argsMap["service-node-port-range"] = cfg.ServiceNodePortRange.String()
argsMap["advertise-port"] = strconv.Itoa(cfg.AdvertisePort)
if cfg.AdvertiseIP != "" {
argsMap["advertise-address"] = cfg.AdvertiseIP
}
argsMap["insecure-port"] = "0"
argsMap["secure-port"] = strconv.Itoa(cfg.APIServerPort)
if cfg.APIServerBindAddress == "" {
argsMap["bind-address"] = localhostIP.String()
} else {
argsMap["bind-address"] = cfg.APIServerBindAddress
}
argsMap["tls-cert-file"] = runtime.ServingKubeAPICert
argsMap["tls-private-key-file"] = runtime.ServingKubeAPIKey
argsMap["service-account-key-file"] = runtime.ServiceKey
argsMap["service-account-issuer"] = "https://kubernetes.default.svc." + cfg.ClusterDomain
argsMap["api-audiences"] = "https://kubernetes.default.svc." + cfg.ClusterDomain + "," + version.Program
argsMap["kubelet-certificate-authority"] = runtime.ServerCA
argsMap["kubelet-client-certificate"] = runtime.ClientKubeAPICert
argsMap["kubelet-client-key"] = runtime.ClientKubeAPIKey
argsMap["requestheader-client-ca-file"] = runtime.RequestHeaderCA
argsMap["requestheader-allowed-names"] = deps.RequestHeaderCN
argsMap["proxy-client-cert-file"] = runtime.ClientAuthProxyCert
argsMap["proxy-client-key-file"] = runtime.ClientAuthProxyKey
argsMap["requestheader-extra-headers-prefix"] = "X-Remote-Extra-"
argsMap["requestheader-group-headers"] = "X-Remote-Group"
argsMap["requestheader-username-headers"] = "X-Remote-User"
argsMap["client-ca-file"] = runtime.ClientCA
argsMap["enable-admission-plugins"] = "NodeRestriction"
argsMap["anonymous-auth"] = "false"
argsMap["profiling"] = "false"
if cfg.EncryptSecrets {
argsMap["encryption-provider-config"] = runtime.EncryptionConfig
}
args := config.GetArgsList(argsMap, cfg.ExtraAPIArgs)
logrus.Infof("Running kube-apiserver %s", config.ArgString(args))
return executor.APIServer(ctx, runtime.ETCDReady, args)
}
func defaults(config *config.Control) {
if config.ClusterIPRange == nil {
_, clusterIPNet, _ := net.ParseCIDR("10.42.0.0/16")
config.ClusterIPRange = clusterIPNet
}
if config.ServiceIPRange == nil {
_, serviceIPNet, _ := net.ParseCIDR("10.43.0.0/16")
config.ServiceIPRange = serviceIPNet
}
if len(config.ClusterDNS) == 0 {
config.ClusterDNS = net.ParseIP("10.43.0.10")
}
if config.AdvertisePort == 0 {
config.AdvertisePort = config.HTTPSPort
}
if config.APIServerPort == 0 {
if config.HTTPSPort != 0 {
config.APIServerPort = config.HTTPSPort + 1
} else {
config.APIServerPort = 6444
}
}
if config.DataDir == "" {
config.DataDir = "./management-state"
}
}
func prepare(ctx context.Context, config *config.Control, runtime *config.ControlRuntime) error {
var err error
defaults(config)
if err := os.MkdirAll(config.DataDir, 0700); err != nil {
return err
}
config.DataDir, err = filepath.Abs(config.DataDir)
if err != nil {
return err
}
os.MkdirAll(filepath.Join(config.DataDir, "tls"), 0700)
os.MkdirAll(filepath.Join(config.DataDir, "cred"), 0700)
runtime.ClientCA = filepath.Join(config.DataDir, "tls", "client-ca.crt")
runtime.ClientCAKey = filepath.Join(config.DataDir, "tls", "client-ca.key")
runtime.ServerCA = filepath.Join(config.DataDir, "tls", "server-ca.crt")
runtime.ServerCAKey = filepath.Join(config.DataDir, "tls", "server-ca.key")
runtime.RequestHeaderCA = filepath.Join(config.DataDir, "tls", "request-header-ca.crt")
runtime.RequestHeaderCAKey = filepath.Join(config.DataDir, "tls", "request-header-ca.key")
runtime.IPSECKey = filepath.Join(config.DataDir, "cred", "ipsec.psk")
runtime.ServiceKey = filepath.Join(config.DataDir, "tls", "service.key")
runtime.PasswdFile = filepath.Join(config.DataDir, "cred", "passwd")
runtime.NodePasswdFile = filepath.Join(config.DataDir, "cred", "node-passwd")
runtime.KubeConfigAdmin = filepath.Join(config.DataDir, "cred", "admin.kubeconfig")
runtime.KubeConfigController = filepath.Join(config.DataDir, "cred", "controller.kubeconfig")
runtime.KubeConfigScheduler = filepath.Join(config.DataDir, "cred", "scheduler.kubeconfig")
runtime.KubeConfigAPIServer = filepath.Join(config.DataDir, "cred", "api-server.kubeconfig")
runtime.KubeConfigCloudController = filepath.Join(config.DataDir, "cred", "cloud-controller.kubeconfig")
runtime.ClientAdminCert = filepath.Join(config.DataDir, "tls", "client-admin.crt")
runtime.ClientAdminKey = filepath.Join(config.DataDir, "tls", "client-admin.key")
runtime.ClientControllerCert = filepath.Join(config.DataDir, "tls", "client-controller.crt")
runtime.ClientControllerKey = filepath.Join(config.DataDir, "tls", "client-controller.key")
runtime.ClientCloudControllerCert = filepath.Join(config.DataDir, "tls", "client-cloud-controller.crt")
runtime.ClientCloudControllerKey = filepath.Join(config.DataDir, "tls", "client-cloud-controller.key")
runtime.ClientSchedulerCert = filepath.Join(config.DataDir, "tls", "client-scheduler.crt")
runtime.ClientSchedulerKey = filepath.Join(config.DataDir, "tls", "client-scheduler.key")
runtime.ClientKubeAPICert = filepath.Join(config.DataDir, "tls", "client-kube-apiserver.crt")
runtime.ClientKubeAPIKey = filepath.Join(config.DataDir, "tls", "client-kube-apiserver.key")
runtime.ClientKubeProxyCert = filepath.Join(config.DataDir, "tls", "client-kube-proxy.crt")
runtime.ClientKubeProxyKey = filepath.Join(config.DataDir, "tls", "client-kube-proxy.key")
runtime.ClientK3sControllerCert = filepath.Join(config.DataDir, "tls", "client-"+version.Program+"-controller.crt")
runtime.ClientK3sControllerKey = filepath.Join(config.DataDir, "tls", "client-"+version.Program+"-controller.key")
runtime.ServingKubeAPICert = filepath.Join(config.DataDir, "tls", "serving-kube-apiserver.crt")
runtime.ServingKubeAPIKey = filepath.Join(config.DataDir, "tls", "serving-kube-apiserver.key")
runtime.ClientKubeletKey = filepath.Join(config.DataDir, "tls", "client-kubelet.key")
runtime.ServingKubeletKey = filepath.Join(config.DataDir, "tls", "serving-kubelet.key")
runtime.ClientAuthProxyCert = filepath.Join(config.DataDir, "tls", "client-auth-proxy.crt")
runtime.ClientAuthProxyKey = filepath.Join(config.DataDir, "tls", "client-auth-proxy.key")
runtime.ETCDServerCA = filepath.Join(config.DataDir, "tls", "etcd", "server-ca.crt")
runtime.ETCDServerCAKey = filepath.Join(config.DataDir, "tls", "etcd", "server-ca.key")
runtime.ETCDPeerCA = filepath.Join(config.DataDir, "tls", "etcd", "peer-ca.crt")
runtime.ETCDPeerCAKey = filepath.Join(config.DataDir, "tls", "etcd", "peer-ca.key")
runtime.ServerETCDCert = filepath.Join(config.DataDir, "tls", "etcd", "server-client.crt")
runtime.ServerETCDKey = filepath.Join(config.DataDir, "tls", "etcd", "server-client.key")
runtime.PeerServerClientETCDCert = filepath.Join(config.DataDir, "tls", "etcd", "peer-server-client.crt")
runtime.PeerServerClientETCDKey = filepath.Join(config.DataDir, "tls", "etcd", "peer-server-client.key")
runtime.ClientETCDCert = filepath.Join(config.DataDir, "tls", "etcd", "client.crt")
runtime.ClientETCDKey = filepath.Join(config.DataDir, "tls", "etcd", "client.key")
if config.EncryptSecrets {
runtime.EncryptionConfig = filepath.Join(config.DataDir, "cred", "encryption-config.json")
}
cluster := cluster.New(config)
if err := cluster.Bootstrap(ctx); err != nil {
return err
}
if err := deps.GenServerDeps(config, runtime); err != nil {
return err
}
ready, err := cluster.Start(ctx)
if err != nil {
return err
}
runtime.ETCDReady = ready
return nil
}
func setupStorageBackend(argsMap map[string]string, cfg *config.Control) {
argsMap["storage-backend"] = "etcd3"
// specify the endpoints
if len(cfg.Datastore.Endpoint) > 0 {
argsMap["etcd-servers"] = cfg.Datastore.Endpoint
}
// storage backend tls configuration
if len(cfg.Datastore.CAFile) > 0 {
argsMap["etcd-cafile"] = cfg.Datastore.CAFile
}
if len(cfg.Datastore.CertFile) > 0 {
argsMap["etcd-certfile"] = cfg.Datastore.CertFile
}
if len(cfg.Datastore.KeyFile) > 0 {
argsMap["etcd-keyfile"] = cfg.Datastore.KeyFile
}
}
func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) {
// Reference: https://github.com/kubernetes/kubernetes/pull/93764
// The above-linked change made the in-tree cloud-controller-manager command much more flexible
// but much more complicated to wrap. It now validates some of the configuration very early on, before
// the CLI args are parsed, so some of the configuration needs to be hardcoded instead of set via flags.
// Reference: https://github.com/kubernetes/kubernetes/pull/98210
// The above-linked change further clarifies the intent of the example cloud-controller-manager.
argsMap := map[string]string{
"profiling": "false",
}
args := config.GetArgsList(argsMap, cfg.ExtraCloudControllerArgs)
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
if err != nil {
logrus.Fatalf("Unable to initialize cloudcontroller options: %v", err)
}
ccmOptions.KubeCloudShared.AllocateNodeCIDRs = true
ccmOptions.KubeCloudShared.CloudProvider.Name = version.Program
ccmOptions.KubeCloudShared.ClusterCIDR = util2.JoinIPNets(cfg.ClusterIPRanges)
ccmOptions.KubeCloudShared.ConfigureCloudRoutes = false
ccmOptions.Kubeconfig = runtime.KubeConfigCloudController
ccmOptions.NodeStatusUpdateFrequency = metav1.Duration{Duration: 1 * time.Minute}
ccmOptions.SecureServing.BindAddress = localhostIP
ccmOptions.SecureServing.BindPort = 0
if cfg.NoLeaderElect {
ccmOptions.Generic.LeaderElection.LeaderElect = false
}
controllerInitializers := ccmapp.DefaultInitFuncConstructors
delete(controllerInitializers, "service")
delete(controllerInitializers, "route")
cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface {
cloud, err := ccm.InitCloudProvider(version.Program, runtime.KubeConfigCloudController)
if err != nil {
logrus.Fatalf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
logrus.Fatalf("Cloud provider is nil")
}
cloud.Initialize(config.ClientBuilder, make(chan struct{}))
if informerUserCloud, ok := cloud.(ccm.InformerUser); ok {
informerUserCloud.SetInformers(config.SharedInformers)
}
return cloud
}
command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, cliflag.NamedFlagSets{}, wait.NeverStop)
command.SetArgs(args)
go func() {
for {
// check for the cloud controller rbac binding
if err := checkForCloudControllerPrivileges(runtime); err != nil {
logrus.Infof("Waiting for cloudcontroller rbac role to be created")
select {
case <-ctx.Done():
logrus.Fatalf("cloud-controller-manager context canceled: %v", ctx.Err())
case <-time.After(5 * time.Second):
continue
}
}
break
}
logrus.Infof("Running cloud-controller-manager with args %v", config.ArgString(args))
logrus.Fatalf("cloud-controller-manager exited: %v", command.ExecuteContext(ctx))
}()
}
func checkForCloudControllerPrivileges(runtime *config.ControlRuntime) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
if err != nil {
return err
}
crb := rbac.NewFactoryFromConfigOrDie(restConfig).Rbac().V1().ClusterRoleBinding()
_, err = crb.Get(version.Program+"-cloud-controller-manager", metav1.GetOptions{})
if err != nil {
return err
}
return nil
}
func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
if err != nil {
return err
}
k8sClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
done := make(chan struct{})
runtime.APIServerReady = done
go func() {
defer close(done)
etcdLoop:
for {
select {
case <-ctx.Done():
return
case <-runtime.ETCDReady:
break etcdLoop
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for etcd server to become available")
}
}
logrus.Infof("Waiting for API server to become available")
for {
select {
case <-ctx.Done():
return
case err := <-promise(func() error { return app2.WaitForAPIServer(k8sClient, 30*time.Second) }):
if err != nil {
logrus.Infof("Waiting for API server to become available")
continue
}
return
}
}
}()
return nil
}
func promise(f func() error) <-chan error {
c := make(chan error, 1)
go func() {
c <- f()
close(c)
}()
return c
}