mirror of https://github.com/k3s-io/k3s
209 lines
7.1 KiB
Go
209 lines
7.1 KiB
Go
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
|
|
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol.go
|
|
|
|
//go:build !windows
|
|
// +build !windows
|
|
|
|
package netpol
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
cloudproviderapi "k8s.io/cloud-provider/api"
|
|
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/netpol"
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/version"
|
|
"github.com/coreos/go-iptables/iptables"
|
|
"github.com/k3s-io/k3s/pkg/daemons/config"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
v1core "k8s.io/api/core/v1"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/component-base/metrics/legacyregistry"
|
|
)
|
|
|
|
func init() {
|
|
// ensure that kube-router exposes metrics through the same registry used by Kubernetes components
|
|
metrics.DefaultRegisterer = legacyregistry.Registerer()
|
|
metrics.DefaultGatherer = legacyregistry.DefaultGatherer
|
|
}
|
|
|
|
// Run creates and starts a new instance of the kube-router network policy controller
|
|
// The code in this function is cribbed from the upstream controller at:
|
|
// https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/cmd/kube-router.go#L78
|
|
// It converts the k3s config.Node into kube-router configuration (only the
|
|
// subset of options needed for netpol controller).
|
|
func Run(ctx context.Context, nodeConfig *config.Node) error {
|
|
set, err := utils.NewIPSet(false)
|
|
if err != nil {
|
|
logrus.Warnf("Skipping network policy controller start, ipset unavailable: %v", err)
|
|
return nil
|
|
}
|
|
|
|
if err := set.Save(); err != nil {
|
|
logrus.Warnf("Skipping network policy controller start, ipset save failed: %v", err)
|
|
return nil
|
|
}
|
|
|
|
restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigK3sController)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
client, err := kubernetes.NewForConfig(restConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// As kube-router netpol requires addresses to be available in the node object
|
|
// Wait until the node has ready addresses to avoid race conditions (max 1 minute).
|
|
// TODO: Replace with non-deprecated PollUntilContextTimeout when our and Kubernetes code migrate to it
|
|
if err := wait.PollImmediateWithContext(ctx, 2*time.Second, 60*time.Second, func(ctx context.Context) (bool, error) {
|
|
// Get the node object
|
|
node, err := client.CoreV1().Nodes().Get(ctx, nodeConfig.AgentConfig.NodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
logrus.Errorf("Error getting the node object: %v", err)
|
|
return false, err
|
|
}
|
|
// Check for the uninitialized taint that should be removed by cloud-provider
|
|
// If there is no cloud-provider, the taint will not be there
|
|
for _, taint := range node.Spec.Taints {
|
|
if taint.Key == cloudproviderapi.TaintExternalCloudProvider {
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
krConfig := options.NewKubeRouterConfig()
|
|
var serviceIPs []string
|
|
for _, elem := range nodeConfig.AgentConfig.ServiceCIDRs {
|
|
serviceIPs = append(serviceIPs, elem.String())
|
|
}
|
|
krConfig.ClusterIPCIDRs = serviceIPs
|
|
krConfig.EnableIPv4 = nodeConfig.AgentConfig.EnableIPv4
|
|
krConfig.EnableIPv6 = nodeConfig.AgentConfig.EnableIPv6
|
|
krConfig.NodePortRange = strings.ReplaceAll(nodeConfig.AgentConfig.ServiceNodePortRange.String(), "-", ":")
|
|
krConfig.HostnameOverride = nodeConfig.AgentConfig.NodeName
|
|
krConfig.MetricsEnabled = true
|
|
krConfig.RunFirewall = true
|
|
krConfig.RunRouter = false
|
|
krConfig.RunServiceProxy = false
|
|
|
|
stopCh := ctx.Done()
|
|
healthCh := make(chan *healthcheck.ControllerHeartbeat)
|
|
|
|
// We don't use this WaitGroup, but kube-router components require it.
|
|
var wg sync.WaitGroup
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
podInformer := informerFactory.Core().V1().Pods().Informer()
|
|
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
|
|
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
|
|
informerFactory.Start(stopCh)
|
|
informerFactory.WaitForCacheSync(stopCh)
|
|
|
|
iptablesCmdHandlers := make(map[v1core.IPFamily]utils.IPTablesHandler, 2)
|
|
ipSetHandlers := make(map[v1core.IPFamily]utils.IPSetHandler, 2)
|
|
|
|
if nodeConfig.AgentConfig.EnableIPv4 {
|
|
iptHandler, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to create iptables handler")
|
|
}
|
|
iptablesCmdHandlers[v1core.IPv4Protocol] = iptHandler
|
|
|
|
ipset, err := utils.NewIPSet(false)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to create ipset handler")
|
|
}
|
|
ipSetHandlers[v1core.IPv4Protocol] = ipset
|
|
}
|
|
|
|
if nodeConfig.AgentConfig.EnableIPv6 {
|
|
ipt6Handler, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to create iptables handler")
|
|
}
|
|
iptablesCmdHandlers[v1core.IPv6Protocol] = ipt6Handler
|
|
|
|
ipset, err := utils.NewIPSet(true)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to create ipset handler")
|
|
}
|
|
ipSetHandlers[v1core.IPv6Protocol] = ipset
|
|
}
|
|
|
|
// Start kube-router healthcheck controller; netpol requires it
|
|
hc, err := healthcheck.NewHealthController(krConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start kube-router metrics controller to avoid complaints about metrics heartbeat missing
|
|
mc, err := metrics.NewMetricsController(krConfig)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Initialize all healthcheck timers. Otherwise, the system reports heartbeat missing messages
|
|
hc.SetAlive()
|
|
|
|
wg.Add(1)
|
|
go hc.RunCheck(healthCh, stopCh, &wg)
|
|
|
|
wg.Add(1)
|
|
go metricsRunCheck(mc, healthCh, stopCh, &wg)
|
|
|
|
npc, err := netpol.NewNetworkPolicyController(client, krConfig, podInformer, npInformer, nsInformer, &sync.Mutex{},
|
|
iptablesCmdHandlers, ipSetHandlers)
|
|
if err != nil {
|
|
return errors.Wrap(err, "unable to initialize network policy controller")
|
|
}
|
|
|
|
podInformer.AddEventHandler(npc.PodEventHandler)
|
|
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
|
|
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)
|
|
|
|
wg.Add(1)
|
|
logrus.Infof("Starting network policy controller version %s, built on %s, %s", version.Version, version.BuildDate, runtime.Version())
|
|
go npc.Run(healthCh, stopCh, &wg)
|
|
|
|
return nil
|
|
}
|
|
|
|
// metricsRunCheck is a stub version of mc.Run() that doesn't start up a dedicated http server.
|
|
func metricsRunCheck(mc *metrics.Controller, healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
|
t := time.NewTicker(3 * time.Second)
|
|
defer wg.Done()
|
|
|
|
// register metrics for this controller
|
|
metrics.BuildInfo.WithLabelValues(runtime.Version(), version.Version).Set(1)
|
|
metrics.DefaultRegisterer.MustRegister(metrics.BuildInfo)
|
|
|
|
for {
|
|
healthcheck.SendHeartBeat(healthChan, "MC")
|
|
select {
|
|
case <-stopCh:
|
|
t.Stop()
|
|
return
|
|
case <-t.C:
|
|
logrus.Debugf("Kube-router network policy controller metrics tick")
|
|
}
|
|
}
|
|
}
|