2021-07-07 15:46:10 +00:00
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol.go
2021-11-18 09:12:11 +00:00
//go:build !windows
2021-02-01 19:20:24 +00:00
// +build !windows
package netpol
import (
"context"
2022-11-02 14:29:50 +00:00
"runtime"
2021-11-18 09:12:11 +00:00
"strings"
2021-07-07 15:46:10 +00:00
"sync"
2023-12-20 13:27:07 +00:00
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
cloudproviderapi "k8s.io/cloud-provider/api"
2021-02-01 19:20:24 +00:00
2023-04-05 16:47:55 +00:00
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/netpol"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
2024-01-10 00:10:28 +00:00
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
2023-04-05 16:47:55 +00:00
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
2024-01-10 00:10:28 +00:00
"github.com/cloudnativelabs/kube-router/v2/pkg/version"
2022-02-23 12:42:21 +00:00
"github.com/coreos/go-iptables/iptables"
2022-03-02 23:47:27 +00:00
"github.com/k3s-io/k3s/pkg/daemons/config"
2022-02-23 12:42:21 +00:00
"github.com/pkg/errors"
2021-02-01 19:20:24 +00:00
"github.com/sirupsen/logrus"
2022-02-23 12:42:21 +00:00
v1core "k8s.io/api/core/v1"
2021-02-01 19:20:24 +00:00
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
2024-01-10 00:10:28 +00:00
"k8s.io/component-base/metrics/legacyregistry"
2021-02-01 19:20:24 +00:00
)
2024-01-10 00:10:28 +00:00
func init ( ) {
// ensure that kube-router exposes metrics through the same registry used by Kubernetes components
metrics . DefaultRegisterer = legacyregistry . Registerer ( )
metrics . DefaultGatherer = legacyregistry . DefaultGatherer
}
2021-02-01 19:20:24 +00:00
// Run creates and starts a new instance of the kube-router network policy controller
// The code in this function is cribbed from the upstream controller at:
// https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/cmd/kube-router.go#L78
2021-11-18 09:12:11 +00:00
// It converts the k3s config.Node into kube-router configuration (only the
// subset of options needed for netpol controller).
2021-02-01 19:20:24 +00:00
func Run ( ctx context . Context , nodeConfig * config . Node ) error {
set , err := utils . NewIPSet ( false )
if err != nil {
logrus . Warnf ( "Skipping network policy controller start, ipset unavailable: %v" , err )
return nil
}
if err := set . Save ( ) ; err != nil {
logrus . Warnf ( "Skipping network policy controller start, ipset save failed: %v" , err )
return nil
}
restConfig , err := clientcmd . BuildConfigFromFlags ( "" , nodeConfig . AgentConfig . KubeConfigK3sController )
if err != nil {
return err
}
client , err := kubernetes . NewForConfig ( restConfig )
if err != nil {
return err
}
2023-12-20 13:27:07 +00:00
// As kube-router netpol requires addresses to be available in the node object
// Wait until the node has ready addresses to avoid race conditions (max 1 minute).
// TODO: Replace with non-deprecated PollUntilContextTimeout when our and Kubernetes code migrate to it
if err := wait . PollImmediateWithContext ( ctx , 2 * time . Second , 60 * time . Second , func ( ctx context . Context ) ( bool , error ) {
// Get the node object
node , err := client . CoreV1 ( ) . Nodes ( ) . Get ( ctx , nodeConfig . AgentConfig . NodeName , metav1 . GetOptions { } )
if err != nil {
2024-02-27 01:17:23 +00:00
logrus . Debugf ( "Network policy controller waiting to get Node %s: %v" , nodeConfig . AgentConfig . NodeName , err )
2024-02-26 18:44:50 +00:00
return false , nil
2023-12-20 13:27:07 +00:00
}
// Check for the uninitialized taint that should be removed by cloud-provider
// If there is no cloud-provider, the taint will not be there
for _ , taint := range node . Spec . Taints {
if taint . Key == cloudproviderapi . TaintExternalCloudProvider {
2024-02-27 01:17:23 +00:00
logrus . Debugf ( "Network policy controller waiting for removal of %s taint" , cloudproviderapi . TaintExternalCloudProvider )
2023-12-20 13:27:07 +00:00
return false , nil
}
}
return true , nil
} ) ; err != nil {
2024-02-27 01:17:23 +00:00
return errors . Wrapf ( err , "network policy controller timed out waiting for %s taint to be removed from Node %s" , cloudproviderapi . TaintExternalCloudProvider , nodeConfig . AgentConfig . NodeName )
2023-12-20 13:27:07 +00:00
}
2024-01-10 00:10:28 +00:00
2021-11-18 09:12:11 +00:00
krConfig := options . NewKubeRouterConfig ( )
2023-03-11 03:57:16 +00:00
var serviceIPs [ ] string
for _ , elem := range nodeConfig . AgentConfig . ServiceCIDRs {
serviceIPs = append ( serviceIPs , elem . String ( ) )
}
krConfig . ClusterIPCIDRs = serviceIPs
2022-04-20 14:01:49 +00:00
krConfig . EnableIPv4 = nodeConfig . AgentConfig . EnableIPv4
2022-02-23 12:42:21 +00:00
krConfig . EnableIPv6 = nodeConfig . AgentConfig . EnableIPv6
2021-11-18 09:12:11 +00:00
krConfig . NodePortRange = strings . ReplaceAll ( nodeConfig . AgentConfig . ServiceNodePortRange . String ( ) , "-" , ":" )
krConfig . HostnameOverride = nodeConfig . AgentConfig . NodeName
2024-01-10 00:10:28 +00:00
krConfig . MetricsEnabled = true
2021-11-18 09:12:11 +00:00
krConfig . RunFirewall = true
krConfig . RunRouter = false
krConfig . RunServiceProxy = false
2021-02-01 19:20:24 +00:00
stopCh := ctx . Done ( )
2021-11-18 09:12:11 +00:00
healthCh := make ( chan * healthcheck . ControllerHeartbeat )
// We don't use this WaitGroup, but kube-router components require it.
var wg sync . WaitGroup
2021-02-01 19:20:24 +00:00
informerFactory := informers . NewSharedInformerFactory ( client , 0 )
podInformer := informerFactory . Core ( ) . V1 ( ) . Pods ( ) . Informer ( )
nsInformer := informerFactory . Core ( ) . V1 ( ) . Namespaces ( ) . Informer ( )
npInformer := informerFactory . Networking ( ) . V1 ( ) . NetworkPolicies ( ) . Informer ( )
informerFactory . Start ( stopCh )
informerFactory . WaitForCacheSync ( stopCh )
2022-02-23 12:42:21 +00:00
iptablesCmdHandlers := make ( map [ v1core . IPFamily ] utils . IPTablesHandler , 2 )
ipSetHandlers := make ( map [ v1core . IPFamily ] utils . IPSetHandler , 2 )
2022-04-25 13:41:49 +00:00
if nodeConfig . AgentConfig . EnableIPv4 {
iptHandler , err := iptables . NewWithProtocol ( iptables . ProtocolIPv4 )
if err != nil {
return errors . Wrap ( err , "failed to create iptables handler" )
}
iptablesCmdHandlers [ v1core . IPv4Protocol ] = iptHandler
2022-02-23 12:42:21 +00:00
2022-04-25 13:41:49 +00:00
ipset , err := utils . NewIPSet ( false )
if err != nil {
return errors . Wrap ( err , "failed to create ipset handler" )
}
ipSetHandlers [ v1core . IPv4Protocol ] = ipset
2022-02-23 12:42:21 +00:00
}
if nodeConfig . AgentConfig . EnableIPv6 {
ipt6Handler , err := iptables . NewWithProtocol ( iptables . ProtocolIPv6 )
if err != nil {
return errors . Wrap ( err , "failed to create iptables handler" )
}
iptablesCmdHandlers [ v1core . IPv6Protocol ] = ipt6Handler
ipset , err := utils . NewIPSet ( true )
if err != nil {
return errors . Wrap ( err , "failed to create ipset handler" )
}
ipSetHandlers [ v1core . IPv6Protocol ] = ipset
}
2024-01-10 00:10:28 +00:00
// Start kube-router healthcheck controller; netpol requires it
2021-11-18 09:12:11 +00:00
hc , err := healthcheck . NewHealthController ( krConfig )
if err != nil {
return err
}
2022-02-10 18:53:36 +00:00
2024-01-10 00:10:28 +00:00
// Start kube-router metrics controller to avoid complaints about metrics heartbeat missing
mc , err := metrics . NewMetricsController ( krConfig )
if err != nil {
return nil
}
// Initialize all healthcheck timers. Otherwise, the system reports heartbeat missing messages
2022-02-10 18:53:36 +00:00
hc . SetAlive ( )
2021-11-18 09:12:11 +00:00
wg . Add ( 1 )
go hc . RunCheck ( healthCh , stopCh , & wg )
2024-01-10 00:10:28 +00:00
wg . Add ( 1 )
go metricsRunCheck ( mc , healthCh , stopCh , & wg )
2022-02-23 12:42:21 +00:00
npc , err := netpol . NewNetworkPolicyController ( client , krConfig , podInformer , npInformer , nsInformer , & sync . Mutex { } ,
iptablesCmdHandlers , ipSetHandlers )
2021-02-01 19:20:24 +00:00
if err != nil {
2024-01-10 00:10:28 +00:00
return errors . Wrap ( err , "unable to initialize network policy controller" )
2021-02-01 19:20:24 +00:00
}
podInformer . AddEventHandler ( npc . PodEventHandler )
nsInformer . AddEventHandler ( npc . NamespaceEventHandler )
npInformer . AddEventHandler ( npc . NetworkPolicyEventHandler )
2021-11-18 09:12:11 +00:00
wg . Add ( 1 )
2024-01-10 00:10:28 +00:00
logrus . Infof ( "Starting network policy controller version %s, built on %s, %s" , version . Version , version . BuildDate , runtime . Version ( ) )
2021-11-18 09:12:11 +00:00
go npc . Run ( healthCh , stopCh , & wg )
2021-02-01 19:20:24 +00:00
return nil
}
2024-01-10 00:10:28 +00:00
// metricsRunCheck is a stub version of mc.Run() that doesn't start up a dedicated http server.
func metricsRunCheck ( mc * metrics . Controller , healthChan chan <- * healthcheck . ControllerHeartbeat , stopCh <- chan struct { } , wg * sync . WaitGroup ) {
t := time . NewTicker ( 3 * time . Second )
defer wg . Done ( )
// register metrics for this controller
metrics . BuildInfo . WithLabelValues ( runtime . Version ( ) , version . Version ) . Set ( 1 )
metrics . DefaultRegisterer . MustRegister ( metrics . BuildInfo )
for {
healthcheck . SendHeartBeat ( healthChan , "MC" )
select {
case <- stopCh :
t . Stop ( )
return
case <- t . C :
logrus . Debugf ( "Kube-router network policy controller metrics tick" )
}
}
}