2022-09-29 20:37:50 +00:00
package cloudprovider
import (
"context"
"fmt"
"sort"
2024-10-15 14:27:56 +00:00
"strconv"
2022-09-29 20:37:50 +00:00
"strings"
"time"
2024-10-15 14:27:56 +00:00
"encoding/json"
2024-08-09 21:57:34 +00:00
"sigs.k8s.io/yaml"
2022-09-29 20:37:50 +00:00
2023-08-01 00:51:46 +00:00
"github.com/k3s-io/k3s/pkg/util"
2022-09-29 20:37:50 +00:00
"github.com/k3s-io/k3s/pkg/version"
2024-05-06 16:42:27 +00:00
"github.com/rancher/wrangler/v3/pkg/condition"
coreclient "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
discoveryclient "github.com/rancher/wrangler/v3/pkg/generated/controllers/discovery/v1"
"github.com/rancher/wrangler/v3/pkg/merr"
"github.com/rancher/wrangler/v3/pkg/objectset"
2022-09-29 20:37:50 +00:00
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
2023-01-26 23:48:12 +00:00
discovery "k8s.io/api/discovery/v1"
2022-09-29 20:37:50 +00:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
2024-04-17 17:19:32 +00:00
"k8s.io/apimachinery/pkg/util/sets"
2022-09-29 20:37:50 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2023-11-21 00:27:29 +00:00
utilfeature "k8s.io/apiserver/pkg/util/feature"
2022-09-29 20:37:50 +00:00
"k8s.io/client-go/util/retry"
2024-07-16 07:51:15 +00:00
"k8s.io/cloud-provider/names"
2022-09-29 20:37:50 +00:00
servicehelper "k8s.io/cloud-provider/service/helpers"
2023-11-21 00:27:29 +00:00
"k8s.io/kubernetes/pkg/features"
2022-09-29 20:37:50 +00:00
utilsnet "k8s.io/utils/net"
2024-03-28 17:07:02 +00:00
utilsptr "k8s.io/utils/ptr"
2022-09-29 20:37:50 +00:00
)
var (
finalizerName = "svccontroller." + version . Program + ".cattle.io/daemonset"
svcNameLabel = "svccontroller." + version . Program + ".cattle.io/svcname"
svcNamespaceLabel = "svccontroller." + version . Program + ".cattle.io/svcnamespace"
daemonsetNodeLabel = "svccontroller." + version . Program + ".cattle.io/enablelb"
daemonsetNodePoolLabel = "svccontroller." + version . Program + ".cattle.io/lbpool"
nodeSelectorLabel = "svccontroller." + version . Program + ".cattle.io/nodeselector"
2024-04-29 19:39:35 +00:00
priorityAnnotation = "svccontroller." + version . Program + ".cattle.io/priorityclassname"
2024-08-09 21:57:34 +00:00
tolerationsAnnotation = "svccontroller." + version . Program + ".cattle.io/tolerations"
2024-07-16 07:51:15 +00:00
controllerName = names . ServiceLBController
2022-09-29 20:37:50 +00:00
)
const (
2024-04-29 19:39:35 +00:00
Ready = condition . Cond ( "Ready" )
DefaultLBNS = meta . NamespaceSystem
DefaultLBPriorityClassName = "system-node-critical"
2023-05-30 07:07:24 +00:00
)
var (
2024-10-15 14:27:56 +00:00
DefaultLBImage = "rancher/klipper-lb:v0.4.9"
2022-09-29 20:37:50 +00:00
)
func ( k * k3s ) Register ( ctx context . Context ,
nodes coreclient . NodeController ,
pods coreclient . PodController ,
2023-01-26 23:48:12 +00:00
endpointslices discoveryclient . EndpointSliceController ,
2022-09-29 20:37:50 +00:00
) error {
nodes . OnChange ( ctx , controllerName , k . onChangeNode )
pods . OnChange ( ctx , controllerName , k . onChangePod )
2023-01-26 23:48:12 +00:00
endpointslices . OnChange ( ctx , controllerName , k . onChangeEndpointSlice )
2022-09-29 20:37:50 +00:00
2023-02-07 01:34:02 +00:00
if err := k . ensureServiceLBNamespace ( ctx ) ; err != nil {
2022-09-29 20:37:50 +00:00
return err
}
2023-02-07 01:34:02 +00:00
if err := k . ensureServiceLBServiceAccount ( ctx ) ; err != nil {
2022-10-10 18:16:57 +00:00
return err
}
2022-09-29 20:37:50 +00:00
go wait . Until ( k . runWorker , time . Second , ctx . Done ( ) )
return k . removeServiceFinalizers ( ctx )
}
2023-02-07 01:34:02 +00:00
// ensureServiceLBNamespace ensures that the configured namespace exists.
func ( k * k3s ) ensureServiceLBNamespace ( ctx context . Context ) error {
ns := k . client . CoreV1 ( ) . Namespaces ( )
if _ , err := ns . Get ( ctx , k . LBNamespace , meta . GetOptions { } ) ; err == nil || ! apierrors . IsNotFound ( err ) {
return err
}
_ , err := ns . Create ( ctx , & core . Namespace {
2022-09-29 20:37:50 +00:00
ObjectMeta : meta . ObjectMeta {
Name : k . LBNamespace ,
} ,
} , meta . CreateOptions { } )
if apierrors . IsAlreadyExists ( err ) {
return nil
}
return err
}
2023-02-07 01:34:02 +00:00
// ensureServiceLBServiceAccount ensures that the ServiceAccount used by pods exists.
func ( k * k3s ) ensureServiceLBServiceAccount ( ctx context . Context ) error {
sa := k . client . CoreV1 ( ) . ServiceAccounts ( k . LBNamespace )
if _ , err := sa . Get ( ctx , "svclb" , meta . GetOptions { } ) ; err == nil || ! apierrors . IsNotFound ( err ) {
return err
}
_ , err := sa . Create ( ctx , & core . ServiceAccount {
2022-10-10 18:16:57 +00:00
ObjectMeta : meta . ObjectMeta {
Name : "svclb" ,
Namespace : k . LBNamespace ,
} ,
} , meta . CreateOptions { } )
if apierrors . IsAlreadyExists ( err ) {
return nil
}
return err
}
2022-09-29 20:37:50 +00:00
// onChangePod handles changes to Pods.
// If the pod has labels that tie it to a service, and the pod has an IP assigned,
// enqueue an update to the service's status.
func ( k * k3s ) onChangePod ( key string , pod * core . Pod ) ( * core . Pod , error ) {
if pod == nil {
return nil , nil
}
serviceName := pod . Labels [ svcNameLabel ]
if serviceName == "" {
return pod , nil
}
serviceNamespace := pod . Labels [ svcNamespaceLabel ]
if serviceNamespace == "" {
return pod , nil
}
if pod . Status . PodIP == "" {
return pod , nil
}
k . workqueue . Add ( serviceNamespace + "/" + serviceName )
return pod , nil
}
// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet
// to add or remove pods from nodes if labels have changed.
func ( k * k3s ) onChangeNode ( key string , node * core . Node ) ( * core . Node , error ) {
if node == nil {
return nil , nil
}
if _ , ok := node . Labels [ daemonsetNodeLabel ] ; ! ok {
return node , nil
}
if err := k . updateDaemonSets ( ) ; err != nil {
return node , err
}
return node , nil
}
2023-01-26 23:48:12 +00:00
// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer
// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local.
func ( k * k3s ) onChangeEndpointSlice ( key string , eps * discovery . EndpointSlice ) ( * discovery . EndpointSlice , error ) {
if eps == nil {
return nil , nil
}
serviceName , ok := eps . Labels [ discovery . LabelServiceName ]
if ! ok {
return eps , nil
}
k . workqueue . Add ( eps . Namespace + "/" + serviceName )
return eps , nil
}
2022-09-29 20:37:50 +00:00
// runWorker dequeues Service changes from the work queue
// We run a lightweight work queue to handle service updates. We don't need the full overhead
// of a wrangler service controller and shared informer cache, but we do want to run changes
// through a keyed queue to reduce thrashing when pods are updated. Much of this is cribbed from
// https://github.com/rancher/lasso/blob/release/v2.5/pkg/controller/controller.go#L173-L215
func ( k * k3s ) runWorker ( ) {
for k . processNextWorkItem ( ) {
}
}
// processNextWorkItem does work for a single item in the queue,
// returning a boolean that indicates if the queue should continue
// to be serviced.
func ( k * k3s ) processNextWorkItem ( ) bool {
obj , shutdown := k . workqueue . Get ( )
if shutdown {
return false
}
if err := k . processSingleItem ( obj ) ; err != nil && ! apierrors . IsConflict ( err ) {
logrus . Errorf ( "%s: %v" , controllerName , err )
}
return true
}
// processSingleItem processes a single item from the work queue,
// requeueing it if the handler fails.
func ( k * k3s ) processSingleItem ( obj interface { } ) error {
var (
key string
ok bool
)
defer k . workqueue . Done ( obj )
if key , ok = obj . ( string ) ; ! ok {
logrus . Errorf ( "expected string in workqueue but got %#v" , obj )
k . workqueue . Forget ( obj )
return nil
}
keyParts := strings . SplitN ( key , "/" , 2 )
if err := k . updateStatus ( keyParts [ 0 ] , keyParts [ 1 ] ) ; err != nil {
k . workqueue . AddRateLimited ( key )
return fmt . Errorf ( "error updating LoadBalancer Status for %s: %v, requeueing" , key , err )
}
k . workqueue . Forget ( obj )
return nil
}
// updateServiceStatus updates the load balancer status for the matching service, if it exists and is a
// LoadBalancer service. The patchStatus function handles checking to see if status needs updating.
func ( k * k3s ) updateStatus ( namespace , name string ) error {
svc , err := k . client . CoreV1 ( ) . Services ( namespace ) . Get ( context . TODO ( ) , name , meta . GetOptions { } )
if err != nil {
if apierrors . IsNotFound ( err ) {
return nil
}
return err
}
if svc . Spec . Type != core . ServiceTypeLoadBalancer {
return nil
}
previousStatus := svc . Status . LoadBalancer . DeepCopy ( )
newStatus , err := k . getStatus ( svc )
if err != nil {
return err
}
return k . patchStatus ( svc , previousStatus , newStatus )
}
// getDaemonSet returns the DaemonSet that should exist for the Service.
func ( k * k3s ) getDaemonSet ( svc * core . Service ) ( * apps . DaemonSet , error ) {
return k . daemonsetCache . Get ( k . LBNamespace , generateName ( svc ) )
}
// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods
// matching the selected service.
func ( k * k3s ) getStatus ( svc * core . Service ) ( * core . LoadBalancerStatus , error ) {
2023-01-26 23:48:12 +00:00
var readyNodes map [ string ] bool
if servicehelper . RequestsOnlyLocalTraffic ( svc ) {
readyNodes = map [ string ] bool { }
eps , err := k . endpointsCache . List ( svc . Namespace , labels . SelectorFromSet ( labels . Set {
discovery . LabelServiceName : svc . Name ,
} ) )
if err != nil {
return nil , err
}
for _ , ep := range eps {
for _ , endpoint := range ep . Endpoints {
isPod := endpoint . TargetRef != nil && endpoint . TargetRef . Kind == "Pod"
isReady := endpoint . Conditions . Ready != nil && * endpoint . Conditions . Ready
if isPod && isReady && endpoint . NodeName != nil {
readyNodes [ * endpoint . NodeName ] = true
}
}
}
}
pods , err := k . podCache . List ( k . LBNamespace , labels . SelectorFromSet ( labels . Set {
2022-09-29 20:37:50 +00:00
svcNameLabel : svc . Name ,
svcNamespaceLabel : svc . Namespace ,
} ) )
if err != nil {
return nil , err
}
2023-01-26 23:48:12 +00:00
expectedIPs , err := k . podIPs ( pods , svc , readyNodes )
2022-09-29 20:37:50 +00:00
if err != nil {
return nil , err
}
loadbalancer := & core . LoadBalancerStatus { }
for _ , ip := range expectedIPs {
loadbalancer . Ingress = append ( loadbalancer . Ingress , core . LoadBalancerIngress {
IP : ip ,
} )
}
return loadbalancer , nil
}
// patchStatus patches the service status. If the status has not changed, this function is a no-op.
func ( k * k3s ) patchStatus ( svc * core . Service , previousStatus , newStatus * core . LoadBalancerStatus ) error {
if servicehelper . LoadBalancerStatusEqual ( previousStatus , newStatus ) {
return nil
}
updated := svc . DeepCopy ( )
updated . Status . LoadBalancer = * newStatus
_ , err := servicehelper . PatchService ( k . client . CoreV1 ( ) , svc , updated )
if err == nil {
if len ( newStatus . Ingress ) == 0 {
k . recorder . Event ( svc , core . EventTypeWarning , "UnAvailableLoadBalancer" , "There are no available nodes for LoadBalancer" )
} else {
k . recorder . Eventf ( svc , core . EventTypeNormal , "UpdatedLoadBalancer" , "Updated LoadBalancer with new IPs: %v -> %v" , ingressToString ( previousStatus . Ingress ) , ingressToString ( newStatus . Ingress ) )
}
}
return err
}
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
2023-01-26 23:48:12 +00:00
func ( k * k3s ) podIPs ( pods [ ] * core . Pod , svc * core . Service , readyNodes map [ string ] bool ) ( [ ] string , error ) {
2024-04-17 17:19:32 +00:00
extIPs := sets . Set [ string ] { }
intIPs := sets . Set [ string ] { }
2022-09-29 20:37:50 +00:00
for _ , pod := range pods {
if pod . Spec . NodeName == "" || pod . Status . PodIP == "" {
continue
}
if ! Ready . IsTrue ( pod ) {
continue
}
2023-01-26 23:48:12 +00:00
if readyNodes != nil && ! readyNodes [ pod . Spec . NodeName ] {
continue
}
2022-09-29 20:37:50 +00:00
node , err := k . nodeCache . Get ( pod . Spec . NodeName )
if apierrors . IsNotFound ( err ) {
continue
} else if err != nil {
return nil , err
}
for _ , addr := range node . Status . Addresses {
if addr . Type == core . NodeExternalIP {
2024-04-17 17:19:32 +00:00
extIPs . Insert ( addr . Address )
2022-09-29 20:37:50 +00:00
} else if addr . Type == core . NodeInternalIP {
2024-04-17 17:19:32 +00:00
intIPs . Insert ( addr . Address )
2022-09-29 20:37:50 +00:00
}
}
}
var ips [ ] string
2024-04-17 17:19:32 +00:00
if extIPs . Len ( ) > 0 {
ips = extIPs . UnsortedList ( )
2022-09-29 20:37:50 +00:00
} else {
2024-04-17 17:19:32 +00:00
ips = intIPs . UnsortedList ( )
2022-09-29 20:37:50 +00:00
}
ips , err := filterByIPFamily ( ips , svc )
if err != nil {
return nil , err
}
if len ( ips ) > 0 && k . Rootless {
return [ ] string { "127.0.0.1" } , nil
}
return ips , nil
}
2023-02-17 00:53:37 +00:00
// filterByIPFamily filters node IPs based on dual-stack parameters of the service
2022-09-29 20:37:50 +00:00
func filterByIPFamily ( ips [ ] string , svc * core . Service ) ( [ ] string , error ) {
var ipv4Addresses [ ] string
var ipv6Addresses [ ] string
2023-02-17 00:53:37 +00:00
var allAddresses [ ] string
2022-09-29 20:37:50 +00:00
for _ , ip := range ips {
if utilsnet . IsIPv4String ( ip ) {
ipv4Addresses = append ( ipv4Addresses , ip )
}
if utilsnet . IsIPv6String ( ip ) {
ipv6Addresses = append ( ipv6Addresses , ip )
}
}
2023-11-15 22:33:31 +00:00
sort . Strings ( ipv4Addresses )
sort . Strings ( ipv6Addresses )
2023-02-17 00:53:37 +00:00
for _ , ipFamily := range svc . Spec . IPFamilies {
switch ipFamily {
case core . IPv4Protocol :
allAddresses = append ( allAddresses , ipv4Addresses ... )
case core . IPv6Protocol :
allAddresses = append ( allAddresses , ipv6Addresses ... )
2022-09-29 20:37:50 +00:00
}
}
2023-02-17 00:53:37 +00:00
return allAddresses , nil
2022-09-29 20:37:50 +00:00
}
// deployDaemonSet ensures that there is a DaemonSet for the service.
func ( k * k3s ) deployDaemonSet ( ctx context . Context , svc * core . Service ) error {
ds , err := k . newDaemonSet ( svc )
if err != nil {
return err
}
defer k . recorder . Eventf ( svc , core . EventTypeNormal , "AppliedDaemonSet" , "Applied LoadBalancer DaemonSet %s/%s" , ds . Namespace , ds . Name )
return k . processor . WithContext ( ctx ) . WithOwner ( svc ) . Apply ( objectset . NewObjectSet ( ds ) )
}
// deleteDaemonSet ensures that there are no DaemonSets for the given service.
func ( k * k3s ) deleteDaemonSet ( ctx context . Context , svc * core . Service ) error {
name := generateName ( svc )
if err := k . client . AppsV1 ( ) . DaemonSets ( k . LBNamespace ) . Delete ( ctx , name , meta . DeleteOptions { } ) ; err != nil {
if apierrors . IsNotFound ( err ) {
return nil
}
return err
}
defer k . recorder . Eventf ( svc , core . EventTypeNormal , "DeletedDaemonSet" , "Deleted LoadBalancer DaemonSet %s/%s" , k . LBNamespace , name )
return nil
}
// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on
// each eligible node.
func ( k * k3s ) newDaemonSet ( svc * core . Service ) ( * apps . DaemonSet , error ) {
name := generateName ( svc )
oneInt := intstr . FromInt ( 1 )
2024-04-29 19:39:35 +00:00
priorityClassName := k . getPriorityClassName ( svc )
2023-01-26 23:48:12 +00:00
localTraffic := servicehelper . RequestsOnlyLocalTraffic ( svc )
2024-10-15 14:27:56 +00:00
sourceRangesSet , err := servicehelper . GetLoadBalancerSourceRanges ( svc )
if err != nil {
return nil , err
}
sourceRanges := strings . Join ( sourceRangesSet . StringSlice ( ) , "," )
2024-07-22 23:20:49 +00:00
securityContext := & core . PodSecurityContext { }
2022-09-29 20:37:50 +00:00
2024-10-15 14:27:56 +00:00
for _ , ipFamily := range svc . Spec . IPFamilies {
switch ipFamily {
case core . IPv4Protocol :
securityContext . Sysctls = append ( securityContext . Sysctls , core . Sysctl { Name : "net.ipv4.ip_forward" , Value : "1" } )
case core . IPv6Protocol :
securityContext . Sysctls = append ( securityContext . Sysctls , core . Sysctl { Name : "net.ipv6.conf.all.forwarding" , Value : "1" } )
if sourceRanges == "0.0.0.0/0" {
// The upstream default load-balancer source range only includes IPv4, even if the service is IPv6-only or dual-stack.
// If using the default range, and IPv6 is enabled, also allow IPv6.
sourceRanges += ",::/0"
}
}
}
2022-09-29 20:37:50 +00:00
ds := & apps . DaemonSet {
ObjectMeta : meta . ObjectMeta {
Name : name ,
Namespace : k . LBNamespace ,
2023-01-26 23:48:12 +00:00
Labels : labels . Set {
2024-10-15 14:27:56 +00:00
nodeSelectorLabel : "false" ,
svcNameLabel : svc . Name ,
svcNamespaceLabel : svc . Namespace ,
2022-09-29 20:37:50 +00:00
} ,
} ,
TypeMeta : meta . TypeMeta {
Kind : "DaemonSet" ,
APIVersion : "apps/v1" ,
} ,
Spec : apps . DaemonSetSpec {
Selector : & meta . LabelSelector {
2023-01-26 23:48:12 +00:00
MatchLabels : labels . Set {
2022-09-29 20:37:50 +00:00
"app" : name ,
} ,
} ,
Template : core . PodTemplateSpec {
ObjectMeta : meta . ObjectMeta {
2023-01-26 23:48:12 +00:00
Labels : labels . Set {
2022-09-29 20:37:50 +00:00
"app" : name ,
svcNameLabel : svc . Name ,
svcNamespaceLabel : svc . Namespace ,
} ,
} ,
Spec : core . PodSpec {
2024-04-29 19:39:35 +00:00
PriorityClassName : priorityClassName ,
2022-10-10 18:16:57 +00:00
ServiceAccountName : "svclb" ,
2024-03-28 17:07:02 +00:00
AutomountServiceAccountToken : utilsptr . To ( false ) ,
2024-07-22 23:20:49 +00:00
SecurityContext : securityContext ,
2023-01-26 23:48:12 +00:00
Tolerations : [ ] core . Toleration {
{
2023-08-01 00:51:46 +00:00
Key : util . MasterRoleLabelKey ,
2023-01-26 23:48:12 +00:00
Operator : "Exists" ,
Effect : "NoSchedule" ,
} ,
{
2023-08-01 00:51:46 +00:00
Key : util . ControlPlaneRoleLabelKey ,
2023-01-26 23:48:12 +00:00
Operator : "Exists" ,
Effect : "NoSchedule" ,
} ,
{
Key : "CriticalAddonsOnly" ,
Operator : "Exists" ,
} ,
} ,
2022-09-29 20:37:50 +00:00
} ,
} ,
UpdateStrategy : apps . DaemonSetUpdateStrategy {
Type : apps . RollingUpdateDaemonSetStrategyType ,
RollingUpdate : & apps . RollingUpdateDaemonSet {
MaxUnavailable : & oneInt ,
} ,
} ,
} ,
}
for _ , port := range svc . Spec . Ports {
portName := fmt . Sprintf ( "lb-%s-%d" , strings . ToLower ( string ( port . Protocol ) ) , port . Port )
container := core . Container {
Name : portName ,
Image : k . LBImage ,
ImagePullPolicy : core . PullIfNotPresent ,
Ports : [ ] core . ContainerPort {
{
Name : portName ,
ContainerPort : port . Port ,
HostPort : port . Port ,
Protocol : port . Protocol ,
} ,
} ,
2024-10-15 14:27:56 +00:00
Env : [ ] core . EnvVar {
{
Name : "SRC_PORT" ,
Value : strconv . Itoa ( int ( port . Port ) ) ,
} ,
{
Name : "SRC_RANGES" ,
Value : sourceRanges ,
} ,
{
Name : "DEST_PROTO" ,
Value : string ( port . Protocol ) ,
} ,
} ,
SecurityContext : & core . SecurityContext {
Capabilities : & core . Capabilities {
Add : [ ] core . Capability {
"NET_ADMIN" ,
} ,
} ,
} ,
}
if localTraffic {
container . Env = append ( container . Env ,
core . EnvVar {
Name : "DEST_PORT" ,
Value : strconv . Itoa ( int ( port . NodePort ) ) ,
} ,
core . EnvVar {
Name : "DEST_IPS" ,
ValueFrom : & core . EnvVarSource {
FieldRef : & core . ObjectFieldSelector {
FieldPath : getHostIPsFieldPath ( ) ,
} ,
} ,
} ,
)
} else {
container . Env = append ( container . Env ,
core . EnvVar {
Name : "DEST_PORT" ,
Value : strconv . Itoa ( int ( port . Port ) ) ,
} ,
core . EnvVar {
Name : "DEST_IPS" ,
Value : strings . Join ( svc . Spec . ClusterIPs , "," ) ,
} ,
)
2023-01-26 23:48:12 +00:00
}
2024-10-15 14:27:56 +00:00
2023-01-26 23:48:12 +00:00
ds . Spec . Template . Spec . Containers = append ( ds . Spec . Template . Spec . Containers , container )
2022-09-29 20:37:50 +00:00
}
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
enableNodeSelector , err := k . nodeHasDaemonSetLabel ( )
if err != nil {
return nil , err
}
if enableNodeSelector {
ds . Spec . Template . Spec . NodeSelector = map [ string ] string {
daemonsetNodeLabel : "true" ,
}
// Add node selector for "svccontroller.k3s.cattle.io/lbpool=<pool>" if service has lbpool label
if svc . Labels [ daemonsetNodePoolLabel ] != "" {
ds . Spec . Template . Spec . NodeSelector [ daemonsetNodePoolLabel ] = svc . Labels [ daemonsetNodePoolLabel ]
}
ds . Labels [ nodeSelectorLabel ] = "true"
}
2023-01-26 23:48:12 +00:00
2024-08-09 21:57:34 +00:00
// Fetch tolerations from the "svccontroller.k3s.cattle.io/tolerations" annotation on the service
// and append them to the DaemonSet's pod tolerations.
tolerations , err := k . getTolerations ( svc )
if err != nil {
return nil , err
}
ds . Spec . Template . Spec . Tolerations = append ( ds . Spec . Template . Spec . Tolerations , tolerations ... )
2022-09-29 20:37:50 +00:00
return ds , nil
}
// updateDaemonSets ensures that our DaemonSets have a NodeSelector present if one is enabled,
// and do not have one if it is not. Nodes are checked for this label when the DaemonSet is generated,
// but node labels may change between Service updates and the NodeSelector needs to be updated appropriately.
func ( k * k3s ) updateDaemonSets ( ) error {
enableNodeSelector , err := k . nodeHasDaemonSetLabel ( )
if err != nil {
return err
}
2023-01-26 23:48:12 +00:00
nodeSelector := labels . SelectorFromSet ( labels . Set { nodeSelectorLabel : fmt . Sprintf ( "%t" , ! enableNodeSelector ) } )
2022-09-29 20:37:50 +00:00
daemonsets , err := k . daemonsetCache . List ( k . LBNamespace , nodeSelector )
if err != nil {
return err
}
for _ , ds := range daemonsets {
ds . Labels [ nodeSelectorLabel ] = fmt . Sprintf ( "%t" , enableNodeSelector )
ds . Spec . Template . Spec . NodeSelector = map [ string ] string { }
if enableNodeSelector {
ds . Spec . Template . Spec . NodeSelector [ daemonsetNodeLabel ] = "true"
}
if _ , err := k . client . AppsV1 ( ) . DaemonSets ( ds . Namespace ) . Update ( context . TODO ( ) , ds , meta . UpdateOptions { } ) ; err != nil {
return err
}
}
return nil
}
// nodeHasDaemonSetLabel returns true if any node is labeled for inclusion or exclusion
// from use by ServiceLB. If any node is labeled, only nodes with a label value of "true"
// will be used.
func ( k * k3s ) nodeHasDaemonSetLabel ( ) ( bool , error ) {
selector , err := labels . Parse ( daemonsetNodeLabel )
if err != nil {
return false , err
}
nodesWithLabel , err := k . nodeCache . List ( selector )
return len ( nodesWithLabel ) > 0 , err
}
// deleteAllDaemonsets deletes all daemonsets created by this controller
func ( k * k3s ) deleteAllDaemonsets ( ctx context . Context ) error {
return k . client . AppsV1 ( ) . DaemonSets ( k . LBNamespace ) . DeleteCollection ( ctx , meta . DeleteOptions { } , meta . ListOptions { LabelSelector : nodeSelectorLabel } )
}
// removeServiceFinalizers ensures that there are no finalizers left on any services.
// Previous implementations of the servicelb controller manually added finalizers to services it managed;
// these need to be removed in order to release ownership to the cloud provider implementation.
func ( k * k3s ) removeServiceFinalizers ( ctx context . Context ) error {
services , err := k . client . CoreV1 ( ) . Services ( meta . NamespaceAll ) . List ( ctx , meta . ListOptions { } )
if err != nil {
return err
}
var errs merr . Errors
for _ , svc := range services . Items {
if err := retry . RetryOnConflict ( retry . DefaultRetry , func ( ) error {
s , err := k . removeFinalizer ( ctx , & svc )
svc = * s
return err
} ) ; err != nil {
errs = append ( errs , err )
}
}
if len ( errs ) > 0 {
return errs
}
return nil
}
// removeFinalizer ensures that there is not a finalizer for this controller on the Service
func ( k * k3s ) removeFinalizer ( ctx context . Context , svc * core . Service ) ( * core . Service , error ) {
var found bool
for k , v := range svc . Finalizers {
if v != finalizerName {
continue
}
found = true
svc . Finalizers = append ( svc . Finalizers [ : k ] , svc . Finalizers [ k + 1 : ] ... )
}
if found {
return k . client . CoreV1 ( ) . Services ( svc . Namespace ) . Update ( ctx , svc , meta . UpdateOptions { } )
}
return svc , nil
}
2024-04-29 19:39:35 +00:00
// getPriorityClassName returns the value of the priority class name annotation on the service,
// or the system default priority class name.
func ( k * k3s ) getPriorityClassName ( svc * core . Service ) string {
if svc != nil {
if v , ok := svc . Annotations [ priorityAnnotation ] ; ok {
return v
}
}
return k . LBDefaultPriorityClassName
}
2024-10-15 14:27:56 +00:00
// getTolerations retrieves the tolerations from a service's annotations.
// It parses the tolerations from a JSON or YAML string stored in the annotations.
2024-08-09 21:57:34 +00:00
func ( k * k3s ) getTolerations ( svc * core . Service ) ( [ ] core . Toleration , error ) {
tolerationsStr , ok := svc . Annotations [ tolerationsAnnotation ]
if ! ok {
return [ ] core . Toleration { } , nil
}
var tolerations [ ] core . Toleration
if err := json . Unmarshal ( [ ] byte ( tolerationsStr ) , & tolerations ) ; err != nil {
if err := yaml . Unmarshal ( [ ] byte ( tolerationsStr ) , & tolerations ) ; err != nil {
return nil , fmt . Errorf ( "failed to parse tolerations from annotation %s: %v" , tolerationsAnnotation , err )
}
}
for i := range tolerations {
if err := validateToleration ( & tolerations [ i ] ) ; err != nil {
return nil , fmt . Errorf ( "validation failed for toleration %d: %v" , i , err )
}
}
return tolerations , nil
}
// validateToleration ensures a toleration has valid fields according to its operator.
func validateToleration ( toleration * core . Toleration ) error {
if toleration . Operator == "" {
toleration . Operator = core . TolerationOpEqual
}
if toleration . Key == "" && toleration . Operator != core . TolerationOpExists {
return fmt . Errorf ( "toleration with empty key must have operator 'Exists'" )
}
if toleration . Operator == core . TolerationOpExists && toleration . Value != "" {
return fmt . Errorf ( "toleration with operator 'Exists' must have an empty value" )
}
return nil
}
2022-09-29 20:37:50 +00:00
// generateName generates a distinct name for the DaemonSet based on the service name and UID
func generateName ( svc * core . Service ) string {
2024-07-22 22:44:27 +00:00
name := svc . Name
// ensure that the service name plus prefix and uuid aren't overly long, but
// don't cut the service name at a trailing hyphen.
if len ( name ) > 48 {
trimlen := 48
for name [ trimlen - 1 ] == '-' {
trimlen --
}
name = name [ 0 : trimlen ]
}
return fmt . Sprintf ( "svclb-%s-%s" , name , svc . UID [ : 8 ] )
2022-09-29 20:37:50 +00:00
}
// ingressToString converts a list of LoadBalancerIngress entries to strings
func ingressToString ( ingresses [ ] core . LoadBalancerIngress ) [ ] string {
parts := make ( [ ] string , len ( ingresses ) )
for i , ingress := range ingresses {
if ingress . IP != "" {
parts [ i ] = ingress . IP
} else {
parts [ i ] = ingress . Hostname
}
}
return parts
}
2023-11-21 00:27:29 +00:00
func getHostIPsFieldPath ( ) string {
if utilfeature . DefaultFeatureGate . Enabled ( features . PodHostIPs ) {
return "status.hostIPs"
}
return "status.hostIP"
}