Watch the local Node object instead of get/sleep looping

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 5acd0b9008)
pull/4375/head
Brad Davidson 2021-10-29 01:59:03 -07:00 committed by Brad Davidson
parent 44a5978135
commit c73aaf839b
2 changed files with 49 additions and 39 deletions

View File

@ -6,14 +6,16 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/util" "github.com/rancher/k3s/pkg/agent/util"
"github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/version" "github.com/rancher/k3s/pkg/version"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/apimachinery/pkg/fields"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
) )
const ( const (
@ -77,22 +79,10 @@ func Prepare(ctx context.Context, nodeConfig *config.Node) error {
return createFlannelConf(nodeConfig) return createFlannelConf(nodeConfig)
} }
func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) error { func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInterface) error {
nodeName := nodeConfig.AgentConfig.NodeName if err := waitForPodCIDR(ctx, nodeConfig.AgentConfig.NodeName, nodes); err != nil {
return errors.Wrap(err, "failed to wait for PodCIDR assignment")
for {
node, err := nodes.Get(ctx, nodeName, metav1.GetOptions{})
if err == nil && node.Spec.PodCIDR != "" {
break
} }
if err == nil {
logrus.Info("Waiting for node " + nodeName + " CIDR not assigned yet")
} else {
logrus.Infof("Waiting for node %s: %v", nodeName, err)
}
time.Sleep(2 * time.Second)
}
logrus.Info("Node CIDR assigned for: " + nodeName)
go func() { go func() {
err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConf, nodeConfig.AgentConfig.KubeConfigKubelet) err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConf, nodeConfig.AgentConfig.KubeConfigKubelet)
@ -102,6 +92,28 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) e
return nil return nil
} }
// waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set.
func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error {
fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String()
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return err
}
defer watch.Stop()
for ev := range watch.ResultChan() {
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
}
if node.Spec.PodCIDR != "" {
break
}
}
logrus.Info("PodCIDR assigned for node " + nodeName)
return nil
}
func createCNIConf(dir string) error { func createCNIConf(dir string) error {
if dir == "" { if dir == "" {
return nil return nil

View File

@ -29,11 +29,13 @@ import (
"github.com/rancher/k3s/pkg/rootless" "github.com/rancher/k3s/pkg/rootless"
"github.com/rancher/k3s/pkg/util" "github.com/rancher/k3s/pkg/util"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
app2 "k8s.io/kubernetes/cmd/kube-proxy/app" app2 "k8s.io/kubernetes/cmd/kube-proxy/app"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
@ -106,16 +108,16 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
util.WaitForAPIServerReady(coreClient, 30*time.Second) util.WaitForAPIServerReady(coreClient, 30*time.Second)
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}
if !nodeConfig.NoFlannel { if !nodeConfig.NoFlannel {
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil { if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
return err return err
} }
} }
if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}
if !nodeConfig.AgentConfig.DisableNPC { if !nodeConfig.AgentConfig.DisableNPC {
if err := netpol.Run(ctx, nodeConfig); err != nil { if err := netpol.Run(ctx, nodeConfig); err != nil {
return err return err
@ -220,17 +222,18 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
return run(ctx, cfg, proxy) return run(ctx, cfg, proxy)
} }
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error { func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error {
count := 0 fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String()
for { watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{})
if err != nil { if err != nil {
if count%30 == 0 { return err
logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err)
} }
count++ defer watch.Stop()
time.Sleep(1 * time.Second)
continue for ev := range watch.ResultChan() {
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
} }
updateNode := false updateNode := false
@ -260,13 +263,8 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v
if updateNode { if updateNode {
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err) logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
continue continue
} }
}
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName) logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
} else { } else {
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName) logrus.Infof("labels have already set on node: %s", agentConfig.NodeName)