package cloudprovider
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/condition"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
ccmapp "k8s.io/cloud-provider/app"
servicehelper "k8s.io/cloud-provider/service/helpers"
utilsnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
)
var (
finalizerName = "svccontroller." + version . Program + ".cattle.io/daemonset"
svcNameLabel = "svccontroller." + version . Program + ".cattle.io/svcname"
svcNamespaceLabel = "svccontroller." + version . Program + ".cattle.io/svcnamespace"
daemonsetNodeLabel = "svccontroller." + version . Program + ".cattle.io/enablelb"
daemonsetNodePoolLabel = "svccontroller." + version . Program + ".cattle.io/lbpool"
nodeSelectorLabel = "svccontroller." + version . Program + ".cattle.io/nodeselector"
controllerName = ccmapp . DefaultInitFuncConstructors [ "service" ] . InitContext . ClientName
)
const (
Ready = condition . Cond ( "Ready" )
DefaultLBNS = meta . NamespaceSystem
)
var (
DefaultLBImage = "rancher/klipper-lb:v0.4.4"
)
func ( k * k3s ) Register ( ctx context . Context ,
nodes coreclient . NodeController ,
pods coreclient . PodController ,
endpointslices discoveryclient . EndpointSliceController ,
) error {
nodes . OnChange ( ctx , controllerName , k . onChangeNode )
pods . OnChange ( ctx , controllerName , k . onChangePod )
endpointslices . OnChange ( ctx , controllerName , k . onChangeEndpointSlice )
if err := k . ensureServiceLBNamespace ( ctx ) ; err != nil {
return err
}
if err := k . ensureServiceLBServiceAccount ( ctx ) ; err != nil {
return err
}
go wait . Until ( k . runWorker , time . Second , ctx . Done ( ) )
return k . removeServiceFinalizers ( ctx )
}
// ensureServiceLBNamespace ensures that the configured namespace exists.
func ( k * k3s ) ensureServiceLBNamespace ( ctx context . Context ) error {
ns := k . client . CoreV1 ( ) . Namespaces ( )
if _ , err := ns . Get ( ctx , k . LBNamespace , meta . GetOptions { } ) ; err == nil || ! apierrors . IsNotFound ( err ) {
return err
}
_ , err := ns . Create ( ctx , & core . Namespace {
ObjectMeta : meta . ObjectMeta {
Name : k . LBNamespace ,
} ,
} , meta . CreateOptions { } )
if apierrors . IsAlreadyExists ( err ) {
return nil
}
return err
}
// ensureServiceLBServiceAccount ensures that the ServiceAccount used by pods exists.
func ( k * k3s ) ensureServiceLBServiceAccount ( ctx context . Context ) error {
sa := k . client . CoreV1 ( ) . ServiceAccounts ( k . LBNamespace )
if _ , err := sa . Get ( ctx , "svclb" , meta . GetOptions { } ) ; err == nil || ! apierrors . IsNotFound ( err ) {
return err
}
_ , err := sa . Create ( ctx , & core . ServiceAccount {
ObjectMeta : meta . ObjectMeta {
Name : "svclb" ,
Namespace : k . LBNamespace ,
} ,
} , meta . CreateOptions { } )
if apierrors . IsAlreadyExists ( err ) {
return nil
}
return err
}
// onChangePod handles changes to Pods.
// If the pod has labels that tie it to a service, and the pod has an IP assigned,
// enqueue an update to the service's status.
func ( k * k3s ) onChangePod ( key string , pod * core . Pod ) ( * core . Pod , error ) {
if pod == nil {
return nil , nil
}
serviceName := pod . Labels [ svcNameLabel ]
if serviceName == "" {
return pod , nil
}
serviceNamespace := pod . Labels [ svcNamespaceLabel ]
if serviceNamespace == "" {
return pod , nil
}
if pod . Status . PodIP == "" {
return pod , nil
}
k . workqueue . Add ( serviceNamespace + "/" + serviceName )
return pod , nil
}
// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet
// to add or remove pods from nodes if labels have changed.
func ( k * k3s ) onChangeNode ( key string , node * core . Node ) ( * core . Node , error ) {
if node == nil {
return nil , nil
}
if _ , ok := node . Labels [ daemonsetNodeLabel ] ; ! ok {
return node , nil
}
if err := k . updateDaemonSets ( ) ; err != nil {
return node , err
}
return node , nil
}
// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer
// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local.
func ( k * k3s ) onChangeEndpointSlice ( key string , eps * discovery . EndpointSlice ) ( * discovery . EndpointSlice , error ) {
if eps == nil {
return nil , nil
}
serviceName , ok := eps . Labels [ discovery . LabelServiceName ]
if ! ok {
return eps , nil
}
k . workqueue . Add ( eps . Namespace + "/" + serviceName )
return eps , nil
}
// runWorker dequeues Service changes from the work queue
// We run a lightweight work queue to handle service updates. We don't need the full overhead
// of a wrangler service controller and shared informer cache, but we do want to run changes
// through a keyed queue to reduce thrashing when pods are updated. Much of this is cribbed from
// https://github.com/rancher/lasso/blob/release/v2.5/pkg/controller/controller.go#L173-L215
func ( k * k3s ) runWorker ( ) {
for k . processNextWorkItem ( ) {
}
}
// processNextWorkItem does work for a single item in the queue,
// returning a boolean that indicates if the queue should continue
// to be serviced.
func ( k * k3s ) processNextWorkItem ( ) bool {
obj , shutdown := k . workqueue . Get ( )
if shutdown {
return false
}
if err := k . processSingleItem ( obj ) ; err != nil && ! apierrors . IsConflict ( err ) {
logrus . Errorf ( "%s: %v" , controllerName , err )
}
return true
}
// processSingleItem processes a single item from the work queue,
// requeueing it if the handler fails.
func ( k * k3s ) processSingleItem ( obj interface { } ) error {
var (
key string
ok bool
)
defer k . workqueue . Done ( obj )
if key , ok = obj . ( string ) ; ! ok {
logrus . Errorf ( "expected string in workqueue but got %#v" , obj )
k . workqueue . Forget ( obj )
return nil
}
keyParts := strings . SplitN ( key , "/" , 2 )
if err := k . updateStatus ( keyParts [ 0 ] , keyParts [ 1 ] ) ; err != nil {
k . workqueue . AddRateLimited ( key )
return fmt . Errorf ( "error updating LoadBalancer Status for %s: %v, requeueing" , key , err )
}
k . workqueue . Forget ( obj )
return nil
}
// updateServiceStatus updates the load balancer status for the matching service, if it exists and is a
// LoadBalancer service. The patchStatus function handles checking to see if status needs updating.
func ( k * k3s ) updateStatus ( namespace , name string ) error {
svc , err := k . client . CoreV1 ( ) . Services ( namespace ) . Get ( context . TODO ( ) , name , meta . GetOptions { } )
if err != nil {
if apierrors . IsNotFound ( err ) {
return nil
}
return err
}
if svc . Spec . Type != core . ServiceTypeLoadBalancer {
return nil
}
previousStatus := svc . Status . LoadBalancer . DeepCopy ( )
newStatus , err := k . getStatus ( svc )
if err != nil {
return err
}
return k . patchStatus ( svc , previousStatus , newStatus )
}
// getDaemonSet returns the DaemonSet that should exist for the Service.
func ( k * k3s ) getDaemonSet ( svc * core . Service ) ( * apps . DaemonSet , error ) {
return k . daemonsetCache . Get ( k . LBNamespace , generateName ( svc ) )
}
// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods
// matching the selected service.
func ( k * k3s ) getStatus ( svc * core . Service ) ( * core . LoadBalancerStatus , error ) {
var readyNodes map [ string ] bool
if servicehelper . RequestsOnlyLocalTraffic ( svc ) {
readyNodes = map [ string ] bool { }
eps , err := k . endpointsCache . List ( svc . Namespace , labels . SelectorFromSet ( labels . Set {
discovery . LabelServiceName : svc . Name ,
} ) )
if err != nil {
return nil , err
}
for _ , ep := range eps {
for _ , endpoint := range ep . Endpoints {
isPod := endpoint . TargetRef != nil && endpoint . TargetRef . Kind == "Pod"
isReady := endpoint . Conditions . Ready != nil && * endpoint . Conditions . Ready
if isPod && isReady && endpoint . NodeName != nil {
readyNodes [ * endpoint . NodeName ] = true
}
}
}
}
pods , err := k . podCache . List ( k . LBNamespace , labels . SelectorFromSet ( labels . Set {
svcNameLabel : svc . Name ,
svcNamespaceLabel : svc . Namespace ,
} ) )
if err != nil {
return nil , err
}
expectedIPs , err := k . podIPs ( pods , svc , readyNodes )
if err != nil {
return nil , err
}
sort . Strings ( expectedIPs )
loadbalancer := & core . LoadBalancerStatus { }
for _ , ip := range expectedIPs {
loadbalancer . Ingress = append ( loadbalancer . Ingress , core . LoadBalancerIngress {
IP : ip ,
} )
}
return loadbalancer , nil
}
// patchStatus patches the service status. If the status has not changed, this function is a no-op.
func ( k * k3s ) patchStatus ( svc * core . Service , previousStatus , newStatus * core . LoadBalancerStatus ) error {
if servicehelper . LoadBalancerStatusEqual ( previousStatus , newStatus ) {
return nil
}
updated := svc . DeepCopy ( )
updated . Status . LoadBalancer = * newStatus
_ , err := servicehelper . PatchService ( k . client . CoreV1 ( ) , svc , updated )
if err == nil {
if len ( newStatus . Ingress ) == 0 {
k . recorder . Event ( svc , core . EventTypeWarning , "UnAvailableLoadBalancer" , "There are no available nodes for LoadBalancer" )
} else {
k . recorder . Eventf ( svc , core . EventTypeNormal , "UpdatedLoadBalancer" , "Updated LoadBalancer with new IPs: %v -> %v" , ingressToString ( previousStatus . Ingress ) , ingressToString ( newStatus . Ingress ) )
}
}
return err
}
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func ( k * k3s ) podIPs ( pods [ ] * core . Pod , svc * core . Service , readyNodes map [ string ] bool ) ( [ ] string , error ) {
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map [ string ] bool { }
intIPs := map [ string ] bool { }
for _ , pod := range pods {
if pod . Spec . NodeName == "" || pod . Status . PodIP == "" {
continue
}
if ! Ready . IsTrue ( pod ) {
continue
}
if readyNodes != nil && ! readyNodes [ pod . Spec . NodeName ] {
continue
}
node , err := k . nodeCache . Get ( pod . Spec . NodeName )
if apierrors . IsNotFound ( err ) {
continue
} else if err != nil {
return nil , err
}
for _ , addr := range node . Status . Addresses {
if addr . Type == core . NodeExternalIP {
extIPs [ addr . Address ] = true
} else if addr . Type == core . NodeInternalIP {
intIPs [ addr . Address ] = true
}
}
}
keys := func ( addrs map [ string ] bool ) ( ips [ ] string ) {
for k := range addrs {
ips = append ( ips , k )
}
return ips
}
var ips [ ] string
if len ( extIPs ) > 0 {
ips = keys ( extIPs )
} else {
ips = keys ( intIPs )
}
ips , err := filterByIPFamily ( ips , svc )
if err != nil {
return nil , err
}
if len ( ips ) > 0 && k . Rootless {
return [ ] string { "127.0.0.1" } , nil
}
return ips , nil
}
// filterByIPFamily filters node IPs based on dual-stack parameters of the service
func filterByIPFamily ( ips [ ] string , svc * core . Service ) ( [ ] string , error ) {
var ipv4Addresses [ ] string
var ipv6Addresses [ ] string
var allAddresses [ ] string
for _ , ip := range ips {
if utilsnet . IsIPv4String ( ip ) {
ipv4Addresses = append ( ipv4Addresses , ip )
}
if utilsnet . IsIPv6String ( ip ) {
ipv6Addresses = append ( ipv6Addresses , ip )
}
}
for _ , ipFamily := range svc . Spec . IPFamilies {
switch ipFamily {
case core . IPv4Protocol :
allAddresses = append ( allAddresses , ipv4Addresses ... )
case core . IPv6Protocol :
allAddresses = append ( allAddresses , ipv6Addresses ... )
}
}
return allAddresses , nil
}
// deployDaemonSet ensures that there is a DaemonSet for the service.
func ( k * k3s ) deployDaemonSet ( ctx context . Context , svc * core . Service ) error {
ds , err := k . newDaemonSet ( svc )
if err != nil {
return err
}
defer k . recorder . Eventf ( svc , core . EventTypeNormal , "AppliedDaemonSet" , "Applied LoadBalancer DaemonSet %s/%s" , ds . Namespace , ds . Name )
return k . processor . WithContext ( ctx ) . WithOwner ( svc ) . Apply ( objectset . NewObjectSet ( ds ) )
}
// deleteDaemonSet ensures that there are no DaemonSets for the given service.
func ( k * k3s ) deleteDaemonSet ( ctx context . Context , svc * core . Service ) error {
name := generateName ( svc )
if err := k . client . AppsV1 ( ) . DaemonSets ( k . LBNamespace ) . Delete ( ctx , name , meta . DeleteOptions { } ) ; err != nil {
if apierrors . IsNotFound ( err ) {
return nil
}
return err
}
defer k . recorder . Eventf ( svc , core . EventTypeNormal , "DeletedDaemonSet" , "Deleted LoadBalancer DaemonSet %s/%s" , k . LBNamespace , name )
return nil
}
// newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on
// each eligible node.
func ( k * k3s ) newDaemonSet ( svc * core . Service ) ( * apps . DaemonSet , error ) {
name := generateName ( svc )
oneInt := intstr . FromInt ( 1 )
localTraffic := servicehelper . RequestsOnlyLocalTraffic ( svc )
sourceRanges , err := servicehelper . GetLoadBalancerSourceRanges ( svc )
if err != nil {
return nil , err
}
var sysctls [ ] core . Sysctl
for _ , ipFamily := range svc . Spec . IPFamilies {
switch ipFamily {
case core . IPv4Protocol :
sysctls = append ( sysctls , core . Sysctl { Name : "net.ipv4.ip_forward" , Value : "1" } )
case core . IPv6Protocol :
sysctls = append ( sysctls , core . Sysctl { Name : "net.ipv6.conf.all.forwarding" , Value : "1" } )
}
}
ds := & apps . DaemonSet {
ObjectMeta : meta . ObjectMeta {
Name : name ,
Namespace : k . LBNamespace ,
Labels : labels . Set {
nodeSelectorLabel : "false" ,
svcNameLabel : svc . Name ,
svcNamespaceLabel : svc . Namespace ,
} ,
} ,
TypeMeta : meta . TypeMeta {
Kind : "DaemonSet" ,
APIVersion : "apps/v1" ,
} ,
Spec : apps . DaemonSetSpec {
Selector : & meta . LabelSelector {
MatchLabels : labels . Set {
"app" : name ,
} ,
} ,
Template : core . PodTemplateSpec {
ObjectMeta : meta . ObjectMeta {
Labels : labels . Set {
"app" : name ,
svcNameLabel : svc . Name ,
svcNamespaceLabel : svc . Namespace ,
} ,
} ,
Spec : core . PodSpec {
ServiceAccountName : "svclb" ,
AutomountServiceAccountToken : utilpointer . Bool ( false ) ,
SecurityContext : & core . PodSecurityContext {
Sysctls : sysctls ,
} ,
Tolerations : [ ] core . Toleration {
{
Key : util . MasterRoleLabelKey ,
Operator : "Exists" ,
Effect : "NoSchedule" ,
} ,
{
Key : util . ControlPlaneRoleLabelKey ,
Operator : "Exists" ,
Effect : "NoSchedule" ,
} ,
{
Key : "CriticalAddonsOnly" ,
Operator : "Exists" ,
} ,
} ,
} ,
} ,
UpdateStrategy : apps . DaemonSetUpdateStrategy {
Type : apps . RollingUpdateDaemonSetStrategyType ,
RollingUpdate : & apps . RollingUpdateDaemonSet {
MaxUnavailable : & oneInt ,
} ,
} ,
} ,
}
for _ , port := range svc . Spec . Ports {
portName := fmt . Sprintf ( "lb-%s-%d" , strings . ToLower ( string ( port . Protocol ) ) , port . Port )
container := core . Container {
Name : portName ,
Image : k . LBImage ,
ImagePullPolicy : core . PullIfNotPresent ,
Ports : [ ] core . ContainerPort {
{
Name : portName ,
ContainerPort : port . Port ,
HostPort : port . Port ,
Protocol : port . Protocol ,
} ,
} ,
Env : [ ] core . EnvVar {
{
Name : "SRC_PORT" ,
Value : strconv . Itoa ( int ( port . Port ) ) ,
} ,
{
Name : "SRC_RANGES" ,
Value : strings . Join ( sourceRanges . StringSlice ( ) , " " ) ,
} ,
{
Name : "DEST_PROTO" ,
Value : string ( port . Protocol ) ,
} ,
} ,
SecurityContext : & core . SecurityContext {
Capabilities : & core . Capabilities {
Add : [ ] core . Capability {
"NET_ADMIN" ,
} ,
} ,
} ,
}
if localTraffic {
container . Env = append ( container . Env ,
core . EnvVar {
Name : "DEST_PORT" ,
Value : strconv . Itoa ( int ( port . NodePort ) ) ,
} ,
core . EnvVar {
Name : "DEST_IPS" ,
ValueFrom : & core . EnvVarSource {
FieldRef : & core . ObjectFieldSelector {
FieldPath : "status.hostIP" ,
} ,
} ,
} ,
)
} else {
container . Env = append ( container . Env ,
core . EnvVar {
Name : "DEST_PORT" ,
Value : strconv . Itoa ( int ( port . Port ) ) ,
} ,
core . EnvVar {
Name : "DEST_IPS" ,
Value : strings . Join ( svc . Spec . ClusterIPs , " " ) ,
} ,
)
}
ds . Spec . Template . Spec . Containers = append ( ds . Spec . Template . Spec . Containers , container )
}
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
enableNodeSelector , err := k . nodeHasDaemonSetLabel ( )
if err != nil {
return nil , err
}
if enableNodeSelector {
ds . Spec . Template . Spec . NodeSelector = map [ string ] string {
daemonsetNodeLabel : "true" ,
}
// Add node selector for "svccontroller.k3s.cattle.io/lbpool=<pool>" if service has lbpool label
if svc . Labels [ daemonsetNodePoolLabel ] != "" {
ds . Spec . Template . Spec . NodeSelector [ daemonsetNodePoolLabel ] = svc . Labels [ daemonsetNodePoolLabel ]
}
ds . Labels [ nodeSelectorLabel ] = "true"
}
return ds , nil
}
// updateDaemonSets ensures that our DaemonSets have a NodeSelector present if one is enabled,
// and do not have one if it is not. Nodes are checked for this label when the DaemonSet is generated,
// but node labels may change between Service updates and the NodeSelector needs to be updated appropriately.
func ( k * k3s ) updateDaemonSets ( ) error {
enableNodeSelector , err := k . nodeHasDaemonSetLabel ( )
if err != nil {
return err
}
nodeSelector := labels . SelectorFromSet ( labels . Set { nodeSelectorLabel : fmt . Sprintf ( "%t" , ! enableNodeSelector ) } )
daemonsets , err := k . daemonsetCache . List ( k . LBNamespace , nodeSelector )
if err != nil {
return err
}
for _ , ds := range daemonsets {
ds . Labels [ nodeSelectorLabel ] = fmt . Sprintf ( "%t" , enableNodeSelector )
ds . Spec . Template . Spec . NodeSelector = map [ string ] string { }
if enableNodeSelector {
ds . Spec . Template . Spec . NodeSelector [ daemonsetNodeLabel ] = "true"
}
if _ , err := k . client . AppsV1 ( ) . DaemonSets ( ds . Namespace ) . Update ( context . TODO ( ) , ds , meta . UpdateOptions { } ) ; err != nil {
return err
}
}
return nil
}
// nodeHasDaemonSetLabel returns true if any node is labeled for inclusion or exclusion
// from use by ServiceLB. If any node is labeled, only nodes with a label value of "true"
// will be used.
func ( k * k3s ) nodeHasDaemonSetLabel ( ) ( bool , error ) {
selector , err := labels . Parse ( daemonsetNodeLabel )
if err != nil {
return false , err
}
nodesWithLabel , err := k . nodeCache . List ( selector )
return len ( nodesWithLabel ) > 0 , err
}
// deleteAllDaemonsets deletes all daemonsets created by this controller
func ( k * k3s ) deleteAllDaemonsets ( ctx context . Context ) error {
return k . client . AppsV1 ( ) . DaemonSets ( k . LBNamespace ) . DeleteCollection ( ctx , meta . DeleteOptions { } , meta . ListOptions { LabelSelector : nodeSelectorLabel } )
}
// removeServiceFinalizers ensures that there are no finalizers left on any services.
// Previous implementations of the servicelb controller manually added finalizers to services it managed;
// these need to be removed in order to release ownership to the cloud provider implementation.
func ( k * k3s ) removeServiceFinalizers ( ctx context . Context ) error {
services , err := k . client . CoreV1 ( ) . Services ( meta . NamespaceAll ) . List ( ctx , meta . ListOptions { } )
if err != nil {
return err
}
var errs merr . Errors
for _ , svc := range services . Items {
if err := retry . RetryOnConflict ( retry . DefaultRetry , func ( ) error {
s , err := k . removeFinalizer ( ctx , & svc )
svc = * s
return err
} ) ; err != nil {
errs = append ( errs , err )
}
}
if len ( errs ) > 0 {
return errs
}
return nil
}
// removeFinalizer ensures that there is not a finalizer for this controller on the Service
func ( k * k3s ) removeFinalizer ( ctx context . Context , svc * core . Service ) ( * core . Service , error ) {
var found bool
for k , v := range svc . Finalizers {
if v != finalizerName {
continue
}
found = true
svc . Finalizers = append ( svc . Finalizers [ : k ] , svc . Finalizers [ k + 1 : ] ... )
}
if found {
return k . client . CoreV1 ( ) . Services ( svc . Namespace ) . Update ( ctx , svc , meta . UpdateOptions { } )
}
return svc , nil
}
// generateName generates a distinct name for the DaemonSet based on the service name and UID
func generateName ( svc * core . Service ) string {
return fmt . Sprintf ( "svclb-%s-%s" , svc . Name , svc . UID [ : 8 ] )
}
// ingressToString converts a list of LoadBalancerIngress entries to strings
func ingressToString ( ingresses [ ] core . LoadBalancerIngress ) [ ] string {
parts := make ( [ ] string , len ( ingresses ) )
for i , ingress := range ingresses {
if ingress . IP != "" {
parts [ i ] = ingress . IP
} else {
parts [ i ] = ingress . Hostname
}
}
return parts
}