diff --git a/manifests/ccm.yaml b/manifests/ccm.yaml index f66a68ce1c..d72b7fded9 100644 --- a/manifests/ccm.yaml +++ b/manifests/ccm.yaml @@ -67,6 +67,14 @@ rules: - daemonsets verbs: - "*" +- apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + verbs: + - get + - list + - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index a44e10b52c..236bc97ef8 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -12,6 +12,8 @@ import ( appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1" "github.com/rancher/wrangler/pkg/generated/controllers/core" coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + "github.com/rancher/wrangler/pkg/generated/controllers/discovery" + discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1" "github.com/rancher/wrangler/pkg/generic" "github.com/rancher/wrangler/pkg/start" "github.com/sirupsen/logrus" @@ -41,6 +43,7 @@ type k3s struct { processor apply.Apply daemonsetCache appsclient.DaemonSetCache + endpointsCache discoveryclient.EndpointSliceCache nodeCache coreclient.NodeCache podCache coreclient.PodCache workqueue workqueue.RateLimitingInterface @@ -89,6 +92,7 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace}) lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace}) + lbDiscFactory := discovery.NewFactoryFromConfigOrDie(config) processor, err := apply.NewForConfig(config) if err != nil { @@ -96,14 +100,15 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st } k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet()) k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache() + k.endpointsCache = lbDiscFactory.Discovery().V1().EndpointSlice().Cache() k.podCache = lbCoreFactory.Core().V1().Pod().Cache() k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil { + if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice()); err != nil { logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err) } - if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil { + if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory, lbDiscFactory); err != nil { logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err) } } else { diff --git a/pkg/cloudprovider/servicelb.go b/pkg/cloudprovider/servicelb.go index dfc70bcae5..89478f2069 100644 --- a/pkg/cloudprovider/servicelb.go +++ b/pkg/cloudprovider/servicelb.go @@ -12,11 +12,13 @@ import ( "github.com/k3s-io/k3s/pkg/version" "github.com/rancher/wrangler/pkg/condition" coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1" "github.com/rancher/wrangler/pkg/merr" "github.com/rancher/wrangler/pkg/objectset" "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -48,9 +50,11 @@ const ( func (k *k3s) Register(ctx context.Context, nodes coreclient.NodeController, pods coreclient.PodController, + endpointslices discoveryclient.EndpointSliceController, ) error { nodes.OnChange(ctx, controllerName, k.onChangeNode) pods.OnChange(ctx, controllerName, k.onChangePod) + endpointslices.OnChange(ctx, controllerName, k.onChangeEndpointSlice) if err := k.createServiceLBNamespace(ctx); err != nil { return err @@ -135,6 +139,22 @@ func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) { return node, nil } +// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer +// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local. +func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*discovery.EndpointSlice, error) { + if eps == nil { + return nil, nil + } + + serviceName, ok := eps.Labels[discovery.LabelServiceName] + if !ok { + return eps, nil + } + + k.workqueue.Add(eps.Namespace + "/" + serviceName) + return eps, nil +} + // runWorker dequeues Service changes from the work queue // We run a lightweight work queue to handle service updates. We don't need the full overhead // of a wrangler service controller and shared informer cache, but we do want to run changes @@ -219,16 +239,37 @@ func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { // getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods // matching the selected service. func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) { - pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{ + var readyNodes map[string]bool + + if servicehelper.RequestsOnlyLocalTraffic(svc) { + readyNodes = map[string]bool{} + eps, err := k.endpointsCache.List(svc.Namespace, labels.SelectorFromSet(labels.Set{ + discovery.LabelServiceName: svc.Name, + })) + if err != nil { + return nil, err + } + + for _, ep := range eps { + for _, endpoint := range ep.Endpoints { + isPod := endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" + isReady := endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready + if isPod && isReady && endpoint.NodeName != nil { + readyNodes[*endpoint.NodeName] = true + } + } + } + } + + pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{ svcNameLabel: svc.Name, svcNamespaceLabel: svc.Namespace, })) - if err != nil { return nil, err } - expectedIPs, err := k.podIPs(pods, svc) + expectedIPs, err := k.podIPs(pods, svc, readyNodes) if err != nil { return nil, err } @@ -267,7 +308,7 @@ func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.Loa // podIPs returns a list of IPs for Nodes hosting ServiceLB Pods. // If at least one node has External IPs available, only external IPs are returned. // If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned. -func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) { +func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service, readyNodes map[string]bool) ([]string, error) { // Go doesn't have sets so we stuff things into a map of bools and then get lists of keys // to determine the unique set of IPs in use by pods. extIPs := map[string]bool{} @@ -280,6 +321,9 @@ func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) { if !Ready.IsTrue(pod) { continue } + if readyNodes != nil && !readyNodes[pod.Spec.NodeName] { + continue + } node, err := k.nodeCache.Get(pod.Spec.NodeName) if apierrors.IsNotFound(err) { @@ -405,17 +449,27 @@ func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error { func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { name := generateName(svc) oneInt := intstr.FromInt(1) - + localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc) sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc) if err != nil { return nil, err } + var sysctls []core.Sysctl + for _, ipFamily := range svc.Spec.IPFamilies { + switch ipFamily { + case core.IPv4Protocol: + sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"}) + case core.IPv6Protocol: + sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"}) + } + } + ds := &apps.DaemonSet{ ObjectMeta: meta.ObjectMeta{ Name: name, Namespace: k.LBNamespace, - Labels: map[string]string{ + Labels: labels.Set{ nodeSelectorLabel: "false", svcNameLabel: svc.Name, svcNamespaceLabel: svc.Namespace, @@ -427,13 +481,13 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { }, Spec: apps.DaemonSetSpec{ Selector: &meta.LabelSelector{ - MatchLabels: map[string]string{ + MatchLabels: labels.Set{ "app": name, }, }, Template: core.PodTemplateSpec{ ObjectMeta: meta.ObjectMeta{ - Labels: map[string]string{ + Labels: labels.Set{ "app": name, svcNameLabel: svc.Name, svcNamespaceLabel: svc.Namespace, @@ -442,6 +496,25 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { Spec: core.PodSpec{ ServiceAccountName: "svclb", AutomountServiceAccountToken: utilpointer.Bool(false), + SecurityContext: &core.PodSecurityContext{ + Sysctls: sysctls, + }, + Tolerations: []core.Toleration{ + { + Key: "node-role.kubernetes.io/master", + Operator: "Exists", + Effect: "NoSchedule", + }, + { + Key: "node-role.kubernetes.io/control-plane", + Operator: "Exists", + Effect: "NoSchedule", + }, + { + Key: "CriticalAddonsOnly", + Operator: "Exists", + }, + }, }, }, UpdateStrategy: apps.DaemonSetUpdateStrategy{ @@ -453,18 +526,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { }, } - var sysctls []core.Sysctl - for _, ipFamily := range svc.Spec.IPFamilies { - switch ipFamily { - case core.IPv4Protocol: - sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"}) - case core.IPv6Protocol: - sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"}) - } - } - - ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls} - for _, port := range svc.Spec.Ports { portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port) container := core.Container{ @@ -492,14 +553,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { Name: "DEST_PROTO", Value: string(port.Protocol), }, - { - Name: "DEST_PORT", - Value: strconv.Itoa(int(port.Port)), - }, - { - Name: "DEST_IPS", - Value: strings.Join(svc.Spec.ClusterIPs, " "), - }, }, SecurityContext: &core.SecurityContext{ Capabilities: &core.Capabilities{ @@ -510,31 +563,36 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { }, } - ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container) - } - - // Add toleration to noderole.kubernetes.io/master=*:NoSchedule - masterToleration := core.Toleration{ - Key: "node-role.kubernetes.io/master", - Operator: "Exists", - Effect: "NoSchedule", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration) - - // Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule - controlPlaneToleration := core.Toleration{ - Key: "node-role.kubernetes.io/control-plane", - Operator: "Exists", - Effect: "NoSchedule", - } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration) + if localTraffic { + container.Env = append(container.Env, + core.EnvVar{ + Name: "DEST_PORT", + Value: strconv.Itoa(int(port.NodePort)), + }, + core.EnvVar{ + Name: "DEST_IPS", + ValueFrom: &core.EnvVarSource{ + FieldRef: &core.ObjectFieldSelector{ + FieldPath: "status.hostIP", + }, + }, + }, + ) + } else { + container.Env = append(container.Env, + core.EnvVar{ + Name: "DEST_PORT", + Value: strconv.Itoa(int(port.Port)), + }, + core.EnvVar{ + Name: "DEST_IPS", + Value: strings.Join(svc.Spec.ClusterIPs, " "), + }, + ) + } - // Add toleration to CriticalAddonsOnly - criticalAddonsOnlyToleration := core.Toleration{ - Key: "CriticalAddonsOnly", - Operator: "Exists", + ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container) } - ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration) // Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes enableNodeSelector, err := k.nodeHasDaemonSetLabel() @@ -551,6 +609,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { } ds.Labels[nodeSelectorLabel] = "true" } + return ds, nil } @@ -563,7 +622,7 @@ func (k *k3s) updateDaemonSets() error { return err } - nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)}) + nodeSelector := labels.SelectorFromSet(labels.Set{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)}) daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector) if err != nil { return err diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 97872a48c6..0104cfa02c 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -371,9 +371,9 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error { func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error { return util.WaitForRBACReady(ctx, runtime.KubeConfigAdmin, timeout, authorizationv1.ResourceAttributes{ Namespace: metav1.NamespaceSystem, - Verb: "*", - Resource: "daemonsets", - Group: "apps", + Verb: "watch", + Resource: "endpointslices", + Group: "discovery.k8s.io", }, version.Program+"-cloud-controller-manager") }