mirror of https://github.com/k3s-io/k3s
Support MixedProtocolLBService and clean up Daemonsets on type change.
Also add event support to increase visibility of change events. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/5221/head
parent
0ed46d27c7
commit
44c53743dd
|
@ -8,6 +8,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
util "github.com/k3s-io/k3s/pkg/util"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/rancher/wrangler/pkg/apply"
|
||||
"github.com/rancher/wrangler/pkg/condition"
|
||||
|
@ -16,7 +17,6 @@ import (
|
|||
"github.com/rancher/wrangler/pkg/objectset"
|
||||
"github.com/rancher/wrangler/pkg/relatedresource"
|
||||
"github.com/rancher/wrangler/pkg/slice"
|
||||
"github.com/sirupsen/logrus"
|
||||
apps "k8s.io/api/apps/v1"
|
||||
core "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
|
@ -27,6 +27,7 @@ import (
|
|||
"k8s.io/client-go/kubernetes"
|
||||
v1getter "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
utilsnet "k8s.io/utils/net"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
)
|
||||
|
@ -40,7 +41,8 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
Ready = condition.Cond("Ready")
|
||||
Ready = condition.Cond("Ready")
|
||||
ControllerName = "svccontroller"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -63,17 +65,17 @@ func Register(ctx context.Context,
|
|||
nodeCache: nodes.Cache(),
|
||||
podCache: pods.Cache(),
|
||||
deploymentCache: deployments.Cache(),
|
||||
processor: apply.WithSetID("svccontroller").
|
||||
WithCacheTypes(daemonSetController),
|
||||
serviceCache: services.Cache(),
|
||||
services: kubernetes.CoreV1(),
|
||||
daemonsets: kubernetes.AppsV1(),
|
||||
deployments: kubernetes.AppsV1(),
|
||||
processor: apply.WithSetID(ControllerName).WithCacheTypes(daemonSetController),
|
||||
serviceCache: services.Cache(),
|
||||
services: kubernetes.CoreV1(),
|
||||
daemonsets: kubernetes.AppsV1(),
|
||||
deployments: kubernetes.AppsV1(),
|
||||
recorder: util.BuildControllerEventRecorder(kubernetes, ControllerName),
|
||||
}
|
||||
|
||||
services.OnChange(ctx, "svccontroller", h.onChangeService)
|
||||
nodes.OnChange(ctx, "svccontroller", h.onChangeNode)
|
||||
relatedresource.Watch(ctx, "svccontroller-watcher",
|
||||
services.OnChange(ctx, ControllerName, h.onChangeService)
|
||||
nodes.OnChange(ctx, ControllerName, h.onChangeNode)
|
||||
relatedresource.Watch(ctx, ControllerName+"-watcher",
|
||||
h.onResourceChange,
|
||||
services,
|
||||
pods,
|
||||
|
@ -93,6 +95,7 @@ type handler struct {
|
|||
services coregetter.ServicesGetter
|
||||
daemonsets v1getter.DaemonSetsGetter
|
||||
deployments v1getter.DeploymentsGetter
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) {
|
||||
|
@ -133,9 +136,10 @@ func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service,
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" ||
|
||||
svc.Spec.ClusterIP == "None" {
|
||||
return svc, nil
|
||||
if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
|
||||
// If the Service type changes from LoadBalancer to something else, make sure we remove the DaemonSet
|
||||
err := h.deletePod(svc)
|
||||
return svc, err
|
||||
}
|
||||
|
||||
if err := h.deployPod(svc); err != nil {
|
||||
|
@ -200,7 +204,7 @@ func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
|
|||
})
|
||||
}
|
||||
|
||||
logrus.Debugf("Setting service loadbalancer %s/%s to IPs %v", svc.Namespace, svc.Name, expectedIPs)
|
||||
h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", "))
|
||||
return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{})
|
||||
}
|
||||
|
||||
|
@ -325,7 +329,22 @@ func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) {
|
|||
return nil, errors.New("unhandled ipFamilyPolicy")
|
||||
}
|
||||
|
||||
// deployPod ensures that there is a DaemonSet for each service.
|
||||
// deletePod ensures that there is not a DaemonSet for the service.
|
||||
func (h *handler) deletePod(svc *core.Service) error {
|
||||
name := fmt.Sprintf("svclb-%s", svc.Name)
|
||||
ds, err := h.daemonsets.DaemonSets(svc.Namespace).Get(context.TODO(), name, meta.GetOptions{})
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
h.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleted LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
|
||||
objs := objectset.NewObjectSet()
|
||||
return h.processor.WithOwner(svc).Apply(objs)
|
||||
}
|
||||
|
||||
// deployPod ensures that there is a DaemonSet for the service.
|
||||
// It also ensures that any legacy Deployments from older versions of ServiceLB are deleted.
|
||||
func (h *handler) deployPod(svc *core.Service) error {
|
||||
if err := h.deleteOldDeployments(svc); err != nil {
|
||||
|
@ -342,6 +361,7 @@ func (h *handler) deployPod(svc *core.Service) error {
|
|||
}
|
||||
if ds != nil {
|
||||
objs.Add(ds)
|
||||
h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
|
||||
}
|
||||
return h.processor.WithOwner(svc).Apply(objs)
|
||||
}
|
||||
|
@ -421,7 +441,7 @@ func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
|||
}
|
||||
|
||||
for _, port := range svc.Spec.Ports {
|
||||
portName := fmt.Sprintf("lb-port-%d", port.Port)
|
||||
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
|
||||
container := core.Container{
|
||||
Name: portName,
|
||||
Image: DefaultLBImage,
|
||||
|
@ -525,7 +545,6 @@ func (h *handler) updateDaemonSets() error {
|
|||
if _, err := h.daemonsets.DaemonSets(ds.Namespace).Update(context.TODO(), &ds, meta.UpdateOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue