You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
k3s/pkg/servicelb/controller.go

408 lines
9.3 KiB

package servicelb
import (
"context"
"fmt"
"sort"
"strconv"
appclient "github.com/rancher/wrangler-api/pkg/generated/controllers/apps/v1"
coreclient "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/rancher/wrangler/pkg/relatedresource"
"github.com/rancher/wrangler/pkg/slice"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
v1getter "k8s.io/client-go/kubernetes/typed/apps/v1"
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
)
const (
image = "rancher/klipper-lb:v0.1.2"
svcNameLabel = "svccontroller.k3s.cattle.io/svcname"
daemonsetNodeLabel = "svccontroller.k3s.cattle.io/enablelb"
nodeSelectorLabel = "svccontroller.k3s.cattle.io/nodeselector"
Ready = condition.Cond("Ready")
)
var (
trueVal = true
)
func Register(ctx context.Context,
kubernetes kubernetes.Interface,
apply apply.Apply,
daemonSetController appclient.DaemonSetController,
deployments appclient.DeploymentController,
nodes coreclient.NodeController,
pods coreclient.PodController,
services coreclient.ServiceController,
endpoints coreclient.EndpointsController,
enabled, rootless bool) error {
h := &handler{
rootless: rootless,
enabled: enabled,
nodeCache: nodes.Cache(),
podCache: pods.Cache(),
deploymentCache: deployments.Cache(),
processor: apply.WithSetID("svccontroller").
WithCacheTypes(daemonSetController),
serviceCache: services.Cache(),
services: kubernetes.CoreV1(),
daemonsets: kubernetes.AppsV1(),
deployments: kubernetes.AppsV1(),
}
services.OnChange(ctx, "svccontroller", h.onChangeService)
nodes.OnChange(ctx, "svccontroller", h.onChangeNode)
relatedresource.Watch(ctx, "svccontroller-watcher",
h.onResourceChange,
services,
pods,
endpoints)
return nil
}
type handler struct {
rootless bool
enabled bool
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
deploymentCache appclient.DeploymentCache
processor apply.Apply
serviceCache coreclient.ServiceCache
services coregetter.ServicesGetter
daemonsets v1getter.DaemonSetsGetter
deployments v1getter.DeploymentsGetter
}
func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) {
if ep, ok := obj.(*core.Endpoints); ok {
return []relatedresource.Key{
{
Name: ep.Name,
Namespace: ep.Namespace,
},
}, nil
}
pod, ok := obj.(*core.Pod)
if !ok {
return nil, nil
}
serviceName := pod.Labels[svcNameLabel]
if serviceName == "" {
return nil, nil
}
if pod.Status.PodIP == "" {
return nil, nil
}
return []relatedresource.Key{
{
Name: serviceName,
Namespace: pod.Namespace,
},
}, nil
}
func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service, error) {
if svc == nil {
return nil, nil
}
if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" ||
svc.Spec.ClusterIP == "None" {
return svc, nil
}
if err := h.deployPod(svc); err != nil {
return svc, err
}
// Don't return service because we don't want another update
_, err := h.updateService(svc)
return nil, err
}
func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error) {
if node == nil {
return nil, nil
}
if _, ok := node.Labels[daemonsetNodeLabel]; !ok {
return node, nil
}
if err := h.updateDaemonSets(); err != nil {
return node, err
}
return node, nil
}
func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
if !h.enabled {
return svc, nil
}
pods, err := h.podCache.List(svc.Namespace, labels.SelectorFromSet(map[string]string{
svcNameLabel: svc.Name,
}))
if err != nil {
return svc, err
}
existingIPs := serviceIPs(svc)
expectedIPs, err := h.podIPs(pods)
if err != nil {
return svc, err
}
sort.Strings(expectedIPs)
sort.Strings(existingIPs)
if slice.StringsEqual(expectedIPs, existingIPs) {
return svc, nil
}
svc = svc.DeepCopy()
svc.Status.LoadBalancer.Ingress = nil
for _, ip := range expectedIPs {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, core.LoadBalancerIngress{
IP: ip,
})
}
logrus.Debugf("Setting service loadbalancer %s/%s to IPs %v", svc.Namespace, svc.Name, expectedIPs)
return h.services.Services(svc.Namespace).UpdateStatus(svc)
}
func serviceIPs(svc *core.Service) []string {
var ips []string
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP != "" {
ips = append(ips, ingress.IP)
}
}
return ips
}
func (h *handler) podIPs(pods []*core.Pod) ([]string, error) {
ips := map[string]bool{}
for _, pod := range pods {
if pod.Spec.NodeName == "" || pod.Status.PodIP == "" {
continue
}
if !Ready.IsTrue(pod) {
continue
}
node, err := h.nodeCache.Get(pod.Spec.NodeName)
if errors.IsNotFound(err) {
continue
} else if err != nil {
return nil, err
}
for _, addr := range node.Status.Addresses {
if addr.Type == core.NodeExternalIP {
ips[addr.Address] = true
}
}
if len(ips) == 0 {
for _, addr := range node.Status.Addresses {
if addr.Type == core.NodeInternalIP {
ips[addr.Address] = true
}
}
}
}
var ipList []string
for k := range ips {
ipList = append(ipList, k)
}
if len(ipList) > 0 && h.rootless {
return []string{"127.0.0.1"}, nil
}
return ipList, nil
}
func (h *handler) deployPod(svc *core.Service) error {
if err := h.deleteOldDeployments(svc); err != nil {
return err
}
objs := objectset.NewObjectSet()
if !h.enabled {
return h.processor.WithOwner(svc).Apply(objs)
}
ds, err := h.newDaemonSet(svc)
if err != nil {
return err
}
if ds != nil {
objs.Add(ds)
}
return h.processor.WithOwner(svc).Apply(objs)
}
func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := fmt.Sprintf("svclb-%s", svc.Name)
oneInt := intstr.FromInt(1)
ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: svc.Namespace,
OwnerReferences: []meta.OwnerReference{
{
Name: svc.Name,
APIVersion: "v1",
Kind: "Service",
UID: svc.UID,
Controller: &trueVal,
},
},
Labels: map[string]string{
nodeSelectorLabel: "false",
},
},
TypeMeta: meta.TypeMeta{
Kind: "DaemonSet",
APIVersion: "apps/v1",
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
"app": name,
svcNameLabel: svc.Name,
},
},
},
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneInt,
},
},
},
}
for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-port-%d", port.Port)
container := core.Container{
Name: portName,
Image: image,
ImagePullPolicy: core.PullIfNotPresent,
Ports: []core.ContainerPort{
{
Name: portName,
ContainerPort: port.Port,
HostPort: port.Port,
},
},
Env: []core.EnvVar{
{
Name: "SRC_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_PROTO",
Value: string(port.Protocol),
},
{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_IP",
Value: svc.Spec.ClusterIP,
},
},
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
Add: []core.Capability{
"NET_ADMIN",
},
},
},
}
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
selector, err := labels.Parse(daemonsetNodeLabel)
if err != nil {
return nil, err
}
nodesWithLabel, err := h.nodeCache.List(selector)
if err != nil {
return nil, err
}
if len(nodesWithLabel) > 0 {
ds.Spec.Template.Spec.NodeSelector = map[string]string{
daemonsetNodeLabel: "true",
}
ds.Labels[nodeSelectorLabel] = "true"
}
return ds, nil
}
func (h *handler) updateDaemonSets() error {
daemonsets, err := h.daemonsets.DaemonSets("").List(meta.ListOptions{
LabelSelector: nodeSelectorLabel + "=false",
})
if err != nil {
return err
}
for _, ds := range daemonsets.Items {
ds.Spec.Template.Spec.NodeSelector = map[string]string{
daemonsetNodeLabel: "true",
}
ds.Labels[nodeSelectorLabel] = "true"
if _, err := h.daemonsets.DaemonSets(ds.Namespace).Update(&ds); err != nil {
return err
}
}
return nil
}
func (h *handler) deleteOldDeployments(svc *core.Service) error {
name := fmt.Sprintf("svclb-%s", svc.Name)
if _, err := h.deploymentCache.Get(svc.Namespace, name); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return h.deployments.Deployments(svc.Namespace).Delete(name, &meta.DeleteOptions{})
}