diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index 81d7a2b677..c55e7877cb 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -7,7 +7,9 @@ import ( "net" "os" "reflect" + "strconv" "sync" + "time" "github.com/gorilla/websocket" agentconfig "github.com/k3s-io/k3s/pkg/agent/config" @@ -22,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -31,10 +34,11 @@ import ( ) type agentTunnel struct { - client kubernetes.Interface - cidrs cidranger.Ranger - ports map[string]bool - mode string + client kubernetes.Interface + cidrs cidranger.Ranger + ports map[string]bool + mode string + kubeletPort string } // explicit interface check @@ -85,6 +89,9 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er close(apiServerReady) }() + // Allow the kubelet port, as published via our node object + go tunnel.setKubeletPort(ctx, apiServerReady) + switch tunnel.mode { case daemonconfig.EgressSelectorModeCluster: // In Cluster mode, we allow the cluster CIDRs, and any connections to the node's IPs for pods using host network. @@ -135,6 +142,23 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er return nil } +// setKubeletPort retrieves the configured kubelet port from our node object +func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) { + <-apiServerReady + + wait.PollImmediateWithContext(ctx, time.Second, util.DefaultAPIServerReadyTimeout, func(ctx context.Context) (bool, error) { + nodeName := os.Getenv("NODE_NAME") + node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + logrus.Debugf("Tunnel authorizer failed to get Kubelet Port: %v", err) + return false, nil + } + a.kubeletPort = strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10) + logrus.Infof("Tunnel authorizer set Kubelet Port %s", a.kubeletPort) + return true, nil + }) +} + func (a *agentTunnel) clusterAuth(config *daemonconfig.Node) { // In Cluster mode, we add static entries for the Node IPs and Cluster CIDRs for _, ip := range config.AgentConfig.NodeIPs { @@ -304,7 +328,7 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo logrus.Debugf("Tunnel authorizer checking dial request for %s", address) host, port, err := net.SplitHostPort(address) if err == nil { - if proto == "tcp" && daemonconfig.KubeletReservedPorts[port] && (host == "127.0.0.1" || host == "::1") { + if a.isKubeletPort(proto, host, port) { return true } if ip := net.ParseIP(host); ip != nil { @@ -359,3 +383,8 @@ func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup return cancel } + +// isKubeletPort returns true if the connection is to a reserved TCP port on a loopback address. +func (a *agentTunnel) isKubeletPort(proto, host, port string) bool { + return proto == "tcp" && (host == "127.0.0.1" || host == "::1") && (port == a.kubeletPort || port == daemonconfig.StreamServerPort) +} diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 7294a6455e..107bf455e8 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -32,16 +32,8 @@ const ( EgressSelectorModePod = "pod" CertificateRenewDays = 90 StreamServerPort = "10010" - KubeletPort = "10250" ) -// These ports can always be accessed via the tunnel server, at the loopback address. -// Other addresses and ports are only accessible via the tunnel on newer agents, when used by a pod. -var KubeletReservedPorts = map[string]bool{ - StreamServerPort: true, - KubeletPort: true, -} - type Node struct { Docker bool ContainerRuntimeEndpoint string diff --git a/pkg/daemons/control/tunnel.go b/pkg/daemons/control/tunnel.go index 9b149cdd95..d9065babdc 100644 --- a/pkg/daemons/control/tunnel.go +++ b/pkg/daemons/control/tunnel.go @@ -7,6 +7,7 @@ import ( "io" "net" "net/http" + "strconv" "strings" "sync" "time" @@ -77,15 +78,21 @@ type TunnelServer struct { var _ cidranger.RangerEntry = &tunnelEntry{} type tunnelEntry struct { - cidr net.IPNet - nodeName string - node bool + kubeletPort string + nodeName string + cidr net.IPNet } func (n *tunnelEntry) Network() net.IPNet { return n.cidr } +// Some ports can always be accessed via the tunnel server, at the loopback address. +// Other addresses and ports are only accessible via the tunnel on newer agents, when used by a pod. +func (n *tunnelEntry) IsReservedPort(port string) bool { + return n.kubeletPort != "" && (port == n.kubeletPort || port == config.StreamServerPort) +} + // ServeHTTP handles either CONNECT requests, or websocket requests to the remotedialer server func (t *TunnelServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) { logrus.Debugf("Tunnel server handing %s %s request for %s from %s", req.Proto, req.Method, req.URL, req.RemoteAddr) @@ -134,7 +141,8 @@ func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, e t.cidrs.Remove(*n) } else { logrus.Debugf("Tunnel server egress proxy updating Node %s IP %v", nodeName, n) - t.cidrs.Insert(&tunnelEntry{cidr: *n, nodeName: nodeName, node: true}) + kubeletPort := strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10) + t.cidrs.Insert(&tunnelEntry{cidr: *n, nodeName: nodeName, kubeletPort: kubeletPort}) } } } @@ -222,7 +230,7 @@ func (t *TunnelServer) dialBackend(ctx context.Context, addr string) (net.Conn, if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 { if n, ok := nets[0].(*tunnelEntry); ok { nodeName = n.nodeName - if n.node && config.KubeletReservedPorts[port] { + if n.IsReservedPort(port) { toKubelet = true useTunnel = true } else {