mirror of https://github.com/k3s-io/k3s
Honor Service ExternalTrafficPolicy
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/6849/head
parent
94d1a87509
commit
369b81b45e
|
@ -67,6 +67,14 @@ rules:
|
|||
- daemonsets
|
||||
verbs:
|
||||
- "*"
|
||||
- apiGroups:
|
||||
- "discovery.k8s.io"
|
||||
resources:
|
||||
- endpointslices
|
||||
verbs:
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
|
||||
"github.com/rancher/wrangler/pkg/generated/controllers/core"
|
||||
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
|
||||
"github.com/rancher/wrangler/pkg/generated/controllers/discovery"
|
||||
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
|
||||
"github.com/rancher/wrangler/pkg/generic"
|
||||
"github.com/rancher/wrangler/pkg/start"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -41,6 +43,7 @@ type k3s struct {
|
|||
|
||||
processor apply.Apply
|
||||
daemonsetCache appsclient.DaemonSetCache
|
||||
endpointsCache discoveryclient.EndpointSliceCache
|
||||
nodeCache coreclient.NodeCache
|
||||
podCache coreclient.PodCache
|
||||
workqueue workqueue.RateLimitingInterface
|
||||
|
@ -89,6 +92,7 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st
|
|||
|
||||
lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
|
||||
lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
|
||||
lbDiscFactory := discovery.NewFactoryFromConfigOrDie(config)
|
||||
|
||||
processor, err := apply.NewForConfig(config)
|
||||
if err != nil {
|
||||
|
@ -96,14 +100,15 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st
|
|||
}
|
||||
k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet())
|
||||
k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache()
|
||||
k.endpointsCache = lbDiscFactory.Discovery().V1().EndpointSlice().Cache()
|
||||
k.podCache = lbCoreFactory.Core().V1().Pod().Cache()
|
||||
k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
||||
|
||||
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil {
|
||||
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice()); err != nil {
|
||||
logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err)
|
||||
}
|
||||
|
||||
if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil {
|
||||
if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory, lbDiscFactory); err != nil {
|
||||
logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err)
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -12,11 +12,13 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/rancher/wrangler/pkg/condition"
|
||||
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
|
||||
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
|
||||
"github.com/rancher/wrangler/pkg/merr"
|
||||
"github.com/rancher/wrangler/pkg/objectset"
|
||||
"github.com/sirupsen/logrus"
|
||||
apps "k8s.io/api/apps/v1"
|
||||
core "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
|
@ -48,9 +50,11 @@ const (
|
|||
func (k *k3s) Register(ctx context.Context,
|
||||
nodes coreclient.NodeController,
|
||||
pods coreclient.PodController,
|
||||
endpointslices discoveryclient.EndpointSliceController,
|
||||
) error {
|
||||
nodes.OnChange(ctx, controllerName, k.onChangeNode)
|
||||
pods.OnChange(ctx, controllerName, k.onChangePod)
|
||||
endpointslices.OnChange(ctx, controllerName, k.onChangeEndpointSlice)
|
||||
|
||||
if err := k.createServiceLBNamespace(ctx); err != nil {
|
||||
return err
|
||||
|
@ -135,6 +139,22 @@ func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) {
|
|||
return node, nil
|
||||
}
|
||||
|
||||
// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer
|
||||
// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local.
|
||||
func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*discovery.EndpointSlice, error) {
|
||||
if eps == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
serviceName, ok := eps.Labels[discovery.LabelServiceName]
|
||||
if !ok {
|
||||
return eps, nil
|
||||
}
|
||||
|
||||
k.workqueue.Add(eps.Namespace + "/" + serviceName)
|
||||
return eps, nil
|
||||
}
|
||||
|
||||
// runWorker dequeues Service changes from the work queue
|
||||
// We run a lightweight work queue to handle service updates. We don't need the full overhead
|
||||
// of a wrangler service controller and shared informer cache, but we do want to run changes
|
||||
|
@ -219,16 +239,37 @@ func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
|||
// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods
|
||||
// matching the selected service.
|
||||
func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) {
|
||||
pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{
|
||||
var readyNodes map[string]bool
|
||||
|
||||
if servicehelper.RequestsOnlyLocalTraffic(svc) {
|
||||
readyNodes = map[string]bool{}
|
||||
eps, err := k.endpointsCache.List(svc.Namespace, labels.SelectorFromSet(labels.Set{
|
||||
discovery.LabelServiceName: svc.Name,
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, ep := range eps {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
isPod := endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod"
|
||||
isReady := endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready
|
||||
if isPod && isReady && endpoint.NodeName != nil {
|
||||
readyNodes[*endpoint.NodeName] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{
|
||||
svcNameLabel: svc.Name,
|
||||
svcNamespaceLabel: svc.Namespace,
|
||||
}))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
expectedIPs, err := k.podIPs(pods, svc)
|
||||
expectedIPs, err := k.podIPs(pods, svc, readyNodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -267,7 +308,7 @@ func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.Loa
|
|||
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
|
||||
// If at least one node has External IPs available, only external IPs are returned.
|
||||
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
|
||||
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
|
||||
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service, readyNodes map[string]bool) ([]string, error) {
|
||||
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
|
||||
// to determine the unique set of IPs in use by pods.
|
||||
extIPs := map[string]bool{}
|
||||
|
@ -280,6 +321,9 @@ func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
|
|||
if !Ready.IsTrue(pod) {
|
||||
continue
|
||||
}
|
||||
if readyNodes != nil && !readyNodes[pod.Spec.NodeName] {
|
||||
continue
|
||||
}
|
||||
|
||||
node, err := k.nodeCache.Get(pod.Spec.NodeName)
|
||||
if apierrors.IsNotFound(err) {
|
||||
|
@ -405,54 +449,12 @@ func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error {
|
|||
func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
||||
name := generateName(svc)
|
||||
oneInt := intstr.FromInt(1)
|
||||
|
||||
localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc)
|
||||
sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds := &apps.DaemonSet{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: k.LBNamespace,
|
||||
Labels: map[string]string{
|
||||
nodeSelectorLabel: "false",
|
||||
svcNameLabel: svc.Name,
|
||||
svcNamespaceLabel: svc.Namespace,
|
||||
},
|
||||
},
|
||||
TypeMeta: meta.TypeMeta{
|
||||
Kind: "DaemonSet",
|
||||
APIVersion: "apps/v1",
|
||||
},
|
||||
Spec: apps.DaemonSetSpec{
|
||||
Selector: &meta.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"app": name,
|
||||
},
|
||||
},
|
||||
Template: core.PodTemplateSpec{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"app": name,
|
||||
svcNameLabel: svc.Name,
|
||||
svcNamespaceLabel: svc.Namespace,
|
||||
},
|
||||
},
|
||||
Spec: core.PodSpec{
|
||||
ServiceAccountName: "svclb",
|
||||
AutomountServiceAccountToken: utilpointer.Bool(false),
|
||||
},
|
||||
},
|
||||
UpdateStrategy: apps.DaemonSetUpdateStrategy{
|
||||
Type: apps.RollingUpdateDaemonSetStrategyType,
|
||||
RollingUpdate: &apps.RollingUpdateDaemonSet{
|
||||
MaxUnavailable: &oneInt,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var sysctls []core.Sysctl
|
||||
for _, ipFamily := range svc.Spec.IPFamilies {
|
||||
switch ipFamily {
|
||||
|
@ -463,7 +465,66 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
|||
}
|
||||
}
|
||||
|
||||
ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls}
|
||||
ds := &apps.DaemonSet{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: k.LBNamespace,
|
||||
Labels: labels.Set{
|
||||
nodeSelectorLabel: "false",
|
||||
svcNameLabel: svc.Name,
|
||||
svcNamespaceLabel: svc.Namespace,
|
||||
},
|
||||
},
|
||||
TypeMeta: meta.TypeMeta{
|
||||
Kind: "DaemonSet",
|
||||
APIVersion: "apps/v1",
|
||||
},
|
||||
Spec: apps.DaemonSetSpec{
|
||||
Selector: &meta.LabelSelector{
|
||||
MatchLabels: labels.Set{
|
||||
"app": name,
|
||||
},
|
||||
},
|
||||
Template: core.PodTemplateSpec{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Labels: labels.Set{
|
||||
"app": name,
|
||||
svcNameLabel: svc.Name,
|
||||
svcNamespaceLabel: svc.Namespace,
|
||||
},
|
||||
},
|
||||
Spec: core.PodSpec{
|
||||
ServiceAccountName: "svclb",
|
||||
AutomountServiceAccountToken: utilpointer.Bool(false),
|
||||
SecurityContext: &core.PodSecurityContext{
|
||||
Sysctls: sysctls,
|
||||
},
|
||||
Tolerations: []core.Toleration{
|
||||
{
|
||||
Key: "node-role.kubernetes.io/master",
|
||||
Operator: "Exists",
|
||||
Effect: "NoSchedule",
|
||||
},
|
||||
{
|
||||
Key: "node-role.kubernetes.io/control-plane",
|
||||
Operator: "Exists",
|
||||
Effect: "NoSchedule",
|
||||
},
|
||||
{
|
||||
Key: "CriticalAddonsOnly",
|
||||
Operator: "Exists",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
UpdateStrategy: apps.DaemonSetUpdateStrategy{
|
||||
Type: apps.RollingUpdateDaemonSetStrategyType,
|
||||
RollingUpdate: &apps.RollingUpdateDaemonSet{
|
||||
MaxUnavailable: &oneInt,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, port := range svc.Spec.Ports {
|
||||
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
|
||||
|
@ -492,14 +553,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
|||
Name: "DEST_PROTO",
|
||||
Value: string(port.Protocol),
|
||||
},
|
||||
{
|
||||
Name: "DEST_PORT",
|
||||
Value: strconv.Itoa(int(port.Port)),
|
||||
},
|
||||
{
|
||||
Name: "DEST_IPS",
|
||||
Value: strings.Join(svc.Spec.ClusterIPs, " "),
|
||||
},
|
||||
},
|
||||
SecurityContext: &core.SecurityContext{
|
||||
Capabilities: &core.Capabilities{
|
||||
|
@ -510,32 +563,37 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
|||
},
|
||||
}
|
||||
|
||||
if localTraffic {
|
||||
container.Env = append(container.Env,
|
||||
core.EnvVar{
|
||||
Name: "DEST_PORT",
|
||||
Value: strconv.Itoa(int(port.NodePort)),
|
||||
},
|
||||
core.EnvVar{
|
||||
Name: "DEST_IPS",
|
||||
ValueFrom: &core.EnvVarSource{
|
||||
FieldRef: &core.ObjectFieldSelector{
|
||||
FieldPath: "status.hostIP",
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
} else {
|
||||
container.Env = append(container.Env,
|
||||
core.EnvVar{
|
||||
Name: "DEST_PORT",
|
||||
Value: strconv.Itoa(int(port.Port)),
|
||||
},
|
||||
core.EnvVar{
|
||||
Name: "DEST_IPS",
|
||||
Value: strings.Join(svc.Spec.ClusterIPs, " "),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
|
||||
}
|
||||
|
||||
// Add toleration to noderole.kubernetes.io/master=*:NoSchedule
|
||||
masterToleration := core.Toleration{
|
||||
Key: "node-role.kubernetes.io/master",
|
||||
Operator: "Exists",
|
||||
Effect: "NoSchedule",
|
||||
}
|
||||
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration)
|
||||
|
||||
// Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule
|
||||
controlPlaneToleration := core.Toleration{
|
||||
Key: "node-role.kubernetes.io/control-plane",
|
||||
Operator: "Exists",
|
||||
Effect: "NoSchedule",
|
||||
}
|
||||
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration)
|
||||
|
||||
// Add toleration to CriticalAddonsOnly
|
||||
criticalAddonsOnlyToleration := core.Toleration{
|
||||
Key: "CriticalAddonsOnly",
|
||||
Operator: "Exists",
|
||||
}
|
||||
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration)
|
||||
|
||||
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
|
||||
enableNodeSelector, err := k.nodeHasDaemonSetLabel()
|
||||
if err != nil {
|
||||
|
@ -551,6 +609,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
|||
}
|
||||
ds.Labels[nodeSelectorLabel] = "true"
|
||||
}
|
||||
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
|
@ -563,7 +622,7 @@ func (k *k3s) updateDaemonSets() error {
|
|||
return err
|
||||
}
|
||||
|
||||
nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
|
||||
nodeSelector := labels.SelectorFromSet(labels.Set{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
|
||||
daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -371,9 +371,9 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
|
|||
func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error {
|
||||
return util.WaitForRBACReady(ctx, runtime.KubeConfigAdmin, timeout, authorizationv1.ResourceAttributes{
|
||||
Namespace: metav1.NamespaceSystem,
|
||||
Verb: "*",
|
||||
Resource: "daemonsets",
|
||||
Group: "apps",
|
||||
Verb: "watch",
|
||||
Resource: "endpointslices",
|
||||
Group: "discovery.k8s.io",
|
||||
}, version.Program+"-cloud-controller-manager")
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue