Use ListWatch helpers instead of bare List/Watch

Reduces code complexity a bit and ensures we don't  have to handle closed watch channels on our own

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/5518/head
Brad Davidson 3 years ago committed by Brad Davidson
parent 5f2a4d4209
commit 13ca10664f

@ -13,10 +13,14 @@ import (
"github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
utilsnet "k8s.io/utils/net" utilsnet "k8s.io/utils/net"
) )
@ -117,21 +121,27 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInt
// waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set. // waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set.
func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error { func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error {
fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String() fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String()
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) lw := &cache.ListWatch{
if err != nil { ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
return err options.FieldSelector = fieldSelector
return nodes.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return nodes.Watch(ctx, options)
},
} }
defer watch.Stop() condition := func(ev watch.Event) (bool, error) {
if n, ok := ev.Object.(*v1.Node); ok {
for ev := range watch.ResultChan() { return n.Spec.PodCIDR != "", nil
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
}
if node.Spec.PodCIDR != "" {
break
} }
return false, errors.New("event object not of type v1.Node")
} }
if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil {
return errors.Wrap(err, "failed to wait for PodCIDR assignment")
}
logrus.Info("Flannel found PodCIDR assigned for node " + nodeName) logrus.Info("Flannel found PodCIDR assigned for node " + nodeName)
return nil return nil
} }

@ -2,7 +2,6 @@ package agent
import ( import (
"context" "context"
"fmt"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
@ -30,14 +29,18 @@ import (
"github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
toolswatch "k8s.io/client-go/tools/watch"
app2 "k8s.io/kubernetes/cmd/kube-proxy/app" app2 "k8s.io/kubernetes/cmd/kube-proxy/app"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
utilsnet "k8s.io/utils/net" utilsnet "k8s.io/utils/net"
@ -271,18 +274,25 @@ func createProxyAndValidateToken(ctx context.Context, cfg *cmds.Agent) (proxy.Pr
return proxy, nil return proxy, nil
} }
// configureNode waits for the node object to be created, and if/when it does,
// ensures that the labels and annotations are up to date.
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error { func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error {
fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String() fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String()
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) lw := &cache.ListWatch{
if err != nil { ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
return err options.FieldSelector = fieldSelector
} return nodes.List(ctx, options)
defer watch.Stop() },
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
for ev := range watch.ResultChan() { options.FieldSelector = fieldSelector
node, ok := ev.Object.(*corev1.Node) return nodes.Watch(ctx, options)
},
}
condition := func(ev watch.Event) (bool, error) {
node, ok := ev.Object.(*v1.Node)
if !ok { if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev) return false, errors.New("event object not of type v1.Node")
} }
updateNode := false updateNode := false
@ -304,7 +314,7 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes t
// inject node config // inject node config
if changed, err := nodeconfig.SetNodeConfigAnnotations(node); err != nil { if changed, err := nodeconfig.SetNodeConfigAnnotations(node); err != nil {
return err return false, err
} else if changed { } else if changed {
updateNode = true updateNode = true
} }
@ -312,16 +322,18 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes t
if updateNode { if updateNode {
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err) logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
continue return false, nil
} }
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName) logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
} else { return true, nil
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName)
} }
logrus.Infof("labels have already set on node: %s", agentConfig.NodeName)
break return true, nil
} }
if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil {
return errors.Wrap(err, "failed to configure node")
}
return nil return nil
} }

@ -7,7 +7,6 @@ import (
"net" "net"
"reflect" "reflect"
"sync" "sync"
"time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
agentconfig "github.com/k3s-io/k3s/pkg/agent/config" agentconfig "github.com/k3s-io/k3s/pkg/agent/config"
@ -20,10 +19,13 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
watchtypes "k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
toolswatch "k8s.io/client-go/tools/watch"
) )
var ( var (
@ -86,42 +88,46 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
} }
// Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come // Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster. We go into a faster but noisier connect loop if the watch fails // and go from the cluster.
// following a successful connection.
go func() { go func() {
if err := util.WaitForAPIServerReady(ctx, client, util.DefaultAPIServerReadyTimeout); err != nil { if err := util.WaitForAPIServerReady(ctx, client, util.DefaultAPIServerReadyTimeout); err != nil {
logrus.Warnf("Tunnel endpoint watch failed to wait for apiserver ready: %v", err) logrus.Warnf("Tunnel endpoint watch failed to wait for apiserver ready: %v", err)
} }
connect:
endpoints := client.CoreV1().Endpoints(metav1.NamespaceDefault)
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
options.FieldSelector = fieldSelector
return endpoints.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return endpoints.Watch(ctx, options)
},
}
_, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Endpoints{})
defer func() {
watch.Stop()
<-done
}()
for { for {
time.Sleep(5 * time.Second) select {
watch, err := client.CoreV1().Endpoints("default").Watch(ctx, metav1.ListOptions{ case <-ctx.Done():
FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), return
ResourceVersion: "0", case ev, ok := <-watch.ResultChan():
})
if err != nil {
logrus.Warnf("Unable to watch for tunnel endpoints: %v", err)
continue connect
}
watching:
for {
ev, ok := <-watch.ResultChan()
if !ok || ev.Type == watchtypes.Error {
if ok {
logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev)
}
watch.Stop()
continue connect
}
endpoint, ok := ev.Object.(*v1.Endpoints) endpoint, ok := ev.Object.(*v1.Endpoints)
if !ok { if !ok {
logrus.Errorf("Tunnel could not convert event object to endpoint: %v", ev) logrus.Errorf("Tunnel watch failed: event object not of type v1.Endpoints")
continue watching continue
} }
newAddresses := util.GetAddresses(endpoint) newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) { if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
continue watching continue
} }
proxy.Update(newAddresses) proxy.Update(newAddresses)

@ -5,22 +5,24 @@ package executor
import ( import (
"context" "context"
"errors"
"fmt"
"net/http" "net/http"
"runtime" "runtime"
"github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
toolswatch "k8s.io/client-go/tools/watch"
ccm "k8s.io/cloud-provider" ccm "k8s.io/cloud-provider"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api" cloudproviderapi "k8s.io/cloud-provider/api"
@ -205,43 +207,33 @@ func waitForUntaintedNode(ctx context.Context, kubeConfig string) error {
if err != nil { if err != nil {
return err return err
} }
nodes := coreClient.Nodes()
// List first, to see if there's an existing node that will do
nodes, err := coreClient.Nodes().List(ctx, metav1.ListOptions{}) lw := &cache.ListWatch{
if err != nil { ListFunc: func(options metav1.ListOptions) (object k8sruntime.Object, e error) {
return err return nodes.List(ctx, options)
} },
for _, node := range nodes.Items { WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
if taint := getCloudTaint(node.Spec.Taints); taint == nil { return nodes.Watch(ctx, options)
return nil },
}
} }
// List didn't give us an existing node, start watching at whatever ResourceVersion the list left off at. condition := func(ev watch.Event) (bool, error) {
watcher, err := coreClient.Nodes().Watch(ctx, metav1.ListOptions{ResourceVersion: nodes.ListMeta.ResourceVersion}) if node, ok := ev.Object.(*v1.Node); ok {
if err != nil { return getCloudTaint(node.Spec.Taints) == nil, nil
return err
}
defer watcher.Stop()
for ev := range watcher.ResultChan() {
if ev.Type == watch.Added || ev.Type == watch.Modified {
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
}
if taint := getCloudTaint(node.Spec.Taints); taint == nil {
return nil
}
} }
return false, errors.New("event object not of type v1.Node")
} }
return errors.New("watch channel closed") if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil {
return errors.Wrap(err, "failed to wait for untainted node")
}
return nil
} }
// getCloudTaint returns the external cloud provider taint, if present. // getCloudTaint returns the external cloud provider taint, if present.
// Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go // Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go
func getCloudTaint(taints []corev1.Taint) *corev1.Taint { func getCloudTaint(taints []v1.Taint) *v1.Taint {
for _, taint := range taints { for _, taint := range taints {
if taint.Key == cloudproviderapi.TaintExternalCloudProvider { if taint.Key == cloudproviderapi.TaintExternalCloudProvider {
return &taint return &taint

Loading…
Cancel
Save