Add support for configuring the EgressSelector mode

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 9d7230496d)
pull/5612/head
Brad Davidson 2022-05-17 12:25:43 -07:00 committed by Brad Davidson
parent 3fa5619d73
commit b330ce340a
7 changed files with 175 additions and 62 deletions

View File

@ -436,6 +436,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint, ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint,
FlannelBackend: controlConfig.FlannelBackend, FlannelBackend: controlConfig.FlannelBackend,
FlannelIPv6Masq: controlConfig.FlannelIPv6Masq, FlannelIPv6Masq: controlConfig.FlannelIPv6Masq,
EgressSelectorMode: controlConfig.EgressSelectorMode,
ServerHTTPSPort: controlConfig.HTTPSPort, ServerHTTPSPort: controlConfig.HTTPSPort,
Token: info.String(), Token: info.String(),
} }

View File

@ -12,9 +12,10 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
agentconfig "github.com/k3s-io/k3s/pkg/agent/config" agentconfig "github.com/k3s-io/k3s/pkg/agent/config"
"github.com/k3s-io/k3s/pkg/agent/proxy" "github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/daemons/config" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
"github.com/rancher/remotedialer" "github.com/rancher/remotedialer"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yl2chen/cidranger" "github.com/yl2chen/cidranger"
@ -37,7 +38,7 @@ var (
} }
) )
func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigK3sController) restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigK3sController)
if err != nil { if err != nil {
return err return err
@ -61,6 +62,14 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
tunnel := &agentTunnel{ tunnel := &agentTunnel{
client: client, client: client,
cidrs: cidranger.NewPCTrieRanger(), cidrs: cidranger.NewPCTrieRanger(),
mode: config.EgressSelectorMode,
}
if tunnel.mode == daemonconfig.EgressSelectorModeCluster {
for _, cidr := range config.AgentConfig.ClusterCIDRs {
logrus.Infof("Tunnel authorizer added Cluster CIDR %s", cidr)
tunnel.cidrs.Insert(cidranger.NewBasicRangerEntry(*cidr))
}
} }
// The loadbalancer is only disabled when there is a local apiserver. Servers without a local // The loadbalancer is only disabled when there is a local apiserver. Servers without a local
@ -175,8 +184,10 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
} }
type agentTunnel struct { type agentTunnel struct {
sync.Mutex
client kubernetes.Interface client kubernetes.Interface
cidrs cidranger.Ranger cidrs cidranger.Ranger
mode string
} }
// authorized determines whether or not a dial request is authorized. // authorized determines whether or not a dial request is authorized.
@ -191,19 +202,10 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo
return true return true
} }
if ip := net.ParseIP(host); ip != nil { if ip := net.ParseIP(host); ip != nil {
// lazy populate the cidrs from the node object // lazy populate pod cidrs from the node object
if a.cidrs.Len() == 0 { if a.mode == daemonconfig.EgressSelectorModePod && a.cidrs.Len() == 0 {
logrus.Debugf("Tunnel authorizer getting Pod CIDRs for %s", os.Getenv("NODE_NAME")) if err := a.updatePodCIDRs(ctx); err != nil {
node, err := a.client.CoreV1().Nodes().Get(ctx, os.Getenv("NODE_NAME"), metav1.GetOptions{}) logrus.Warnf("Tunnel authorizer failed to update CIDRs: %v", err)
if err != nil {
logrus.Warnf("Tunnel authorizer failed to get Pod CIDRs: %v", err)
return false
}
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
logrus.Infof("Tunnel authorizer added Pod CIDR %s", cidr)
a.cidrs.Insert(cidranger.NewBasicRangerEntry(*n))
}
} }
} }
if nets, err := a.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 { if nets, err := a.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
@ -214,6 +216,28 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo
return false return false
} }
func (a *agentTunnel) updatePodCIDRs(ctx context.Context) error {
a.Lock()
defer a.Unlock()
// Return early if another goroutine updated the CIDRs while we were waiting to lock
if a.cidrs.Len() != 0 {
return nil
}
nodeName := os.Getenv("NODE_NAME")
logrus.Debugf("Tunnel authorizer getting Pod CIDRs for %s", nodeName)
node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return errors.Wrap(err, "failed to get Pod CIDRs")
}
for _, cidr := range node.Spec.PodCIDRs {
if _, n, err := net.ParseCIDR(cidr); err == nil {
logrus.Infof("Tunnel authorizer added Pod CIDR %s", cidr)
a.cidrs.Insert(cidranger.NewBasicRangerEntry(*n))
}
}
return nil
}
// connect initiates a connection to the remotedialer server. Incoming dial requests from // connect initiates a connection to the remotedialer server. Incoming dial requests from
// the server will be checked by the authorizer function prior to being fulfilled. // the server will be checked by the authorizer function prior to being fulfilled.
func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc { func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc {

View File

@ -63,6 +63,7 @@ type Server struct {
ServerURL string ServerURL string
FlannelBackend string FlannelBackend string
FlannelIPv6Masq bool FlannelIPv6Masq bool
EgressSelectorMode string
DefaultLocalStoragePath string DefaultLocalStoragePath string
DisableCCM bool DisableCCM bool
DisableNPC bool DisableNPC bool
@ -213,6 +214,12 @@ var ServerFlags = []cli.Flag{
Usage: "(networking) Enable IPv6 masquerading for pod", Usage: "(networking) Enable IPv6 masquerading for pod",
Destination: &ServerConfig.FlannelIPv6Masq, Destination: &ServerConfig.FlannelIPv6Masq,
}, },
cli.StringFlag{
Name: "egress-selector-mode",
Usage: "(networking) One of 'agent', cluster', 'pod', 'disabled'",
Destination: &ServerConfig.EgressSelectorMode,
Value: "pod",
},
ServerToken, ServerToken,
cli.StringFlag{ cli.StringFlag{
Name: "token-file", Name: "token-file",

View File

@ -134,6 +134,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.AdvertisePort = cfg.AdvertisePort serverConfig.ControlConfig.AdvertisePort = cfg.AdvertisePort
serverConfig.ControlConfig.FlannelBackend = cfg.FlannelBackend serverConfig.ControlConfig.FlannelBackend = cfg.FlannelBackend
serverConfig.ControlConfig.FlannelIPv6Masq = cfg.FlannelIPv6Masq serverConfig.ControlConfig.FlannelIPv6Masq = cfg.FlannelIPv6Masq
serverConfig.ControlConfig.EgressSelectorMode = cfg.EgressSelectorMode
serverConfig.ControlConfig.ExtraCloudControllerArgs = cfg.ExtraCloudControllerArgs serverConfig.ControlConfig.ExtraCloudControllerArgs = cfg.ExtraCloudControllerArgs
serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM
serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC
@ -535,6 +536,13 @@ func validateNetworkConfiguration(serverConfig server.Config) error {
} }
} }
switch serverConfig.ControlConfig.EgressSelectorMode {
case config.EgressSelectorModeAgent, config.EgressSelectorModeCluster,
config.EgressSelectorModeDisabled, config.EgressSelectorModePod:
default:
return fmt.Errorf("invalid egress-selector-mode %s", serverConfig.ControlConfig.EgressSelectorMode)
}
return nil return nil
} }

View File

@ -25,6 +25,10 @@ const (
FlannelBackendIPSEC = "ipsec" FlannelBackendIPSEC = "ipsec"
FlannelBackendWireguard = "wireguard" FlannelBackendWireguard = "wireguard"
FlannelBackendWireguardNative = "wireguard-native" FlannelBackendWireguardNative = "wireguard-native"
EgressSelectorModeAgent = "agent"
EgressSelectorModeCluster = "cluster"
EgressSelectorModeDisabled = "disabled"
EgressSelectorModePod = "pod"
CertificateRenewDays = 90 CertificateRenewDays = 90
) )
@ -38,6 +42,7 @@ type Node struct {
FlannelConfOverride bool FlannelConfOverride bool
FlannelIface *net.Interface FlannelIface *net.Interface
FlannelIPv6Masq bool FlannelIPv6Masq bool
EgressSelectorMode string
Containerd Containerd Containerd Containerd
Images string Images string
AgentConfig Agent AgentConfig Agent
@ -122,6 +127,7 @@ type CriticalControlArgs struct {
DisableServiceLB bool DisableServiceLB bool
FlannelBackend string FlannelBackend string
FlannelIPv6Masq bool FlannelIPv6Masq bool
EgressSelectorMode string
NoCoreDNS bool NoCoreDNS bool
ServiceIPRange *net.IPNet ServiceIPRange *net.IPNet
ServiceIPRanges []*net.IPNet ServiceIPRanges []*net.IPNet

View File

@ -723,7 +723,11 @@ func genEncryptionConfigAndState(controlConfig *config.Control) error {
} }
func genEgressSelectorConfig(controlConfig *config.Control) error { func genEgressSelectorConfig(controlConfig *config.Control) error {
connection := apiserver.Connection{ direct := apiserver.Connection{
ProxyProtocol: apiserver.ProtocolDirect,
}
proxy := apiserver.Connection{
ProxyProtocol: apiserver.ProtocolHTTPConnect, ProxyProtocol: apiserver.ProtocolHTTPConnect,
Transport: &apiserver.Transport{ Transport: &apiserver.Transport{
TCP: &apiserver.TCPTransport{ TCP: &apiserver.TCPTransport{
@ -737,6 +741,17 @@ func genEgressSelectorConfig(controlConfig *config.Control) error {
}, },
} }
clusterConn := direct
controlConn := direct
switch controlConfig.EgressSelectorMode {
case config.EgressSelectorModeAgent:
controlConn = proxy
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
clusterConn = proxy
controlConn = proxy
}
egressConfig := apiserver.EgressSelectorConfiguration{ egressConfig := apiserver.EgressSelectorConfiguration{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "EgressSelectorConfiguration", Kind: "EgressSelectorConfiguration",
@ -745,11 +760,11 @@ func genEgressSelectorConfig(controlConfig *config.Control) error {
EgressSelections: []apiserver.EgressSelection{ EgressSelections: []apiserver.EgressSelection{
{ {
Name: "cluster", Name: "cluster",
Connection: connection, Connection: clusterConn,
}, },
{ {
Name: "controlplane", Name: "controlplane",
Connection: connection, Connection: controlConn,
}, },
}, },
} }

View File

@ -6,6 +6,7 @@ import (
"net" "net"
"net/http" "net/http"
"strings" "strings"
"sync"
"time" "time"
"github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/config"
@ -31,8 +32,9 @@ func setupTunnel(ctx context.Context, cfg *config.Control) (http.Handler, error)
cidrs: cidranger.NewPCTrieRanger(), cidrs: cidranger.NewPCTrieRanger(),
config: cfg, config: cfg,
server: remotedialer.New(authorizer, loggingErrorWriter), server: remotedialer.New(authorizer, loggingErrorWriter),
egress: map[string]bool{},
} }
go tunnel.watchNodes(ctx) go tunnel.watch(ctx)
return tunnel, nil return tunnel, nil
} }
@ -53,32 +55,24 @@ func authorizer(req *http.Request) (clientKey string, authed bool, err error) {
var _ http.Handler = &TunnelServer{} var _ http.Handler = &TunnelServer{}
type TunnelServer struct { type TunnelServer struct {
sync.Mutex
cidrs cidranger.Ranger cidrs cidranger.Ranger
client kubernetes.Interface client kubernetes.Interface
config *config.Control config *config.Control
server *remotedialer.Server server *remotedialer.Server
egress map[string]bool
} }
// explicit interface check // explicit interface check
var _ cidranger.RangerEntry = &nodeAddress{} var _ cidranger.RangerEntry = &tunnelEntry{}
var _ cidranger.RangerEntry = &nodeCIDR{}
type nodeAddress struct { type tunnelEntry struct {
cidr net.IPNet cidr net.IPNet
node string node string
kubelet bool
} }
func (n *nodeAddress) Network() net.IPNet { func (n *tunnelEntry) Network() net.IPNet {
return n.cidr
}
type nodeCIDR struct {
cidr net.IPNet
clusterEgress bool
node string
}
func (n *nodeCIDR) Network() net.IPNet {
return n.cidr return n.cidr
} }
@ -92,24 +86,39 @@ func (t *TunnelServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
} }
} }
// watchNodes waits for the runtime core to become available, // watch waits for the runtime core to become available,
// and registers an OnChange handler to observe PodCIDR changes. // and registers OnChange handlers to observe changes to Nodes (and Endpoints if necessary).
func (t *TunnelServer) watchNodes(ctx context.Context) { func (t *TunnelServer) watch(ctx context.Context) {
logrus.Infof("Tunnel server egress proxy mode: %s", t.config.EgressSelectorMode)
if t.config.EgressSelectorMode == config.EgressSelectorModeDisabled {
return
}
for { for {
if t.config.Runtime.Core != nil { if t.config.Runtime.Core != nil {
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode) t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
if t.config.EgressSelectorMode == config.EgressSelectorModeCluster {
// Cluster mode watches Endpoints to find what Node is hosting an Endpoint address, as the CNI
// may be using its own IPAM that does not repsect the Node's PodCIDR.
t.config.Runtime.Core.Core().V1().Endpoints().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeEndpoints)
}
return return
} }
logrus.Infof("Tunnel server waiting for runtime core to become available") logrus.Infof("Tunnel server egress proxy waiting for runtime core to become available")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
} }
// onChangeNode updates the node address/CIDR mappings by observing changes to nodes. // onChangeNode updates the node address/CIDR mappings by observing changes to nodes.
// Node addresses are updated in Agent, Cluster, and Pod mode.
// Pod CIDRs are updated only in Pod mode
func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, error) { func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, error) {
if node != nil { if node != nil {
logrus.Debugf("Tunnel server updating node %s", nodeName) t.Lock()
_, clusterEgress := node.Labels[nodeconfig.ClusterEgressLabel] defer t.Unlock()
logrus.Debugf("Tunnel server egress proxy updating node %s", nodeName)
_, t.egress[nodeName] = node.Labels[nodeconfig.ClusterEgressLabel]
// Add all node IP addresses // Add all node IP addresses
for _, addr := range node.Status.Addresses { for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP { if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP {
@ -123,18 +132,20 @@ func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, e
if node.DeletionTimestamp != nil { if node.DeletionTimestamp != nil {
t.cidrs.Remove(*n) t.cidrs.Remove(*n)
} else { } else {
t.cidrs.Insert(&nodeAddress{cidr: *n, node: nodeName}) t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName, kubelet: true})
} }
} }
} }
} }
// Add all node PodCIDRs // Add all Node PodCIDRs, if in pod mode
for _, cidr := range node.Spec.PodCIDRs { if t.config.EgressSelectorMode == config.EgressSelectorModePod {
if _, n, err := net.ParseCIDR(cidr); err == nil { for _, cidr := range node.Spec.PodCIDRs {
if node.DeletionTimestamp != nil { if _, n, err := net.ParseCIDR(cidr); err == nil {
t.cidrs.Remove(*n) if node.DeletionTimestamp != nil {
} else { t.cidrs.Remove(*n)
t.cidrs.Insert(&nodeCIDR{cidr: *n, clusterEgress: clusterEgress, node: nodeName}) } else {
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName})
}
} }
} }
} }
@ -142,6 +153,47 @@ func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, e
return node, nil return node, nil
} }
// onChangeEndpoits updates the pod address mappings by observing changes to endpoints.
// Only Pod endpoints with a defined NodeName are used, and only in Cluster mode.
func (t *TunnelServer) onChangeEndpoints(endpointsName string, endpoints *v1.Endpoints) (*v1.Endpoints, error) {
if endpoints != nil {
t.Lock()
defer t.Unlock()
logrus.Debugf("Tunnel server egress proxy updating endpoints %s", endpointsName)
// Add all Pod endpoints
for _, subset := range endpoints.Subsets {
for _, addr := range subset.Addresses {
if addr.NodeName != nil && addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
nodeName := *addr.NodeName
address := addr.IP
if strings.Contains(address, ":") {
address += "/128"
} else {
address += "/32"
}
if _, n, err := net.ParseCIDR(address); err == nil {
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName})
}
}
}
for _, addr := range subset.NotReadyAddresses {
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
address := addr.IP
if strings.Contains(address, ":") {
address += "/128"
} else {
address += "/32"
}
if _, n, err := net.ParseCIDR(address); err == nil {
t.cidrs.Remove(*n)
}
}
}
}
}
return endpoints, nil
}
// serveConnect attempts to handle the HTTP CONNECT request by dialing // serveConnect attempts to handle the HTTP CONNECT request by dialing
// a connection, either locally or via the remotedialer tunnel. // a connection, either locally or via the remotedialer tunnel.
func (t *TunnelServer) serveConnect(resp http.ResponseWriter, req *http.Request) { func (t *TunnelServer) serveConnect(resp http.ResponseWriter, req *http.Request) {
@ -182,19 +234,19 @@ func (t *TunnelServer) dialBackend(addr string) (net.Conn, error) {
var node string var node string
var toKubelet, useTunnel bool var toKubelet, useTunnel bool
if ip := net.ParseIP(host); ip != nil { if ip := net.ParseIP(host); ip != nil {
// Destination is an IP address, check to see if the target is a CIDR or node address. // Destination is an IP address, check to see if the target is a kubelet or pod address.
// We can only use the tunnel for egress to pods if the agent supports it. // We can only use the tunnel for egress to pods if the agent supports it.
if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 { if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
switch n := nets[0].(type) { if n, ok := nets[0].(*tunnelEntry); ok {
case *nodeAddress:
node = n.node node = n.node
toKubelet = true if n.kubelet {
useTunnel = true toKubelet = true
case *nodeCIDR: useTunnel = true
node = n.node } else {
useTunnel = n.clusterEgress useTunnel = t.egress[node]
default: }
logrus.Debugf("Tunnel server CIDR lookup returned unknown type %T for address %s", nets[0], ip) } else {
logrus.Debugf("Tunnel server egress proxy CIDR lookup returned unknown type for address %s", ip)
} }
} }
} else { } else {
@ -217,16 +269,16 @@ func (t *TunnelServer) dialBackend(addr string) (net.Conn, error) {
if t.server.HasSession(node) { if t.server.HasSession(node) {
if useTunnel { if useTunnel {
// Have a session and it is safe to use for this destination, do so. // Have a session and it is safe to use for this destination, do so.
logrus.Debugf("Tunnel server dialing %s via session to %s", addr, node) logrus.Debugf("Tunnel server egress proxy dialing %s via session to %s", addr, node)
return t.server.Dial(node, 15*time.Second, "tcp", addr) return t.server.Dial(node, 15*time.Second, "tcp", addr)
} }
// Have a session but the agent doesn't support tunneling to this destination or // Have a session but the agent doesn't support tunneling to this destination or
// the destination is local; fall back to direct connection. // the destination is local; fall back to direct connection.
logrus.Debugf("Tunnel server dialing %s directly", addr) logrus.Debugf("Tunnel server egress proxy dialing %s directly", addr)
return net.Dial("tcp", addr) return net.Dial("tcp", addr)
} }
// don't provide a proxy connection for anything else // don't provide a proxy connection for anything else
logrus.Debugf("Tunnel server rejecting connection to %s", addr) logrus.Debugf("Tunnel server egress proxy rejecting connection to %s", addr)
return nil, fmt.Errorf("no sessions available for host %s", host) return nil, fmt.Errorf("no sessions available for host %s", host)
} }