Fall back to polling the supervisor for apiserver addresses when the watch fails

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/11430/head
Brad Davidson 2024-11-22 01:47:17 +00:00 committed by Brad Davidson
parent 168b344d1d
commit c7ff957cae
2 changed files with 113 additions and 61 deletions

View File

@ -91,15 +91,24 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy
return disabled return disabled
} }
// APIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations. // WaitForAPIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// This function will block until it can return a populated list of apiservers, or if the remote server returns // This function will block until it can return a populated list of apiservers, or if the remote server returns
// an error (indicating that it does not support this functionality). // an error (indicating that it does not support this functionality).
func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string { func WaitForAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
var addresses []string var addresses []string
var info *clientaccess.Info
var err error var err error
_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) { _ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy) if info == nil {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
logrus.Warnf("Failed to validate server token: %v", err)
return false, nil
}
}
addresses, err = GetAPIServers(ctx, info)
if err != nil { if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err) logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)
return false, err return false, err
@ -772,14 +781,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
return nodeConfig, nil return nodeConfig, nil
} }
// getAPIServers attempts to return a list of apiservers from the server. // GetAPIServers attempts to return a list of apiservers from the server.
func getAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) ([]string, error) { func GetAPIServers(ctx context.Context, info *clientaccess.Info) ([]string, error) {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
return nil, err
}
data, err := info.Get("/v1-" + version.Program + "/apiservers") data, err := info.Get("/v1-" + version.Program + "/apiservers")
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"net" "net"
"os" "os"
"reflect"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -16,6 +15,7 @@ import (
agentconfig "github.com/k3s-io/k3s/pkg/agent/config" agentconfig "github.com/k3s-io/k3s/pkg/agent/config"
"github.com/k3s-io/k3s/pkg/agent/loadbalancer" "github.com/k3s-io/k3s/pkg/agent/loadbalancer"
"github.com/k3s-io/k3s/pkg/agent/proxy" "github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/clientaccess"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/k3s/pkg/version"
@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -138,17 +139,18 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
// connecting to. If that fails, fall back to querying the endpoints list from Kubernetes. This // connecting to. If that fails, fall back to querying the endpoints list from Kubernetes. This
// fallback requires that the server we're joining be running an apiserver, but is the only safe // fallback requires that the server we're joining be running an apiserver, but is the only safe
// thing to do if its supervisor is down-level and can't provide us with an endpoint list. // thing to do if its supervisor is down-level and can't provide us with an endpoint list.
addresses := agentconfig.APIServers(ctx, config, proxy) addresses := agentconfig.WaitForAPIServers(ctx, config, proxy)
logrus.Infof("Got apiserver addresses from supervisor: %v", addresses)
if len(addresses) > 0 { if len(addresses) > 0 {
logrus.Infof("Got apiserver addresses from supervisor: %v", addresses)
if localSupervisorDefault { if localSupervisorDefault {
proxy.SetSupervisorDefault(addresses[0]) proxy.SetSupervisorDefault(addresses[0])
} }
proxy.Update(addresses) proxy.Update(addresses)
} else { } else {
if endpoint, _ := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil { if endpoint, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil {
addresses = util.GetAddresses(endpoint) logrus.Errorf("Failed to get apiserver addresses from kubernetes endpoints: %v", err)
} else {
addresses := util.GetAddresses(endpoint)
logrus.Infof("Got apiserver addresses from kubernetes endpoints: %v", addresses) logrus.Infof("Got apiserver addresses from kubernetes endpoints: %v", addresses)
if len(addresses) > 0 { if len(addresses) > 0 {
proxy.Update(addresses) proxy.Update(addresses)
@ -159,7 +161,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, proxy) go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, config, proxy)
wait := make(chan int, 1) wait := make(chan int, 1)
go func() { go func() {
@ -302,23 +304,21 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc
// WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the // WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the
// apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come // apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster. // and go from the cluster.
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) { func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) {
// Attempt to connect to supervisors, storing their cancellation function for later when we syncProxyAddresses := a.getProxySyncer(ctx, wg, tlsConfig, proxy)
// need to disconnect. refreshFromSupervisor := getAPIServersRequester(node, proxy, syncProxyAddresses)
disconnect := map[string]context.CancelFunc{}
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, wg, address, tlsConfig)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}
<-apiServerReady <-apiServerReady
endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault) endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault)
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String() fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
// if we're being called to re-list, then likely there was an
// interruption to the apiserver connection and the listwatch is retrying
// its connection. This is a good suggestion that it might be necessary
// to refresh the apiserver address from the supervisor.
go refreshFromSupervisor(ctx)
options.FieldSelector = fieldSelector options.FieldSelector = fieldSelector
return endpoints.List(ctx, options) return endpoints.List(ctx, options)
}, },
@ -364,38 +364,7 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
// goroutine that sleeps for a short period before checking for changes and updating // goroutine that sleeps for a short period before checking for changes and updating
// the proxy addresses. If another update occurs, the previous update operation // the proxy addresses. If another update occurs, the previous update operation
// will be cancelled and a new one queued. // will be cancelled and a new one queued.
go func() { go syncProxyAddresses(debounceCtx, util.GetAddresses(endpoint))
select {
case <-time.After(endpointDebounceDelay):
case <-debounceCtx.Done():
return
}
newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
return
}
proxy.Update(newAddresses)
validEndpoint := map[string]bool{}
for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}
for address, cancel := range disconnect {
if !validEndpoint[address] {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}()
} }
} }
} }
@ -507,3 +476,83 @@ func (a *agentTunnel) dialContext(ctx context.Context, network, address string)
} }
return defaultDialer.DialContext(ctx, network, address) return defaultDialer.DialContext(ctx, network, address)
} }
// proxySyncer is a common signature for functions that sync the proxy address list with a context
type proxySyncer func(ctx context.Context, addresses []string)
// getProxySyncer returns a function that can be called to update the list of supervisors.
// This function is responsible for connecting to or disconnecting websocket tunnels,
// as well as updating the proxy loadbalancer server list.
func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) proxySyncer {
disconnect := map[string]context.CancelFunc{}
// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, wg, address, tlsConfig)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}
// return a function that can be called to update the address list.
// servers will be connected to or disconnected from as necessary,
// and the proxy addresses updated.
return func(debounceCtx context.Context, addresses []string) {
select {
case <-time.After(endpointDebounceDelay):
case <-debounceCtx.Done():
return
}
newAddresses := sets.New(addresses...)
curAddresses := sets.New(proxy.SupervisorAddresses()...)
if newAddresses.Equal(curAddresses) {
return
}
proxy.Update(addresses)
// add new servers
for address := range newAddresses.Difference(curAddresses) {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
logrus.Infof("Started tunnel to %s", address)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}
// remove old servers
for address := range curAddresses.Difference(newAddresses) {
if cancel, ok := disconnect[address]; ok {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}
}
// getAPIServersRequester returns a function that can be called to update the
// proxy apiserver endpoints with addresses retrieved from the supervisor.
func getAPIServersRequester(node *daemonconfig.Node, proxy proxy.Proxy, syncProxyAddresses proxySyncer) func(ctx context.Context) {
var info *clientaccess.Info
return func(ctx context.Context) {
if info == nil {
var err error
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
logrus.Warnf("Failed to validate server token: %v", err)
return
}
}
if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil {
logrus.Warnf("Failed to get apiserver addresses from supervisor: %v", err)
} else {
syncProxyAddresses(ctx, addresses)
}
}
}