From 13ca10664fedb3415a5d48013ab8b29bd0b0acce Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 21 Apr 2022 13:56:39 -0700 Subject: [PATCH] Use ListWatch helpers instead of bare List/Watch Reduces code complexity a bit and ensures we don't have to handle closed watch channels on our own Signed-off-by: Brad Davidson --- pkg/agent/flannel/setup.go | 36 +++++++++++++-------- pkg/agent/run.go | 46 +++++++++++++++++---------- pkg/agent/tunnel/tunnel.go | 60 +++++++++++++++++++---------------- pkg/daemons/executor/embed.go | 54 ++++++++++++++----------------- 4 files changed, 108 insertions(+), 88 deletions(-) diff --git a/pkg/agent/flannel/setup.go b/pkg/agent/flannel/setup.go index 802915e2ce..e83a650690 100644 --- a/pkg/agent/flannel/setup.go +++ b/pkg/agent/flannel/setup.go @@ -13,10 +13,14 @@ import ( "github.com/k3s-io/k3s/pkg/version" "github.com/pkg/errors" "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + toolswatch "k8s.io/client-go/tools/watch" utilsnet "k8s.io/utils/net" ) @@ -117,21 +121,27 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInt // waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set. func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error { fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String() - watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) - if err != nil { - return err + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + options.FieldSelector = fieldSelector + return nodes.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return nodes.Watch(ctx, options) + }, } - defer watch.Stop() - - for ev := range watch.ResultChan() { - node, ok := ev.Object.(*corev1.Node) - if !ok { - return fmt.Errorf("could not convert event object to node: %v", ev) - } - if node.Spec.PodCIDR != "" { - break + condition := func(ev watch.Event) (bool, error) { + if n, ok := ev.Object.(*v1.Node); ok { + return n.Spec.PodCIDR != "", nil } + return false, errors.New("event object not of type v1.Node") } + + if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil { + return errors.Wrap(err, "failed to wait for PodCIDR assignment") + } + logrus.Info("Flannel found PodCIDR assigned for node " + nodeName) return nil } diff --git a/pkg/agent/run.go b/pkg/agent/run.go index d599ce3663..0233966c8f 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -2,7 +2,6 @@ package agent import ( "context" - "fmt" "net" "os" "path/filepath" @@ -30,14 +29,18 @@ import ( "github.com/k3s-io/k3s/pkg/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + toolswatch "k8s.io/client-go/tools/watch" app2 "k8s.io/kubernetes/cmd/kube-proxy/app" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" utilsnet "k8s.io/utils/net" @@ -271,18 +274,25 @@ func createProxyAndValidateToken(ctx context.Context, cfg *cmds.Agent) (proxy.Pr return proxy, nil } +// configureNode waits for the node object to be created, and if/when it does, +// ensures that the labels and annotations are up to date. func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error { fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String() - watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) - if err != nil { - return err - } - defer watch.Stop() - - for ev := range watch.ResultChan() { - node, ok := ev.Object.(*corev1.Node) + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + options.FieldSelector = fieldSelector + return nodes.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return nodes.Watch(ctx, options) + }, + } + + condition := func(ev watch.Event) (bool, error) { + node, ok := ev.Object.(*v1.Node) if !ok { - return fmt.Errorf("could not convert event object to node: %v", ev) + return false, errors.New("event object not of type v1.Node") } updateNode := false @@ -304,7 +314,7 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes t // inject node config if changed, err := nodeconfig.SetNodeConfigAnnotations(node); err != nil { - return err + return false, err } else if changed { updateNode = true } @@ -312,16 +322,18 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes t if updateNode { if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err) - continue + return false, nil } logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName) - } else { - logrus.Infof("labels have already set on node: %s", agentConfig.NodeName) + return true, nil } - - break + logrus.Infof("labels have already set on node: %s", agentConfig.NodeName) + return true, nil } + if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil { + return errors.Wrap(err, "failed to configure node") + } return nil } diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index e49e747864..e3111e0907 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -7,7 +7,6 @@ import ( "net" "reflect" "sync" - "time" "github.com/gorilla/websocket" agentconfig "github.com/k3s-io/k3s/pkg/agent/config" @@ -20,10 +19,13 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - watchtypes "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + toolswatch "k8s.io/client-go/tools/watch" ) var ( @@ -86,42 +88,46 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { } // Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come - // and go from the cluster. We go into a faster but noisier connect loop if the watch fails - // following a successful connection. + // and go from the cluster. go func() { if err := util.WaitForAPIServerReady(ctx, client, util.DefaultAPIServerReadyTimeout); err != nil { logrus.Warnf("Tunnel endpoint watch failed to wait for apiserver ready: %v", err) } - connect: + + endpoints := client.CoreV1().Endpoints(metav1.NamespaceDefault) + fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + options.FieldSelector = fieldSelector + return endpoints.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + options.FieldSelector = fieldSelector + return endpoints.Watch(ctx, options) + }, + } + + _, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Endpoints{}) + + defer func() { + watch.Stop() + <-done + }() + for { - time.Sleep(5 * time.Second) - watch, err := client.CoreV1().Endpoints("default").Watch(ctx, metav1.ListOptions{ - FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), - ResourceVersion: "0", - }) - if err != nil { - logrus.Warnf("Unable to watch for tunnel endpoints: %v", err) - continue connect - } - watching: - for { - ev, ok := <-watch.ResultChan() - if !ok || ev.Type == watchtypes.Error { - if ok { - logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) - } - watch.Stop() - continue connect - } + select { + case <-ctx.Done(): + return + case ev, ok := <-watch.ResultChan(): endpoint, ok := ev.Object.(*v1.Endpoints) if !ok { - logrus.Errorf("Tunnel could not convert event object to endpoint: %v", ev) - continue watching + logrus.Errorf("Tunnel watch failed: event object not of type v1.Endpoints") + continue } newAddresses := util.GetAddresses(endpoint) if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) { - continue watching + continue } proxy.Update(newAddresses) diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index eaca255033..e78e16de24 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -5,22 +5,24 @@ package executor import ( "context" - "errors" - "fmt" "net/http" "runtime" "github.com/k3s-io/k3s/pkg/cli/cmds" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/version" + "github.com/pkg/errors" "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/authenticator" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + toolswatch "k8s.io/client-go/tools/watch" ccm "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider" cloudproviderapi "k8s.io/cloud-provider/api" @@ -205,43 +207,33 @@ func waitForUntaintedNode(ctx context.Context, kubeConfig string) error { if err != nil { return err } - - // List first, to see if there's an existing node that will do - nodes, err := coreClient.Nodes().List(ctx, metav1.ListOptions{}) - if err != nil { - return err - } - for _, node := range nodes.Items { - if taint := getCloudTaint(node.Spec.Taints); taint == nil { - return nil - } + nodes := coreClient.Nodes() + + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (object k8sruntime.Object, e error) { + return nodes.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { + return nodes.Watch(ctx, options) + }, } - // List didn't give us an existing node, start watching at whatever ResourceVersion the list left off at. - watcher, err := coreClient.Nodes().Watch(ctx, metav1.ListOptions{ResourceVersion: nodes.ListMeta.ResourceVersion}) - if err != nil { - return err - } - defer watcher.Stop() - - for ev := range watcher.ResultChan() { - if ev.Type == watch.Added || ev.Type == watch.Modified { - node, ok := ev.Object.(*corev1.Node) - if !ok { - return fmt.Errorf("could not convert event object to node: %v", ev) - } - if taint := getCloudTaint(node.Spec.Taints); taint == nil { - return nil - } + condition := func(ev watch.Event) (bool, error) { + if node, ok := ev.Object.(*v1.Node); ok { + return getCloudTaint(node.Spec.Taints) == nil, nil } + return false, errors.New("event object not of type v1.Node") } - return errors.New("watch channel closed") + if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil { + return errors.Wrap(err, "failed to wait for untainted node") + } + return nil } // getCloudTaint returns the external cloud provider taint, if present. // Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go -func getCloudTaint(taints []corev1.Taint) *corev1.Taint { +func getCloudTaint(taints []v1.Taint) *v1.Taint { for _, taint := range taints { if taint.Key == cloudproviderapi.TaintExternalCloudProvider { return &taint