mirror of https://github.com/k3s-io/k3s
Honor Service ExternalTrafficPolicy
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 369b81b45e
@ -67,6 +67,14 @@ rules:
- daemonsets
- "*"
- apiGroups:
- "discovery.k8s.io"
- endpointslices
- get
- list
- watch
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
@ -12,6 +12,8 @@ import (
appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
@ -41,6 +43,7 @@ type k3s struct {
processor apply.Apply
daemonsetCache appsclient.DaemonSetCache
endpointsCache discoveryclient.EndpointSliceCache
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
workqueue workqueue.RateLimitingInterface
@ -89,6 +92,7 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st
lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbDiscFactory := discovery.NewFactoryFromConfigOrDie(config)
processor, err := apply.NewForConfig(config)
if err != nil {
@ -96,14 +100,15 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st
k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet())
k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache()
k.endpointsCache = lbDiscFactory.Discovery().V1().EndpointSlice().Cache()
k.podCache = lbCoreFactory.Core().V1().Pod().Cache()
k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil {
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice()); err != nil {
logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err)
if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil {
if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory, lbDiscFactory); err != nil {
logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err)
} else {
@ -12,11 +12,13 @@ import (
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -48,9 +50,11 @@ const (
func (k *k3s) Register(ctx context.Context,
nodes coreclient.NodeController,
pods coreclient.PodController,
endpointslices discoveryclient.EndpointSliceController,
) error {
nodes.OnChange(ctx, controllerName, k.onChangeNode)
pods.OnChange(ctx, controllerName, k.onChangePod)
endpointslices.OnChange(ctx, controllerName, k.onChangeEndpointSlice)
if err := k.createServiceLBNamespace(ctx); err != nil {
return err
@ -135,6 +139,22 @@ func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) {
return node, nil
// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer
// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local.
func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*discovery.EndpointSlice, error) {
if eps == nil {
return nil, nil
serviceName, ok := eps.Labels[discovery.LabelServiceName]
if !ok {
return eps, nil
k.workqueue.Add(eps.Namespace + "/" + serviceName)
return eps, nil
// runWorker dequeues Service changes from the work queue
// We run a lightweight work queue to handle service updates. We don't need the full overhead
// of a wrangler service controller and shared informer cache, but we do want to run changes
@ -219,16 +239,37 @@ func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods
// matching the selected service.
func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) {
pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{
var readyNodes map[string]bool
if servicehelper.RequestsOnlyLocalTraffic(svc) {
readyNodes = map[string]bool{}
eps, err := k.endpointsCache.List(svc.Namespace, labels.SelectorFromSet(labels.Set{
discovery.LabelServiceName: svc.Name,
if err != nil {
return nil, err
for _, ep := range eps {
for _, endpoint := range ep.Endpoints {
isPod := endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod"
isReady := endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready
if isPod && isReady && endpoint.NodeName != nil {
readyNodes[*endpoint.NodeName] = true
pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
if err != nil {
return nil, err
expectedIPs, err := k.podIPs(pods, svc)
expectedIPs, err := k.podIPs(pods, svc, readyNodes)
if err != nil {
return nil, err
@ -267,7 +308,7 @@ func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.Loa
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service, readyNodes map[string]bool) ([]string, error) {
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map[string]bool{}
@ -280,6 +321,9 @@ func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
if !Ready.IsTrue(pod) {
if readyNodes != nil && !readyNodes[pod.Spec.NodeName] {
node, err := k.nodeCache.Get(pod.Spec.NodeName)
if apierrors.IsNotFound(err) {
@ -405,54 +449,12 @@ func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error {
func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := generateName(svc)
oneInt := intstr.FromInt(1)
localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc)
sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc)
if err != nil {
return nil, err
ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: k.LBNamespace,
Labels: map[string]string{
nodeSelectorLabel: "false",
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
TypeMeta: meta.TypeMeta{
Kind: "DaemonSet",
APIVersion: "apps/v1",
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
"app": name,
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
"app": name,
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
Spec: core.PodSpec{
ServiceAccountName: "svclb",
AutomountServiceAccountToken: utilpointer.Bool(false),
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneInt,
var sysctls []core.Sysctl
for _, ipFamily := range svc.Spec.IPFamilies {
switch ipFamily {
@ -463,7 +465,66 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls}
ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: k.LBNamespace,
Labels: labels.Set{
nodeSelectorLabel: "false",
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
TypeMeta: meta.TypeMeta{
Kind: "DaemonSet",
APIVersion: "apps/v1",
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: labels.Set{
"app": name,
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: labels.Set{
"app": name,
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
Spec: core.PodSpec{
ServiceAccountName: "svclb",
AutomountServiceAccountToken: utilpointer.Bool(false),
SecurityContext: &core.PodSecurityContext{
Sysctls: sysctls,
Tolerations: []core.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
Key: "CriticalAddonsOnly",
Operator: "Exists",
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneInt,
for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
@ -492,14 +553,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
Value: string(port.Protocol),
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
@ -510,32 +563,37 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
if localTraffic {
container.Env = append(container.Env,
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.NodePort)),
Name: "DEST_IPS",
ValueFrom: &core.EnvVarSource{
FieldRef: &core.ObjectFieldSelector{
FieldPath: "status.hostIP",
} else {
container.Env = append(container.Env,
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
// Add toleration to noderole.kubernetes.io/master=*:NoSchedule
masterToleration := core.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration)
// Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule
controlPlaneToleration := core.Toleration{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration)
// Add toleration to CriticalAddonsOnly
criticalAddonsOnlyToleration := core.Toleration{
Key: "CriticalAddonsOnly",
Operator: "Exists",
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration)
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
enableNodeSelector, err := k.nodeHasDaemonSetLabel()
if err != nil {
@ -551,6 +609,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
ds.Labels[nodeSelectorLabel] = "true"
return ds, nil
@ -563,7 +622,7 @@ func (k *k3s) updateDaemonSets() error {
return err
nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
nodeSelector := labels.SelectorFromSet(labels.Set{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector)
if err != nil {
return err
@ -371,9 +371,9 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error {
return util.WaitForRBACReady(ctx, runtime.KubeConfigAdmin, timeout, authorizationv1.ResourceAttributes{
Namespace: metav1.NamespaceSystem,
Verb: "*",
Resource: "daemonsets",
Group: "apps",
Verb: "watch",
Resource: "endpointslices",
Group: "discovery.k8s.io",
}, version.Program+"-cloud-controller-manager")
Reference in New Issue