mirror of https://github.com/k3s-io/k3s
Block scheduler startup on untainted node when using embedded CCM
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/4370/head
parent
52eb6cac1c
commit
3fe460d080
|
@ -103,7 +103,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
|
||||||
}
|
}
|
||||||
endpoint, ok := ev.Object.(*v1.Endpoints)
|
endpoint, ok := ev.Object.(*v1.Endpoints)
|
||||||
if !ok {
|
if !ok {
|
||||||
logrus.Errorf("Tunnel could not case event object to endpoint: %v", ev)
|
logrus.Errorf("Tunnel could not convert event object to endpoint: %v", ev)
|
||||||
continue watching
|
continue watching
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,16 +4,25 @@ package executor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"github.com/rancher/k3s/pkg/cli/cmds"
|
"github.com/rancher/k3s/pkg/cli/cmds"
|
||||||
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
|
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
|
||||||
"github.com/rancher/k3s/pkg/version"
|
"github.com/rancher/k3s/pkg/version"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||||
|
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
ccm "k8s.io/cloud-provider"
|
ccm "k8s.io/cloud-provider"
|
||||||
cloudprovider "k8s.io/cloud-provider"
|
cloudprovider "k8s.io/cloud-provider"
|
||||||
|
cloudproviderapi "k8s.io/cloud-provider/api"
|
||||||
ccmapp "k8s.io/cloud-provider/app"
|
ccmapp "k8s.io/cloud-provider/app"
|
||||||
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
|
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
|
||||||
ccmopt "k8s.io/cloud-provider/options"
|
ccmopt "k8s.io/cloud-provider/options"
|
||||||
|
@ -29,16 +38,19 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
executor = Embedded{}
|
executor = &Embedded{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Embedded struct{}
|
type Embedded struct {
|
||||||
|
nodeConfig *daemonconfig.Node
|
||||||
|
}
|
||||||
|
|
||||||
func (Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
|
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
|
||||||
|
e.nodeConfig = nodeConfig
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (Embedded) Kubelet(ctx context.Context, args []string) error {
|
func (*Embedded) Kubelet(ctx context.Context, args []string) error {
|
||||||
command := kubelet.NewKubeletCommand(context.Background())
|
command := kubelet.NewKubeletCommand(context.Background())
|
||||||
command.SetArgs(args)
|
command.SetArgs(args)
|
||||||
|
|
||||||
|
@ -54,7 +66,7 @@ func (Embedded) Kubelet(ctx context.Context, args []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (Embedded) KubeProxy(ctx context.Context, args []string) error {
|
func (*Embedded) KubeProxy(ctx context.Context, args []string) error {
|
||||||
command := proxy.NewProxyCommand()
|
command := proxy.NewProxyCommand()
|
||||||
command.SetArgs(args)
|
command.SetArgs(args)
|
||||||
|
|
||||||
|
@ -70,12 +82,12 @@ func (Embedded) KubeProxy(ctx context.Context, args []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (Embedded) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
|
func (*Embedded) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
|
||||||
startupConfig := <-app.StartupConfig
|
startupConfig := <-app.StartupConfig
|
||||||
return startupConfig.Authenticator, startupConfig.Handler, nil
|
return startupConfig.Authenticator, startupConfig.Handler, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
|
func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
|
||||||
command := app.NewAPIServerCommand(ctx.Done())
|
command := app.NewAPIServerCommand(ctx.Done())
|
||||||
command.SetArgs(args)
|
command.SetArgs(args)
|
||||||
|
|
||||||
|
@ -92,12 +104,24 @@ func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args [
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
||||||
command := sapp.NewSchedulerCommand()
|
command := sapp.NewSchedulerCommand()
|
||||||
command.SetArgs(args)
|
command.SetArgs(args)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
<-apiReady
|
<-apiReady
|
||||||
|
// wait for Bootstrap to set nodeConfig
|
||||||
|
for e.nodeConfig == nil {
|
||||||
|
runtime.Gosched()
|
||||||
|
}
|
||||||
|
// If we're running the embedded cloud controller, wait for it to untaint at least one
|
||||||
|
// node (usually, the local node) before starting the scheduler to ensure that it
|
||||||
|
// finds a node that is ready to run pods during its initial scheduling loop.
|
||||||
|
if !e.nodeConfig.AgentConfig.DisableCCM {
|
||||||
|
if err := waitForUntaintedNode(ctx, e.nodeConfig.AgentConfig.KubeConfigKubelet); err != nil {
|
||||||
|
logrus.Fatalf("failed to wait for untained node: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
logrus.Fatalf("scheduler panic: %v", err)
|
logrus.Fatalf("scheduler panic: %v", err)
|
||||||
|
@ -109,7 +133,7 @@ func (Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
func (*Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
||||||
command := cmapp.NewControllerManagerCommand()
|
command := cmapp.NewControllerManagerCommand()
|
||||||
command.SetArgs(args)
|
command.SetArgs(args)
|
||||||
|
|
||||||
|
@ -126,7 +150,7 @@ func (Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{},
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
|
func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
|
||||||
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
|
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Fatalf("unable to initialize command options: %v", err)
|
logrus.Fatalf("unable to initialize command options: %v", err)
|
||||||
|
@ -168,3 +192,59 @@ func (Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForUntaintedNode watches nodes, waiting to find one not tainted as
|
||||||
|
// uninitialized by the external cloud provider.
|
||||||
|
func waitForUntaintedNode(ctx context.Context, kubeConfig string) error {
|
||||||
|
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
coreClient, err := typedcorev1.NewForConfig(restConfig)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// List first, to see if there's an existing node that will do
|
||||||
|
nodes, err := coreClient.Nodes().List(ctx, metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
if taint := getCloudTaint(node.Spec.Taints); taint == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// List didn't give us an existing node, start watching at whatever ResourceVersion the list left off at.
|
||||||
|
watcher, err := coreClient.Nodes().Watch(ctx, metav1.ListOptions{ResourceVersion: nodes.ListMeta.ResourceVersion})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer watcher.Stop()
|
||||||
|
|
||||||
|
for ev := range watcher.ResultChan() {
|
||||||
|
if ev.Type == watch.Added || ev.Type == watch.Modified {
|
||||||
|
node, ok := ev.Object.(*corev1.Node)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("could not convert event object to node: %v", ev)
|
||||||
|
}
|
||||||
|
if taint := getCloudTaint(node.Spec.Taints); taint == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.New("watch channel closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCloudTaint returns the external cloud provider taint, if present.
|
||||||
|
// Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go
|
||||||
|
func getCloudTaint(taints []corev1.Taint) *corev1.Taint {
|
||||||
|
for _, taint := range taints {
|
||||||
|
if taint.Key == cloudproviderapi.TaintExternalCloudProvider {
|
||||||
|
return &taint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue