diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index cdff5cb543..2e96adb7ee 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -91,15 +91,24 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy return disabled } -// APIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations. +// WaitForAPIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations. // This function will block until it can return a populated list of apiservers, or if the remote server returns // an error (indicating that it does not support this functionality). -func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string { +func WaitForAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string { var addresses []string + var info *clientaccess.Info var err error _ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) { - addresses, err = getAPIServers(ctx, node, proxy) + if info == nil { + withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey) + info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert) + if err != nil { + logrus.Warnf("Failed to validate server token: %v", err) + return false, nil + } + } + addresses, err = GetAPIServers(ctx, info) if err != nil { logrus.Infof("Failed to retrieve list of apiservers from server: %v", err) return false, err @@ -772,14 +781,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N return nodeConfig, nil } -// getAPIServers attempts to return a list of apiservers from the server. -func getAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) ([]string, error) { - withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey) - info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert) - if err != nil { - return nil, err - } - +// GetAPIServers attempts to return a list of apiservers from the server. +func GetAPIServers(ctx context.Context, info *clientaccess.Info) ([]string, error) { data, err := info.Get("/v1-" + version.Program + "/apiservers") if err != nil { return nil, err diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index e1eb566cc6..2fe031ce62 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "os" - "reflect" "strconv" "sync" "time" @@ -16,6 +15,7 @@ import ( agentconfig "github.com/k3s-io/k3s/pkg/agent/config" "github.com/k3s-io/k3s/pkg/agent/loadbalancer" "github.com/k3s-io/k3s/pkg/agent/proxy" + "github.com/k3s-io/k3s/pkg/clientaccess" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -138,17 +139,18 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er // connecting to. If that fails, fall back to querying the endpoints list from Kubernetes. This // fallback requires that the server we're joining be running an apiserver, but is the only safe // thing to do if its supervisor is down-level and can't provide us with an endpoint list. - addresses := agentconfig.APIServers(ctx, config, proxy) - logrus.Infof("Got apiserver addresses from supervisor: %v", addresses) - + addresses := agentconfig.WaitForAPIServers(ctx, config, proxy) if len(addresses) > 0 { + logrus.Infof("Got apiserver addresses from supervisor: %v", addresses) if localSupervisorDefault { proxy.SetSupervisorDefault(addresses[0]) } proxy.Update(addresses) } else { - if endpoint, _ := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil { - addresses = util.GetAddresses(endpoint) + if endpoint, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil { + logrus.Errorf("Failed to get apiserver addresses from kubernetes endpoints: %v", err) + } else { + addresses := util.GetAddresses(endpoint) logrus.Infof("Got apiserver addresses from kubernetes endpoints: %v", addresses) if len(addresses) > 0 { proxy.Update(addresses) @@ -159,7 +161,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er wg := &sync.WaitGroup{} - go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, proxy) + go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, config, proxy) wait := make(chan int, 1) go func() { @@ -302,23 +304,21 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc // WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the // apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come // and go from the cluster. -func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) { - // Attempt to connect to supervisors, storing their cancellation function for later when we - // need to disconnect. - disconnect := map[string]context.CancelFunc{} - for _, address := range proxy.SupervisorAddresses() { - if _, ok := disconnect[address]; !ok { - conn := a.connect(ctx, wg, address, tlsConfig) - disconnect[address] = conn.cancel - proxy.SetHealthCheck(address, conn.healthCheck) - } - } +func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) { + syncProxyAddresses := a.getProxySyncer(ctx, wg, tlsConfig, proxy) + refreshFromSupervisor := getAPIServersRequester(node, proxy, syncProxyAddresses) <-apiServerReady + endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault) fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String() lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { + // if we're being called to re-list, then likely there was an + // interruption to the apiserver connection and the listwatch is retrying + // its connection. This is a good suggestion that it might be necessary + // to refresh the apiserver address from the supervisor. + go refreshFromSupervisor(ctx) options.FieldSelector = fieldSelector return endpoints.List(ctx, options) }, @@ -364,38 +364,7 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan // goroutine that sleeps for a short period before checking for changes and updating // the proxy addresses. If another update occurs, the previous update operation // will be cancelled and a new one queued. - go func() { - select { - case <-time.After(endpointDebounceDelay): - case <-debounceCtx.Done(): - return - } - - newAddresses := util.GetAddresses(endpoint) - if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) { - return - } - proxy.Update(newAddresses) - - validEndpoint := map[string]bool{} - - for _, address := range proxy.SupervisorAddresses() { - validEndpoint[address] = true - if _, ok := disconnect[address]; !ok { - conn := a.connect(ctx, nil, address, tlsConfig) - disconnect[address] = conn.cancel - proxy.SetHealthCheck(address, conn.healthCheck) - } - } - - for address, cancel := range disconnect { - if !validEndpoint[address] { - cancel() - delete(disconnect, address) - logrus.Infof("Stopped tunnel to %s", address) - } - } - }() + go syncProxyAddresses(debounceCtx, util.GetAddresses(endpoint)) } } } @@ -507,3 +476,83 @@ func (a *agentTunnel) dialContext(ctx context.Context, network, address string) } return defaultDialer.DialContext(ctx, network, address) } + +// proxySyncer is a common signature for functions that sync the proxy address list with a context +type proxySyncer func(ctx context.Context, addresses []string) + +// getProxySyncer returns a function that can be called to update the list of supervisors. +// This function is responsible for connecting to or disconnecting websocket tunnels, +// as well as updating the proxy loadbalancer server list. +func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) proxySyncer { + disconnect := map[string]context.CancelFunc{} + // Attempt to connect to supervisors, storing their cancellation function for later when we + // need to disconnect. + for _, address := range proxy.SupervisorAddresses() { + if _, ok := disconnect[address]; !ok { + conn := a.connect(ctx, wg, address, tlsConfig) + disconnect[address] = conn.cancel + proxy.SetHealthCheck(address, conn.healthCheck) + } + } + + // return a function that can be called to update the address list. + // servers will be connected to or disconnected from as necessary, + // and the proxy addresses updated. + return func(debounceCtx context.Context, addresses []string) { + select { + case <-time.After(endpointDebounceDelay): + case <-debounceCtx.Done(): + return + } + + newAddresses := sets.New(addresses...) + curAddresses := sets.New(proxy.SupervisorAddresses()...) + if newAddresses.Equal(curAddresses) { + return + } + + proxy.Update(addresses) + + // add new servers + for address := range newAddresses.Difference(curAddresses) { + if _, ok := disconnect[address]; !ok { + conn := a.connect(ctx, nil, address, tlsConfig) + logrus.Infof("Started tunnel to %s", address) + disconnect[address] = conn.cancel + proxy.SetHealthCheck(address, conn.healthCheck) + } + } + + // remove old servers + for address := range curAddresses.Difference(newAddresses) { + if cancel, ok := disconnect[address]; ok { + cancel() + delete(disconnect, address) + logrus.Infof("Stopped tunnel to %s", address) + } + } + } +} + +// getAPIServersRequester returns a function that can be called to update the +// proxy apiserver endpoints with addresses retrieved from the supervisor. +func getAPIServersRequester(node *daemonconfig.Node, proxy proxy.Proxy, syncProxyAddresses proxySyncer) func(ctx context.Context) { + var info *clientaccess.Info + return func(ctx context.Context) { + if info == nil { + var err error + withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey) + info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert) + if err != nil { + logrus.Warnf("Failed to validate server token: %v", err) + return + } + } + + if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil { + logrus.Warnf("Failed to get apiserver addresses from supervisor: %v", err) + } else { + syncProxyAddresses(ctx, addresses) + } + } +}