Handle custom kubelet port in agent tunnel

The kubelet port can be overridden by users; we shouldn't assume its always 10250

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/6250/head
Brad Davidson 2022-10-05 21:11:01 +00:00 committed by Brad Davidson
parent 3f5c88e4a3
commit 3a829ae860
3 changed files with 47 additions and 18 deletions

View File

@ -7,7 +7,9 @@ import (
"net"
"os"
"reflect"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
agentconfig "github.com/k3s-io/k3s/pkg/agent/config"
@ -22,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
@ -31,10 +34,11 @@ import (
)
type agentTunnel struct {
client kubernetes.Interface
cidrs cidranger.Ranger
ports map[string]bool
mode string
client kubernetes.Interface
cidrs cidranger.Ranger
ports map[string]bool
mode string
kubeletPort string
}
// explicit interface check
@ -85,6 +89,9 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
close(apiServerReady)
}()
// Allow the kubelet port, as published via our node object
go tunnel.setKubeletPort(ctx, apiServerReady)
switch tunnel.mode {
case daemonconfig.EgressSelectorModeCluster:
// In Cluster mode, we allow the cluster CIDRs, and any connections to the node's IPs for pods using host network.
@ -135,6 +142,23 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
return nil
}
// setKubeletPort retrieves the configured kubelet port from our node object
func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) {
<-apiServerReady
wait.PollImmediateWithContext(ctx, time.Second, util.DefaultAPIServerReadyTimeout, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
logrus.Debugf("Tunnel authorizer failed to get Kubelet Port: %v", err)
return false, nil
}
a.kubeletPort = strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10)
logrus.Infof("Tunnel authorizer set Kubelet Port %s", a.kubeletPort)
return true, nil
})
}
func (a *agentTunnel) clusterAuth(config *daemonconfig.Node) {
// In Cluster mode, we add static entries for the Node IPs and Cluster CIDRs
for _, ip := range config.AgentConfig.NodeIPs {
@ -304,7 +328,7 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo
logrus.Debugf("Tunnel authorizer checking dial request for %s", address)
host, port, err := net.SplitHostPort(address)
if err == nil {
if proto == "tcp" && daemonconfig.KubeletReservedPorts[port] && (host == "127.0.0.1" || host == "::1") {
if a.isKubeletPort(proto, host, port) {
return true
}
if ip := net.ParseIP(host); ip != nil {
@ -359,3 +383,8 @@ func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup
return cancel
}
// isKubeletPort returns true if the connection is to a reserved TCP port on a loopback address.
func (a *agentTunnel) isKubeletPort(proto, host, port string) bool {
return proto == "tcp" && (host == "127.0.0.1" || host == "::1") && (port == a.kubeletPort || port == daemonconfig.StreamServerPort)
}

View File

@ -32,16 +32,8 @@ const (
EgressSelectorModePod = "pod"
CertificateRenewDays = 90
StreamServerPort = "10010"
KubeletPort = "10250"
)
// These ports can always be accessed via the tunnel server, at the loopback address.
// Other addresses and ports are only accessible via the tunnel on newer agents, when used by a pod.
var KubeletReservedPorts = map[string]bool{
StreamServerPort: true,
KubeletPort: true,
}
type Node struct {
Docker bool
ContainerRuntimeEndpoint string

View File

@ -7,6 +7,7 @@ import (
"io"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -77,15 +78,21 @@ type TunnelServer struct {
var _ cidranger.RangerEntry = &tunnelEntry{}
type tunnelEntry struct {
cidr net.IPNet
nodeName string
node bool
kubeletPort string
nodeName string
cidr net.IPNet
}
func (n *tunnelEntry) Network() net.IPNet {
return n.cidr
}
// Some ports can always be accessed via the tunnel server, at the loopback address.
// Other addresses and ports are only accessible via the tunnel on newer agents, when used by a pod.
func (n *tunnelEntry) IsReservedPort(port string) bool {
return n.kubeletPort != "" && (port == n.kubeletPort || port == config.StreamServerPort)
}
// ServeHTTP handles either CONNECT requests, or websocket requests to the remotedialer server
func (t *TunnelServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
logrus.Debugf("Tunnel server handing %s %s request for %s from %s", req.Proto, req.Method, req.URL, req.RemoteAddr)
@ -134,7 +141,8 @@ func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, e
t.cidrs.Remove(*n)
} else {
logrus.Debugf("Tunnel server egress proxy updating Node %s IP %v", nodeName, n)
t.cidrs.Insert(&tunnelEntry{cidr: *n, nodeName: nodeName, node: true})
kubeletPort := strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10)
t.cidrs.Insert(&tunnelEntry{cidr: *n, nodeName: nodeName, kubeletPort: kubeletPort})
}
}
}
@ -222,7 +230,7 @@ func (t *TunnelServer) dialBackend(ctx context.Context, addr string) (net.Conn,
if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
if n, ok := nets[0].(*tunnelEntry); ok {
nodeName = n.nodeName
if n.node && config.KubeletReservedPorts[port] {
if n.IsReservedPort(port) {
toKubelet = true
useTunnel = true
} else {