mirror of https://github.com/k3s-io/k3s
Refactor egress-selector pods mode to watch pods
Watching pods appears to be the most reliable way to ensure that the proxy routes and authorizes connections. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/5663/head v1.22.10-rc2+k3s1
parent
571e4812db
commit
28eb743905
|
@ -15,7 +15,6 @@ import (
|
||||||
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
|
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
|
||||||
"github.com/k3s-io/k3s/pkg/util"
|
"github.com/k3s-io/k3s/pkg/util"
|
||||||
"github.com/k3s-io/k3s/pkg/version"
|
"github.com/k3s-io/k3s/pkg/version"
|
||||||
"github.com/pkg/errors"
|
|
||||||
"github.com/rancher/remotedialer"
|
"github.com/rancher/remotedialer"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yl2chen/cidranger"
|
"github.com/yl2chen/cidranger"
|
||||||
|
@ -29,14 +28,27 @@ import (
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
toolswatch "k8s.io/client-go/tools/watch"
|
toolswatch "k8s.io/client-go/tools/watch"
|
||||||
|
"k8s.io/kubectl/pkg/util/podutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
type agentTunnel struct {
|
||||||
ports = map[string]bool{
|
client kubernetes.Interface
|
||||||
"10250": true,
|
cidrs cidranger.Ranger
|
||||||
"10010": true,
|
ports map[string]bool
|
||||||
}
|
mode string
|
||||||
)
|
}
|
||||||
|
|
||||||
|
// explicit interface check
|
||||||
|
var _ cidranger.RangerEntry = &podEntry{}
|
||||||
|
|
||||||
|
type podEntry struct {
|
||||||
|
cidr net.IPNet
|
||||||
|
hostNet bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *podEntry) Network() net.IPNet {
|
||||||
|
return p.cidr
|
||||||
|
}
|
||||||
|
|
||||||
func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) error {
|
func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) error {
|
||||||
restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigK3sController)
|
restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigK3sController)
|
||||||
|
@ -62,14 +74,25 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
||||||
tunnel := &agentTunnel{
|
tunnel := &agentTunnel{
|
||||||
client: client,
|
client: client,
|
||||||
cidrs: cidranger.NewPCTrieRanger(),
|
cidrs: cidranger.NewPCTrieRanger(),
|
||||||
|
ports: map[string]bool{},
|
||||||
mode: config.EgressSelectorMode,
|
mode: config.EgressSelectorMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
if tunnel.mode == daemonconfig.EgressSelectorModeCluster {
|
apiServerReady := make(chan struct{})
|
||||||
for _, cidr := range config.AgentConfig.ClusterCIDRs {
|
go func() {
|
||||||
logrus.Infof("Tunnel authorizer added Cluster CIDR %s", cidr)
|
if err := util.WaitForAPIServerReady(ctx, config.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
|
||||||
tunnel.cidrs.Insert(cidranger.NewBasicRangerEntry(*cidr))
|
logrus.Fatalf("Tunnel watches failed to wait for apiserver ready: %v", err)
|
||||||
}
|
}
|
||||||
|
close(apiServerReady)
|
||||||
|
}()
|
||||||
|
|
||||||
|
switch tunnel.mode {
|
||||||
|
case daemonconfig.EgressSelectorModeCluster:
|
||||||
|
// In Cluster mode, we allow the cluster CIDRs, and any connections to the node's IPs for pods using host network.
|
||||||
|
tunnel.clusterAuth(config)
|
||||||
|
case daemonconfig.EgressSelectorModePod:
|
||||||
|
// In Pod mode, we watch pods assigned to this node, and allow their addresses, as well as ports used by containers with host network.
|
||||||
|
go tunnel.watchPods(ctx, apiServerReady, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The loadbalancer is only disabled when there is a local apiserver. Servers without a local
|
// The loadbalancer is only disabled when there is a local apiserver. Servers without a local
|
||||||
|
@ -93,24 +116,132 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
|
go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, proxy)
|
||||||
|
|
||||||
|
wait := make(chan int, 1)
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
wait <- 0
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
logrus.Error("Tunnel context canceled while waiting for connection")
|
||||||
|
return ctx.Err()
|
||||||
|
case <-wait:
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *agentTunnel) clusterAuth(config *daemonconfig.Node) {
|
||||||
|
// In Cluster mode, we add static entries for the Node IPs and Cluster CIDRs
|
||||||
|
for _, ip := range config.AgentConfig.NodeIPs {
|
||||||
|
if cidr, err := util.IPToIPNet(ip); err == nil {
|
||||||
|
logrus.Infof("Tunnel authorizer adding Node IP %s", cidr)
|
||||||
|
a.cidrs.Insert(&podEntry{cidr: *cidr})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, cidr := range config.AgentConfig.ClusterCIDRs {
|
||||||
|
logrus.Infof("Tunnel authorizer adding Cluster CIDR %s", cidr)
|
||||||
|
a.cidrs.Insert(&podEntry{cidr: *cidr})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// watchPods watches for pods assigned to this node, adding their IPs to the CIDR list.
|
||||||
|
// If the pod uses host network, we instead add the
|
||||||
|
func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struct{}, config *daemonconfig.Node) {
|
||||||
|
for _, ip := range config.AgentConfig.NodeIPs {
|
||||||
|
if cidr, err := util.IPToIPNet(ip); err == nil {
|
||||||
|
logrus.Infof("Tunnel authorizer adding Node IP %s", cidr)
|
||||||
|
a.cidrs.Insert(&podEntry{cidr: *cidr, hostNet: true})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<-apiServerReady
|
||||||
|
|
||||||
|
nodeName := os.Getenv("NODE_NAME")
|
||||||
|
pods := a.client.CoreV1().Pods(metav1.NamespaceNone)
|
||||||
|
fieldSelector := fields.Set{"spec.nodeName": nodeName}.String()
|
||||||
|
lw := &cache.ListWatch{
|
||||||
|
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
|
||||||
|
options.FieldSelector = fieldSelector
|
||||||
|
return pods.List(ctx, options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
|
||||||
|
options.FieldSelector = fieldSelector
|
||||||
|
return pods.Watch(ctx, options)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Infof("Tunnnel authorizer watching Pods")
|
||||||
|
_, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Pod{})
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
watch.Stop()
|
||||||
|
<-done
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case ev, ok := <-watch.ResultChan():
|
||||||
|
pod, ok := ev.Object.(*v1.Pod)
|
||||||
|
if !ok {
|
||||||
|
logrus.Errorf("Tunnel watch failed: event object not of type v1.Pod")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ready := podutils.IsPodReady(pod)
|
||||||
|
if pod.Spec.HostNetwork {
|
||||||
|
for _, container := range pod.Spec.Containers {
|
||||||
|
for _, port := range container.Ports {
|
||||||
|
if port.Protocol == v1.ProtocolTCP {
|
||||||
|
containerPort := fmt.Sprint(port.ContainerPort)
|
||||||
|
if ready {
|
||||||
|
logrus.Debugf("Tunnel authorizer adding Node Port %s", containerPort)
|
||||||
|
a.ports[containerPort] = true
|
||||||
|
} else {
|
||||||
|
logrus.Debugf("Tunnel authorizer removing Node Port %s", containerPort)
|
||||||
|
delete(a.ports, containerPort)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _, ip := range pod.Status.PodIPs {
|
||||||
|
if cidr, err := util.IPStringToIPNet(ip.IP); err == nil {
|
||||||
|
if ready {
|
||||||
|
logrus.Debugf("Tunnel authorizer adding Pod IP %s", cidr)
|
||||||
|
a.cidrs.Insert(&podEntry{cidr: *cidr})
|
||||||
|
} else {
|
||||||
|
logrus.Debugf("Tunnel authorizer removing Pod IP %s", cidr)
|
||||||
|
a.cidrs.Remove(*cidr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the
|
||||||
|
// apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
|
||||||
|
// and go from the cluster.
|
||||||
|
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) {
|
||||||
// Attempt to connect to supervisors, storing their cancellation function for later when we
|
// Attempt to connect to supervisors, storing their cancellation function for later when we
|
||||||
// need to disconnect.
|
// need to disconnect.
|
||||||
disconnect := map[string]context.CancelFunc{}
|
disconnect := map[string]context.CancelFunc{}
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
for _, address := range proxy.SupervisorAddresses() {
|
for _, address := range proxy.SupervisorAddresses() {
|
||||||
if _, ok := disconnect[address]; !ok {
|
if _, ok := disconnect[address]; !ok {
|
||||||
disconnect[address] = tunnel.connect(ctx, wg, address, tlsConfig)
|
disconnect[address] = a.connect(ctx, wg, address, tlsConfig)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
|
<-apiServerReady
|
||||||
// and go from the cluster.
|
endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault)
|
||||||
go func() {
|
|
||||||
if err := util.WaitForAPIServerReady(ctx, config.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
|
|
||||||
logrus.Warnf("Tunnel endpoint watch failed to wait for apiserver ready: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
endpoints := client.CoreV1().Endpoints(metav1.NamespaceDefault)
|
|
||||||
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
|
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
|
||||||
lw := &cache.ListWatch{
|
lw := &cache.ListWatch{
|
||||||
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
|
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
|
||||||
|
@ -152,7 +283,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
||||||
for _, address := range proxy.SupervisorAddresses() {
|
for _, address := range proxy.SupervisorAddresses() {
|
||||||
validEndpoint[address] = true
|
validEndpoint[address] = true
|
||||||
if _, ok := disconnect[address]; !ok {
|
if _, ok := disconnect[address]; !ok {
|
||||||
disconnect[address] = tunnel.connect(ctx, nil, address, tlsConfig)
|
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,29 +296,6 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
|
||||||
wait := make(chan int, 1)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
wait <- 0
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
logrus.Error("Tunnel context canceled while waiting for connection")
|
|
||||||
return ctx.Err()
|
|
||||||
case <-wait:
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type agentTunnel struct {
|
|
||||||
sync.Mutex
|
|
||||||
client kubernetes.Interface
|
|
||||||
cidrs cidranger.Ranger
|
|
||||||
mode string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// authorized determines whether or not a dial request is authorized.
|
// authorized determines whether or not a dial request is authorized.
|
||||||
|
@ -198,46 +306,24 @@ func (a *agentTunnel) authorized(ctx context.Context, proto, address string) boo
|
||||||
logrus.Debugf("Tunnel authorizer checking dial request for %s", address)
|
logrus.Debugf("Tunnel authorizer checking dial request for %s", address)
|
||||||
host, port, err := net.SplitHostPort(address)
|
host, port, err := net.SplitHostPort(address)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if proto == "tcp" && ports[port] && (host == "127.0.0.1" || host == "::1") {
|
if proto == "tcp" && daemonconfig.KubeletReservedPorts[port] && (host == "127.0.0.1" || host == "::1") {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if ip := net.ParseIP(host); ip != nil {
|
if ip := net.ParseIP(host); ip != nil {
|
||||||
// lazy populate pod cidrs from the node object
|
|
||||||
if a.mode == daemonconfig.EgressSelectorModePod && a.cidrs.Len() == 0 {
|
|
||||||
if err := a.updatePodCIDRs(ctx); err != nil {
|
|
||||||
logrus.Warnf("Tunnel authorizer failed to update CIDRs: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if nets, err := a.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
|
if nets, err := a.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
|
||||||
|
if p, ok := nets[0].(*podEntry); ok {
|
||||||
|
if p.hostNet {
|
||||||
|
return proto == "tcp" && a.ports[port]
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
logrus.Debugf("Tunnel authorizer CIDR lookup returned unknown type for address %s", ip)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *agentTunnel) updatePodCIDRs(ctx context.Context) error {
|
|
||||||
a.Lock()
|
|
||||||
defer a.Unlock()
|
|
||||||
// Return early if another goroutine updated the CIDRs while we were waiting to lock
|
|
||||||
if a.cidrs.Len() != 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
nodeName := os.Getenv("NODE_NAME")
|
|
||||||
logrus.Debugf("Tunnel authorizer getting Pod CIDRs for %s", nodeName)
|
|
||||||
node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "failed to get Pod CIDRs")
|
|
||||||
}
|
|
||||||
for _, cidr := range node.Spec.PodCIDRs {
|
|
||||||
if _, n, err := net.ParseCIDR(cidr); err == nil {
|
|
||||||
logrus.Infof("Tunnel authorizer added Pod CIDR %s", cidr)
|
|
||||||
a.cidrs.Insert(cidranger.NewBasicRangerEntry(*n))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// connect initiates a connection to the remotedialer server. Incoming dial requests from
|
// connect initiates a connection to the remotedialer server. Incoming dial requests from
|
||||||
// the server will be checked by the authorizer function prior to being fulfilled.
|
// the server will be checked by the authorizer function prior to being fulfilled.
|
||||||
func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc {
|
func (a *agentTunnel) connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, tlsConfig *tls.Config) context.CancelFunc {
|
||||||
|
|
|
@ -30,8 +30,17 @@ const (
|
||||||
EgressSelectorModeDisabled = "disabled"
|
EgressSelectorModeDisabled = "disabled"
|
||||||
EgressSelectorModePod = "pod"
|
EgressSelectorModePod = "pod"
|
||||||
CertificateRenewDays = 90
|
CertificateRenewDays = 90
|
||||||
|
StreamServerPort = "10010"
|
||||||
|
KubeletPort = "10250"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// These ports can always be accessed via the tunnel server, at the loopback address.
|
||||||
|
// Other addresses and ports are only accessible via the tunnel on newer agents, when used by a pod.
|
||||||
|
var KubeletReservedPorts = map[string]bool{
|
||||||
|
StreamServerPort: true,
|
||||||
|
KubeletPort: true,
|
||||||
|
}
|
||||||
|
|
||||||
type Node struct {
|
type Node struct {
|
||||||
Docker bool
|
Docker bool
|
||||||
ContainerRuntimeEndpoint string
|
ContainerRuntimeEndpoint string
|
||||||
|
|
|
@ -11,12 +11,18 @@ import (
|
||||||
|
|
||||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||||
"github.com/k3s-io/k3s/pkg/daemons/control/proxy"
|
"github.com/k3s-io/k3s/pkg/daemons/control/proxy"
|
||||||
|
"github.com/k3s-io/k3s/pkg/generated/clientset/versioned/scheme"
|
||||||
"github.com/k3s-io/k3s/pkg/nodeconfig"
|
"github.com/k3s-io/k3s/pkg/nodeconfig"
|
||||||
|
"github.com/k3s-io/k3s/pkg/util"
|
||||||
"github.com/k3s-io/k3s/pkg/version"
|
"github.com/k3s-io/k3s/pkg/version"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/rancher/remotedialer"
|
"github.com/rancher/remotedialer"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yl2chen/cidranger"
|
"github.com/yl2chen/cidranger"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
)
|
)
|
||||||
|
@ -68,8 +74,8 @@ var _ cidranger.RangerEntry = &tunnelEntry{}
|
||||||
|
|
||||||
type tunnelEntry struct {
|
type tunnelEntry struct {
|
||||||
cidr net.IPNet
|
cidr net.IPNet
|
||||||
node string
|
nodeName string
|
||||||
kubelet bool
|
node bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *tunnelEntry) Network() net.IPNet {
|
func (n *tunnelEntry) Network() net.IPNet {
|
||||||
|
@ -98,10 +104,9 @@ func (t *TunnelServer) watch(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
if t.config.Runtime.Core != nil {
|
if t.config.Runtime.Core != nil {
|
||||||
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
|
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
|
||||||
if t.config.EgressSelectorMode == config.EgressSelectorModeCluster {
|
switch t.config.EgressSelectorMode {
|
||||||
// Cluster mode watches Endpoints to find what Node is hosting an Endpoint address, as the CNI
|
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
|
||||||
// may be using its own IPAM that does not repsect the Node's PodCIDR.
|
t.config.Runtime.Core.Core().V1().Pod().OnChange(ctx, version.Program+"-tunnel-server", t.onChangePod)
|
||||||
t.config.Runtime.Core.Core().V1().Endpoints().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeEndpoints)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -110,41 +115,22 @@ func (t *TunnelServer) watch(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// onChangeNode updates the node address/CIDR mappings by observing changes to nodes.
|
// onChangeNode updates the node address mappings by observing changes to nodes.
|
||||||
// Node addresses are updated in Agent, Cluster, and Pod mode.
|
|
||||||
// Pod CIDRs are updated only in Pod mode
|
|
||||||
func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, error) {
|
func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, error) {
|
||||||
if node != nil {
|
if node != nil {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
logrus.Debugf("Tunnel server egress proxy updating node %s", nodeName)
|
|
||||||
_, t.egress[nodeName] = node.Labels[nodeconfig.ClusterEgressLabel]
|
_, t.egress[nodeName] = node.Labels[nodeconfig.ClusterEgressLabel]
|
||||||
// Add all node IP addresses
|
// Add all node IP addresses
|
||||||
for _, addr := range node.Status.Addresses {
|
for _, addr := range node.Status.Addresses {
|
||||||
if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP {
|
if addr.Type == v1.NodeInternalIP || addr.Type == v1.NodeExternalIP {
|
||||||
address := addr.Address
|
if n, err := util.IPStringToIPNet(addr.Address); err == nil {
|
||||||
if strings.Contains(address, ":") {
|
|
||||||
address += "/128"
|
|
||||||
} else {
|
|
||||||
address += "/32"
|
|
||||||
}
|
|
||||||
if _, n, err := net.ParseCIDR(address); err == nil {
|
|
||||||
if node.DeletionTimestamp != nil {
|
if node.DeletionTimestamp != nil {
|
||||||
|
logrus.Debugf("Tunnel server egress proxy removing Node %s IP %v", nodeName, n)
|
||||||
t.cidrs.Remove(*n)
|
t.cidrs.Remove(*n)
|
||||||
} else {
|
} else {
|
||||||
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName, kubelet: true})
|
logrus.Debugf("Tunnel server egress proxy updating Node %s IP %v", nodeName, n)
|
||||||
}
|
t.cidrs.Insert(&tunnelEntry{cidr: *n, nodeName: nodeName, node: true})
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Add all Node PodCIDRs, if in pod mode
|
|
||||||
if t.config.EgressSelectorMode == config.EgressSelectorModePod {
|
|
||||||
for _, cidr := range node.Spec.PodCIDRs {
|
|
||||||
if _, n, err := net.ParseCIDR(cidr); err == nil {
|
|
||||||
if node.DeletionTimestamp != nil {
|
|
||||||
t.cidrs.Remove(*n)
|
|
||||||
} else {
|
|
||||||
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -153,45 +139,29 @@ func (t *TunnelServer) onChangeNode(nodeName string, node *v1.Node) (*v1.Node, e
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// onChangeEndpoits updates the pod address mappings by observing changes to endpoints.
|
// onChangePod updates the pod address mappings by observing changes to pods.
|
||||||
// Only Pod endpoints with a defined NodeName are used, and only in Cluster mode.
|
func (t *TunnelServer) onChangePod(podName string, pod *v1.Pod) (*v1.Pod, error) {
|
||||||
func (t *TunnelServer) onChangeEndpoints(endpointsName string, endpoints *v1.Endpoints) (*v1.Endpoints, error) {
|
if pod != nil {
|
||||||
if endpoints != nil {
|
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
logrus.Debugf("Tunnel server egress proxy updating endpoints %s", endpointsName)
|
// Add all pod IPs, unless the pod uses host network
|
||||||
// Add all Pod endpoints
|
if !pod.Spec.HostNetwork {
|
||||||
for _, subset := range endpoints.Subsets {
|
nodeName := pod.Spec.NodeName
|
||||||
for _, addr := range subset.Addresses {
|
for _, ip := range pod.Status.PodIPs {
|
||||||
if addr.NodeName != nil && addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
|
if cidr, err := util.IPStringToIPNet(ip.IP); err == nil {
|
||||||
nodeName := *addr.NodeName
|
if pod.DeletionTimestamp != nil {
|
||||||
address := addr.IP
|
logrus.Debugf("Tunnel server egress proxy removing Node %s Pod IP %v", nodeName, cidr)
|
||||||
if strings.Contains(address, ":") {
|
t.cidrs.Remove(*cidr)
|
||||||
address += "/128"
|
|
||||||
} else {
|
} else {
|
||||||
address += "/32"
|
logrus.Debugf("Tunnel server egress proxy updating Node %s Pod IP %s", nodeName, cidr)
|
||||||
}
|
t.cidrs.Insert(&tunnelEntry{cidr: *cidr, nodeName: nodeName})
|
||||||
if _, n, err := net.ParseCIDR(address); err == nil {
|
|
||||||
t.cidrs.Insert(&tunnelEntry{cidr: *n, node: nodeName})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, addr := range subset.NotReadyAddresses {
|
|
||||||
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
|
|
||||||
address := addr.IP
|
|
||||||
if strings.Contains(address, ":") {
|
|
||||||
address += "/128"
|
|
||||||
} else {
|
|
||||||
address += "/32"
|
|
||||||
}
|
|
||||||
if _, n, err := net.ParseCIDR(address); err == nil {
|
|
||||||
t.cidrs.Remove(*n)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return endpoints, nil
|
return pod, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// serveConnect attempts to handle the HTTP CONNECT request by dialing
|
// serveConnect attempts to handle the HTTP CONNECT request by dialing
|
||||||
|
@ -199,20 +169,29 @@ func (t *TunnelServer) onChangeEndpoints(endpointsName string, endpoints *v1.End
|
||||||
func (t *TunnelServer) serveConnect(resp http.ResponseWriter, req *http.Request) {
|
func (t *TunnelServer) serveConnect(resp http.ResponseWriter, req *http.Request) {
|
||||||
bconn, err := t.dialBackend(req.Host)
|
bconn, err := t.dialBackend(req.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(resp, fmt.Sprintf("no tunnels available: %v", err), http.StatusInternalServerError)
|
responsewriters.ErrorNegotiated(
|
||||||
|
apierrors.NewInternalError(errors.Wrap(err, "no tunnels available")),
|
||||||
|
scheme.Codecs.WithoutConversion(), schema.GroupVersion{}, resp, req,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
hijacker, ok := resp.(http.Hijacker)
|
hijacker, ok := resp.(http.Hijacker)
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(resp, "hijacking not supported", http.StatusInternalServerError)
|
responsewriters.ErrorNegotiated(
|
||||||
|
apierrors.NewInternalError(errors.New("hijacking not supported")),
|
||||||
|
scheme.Codecs.WithoutConversion(), schema.GroupVersion{}, resp, req,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp.WriteHeader(http.StatusOK)
|
resp.WriteHeader(http.StatusOK)
|
||||||
|
|
||||||
rconn, _, err := hijacker.Hijack()
|
rconn, _, err := hijacker.Hijack()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(resp, err.Error(), http.StatusInternalServerError)
|
responsewriters.ErrorNegotiated(
|
||||||
|
apierrors.NewInternalError(err),
|
||||||
|
scheme.Codecs.WithoutConversion(), schema.GroupVersion{}, resp, req,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,46 +210,46 @@ func (t *TunnelServer) dialBackend(addr string) (net.Conn, error) {
|
||||||
}
|
}
|
||||||
loopback := t.config.Loopback()
|
loopback := t.config.Loopback()
|
||||||
|
|
||||||
var node string
|
var nodeName string
|
||||||
var toKubelet, useTunnel bool
|
var toKubelet, useTunnel bool
|
||||||
if ip := net.ParseIP(host); ip != nil {
|
if ip := net.ParseIP(host); ip != nil {
|
||||||
// Destination is an IP address, check to see if the target is a kubelet or pod address.
|
// Destination is an IP address, which could be either a pod, or node by IP.
|
||||||
// We can only use the tunnel for egress to pods if the agent supports it.
|
// We can only use the tunnel for egress to pods if the agent supports it.
|
||||||
if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
|
if nets, err := t.cidrs.ContainingNetworks(ip); err == nil && len(nets) > 0 {
|
||||||
if n, ok := nets[0].(*tunnelEntry); ok {
|
if n, ok := nets[0].(*tunnelEntry); ok {
|
||||||
node = n.node
|
nodeName = n.nodeName
|
||||||
if n.kubelet {
|
if n.node && config.KubeletReservedPorts[port] {
|
||||||
toKubelet = true
|
toKubelet = true
|
||||||
useTunnel = true
|
useTunnel = true
|
||||||
} else {
|
} else {
|
||||||
useTunnel = t.egress[node]
|
useTunnel = t.egress[nodeName]
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Debugf("Tunnel server egress proxy CIDR lookup returned unknown type for address %s", ip)
|
logrus.Debugf("Tunnel server egress proxy CIDR lookup returned unknown type for address %s", ip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Destination is a kubelet by name, it is safe to use the tunnel.
|
// Destination is a node by name, it is safe to use the tunnel.
|
||||||
node = host
|
nodeName = host
|
||||||
toKubelet = true
|
toKubelet = true
|
||||||
useTunnel = true
|
useTunnel = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always dial kubelets via the loopback address.
|
// Always dial kubelet via the loopback address.
|
||||||
if toKubelet {
|
if toKubelet {
|
||||||
addr = fmt.Sprintf("%s:%s", loopback, port)
|
addr = fmt.Sprintf("%s:%s", loopback, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If connecting to something hosted by the local node, don't tunnel
|
// If connecting to something hosted by the local node, don't tunnel
|
||||||
if node == t.config.ServerNodeName {
|
if nodeName == t.config.ServerNodeName {
|
||||||
useTunnel = false
|
useTunnel = false
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.server.HasSession(node) {
|
if t.server.HasSession(nodeName) {
|
||||||
if useTunnel {
|
if useTunnel {
|
||||||
// Have a session and it is safe to use for this destination, do so.
|
// Have a session and it is safe to use for this destination, do so.
|
||||||
logrus.Debugf("Tunnel server egress proxy dialing %s via session to %s", addr, node)
|
logrus.Debugf("Tunnel server egress proxy dialing %s via session to %s", addr, nodeName)
|
||||||
return t.server.Dial(node, 15*time.Second, "tcp", addr)
|
return t.server.Dial(nodeName, 15*time.Second, "tcp", addr)
|
||||||
}
|
}
|
||||||
// Have a session but the agent doesn't support tunneling to this destination or
|
// Have a session but the agent doesn't support tunneling to this destination or
|
||||||
// the destination is local; fall back to direct connection.
|
// the destination is local; fall back to direct connection.
|
||||||
|
|
|
@ -256,3 +256,26 @@ func IsIPv6OnlyCIDRs(cidrs []*net.IPNet) (bool, error) {
|
||||||
|
|
||||||
return !v4Found && v6Found, nil
|
return !v4Found && v6Found, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IPToIPNet converts an IP to an IPNet, using a fully filled mask appropriate for the address family.
|
||||||
|
func IPToIPNet(ip net.IP) (*net.IPNet, error) {
|
||||||
|
address := ip.String()
|
||||||
|
if strings.Contains(address, ":") {
|
||||||
|
address += "/128"
|
||||||
|
} else {
|
||||||
|
address += "/32"
|
||||||
|
}
|
||||||
|
_, cidr, err := net.ParseCIDR(address)
|
||||||
|
return cidr, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// IPStringToIPNet converts an IP string to an IPNet, using a fully filled mask appropriate for the address family.
|
||||||
|
func IPStringToIPNet(address string) (*net.IPNet, error) {
|
||||||
|
if strings.Contains(address, ":") {
|
||||||
|
address += "/128"
|
||||||
|
} else {
|
||||||
|
address += "/32"
|
||||||
|
}
|
||||||
|
_, cidr, err := net.ParseCIDR(address)
|
||||||
|
return cidr, err
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue