Update embedded kube-router (#3557)

* Update embedded kube-router

Signed-off-by: dereknola <derek.nola@suse.com>
pull/3483/head
Derek Nola 2021-07-07 08:46:10 -07:00 committed by GitHub
parent 2093ee384f
commit 73df2d806b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 990 additions and 619 deletions

1
go.mod
View File

@ -126,6 +126,7 @@ require (
k8s.io/controller-manager v0.21.2
k8s.io/cri-api v0.21.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.8.0
k8s.io/kubectl v0.21.2
k8s.io/kubernetes v1.21.2
k8s.io/utils v0.0.0-20201110183641-67b214c5f920

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/namespace.go
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/namespace.go
// +build !windows
@ -10,7 +10,7 @@ import (
api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
"k8s.io/klog/v2"
)
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
@ -32,7 +32,7 @@ func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEve
return
}
default:
glog.Errorf("unexpected object type: %v", obj)
klog.Errorf("unexpected object type: %v", obj)
}
},
}
@ -42,7 +42,7 @@ func (npc *NetworkPolicyController) handleNamespaceAdd(obj *api.Namespace) {
if obj.Labels == nil {
return
}
glog.V(2).Infof("Received update for namespace: %s", obj.Name)
klog.V(2).Infof("Received update for namespace: %s", obj.Name)
npc.RequestFullSync()
}
@ -51,7 +51,7 @@ func (npc *NetworkPolicyController) handleNamespaceUpdate(oldObj, newObj *api.Na
if reflect.DeepEqual(oldObj.Labels, newObj.Labels) {
return
}
glog.V(2).Infof("Received update for namespace: %s", newObj.Name)
klog.V(2).Infof("Received update for namespace: %s", newObj.Name)
npc.RequestFullSync()
}
@ -60,7 +60,7 @@ func (npc *NetworkPolicyController) handleNamespaceDelete(obj *api.Namespace) {
if obj.Labels == nil {
return
}
glog.V(2).Infof("Received namespace: %s delete event", obj.Name)
klog.V(2).Infof("Received namespace: %s delete event", obj.Name)
npc.RequestFullSync()
}

View File

@ -1,9 +1,13 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol.go
// +build !windows
package netpol
import (
"context"
"sync"
"github.com/rancher/k3s/pkg/agent/netpol/utils"
"github.com/rancher/k3s/pkg/daemons/config"
@ -48,7 +52,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)
npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer)
npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer, &sync.Mutex{})
if err != nil {
return err
}

View File

@ -1,11 +1,12 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/network_policy_controller.go
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/network_policy_controller.go
// +build !windows
package netpol
import (
"bytes"
"crypto/sha256"
"encoding/base32"
"fmt"
@ -22,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
"k8s.io/klog/v2"
)
const (
@ -33,6 +34,7 @@ const (
kubeInputChainName = "KUBE-ROUTER-INPUT"
kubeForwardChainName = "KUBE-ROUTER-FORWARD"
kubeOutputChainName = "KUBE-ROUTER-OUTPUT"
kubeDefaultNetpolChain = "KUBE-NWPLCY-DEFAULT"
defaultSyncPeriod = 5 * time.Minute
)
@ -47,7 +49,7 @@ const (
// followed by one or more network policy chains, till there is a match which will accept the packet, or gets
// dropped by the rule in the pod chain, if there is no match.
// NetworkPolicyController strcut to hold information required by NetworkPolicyController
// NetworkPolicyController struct to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
@ -57,6 +59,7 @@ type NetworkPolicyController struct {
mu sync.Mutex
syncPeriod time.Duration
fullSyncRequestChan chan struct{}
ipsetMutex *sync.Mutex
ipSetHandler *utils.IPSet
@ -67,6 +70,8 @@ type NetworkPolicyController struct {
PodEventHandler cache.ResourceEventHandler
NamespaceEventHandler cache.ResourceEventHandler
NetworkPolicyEventHandler cache.ResourceEventHandler
filterTableRules bytes.Buffer
}
// internal structure to represent a network policy
@ -119,6 +124,7 @@ type egressRule struct {
type protocolAndPort struct {
protocol string
port string
endport string
}
type endPoints struct {
@ -135,29 +141,32 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
glog.Info("Starting network policy controller")
klog.Info("Starting network policy controller")
// setup kube-router specific top level cutoms chains
// setup kube-router specific top level custom chains (KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT)
npc.ensureTopLevelChains()
// setup default network policy chain that is applied to traffic from/to the pods that does not match any network policy
npc.ensureDefaultNetworkPolicyChain()
// Full syncs of the network policy controller take a lot of time and can only be processed one at a time,
// therefore, we start it in it's own goroutine and request a sync through a single item channel
glog.Info("Starting network policy controller full sync goroutine")
klog.Info("Starting network policy controller full sync goroutine")
go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}) {
for {
// Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first
select {
case <-stopCh:
glog.Info("Shutting down network policies full sync goroutine")
klog.Info("Shutting down network policies full sync goroutine")
return
default:
}
select {
case <-stopCh:
glog.Info("Shutting down network policies full sync goroutine")
klog.Info("Shutting down network policies full sync goroutine")
return
case <-fullSyncRequest:
glog.V(3).Info("Received request for a full sync, processing")
klog.V(3).Info("Received request for a full sync, processing")
npc.fullPolicySync() // fullPolicySync() is a blocking request here
}
}
@ -165,11 +174,11 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
// loop forever till notified to stop on stopCh
for {
glog.V(1).Info("Requesting periodic sync of iptables to reflect network policies")
klog.V(1).Info("Requesting periodic sync of iptables to reflect network policies")
npc.RequestFullSync()
select {
case <-stopCh:
glog.Infof("Shutting down network policies controller")
klog.Infof("Shutting down network policies controller")
return
case <-t.C:
}
@ -180,9 +189,9 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
func (npc *NetworkPolicyController) RequestFullSync() {
select {
case npc.fullSyncRequestChan <- struct{}{}:
glog.V(3).Info("Full sync request queue was empty so a full sync request was successfully sent")
klog.V(3).Info("Full sync request queue was empty so a full sync request was successfully sent")
default: // Don't block if the buffered channel is full, return quickly so that we don't block callee execution
glog.V(1).Info("Full sync request queue was full, skipping...")
klog.V(1).Info("Full sync request queue was full, skipping...")
}
}
@ -198,35 +207,55 @@ func (npc *NetworkPolicyController) fullPolicySync() {
syncVersion := strconv.FormatInt(start.UnixNano(), 10)
defer func() {
endTime := time.Since(start)
glog.V(1).Infof("sync iptables took %v", endTime)
klog.V(1).Infof("sync iptables took %v", endTime)
}()
glog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion)
klog.V(1).Infof("Starting sync of iptables with version: %s", syncVersion)
// ensure kube-router specific top level chains and corresponding rules exist
npc.ensureTopLevelChains()
// ensure default network policy chain that is applied to traffic from/to the pods that does not match any network policy
npc.ensureDefaultNetworkPolicyChain()
networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
if err != nil {
glog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error())
klog.Errorf("Aborting sync. Failed to build network policies: %v", err.Error())
return
}
npc.filterTableRules.Reset()
if err := utils.SaveInto("filter", &npc.filterTableRules); err != nil {
klog.Errorf("Aborting sync. Failed to run iptables-save: %v" + err.Error())
return
}
activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion)
if err != nil {
glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error())
klog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error())
return
}
activePodFwChains, err := npc.syncPodFirewallChains(networkPoliciesInfo, syncVersion)
if err != nil {
glog.Errorf("Aborting sync. Failed to sync pod firewalls: %v", err.Error())
klog.Errorf("Aborting sync. Failed to sync pod firewalls: %v", err.Error())
return
}
err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets)
err = npc.cleanupStaleRules(activePolicyChains, activePodFwChains)
if err != nil {
glog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error())
klog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error())
return
}
if err := utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil {
klog.Errorf("Aborting sync. Failed to run iptables-restore: %v\n%s", err.Error(), npc.filterTableRules.String())
return
}
err = npc.cleanupStaleIPSets(activePolicyIPSets)
if err != nil {
klog.Errorf("Failed to cleanup stale ipsets: %v", err.Error())
return
}
}
@ -240,7 +269,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor due to %s", err.Error())
klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error())
}
addUUIDForRuleSpec := func(chain string, ruleSpec *[]string) (string, error) {
@ -258,18 +287,18 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
ensureRuleAtPosition := func(chain string, ruleSpec []string, uuid string, position int) {
exists, err := iptablesCmdHandler.Exists("filter", chain, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error())
klog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
klog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
}
return
}
rules, err := iptablesCmdHandler.List("filter", chain)
if err != nil {
glog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error())
klog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error())
}
var ruleNo, ruleIndexOffset int
@ -290,11 +319,11 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
if ruleNo != position {
err = iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
klog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
}
err = iptablesCmdHandler.Delete("filter", chain, strconv.Itoa(ruleNo+1))
if err != nil {
glog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error())
klog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error())
}
}
}
@ -304,12 +333,12 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
for builtinChain, customChain := range chains {
err = iptablesCmdHandler.NewChain("filter", customChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
glog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error())
klog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error())
}
args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain}
uuid, err := addUUIDForRuleSpec(builtinChain, &args)
if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error())
klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
}
ensureRuleAtPosition(builtinChain, args, uuid, 1)
}
@ -317,7 +346,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", npc.serviceClusterIPRange.String(), "-j", "RETURN"}
uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips)
if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error())
klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
}
ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, 1)
@ -325,7 +354,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistTCPNodeports)
if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error())
klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
}
ensureRuleAtPosition(kubeInputChainName, whitelistTCPNodeports, uuid, 2)
@ -333,7 +362,7 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistUDPNodeports)
if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error())
klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
}
ensureRuleAtPosition(kubeInputChainName, whitelistUDPNodeports, uuid, 3)
@ -341,40 +370,66 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to external IP range: " + externalIPRange.String(), "-d", externalIPRange.String(), "-j", "RETURN"}
uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips)
if err != nil {
glog.Fatalf("Failed to get uuid for rule: %s", err.Error())
klog.Fatalf("Failed to get uuid for rule: %s", err.Error())
}
ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, externalIPIndex+4)
}
// for the traffic to/from the local pod's let network policy controller be
// authoritative entity to ACCEPT the traffic if it complies to network policies
for _, chain := range chains {
comment := "rule to explicitly ACCEPT traffic that comply to network policies"
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", "0x20000/0x20000", "-j", "ACCEPT"}
err = iptablesCmdHandler.AppendUnique("filter", chain, args...)
if err != nil {
klog.Fatalf("Failed to run iptables command: %s", err.Error())
}
}
}
func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error {
// Creates custom chains KUBE-NWPLCY-DEFAULT
func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() {
iptablesCmdHandler, err := iptables.New()
if err != nil {
klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error())
}
markArgs := make([]string, 0)
markComment := "rule to mark traffic matching a network policy"
markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000")
err = iptablesCmdHandler.NewChain("filter", kubeDefaultNetpolChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
klog.Fatalf("Failed to run iptables command to create %s chain due to %s", kubeDefaultNetpolChain, err.Error())
}
err = iptablesCmdHandler.AppendUnique("filter", kubeDefaultNetpolChain, markArgs...)
if err != nil {
klog.Fatalf("Failed to run iptables command: %s", err.Error())
}
}
func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) error {
cleanupPodFwChains := make([]string, 0)
cleanupPolicyChains := make([]string, 0)
cleanupPolicyIPSets := make([]*utils.Set, 0)
// initialize tool sets for working with iptables and ipset
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error())
}
ipsets, err := utils.NewIPSet(false)
if err != nil {
glog.Fatalf("failed to create ipsets command executor due to %s", err.Error())
}
err = ipsets.Save()
if err != nil {
glog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
return fmt.Errorf("failed to initialize iptables command executor due to %s", err.Error())
}
// find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed
chains, err := iptablesCmdHandler.ListChains("filter")
if err != nil {
return fmt.Errorf("Unable to list chains: %s", err)
return fmt.Errorf("unable to list chains: %s", err)
}
for _, chain := range chains {
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
if chain == kubeDefaultNetpolChain {
continue
}
if _, ok := activePolicyChains[chain]; !ok {
cleanupPolicyChains = append(cleanupPolicyChains, chain)
}
@ -385,6 +440,58 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
}
}
}
var newChains, newRules, desiredFilterTable bytes.Buffer
rules := strings.Split(npc.filterTableRules.String(), "\n")
if len(rules) > 0 && rules[len(rules)-1] == "" {
rules = rules[:len(rules)-1]
}
for _, rule := range rules {
skipRule := false
for _, podFWChainName := range cleanupPodFwChains {
if strings.Contains(rule, podFWChainName) {
skipRule = true
break
}
}
for _, policyChainName := range cleanupPolicyChains {
if strings.Contains(rule, policyChainName) {
skipRule = true
break
}
}
if strings.Contains(rule, "COMMIT") || strings.HasPrefix(rule, "# ") {
skipRule = true
}
if skipRule {
continue
}
if strings.HasPrefix(rule, ":") {
newChains.WriteString(rule + " - [0:0]\n")
}
if strings.HasPrefix(rule, "-") {
newRules.WriteString(rule + "\n")
}
}
desiredFilterTable.WriteString("*filter" + "\n")
desiredFilterTable.Write(newChains.Bytes())
desiredFilterTable.Write(newRules.Bytes())
desiredFilterTable.WriteString("COMMIT" + "\n")
npc.filterTableRules = desiredFilterTable
return nil
}
func (npc *NetworkPolicyController) cleanupStaleIPSets(activePolicyIPSets map[string]bool) error {
cleanupPolicyIPSets := make([]*utils.Set, 0)
ipsets, err := utils.NewIPSet(false)
if err != nil {
return fmt.Errorf("failed to create ipsets command executor due to %s", err.Error())
}
err = ipsets.Save()
if err != nil {
klog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error())
}
for _, set := range ipsets.Sets {
if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) ||
strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) {
@ -393,83 +500,11 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
}
}
}
// remove stale iptables podFwChain references from the filter table chains
for _, podFwChain := range cleanupPodFwChains {
primaryChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, egressChain := range primaryChains {
forwardChainRules, err := iptablesCmdHandler.List("filter", egressChain)
if err != nil {
return fmt.Errorf("failed to list rules in filter table, %s podFwChain due to %s", egressChain, err.Error())
}
// TODO delete rule by spec, than rule number to avoid extra loop
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, podFwChain) {
err = iptablesCmdHandler.Delete("filter", egressChain, strconv.Itoa(i-realRuleNo))
if err != nil {
return fmt.Errorf("failed to delete rule: %s from the %s podFwChain of filter table due to %s", rule, egressChain, err.Error())
}
realRuleNo++
}
}
}
}
// cleanup pod firewall chain
for _, chain := range cleanupPodFwChains {
glog.V(2).Infof("Found pod fw chain to cleanup: %s", chain)
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error())
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error())
}
glog.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain)
}
// cleanup network policy chains
for _, policyChain := range cleanupPolicyChains {
glog.V(2).Infof("Found policy chain to cleanup %s", policyChain)
// first clean up any references from active pod firewall chains
for podFwChain := range activePodFwChains {
podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain)
if err != nil {
return fmt.Errorf("Unable to list rules from the chain %s: %s", podFwChain, err)
}
for i, rule := range podFwChainRules {
if strings.Contains(rule, policyChain) {
err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i))
if err != nil {
return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain)
}
break
}
}
}
// now that all stale and active references to the network policy chain have been removed, delete the chain
err = iptablesCmdHandler.ClearChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
err = iptablesCmdHandler.DeleteChain("filter", policyChain)
if err != nil {
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
}
glog.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain)
}
// cleanup network policy ipsets
for _, set := range cleanupPolicyIPSets {
err = set.Destroy()
if err != nil {
return fmt.Errorf("Failed to delete ipset %s due to %s", set.Name, err)
return fmt.Errorf("failed to delete ipset %s due to %s", set.Name, err)
}
}
return nil
@ -478,17 +513,18 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
// Cleanup cleanup configurations done
func (npc *NetworkPolicyController) Cleanup() {
glog.Info("Cleaning up iptables configuration permanently done by kube-router")
klog.Info("Cleaning up iptables configuration permanently done by kube-router")
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Errorf("Failed to initialize iptables executor: %s", err.Error())
klog.Errorf("Failed to initialize iptables executor: %s", err.Error())
return
}
// delete jump rules in FORWARD chain to pod specific firewall chain
forwardChainRules, err := iptablesCmdHandler.List("filter", kubeForwardChainName)
if err != nil {
glog.Errorf("Failed to delete iptables rules as part of cleanup")
klog.Errorf("Failed to delete iptables rules as part of cleanup")
return
}
@ -498,7 +534,7 @@ func (npc *NetworkPolicyController) Cleanup() {
if strings.Contains(rule, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.Delete("filter", kubeForwardChainName, strconv.Itoa(i-realRuleNo))
if err != nil {
glog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err)
klog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err)
}
realRuleNo++
}
@ -507,7 +543,7 @@ func (npc *NetworkPolicyController) Cleanup() {
// delete jump rules in OUTPUT chain to pod specific firewall chain
forwardChainRules, err = iptablesCmdHandler.List("filter", kubeOutputChainName)
if err != nil {
glog.Errorf("Failed to delete iptables rules as part of cleanup")
klog.Errorf("Failed to delete iptables rules as part of cleanup")
return
}
@ -517,7 +553,7 @@ func (npc *NetworkPolicyController) Cleanup() {
if strings.Contains(rule, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.Delete("filter", kubeOutputChainName, strconv.Itoa(i-realRuleNo))
if err != nil {
glog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err)
klog.Errorf("Failed to delete iptables rule as part of cleanup: %s", err)
}
realRuleNo++
}
@ -526,19 +562,19 @@ func (npc *NetworkPolicyController) Cleanup() {
// flush and delete pod specific firewall chain
chains, err := iptablesCmdHandler.ListChains("filter")
if err != nil {
glog.Errorf("Unable to list chains: %s", err)
klog.Errorf("Unable to list chains: %s", err)
return
}
for _, chain := range chains {
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
}
@ -547,45 +583,53 @@ func (npc *NetworkPolicyController) Cleanup() {
// flush and delete per network policy specific chain
chains, err = iptablesCmdHandler.ListChains("filter")
if err != nil {
glog.Errorf("Unable to list chains: %s", err)
klog.Errorf("Unable to list chains: %s", err)
return
}
for _, chain := range chains {
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
err = iptablesCmdHandler.ClearChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
err = iptablesCmdHandler.DeleteChain("filter", chain)
if err != nil {
glog.Errorf("Failed to cleanup iptables rules: " + err.Error())
klog.Errorf("Failed to cleanup iptables rules: " + err.Error())
return
}
}
}
// delete all ipsets
klog.V(1).Infof("Attempting to attain ipset mutex lock")
npc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
npc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()
ipset, err := utils.NewIPSet(false)
if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error())
klog.Errorf("Failed to clean up ipsets: " + err.Error())
return
}
err = ipset.Save()
if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error())
klog.Errorf("Failed to clean up ipsets: " + err.Error())
}
err = ipset.DestroyAllWithin()
if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error())
klog.Errorf("Failed to clean up ipsets: " + err.Error())
}
glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
klog.Infof("Successfully cleaned the iptables configuration done by kube-router")
}
// NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *config.Node, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{}
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{ipsetMutex: ipsetMutex}
// Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time,
// additional requests would be pointless to queue since after the first one was processed the system would already
@ -601,24 +645,12 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
return nil, err
}
npc.nodeHostName = node.Name
nodeIP, err := utils.GetNodeIP(node)
if err != nil {
return nil, err
}
npc.nodeIP = nodeIP
ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, err
}
err = ipset.Save()
if err != nil {
return nil, err
}
npc.ipSetHandler = ipset
npc.podLister = podInformer.GetIndexer()
npc.PodEventHandler = npc.newPodEventHandler()

View File

@ -1,19 +1,24 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/network_policy_controller_test.go
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/network_policy_controller_test.go
// +build !windows
package netpol
import (
"bytes"
"context"
"fmt"
"net"
"strings"
"sync"
"testing"
"time"
"github.com/rancher/k3s/pkg/daemons/config"
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"
v1 "k8s.io/api/core/v1"
@ -23,8 +28,6 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"github.com/rancher/k3s/pkg/daemons/config"
)
// newFakeInformersFromClient creates the different informers used in the uneventful network policy controller
@ -213,6 +216,7 @@ type tNetpolTestCase struct {
targetPods tPodNamespaceMap
inSourcePods tPodNamespaceMap
outDestPods tPodNamespaceMap
expectedRule string
}
// tGetNotTargetedPods finds set of pods that should not be targeted by netpol selectors
@ -417,6 +421,182 @@ func TestNewNetworkPolicySelectors(t *testing.T) {
}
}
func TestNetworkPolicyBuilder(t *testing.T) {
port, port1 := intstr.FromInt(30000), intstr.FromInt(34000)
ingressPort := intstr.FromInt(37000)
endPort, endPort1 := int32(31000), int32(35000)
testCases := []tNetpolTestCase{
{
name: "Simple Egress Destination Port",
netpol: tNetpol{name: "simple-egress", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
{
name: "Simple Ingress/Egress Destination Port",
netpol: tNetpol{name: "simple-ingress-egress", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port,
},
},
},
},
ingress: []netv1.NetworkPolicyIngressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &ingressPort,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n" +
"-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
{
name: "Simple Egress Destination Port Range",
netpol: tNetpol{name: "simple-egress-pr", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port,
EndPort: &endPort,
},
{
Port: &port1,
EndPort: &endPort1,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -m mark --mark 0x10000/0x10000 -j RETURN \n" +
"-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
{
name: "Port > EndPort (invalid condition, should drop endport)",
netpol: tNetpol{name: "invalid-endport", namespace: "nsA",
podSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: "In",
Values: []string{"a"},
},
},
},
egress: []netv1.NetworkPolicyEgressRule{
{
Ports: []netv1.NetworkPolicyPort{
{
Port: &port1,
EndPort: &endPort,
},
},
},
},
},
expectedRule: "-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -j MARK --set-xmark 0x10000/0x10000 \n" +
"-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -m mark --mark 0x10000/0x10000 -j RETURN \n",
},
}
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", "10.10.10.10")}})
informerFactory, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)
krNetPol, _ := newUneventfulNetworkPolicyController(podInformer, netpolInformer, nsInformer)
tCreateFakePods(t, podInformer, nsInformer)
for _, test := range testCases {
test.netpol.createFakeNetpol(t, netpolInformer)
netpols, err := krNetPol.buildNetworkPoliciesInfo()
if err != nil {
t.Errorf("Problems building policies: %s", err)
}
for _, np := range netpols {
fmt.Printf(np.policyType)
if np.policyType == "egress" || np.policyType == "both" {
err = krNetPol.processEgressRules(np, "", nil, "1")
if err != nil {
t.Errorf("Error syncing the rules: %s", err)
}
}
if np.policyType == "ingress" || np.policyType == "both" {
err = krNetPol.processIngressRules(np, "", nil, "1")
if err != nil {
t.Errorf("Error syncing the rules: %s", err)
}
}
}
if !bytes.Equal([]byte(test.expectedRule), krNetPol.filterTableRules.Bytes()) {
t.Errorf("Invalid rule %s created:\nExpected:\n%s \nGot:\n%s", test.name, test.expectedRule, krNetPol.filterTableRules.String())
}
key := fmt.Sprintf("%s/%s", test.netpol.namespace, test.netpol.name)
obj, exists, err := krNetPol.npLister.GetByKey(key)
if err != nil {
t.Errorf("Failed to get Netpol from store: %s", err)
}
if exists {
err = krNetPol.npLister.Delete(obj)
if err != nil {
t.Errorf("Failed to remove Netpol from store: %s", err)
}
}
krNetPol.filterTableRules.Reset()
}
}
func TestNetworkPolicyController(t *testing.T) {
testCases := []tNetPolConfigTestCase{
{
@ -429,7 +609,7 @@ func TestNetworkPolicyController(t *testing.T) {
"Missing nodename fails appropriately",
newMinimalNodeConfig("", "", "", nil),
true,
"Failed to identify the node by NODE_NAME, hostname or --hostname-override",
"failed to identify the node by NODE_NAME, hostname or --hostname-override",
},
{
"Test good cluster CIDR (using single IP with a /32)",
@ -466,7 +646,7 @@ func TestNetworkPolicyController(t *testing.T) {
_, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client)
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer)
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{})
if err == nil && test.expectError {
t.Error("This config should have failed, but it was successful instead")
} else if err != nil {

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/pod.go
// +build !windows
@ -8,26 +8,40 @@ package netpol
import (
"crypto/sha256"
"encoding/base32"
"fmt"
"strings"
"github.com/coreos/go-iptables/iptables"
api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
"k8s.io/klog/v2"
)
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)
if podObj, ok := obj.(*api.Pod); ok {
// If the pod isn't yet actionable there is no action to take here anyway, so skip it. When it becomes
// actionable, we'll get an update below.
if isNetPolActionable(podObj) {
npc.OnPodUpdate(obj)
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPoObj := newObj.(*api.Pod)
oldPoObj := oldObj.(*api.Pod)
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
// for the network policies, we are only interested in pod status phase change or IP change
var newPodObj, oldPodObj *api.Pod
var ok bool
// If either of these objects are not pods, quit now
if newPodObj, ok = newObj.(*api.Pod); !ok {
return
}
if oldPodObj, ok = oldObj.(*api.Pod); !ok {
return
}
// We don't check isNetPolActionable here, because if it is transitioning in or out of the actionable state
// we want to run the full sync so that it can be added or removed from the existing network policy of the host
// For the network policies, we are only interested in some changes, most pod changes aren't relevant to network policy
if isPodUpdateNetPolRelevant(oldPodObj, newPodObj) {
npc.OnPodUpdate(newObj)
}
},
@ -40,7 +54,7 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand
// OnPodUpdate handles updates to pods from the Kubernetes api server
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
pod := obj.(*api.Pod)
glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
klog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
npc.RequestFullSync()
}
@ -50,15 +64,15 @@ func (npc *NetworkPolicyController) handlePodDelete(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("unexpected object type: %v", obj)
klog.Errorf("unexpected object type: %v", obj)
return
}
if pod, ok = tombstone.Obj.(*api.Pod); !ok {
glog.Errorf("unexpected object type: %v", obj)
klog.Errorf("unexpected object type: %v", obj)
return
}
}
glog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name)
klog.V(2).Infof("Received pod: %s/%s delete event", pod.Namespace, pod.Name)
npc.RequestFullSync()
}
@ -67,315 +81,184 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
activePodFwChains := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor: %s", err.Error())
}
dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) error {
// add rule to log the packets that will be dropped due to network policy enforcement
comment := "rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
comment := "\"rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace + "\""
args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"}
// This used to be AppendUnique when we were using iptables directly, this checks to make sure we didn't drop unmarked for this chain already
if strings.Contains(npc.filterTableRules.String(), strings.Join(args, " ")) {
return nil
}
npc.filterTableRules.WriteString(strings.Join(args, " "))
// add rule to DROP if no applicable network policy permits the traffic
comment = "rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace
args = []string{"-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
comment = "\"rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace + "\""
args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
// reset mark to let traffic pass through rest of the chains
args = []string{"-j", "MARK", "--set-mark", "0/0x10000"}
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
args = []string{"-A", podFwChainName, "-j", "MARK", "--set-mark", "0/0x10000", "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
return nil
}
// loop through the pods running on the node which to which ingress network policies to be applied
ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String())
// loop through the pods running on the node
allLocalPods, err := npc.getLocalPods(npc.nodeIP.String())
if err != nil {
return nil, err
}
for _, pod := range *ingressNetworkPolicyEnabledPods {
// below condition occurs when we get trasient update while removing or adding pod
// subsequent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}
for _, pod := range *allLocalPods {
// ensure pod specific firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
npc.filterTableRules.WriteString(":" + podFwChainName + "\n")
activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies
for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
}
// setup rules to run through applicable ingress/egress network policies for the pod
npc.setupPodNetpolRules(&pod, podFwChainName, networkPoliciesInfo, version)
comment := "rule to permit the traffic traffic to pods when source is the pod's local node"
args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// setup rules to intercept inbound traffic to the pods
npc.interceptPodInboundTraffic(&pod, podFwChainName)
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod
comment = "rule for stateful firewall for pod"
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting routed (coming for other node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
exists, err = iptablesCmdHandler.Exists("filter", kubeOutputChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", kubeOutputChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-d", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
// setup rules to intercept inbound traffic to the pods
npc.interceptPodOutboundTraffic(&pod, podFwChainName)
err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName)
if err != nil {
return nil, err
}
}
// loop through the pods running on the node which egress network policies to be applied
egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(networkPoliciesInfo, npc.nodeIP.String())
if err != nil {
return nil, err
}
for _, pod := range *egressNetworkPolicyEnabledPods {
// below condition occurs when we get trasient update while removing or adding pod
// subsequent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}
// ensure pod specific firewall chain exist for all the pods that need egress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
activePodFwChains[podFwChainName] = true
// add entries in pod firewall to run through required network policies
for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; ok {
comment := "run through nw policy " + policy.name
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
}
// ensure statefull firewall, that permits return traffic for the traffic originated by the pod
comment := "rule for stateful firewall for pod"
args := []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, chain := range egressFilterChains {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
// to pod on a different node)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.AppendUnique("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-s", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", kubeForwardChainName, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", kubeForwardChainName, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
err = dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName)
if err != nil {
return nil, err
}
// set mark to indicate traffic from/to the pod passed network policies.
// Mark will be checked to explicitly ACCEPT the traffic
comment := "\"set mark to ACCEPT traffic that comply to network policies\""
args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-j", "MARK", "--set-mark", "0x20000/0x20000", "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
return activePodFwChains, nil
}
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) {
nodePods := make(map[string]podInfo)
// setup rules to jump to applicable network policy chains for the traffic from/to the pod
func (npc *NetworkPolicyController) setupPodNetpolRules(pod *podInfo, podFwChainName string, networkPoliciesInfo []networkPolicyInfo, version string) {
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)
hasIngressPolicy := false
hasEgressPolicy := false
if strings.Compare(pod.Status.HostIP, nodeIP) != 0 {
// add entries in pod firewall to run through applicable network policies
for _, policy := range networkPoliciesInfo {
if _, ok := policy.targetPods[pod.ip]; !ok {
continue
}
for _, policy := range networkPoliciesInfo {
if policy.namespace != pod.ObjectMeta.Namespace {
continue
}
_, ok := policy.targetPods[pod.Status.PodIP]
if ok && (policy.policyType == "both" || policy.policyType == "ingress") {
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
namespace: pod.ObjectMeta.Namespace,
labels: pod.ObjectMeta.Labels}
break
}
comment := "\"run through nw policy " + policy.name + "\""
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
var args []string
switch policy.policyType {
case "both":
hasIngressPolicy = true
hasEgressPolicy = true
args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
case "ingress":
hasIngressPolicy = true
args = []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
case "egress":
hasEgressPolicy = true
args = []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"}
}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
return &nodePods, nil
// if pod does not have any network policy which applies rules for pod's ingress traffic
// then apply default network policy
if !hasIngressPolicy {
comment := "\"run through default ingress network policy chain\""
args := []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
// if pod does not have any network policy which applies rules for pod's egress traffic
// then apply default network policy
if !hasEgressPolicy {
comment := "\"run through default egress network policy chain\""
args := []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\""
args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
// ensure statefull firewall that permits RELATED,ESTABLISHED traffic from/to the pod
comment = "\"rule for stateful firewall for pod\""
args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) {
func (npc *NetworkPolicyController) interceptPodInboundTraffic(pod *podInfo, podFwChainName string) {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting routed (coming for other node pods)
comment := "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args := []string{"-I", kubeForwardChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
nodePods := make(map[string]podInfo)
// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
args = []string{"-I", kubeOutputChainName, "1", "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args = []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-d", pod.ip,
"-j", podFwChainName, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
// setup iptable rules to intercept outbound traffic from pods and run it across the
// firewall chain corresponding to the pod so that egress network policies are enforced
func (npc *NetworkPolicyController) interceptPodOutboundTraffic(pod *podInfo, podFwChainName string) {
egressFilterChains := []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName}
for _, chain := range egressFilterChains {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
// to pod on a different node)
comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args := []string{"-I", chain, "1", "-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName + "\""
args := []string{"-I", kubeForwardChainName, "1", "-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-s", pod.ip,
"-j", podFwChainName, "\n"}
npc.filterTableRules.WriteString(strings.Join(args, " "))
}
func (npc *NetworkPolicyController) getLocalPods(nodeIP string) (*map[string]podInfo, error) {
localPods := make(map[string]podInfo)
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)
if strings.Compare(pod.Status.HostIP, nodeIP) != 0 {
// ignore the pods running on the different node and pods that are not actionable
if strings.Compare(pod.Status.HostIP, nodeIP) != 0 || !isNetPolActionable(pod) {
continue
}
for _, policy := range networkPoliciesInfo {
if policy.namespace != pod.ObjectMeta.Namespace {
continue
}
_, ok := policy.targetPods[pod.Status.PodIP]
if ok && (policy.policyType == "both" || policy.policyType == "egress") {
glog.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
namespace: pod.ObjectMeta.Namespace,
labels: pod.ObjectMeta.Labels}
break
}
}
localPods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
namespace: pod.ObjectMeta.Namespace,
labels: pod.ObjectMeta.Labels}
}
return &nodePods, nil
return &localPods, nil
}
func podFirewallChainName(namespace, podName string, version string) string {

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/pod.go
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/policy.go
// +build !windows
@ -14,7 +14,6 @@ import (
"strings"
"time"
"github.com/coreos/go-iptables/iptables"
"github.com/rancher/k3s/pkg/agent/netpol/utils"
api "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
@ -23,7 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
"k8s.io/klog/v2"
)
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
@ -45,7 +44,7 @@ func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.Resourc
// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
netpol := obj.(*networking.NetworkPolicy)
glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
klog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
npc.RequestFullSync()
}
@ -55,15 +54,15 @@ func (npc *NetworkPolicyController) handleNetworkPolicyDelete(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("unexpected object type: %v", obj)
klog.Errorf("unexpected object type: %v", obj)
return
}
if netpol, ok = tombstone.Obj.(*networking.NetworkPolicy); !ok {
glog.Errorf("unexpected object type: %v", obj)
klog.Errorf("unexpected object type: %v", obj)
return
}
}
glog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name)
klog.V(2).Infof("Received network policy: %s/%s delete event", netpol.Namespace, netpol.Name)
npc.RequestFullSync()
}
@ -77,25 +76,36 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
start := time.Now()
defer func() {
endTime := time.Since(start)
glog.V(2).Infof("Syncing network policy chains took %v", endTime)
klog.V(2).Infof("Syncing network policy chains took %v", endTime)
}()
klog.V(1).Infof("Attempting to attain ipset mutex lock")
npc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
npc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()
ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, nil, err
}
err = ipset.Save()
if err != nil {
return nil, nil, err
}
npc.ipSetHandler = ipset
activePolicyChains := make(map[string]bool)
activePolicyIPSets := make(map[string]bool)
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("Failed to initialize iptables executor due to: %s", err.Error())
}
// run through all network policies
for _, policy := range networkPoliciesInfo {
// ensure there is a unique chain per network policy in filter table
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
err := iptablesCmdHandler.NewChain("filter", policyChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
npc.filterTableRules.WriteString(":" + policyChainName + "\n")
activePolicyChains[policyChainName] = true
@ -107,42 +117,39 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
if policy.policyType == "both" || policy.policyType == "ingress" {
// create a ipset for all destination pod ip's matched by the policy spec PodSelector
targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name)
targetDestPodIPSet, err := npc.ipSetHandler.Create(targetDestPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}
err = targetDestPodIPSet.Refresh(currnetPodIps)
if err != nil {
glog.Errorf("failed to refresh targetDestPodIPSet,: " + err.Error())
setEntries := make([][]string, 0)
for _, podIP := range currnetPodIps {
setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(targetDestPodIPSetName, setEntries, utils.TypeHashIP)
err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version)
if err != nil {
return nil, nil, err
}
activePolicyIPSets[targetDestPodIPSet.Name] = true
activePolicyIPSets[targetDestPodIPSetName] = true
}
if policy.policyType == "both" || policy.policyType == "egress" {
// create a ipset for all source pod ip's matched by the policy spec PodSelector
targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name)
targetSourcePodIPSet, err := npc.ipSetHandler.Create(targetSourcePodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}
err = targetSourcePodIPSet.Refresh(currnetPodIps)
if err != nil {
glog.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error())
setEntries := make([][]string, 0)
for _, podIP := range currnetPodIps {
setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(targetSourcePodIPSetName, setEntries, utils.TypeHashIP)
err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version)
if err != nil {
return nil, nil, err
}
activePolicyIPSets[targetSourcePodIPSet.Name] = true
activePolicyIPSets[targetSourcePodIPSetName] = true
}
}
glog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.")
err = npc.ipSetHandler.Restore()
if err != nil {
return nil, nil, fmt.Errorf("failed to perform ipset restore: %s", err.Error())
}
klog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.")
return activePolicyChains, activePolicyIPSets, nil
}
@ -156,11 +163,6 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
return nil
}
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the ingress rules in the spec and create iptables rules
@ -169,21 +171,12 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if len(ingressRule.srcPods) != 0 {
srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i)
srcPodIPSet, err := npc.ipSetHandler.Create(srcPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[srcPodIPSet.Name] = true
ingressRuleSrcPodIPs := make([]string, 0, len(ingressRule.srcPods))
activePolicyIPSets[srcPodIPSetName] = true
setEntries := make([][]string, 0)
for _, pod := range ingressRule.srcPods {
ingressRuleSrcPodIPs = append(ingressRuleSrcPodIPs, pod.ip)
}
err = srcPodIPSet.Refresh(ingressRuleSrcPodIPs)
if err != nil {
glog.Errorf("failed to refresh srcPodIPSet: " + err.Error())
setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(srcPodIPSetName, setEntries, utils.TypeHashIP)
if len(ingressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the ingress rule
@ -191,7 +184,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
@ -200,18 +193,16 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if len(ingressRule.namedPorts) != 0 {
for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err
}
}
@ -222,7 +213,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
// so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", "", ""); err != nil {
return err
}
}
@ -234,27 +225,23 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
}
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err
}
}
@ -265,47 +252,36 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if ingressRule.matchAllSource && ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, "", "", ""); err != nil {
return err
}
}
if len(ingressRule.srcIPBlocks) != 0 {
srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i)
srcIPBlockIPSet, err := npc.ipSetHandler.Create(srcIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[srcIPBlockIPSet.Name] = true
err = srcIPBlockIPSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks)
if err != nil {
glog.Errorf("failed to refresh srcIPBlockIPSet: " + err.Error())
}
activePolicyIPSets[srcIPBlockIPSetName] = true
npc.ipSetHandler.RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks, utils.TypeHashNet)
if !ingressRule.matchAllPorts {
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashNet)
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err
}
}
@ -313,7 +289,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
if ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", "", ""); err != nil {
return err
}
}
@ -332,11 +308,6 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return nil
}
iptablesCmdHandler, err := iptables.New()
if err != nil {
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
}
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
// run through all the egress rules in the spec and create iptables rules
@ -345,28 +316,19 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if len(egressRule.dstPods) != 0 {
dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i)
dstPodIPSet, err := npc.ipSetHandler.Create(dstPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[dstPodIPSet.Name] = true
egressRuleDstPodIps := make([]string, 0, len(egressRule.dstPods))
activePolicyIPSets[dstPodIPSetName] = true
setEntries := make([][]string, 0)
for _, pod := range egressRule.dstPods {
egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip)
}
err = dstPodIPSet.Refresh(egressRuleDstPodIps)
if err != nil {
glog.Errorf("failed to refresh dstPodIPSet: " + err.Error())
setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(dstPodIPSetName, setEntries, utils.TypeHashIP)
if len(egressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the egress rule
// so match on specified source and destination ip's and specified port (if any) and protocol
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
@ -375,20 +337,15 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if len(egressRule.namedPorts) != 0 {
for j, endPoints := range egressRule.namedPorts {
namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j)
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[namedPortIPSet.Name] = true
err = namedPortIPSet.Refresh(endPoints.ips)
if err != nil {
glog.Errorf("failed to refresh namedPortIPSet: " + err.Error())
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
return err
}
}
@ -400,7 +357,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
// so match on specified source and destination ip with all port and protocol
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", "", ""); err != nil {
return err
}
}
@ -412,7 +369,14 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
for _, portProtocol := range egressRule.namedPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
@ -423,26 +387,19 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if egressRule.matchAllDestinations && egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", "", "", ""); err != nil {
return err
}
}
if len(egressRule.dstIPBlocks) != 0 {
dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i)
dstIPBlockIPSet, err := npc.ipSetHandler.Create(dstIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
activePolicyIPSets[dstIPBlockIPSet.Name] = true
err = dstIPBlockIPSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks)
if err != nil {
glog.Errorf("failed to refresh dstIPBlockIPSet: " + err.Error())
}
activePolicyIPSets[dstIPBlockIPSetName] = true
npc.ipSetHandler.RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks, utils.TypeHashNet)
if !egressRule.matchAllPorts {
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
@ -450,7 +407,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
if egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", "", ""); err != nil {
return err
}
}
@ -459,13 +416,13 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return nil
}
func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error {
if iptablesCmdHandler == nil {
return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil")
}
func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort, endDport string) error {
args := make([]string, 0)
args = append(args, "-A", policyChainName)
if comment != "" {
args = append(args, "-m", "comment", "--comment", comment)
args = append(args, "-m", "comment", "--comment", "\""+comment+"\"")
}
if srcIPSetName != "" {
args = append(args, "-m", "set", "--match-set", srcIPSetName, "src")
@ -477,22 +434,19 @@ func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *
args = append(args, "-p", protocol)
}
if dPort != "" {
args = append(args, "--dport", dPort)
if endDport != "" {
multiport := fmt.Sprintf("%s:%s", dPort, endDport)
args = append(args, "--dport", multiport)
} else {
args = append(args, "--dport", dPort)
}
}
markComment := "rule to mark traffic matching a network policy"
markArgs := append(args, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000")
err := iptablesCmdHandler.AppendUnique("filter", policyChainName, markArgs...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
markArgs := append(args, "-j", "MARK", "--set-xmark", "0x10000/0x10000", "\n")
npc.filterTableRules.WriteString(strings.Join(markArgs, " "))
returnComment := "rule to RETURN traffic matching a network policy"
returnArgs := append(args, "-m", "comment", "--comment", returnComment, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN")
err = iptablesCmdHandler.AppendUnique("filter", policyChainName, returnArgs...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
returnArgs := append(args, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN", "\n")
npc.filterTableRules.WriteString(strings.Join(returnArgs, " "))
return nil
}
@ -506,7 +460,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
policy, ok := policyObj.(*networking.NetworkPolicy)
podSelector, _ := v1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if !ok {
return nil, fmt.Errorf("Failed to convert")
return nil, fmt.Errorf("failed to convert")
}
newPolicy := networkPolicyInfo{
name: policy.Name,
@ -537,7 +491,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
namedPort2IngressEps := make(namedPort2eps)
if err == nil {
for _, matchingPod := range matchingPods {
if matchingPod.Status.PodIP == "" {
if !isNetPolActionable(matchingPod) {
continue
}
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
@ -573,7 +527,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
for _, peer := range specIngressRule.From {
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
for _, peerPod := range peerPods {
if peerPod.Status.PodIP == "" {
if !isNetPolActionable(peerPod) {
continue
}
ingressRule.srcPods = append(ingressRule.srcPods,
@ -609,12 +563,23 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI
// If this field is empty or missing in the spec, this rule matches all sources
if len(specEgressRule.To) == 0 {
egressRule.matchAllDestinations = true
// if rule.To is empty but rule.Ports not, we must try to grab NamedPort from pods that in same namespace,
// so that we can design iptables rule to describe "match all dst but match some named dst-port" egress rule
if policyRulePortsHasNamedPort(specEgressRule.Ports) {
matchingPeerPods, _ := npc.ListPodsByNamespaceAndLabels(policy.Namespace, labels.Everything())
for _, peerPod := range matchingPeerPods {
if !isNetPolActionable(peerPod) {
continue
}
npc.grabNamedPortFromPod(peerPod, &namedPort2EgressEps)
}
}
} else {
egressRule.matchAllDestinations = false
for _, peer := range specEgressRule.To {
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
for _, peerPod := range peerPods {
if peerPod.Status.PodIP == "" {
if !isNetPolActionable(peerPod) {
continue
}
egressRule.dstPods = append(egressRule.dstPods,
@ -683,13 +648,24 @@ func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy
func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
for _, npPort := range npPorts {
var protocol string
if npPort.Protocol != nil {
protocol = string(*npPort.Protocol)
}
if npPort.Port == nil {
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)})
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: protocol})
} else if npPort.Port.Type == intstr.Int {
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)})
var portproto protocolAndPort
if npPort.EndPort != nil {
if *npPort.EndPort >= npPort.Port.IntVal {
portproto.endport = strconv.Itoa(int(*npPort.EndPort))
}
}
portproto.protocol, portproto.port = protocol, npPort.Port.String()
numericPorts = append(numericPorts, portproto)
} else {
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok {
if numericPort2eps, ok := protocol2eps[protocol]; ok {
for _, eps := range numericPort2eps {
namedPorts = append(namedPorts, *eps)
}
@ -818,3 +794,12 @@ func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressR
encoded := base32.StdEncoding.EncodeToString(hash[:])
return kubeDestinationIPSetPrefix + encoded[:16]
}
func policyRulePortsHasNamedPort(npPorts []networking.NetworkPolicyPort) bool {
for _, npPort := range npPorts {
if npPort.Port != nil && npPort.Port.Type == intstr.String {
return true
}
}
return false
}

66
pkg/agent/netpol/utils.go Normal file
View File

@ -0,0 +1,66 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/controllers/netpol/utils.go
package netpol
import (
"fmt"
"reflect"
"regexp"
"strconv"
api "k8s.io/api/core/v1"
)
const (
PodCompleted api.PodPhase = "Completed"
)
// isPodUpdateNetPolRelevant checks the attributes that we care about for building NetworkPolicies on the host and if it
// finds a relevant change, it returns true otherwise it returns false. The things we care about for NetworkPolicies:
// 1) Is the phase of the pod changing? (matters for catching completed, succeeded, or failed jobs)
// 2) Is the pod IP changing? (changes how the network policy is applied to the host)
// 3) Is the pod's host IP changing? (should be caught in the above, with the CNI kube-router runs with but we check this as well for sanity)
// 4) Is a pod's label changing? (potentially changes which NetworkPolicies select this pod)
func isPodUpdateNetPolRelevant(oldPod, newPod *api.Pod) bool {
return newPod.Status.Phase != oldPod.Status.Phase ||
newPod.Status.PodIP != oldPod.Status.PodIP ||
!reflect.DeepEqual(newPod.Status.PodIPs, oldPod.Status.PodIPs) ||
newPod.Status.HostIP != oldPod.Status.HostIP ||
!reflect.DeepEqual(newPod.Labels, oldPod.Labels)
}
func isNetPolActionable(pod *api.Pod) bool {
return !isFinished(pod) && pod.Status.PodIP != "" && !pod.Spec.HostNetwork
}
func isFinished(pod *api.Pod) bool {
switch pod.Status.Phase {
case api.PodFailed, api.PodSucceeded, PodCompleted:
return true
}
return false
}
func validateNodePortRange(nodePortOption string) (string, error) {
nodePortValidator := regexp.MustCompile(`^([0-9]+)[:-]([0-9]+)$`)
if matched := nodePortValidator.MatchString(nodePortOption); !matched {
return "", fmt.Errorf("failed to parse node port range given: '%s' please see specification in help text", nodePortOption)
}
matches := nodePortValidator.FindStringSubmatch(nodePortOption)
if len(matches) != 3 {
return "", fmt.Errorf("could not parse port number from range given: '%s'", nodePortOption)
}
port1, err := strconv.ParseUint(matches[1], 10, 16)
if err != nil {
return "", fmt.Errorf("could not parse first port number from range given: '%s'", nodePortOption)
}
port2, err := strconv.ParseUint(matches[2], 10, 16)
if err != nil {
return "", fmt.Errorf("could not parse second port number from range given: '%s'", nodePortOption)
}
if port1 >= port2 {
return "", fmt.Errorf("port 1 is greater than or equal to port 2 in range given: '%s'", nodePortOption)
}
return fmt.Sprintf("%d:%d", port1, port2), nil
}

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/ipset.go
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/ipset.go
// +build !windows
@ -7,15 +7,18 @@ package utils
import (
"bytes"
"crypto/sha1"
"encoding/base32"
"errors"
"fmt"
"os/exec"
"sort"
"strings"
)
var (
// Error returned when ipset binary is not found.
errIpsetNotFound = errors.New("Ipset utility not found")
errIpsetNotFound = errors.New("ipset utility not found")
)
const (
@ -82,6 +85,9 @@ const (
OptionNoMatch = "nomatch"
// OptionForceAdd All hash set types support the optional forceadd parameter when creating a set. When sets created with this option become full the next addition to the set may succeed and evict a random entry from the set.
OptionForceAdd = "forceadd"
// tmpIPSetPrefix Is the prefix added to temporary ipset names used in the atomic swap operations during ipset restore. You should never see these on your system because they only exist during the restore.
tmpIPSetPrefix = "TMP-"
)
// IPSet represent ipset sets managed by.
@ -181,7 +187,7 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error
// Determine if set with the same name is already active on the system
setIsActive, err := ipset.Sets[setName].IsActive()
if err != nil {
return nil, fmt.Errorf("Failed to determine if ipset set %s exists: %s",
return nil, fmt.Errorf("failed to determine if ipset set %s exists: %s",
setName, err)
}
@ -193,20 +199,20 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error
args = append(args, createOptions...)
args = append(args, "family", "inet6")
if _, err := ipset.run(args...); err != nil {
return nil, fmt.Errorf("Failed to create ipset set on system: %s", err)
return nil, fmt.Errorf("failed to create ipset set on system: %s", err)
}
} else {
_, err := ipset.run(append([]string{"create", "-exist", setName},
createOptions...)...)
if err != nil {
return nil, fmt.Errorf("Failed to create ipset set on system: %s", err)
return nil, fmt.Errorf("failed to create ipset set on system: %s", err)
}
}
}
return ipset.Sets[setName], nil
}
// Adds a given Set to an IPSet
// Add a given Set to an IPSet
func (ipset *IPSet) Add(set *Set) error {
_, err := ipset.Create(set.Name, set.Options...)
if err != nil {
@ -226,6 +232,22 @@ func (ipset *IPSet) Add(set *Set) error {
return nil
}
// RefreshSet add/update internal Sets with a Set of entries but does not run restore command
func (ipset *IPSet) RefreshSet(setName string, entriesWithOptions [][]string, setType string) {
if ipset.Get(setName) == nil {
ipset.Sets[setName] = &Set{
Name: setName,
Options: []string{setType, OptionTimeout, "0"},
Parent: ipset,
}
}
entries := make([]*Entry, len(entriesWithOptions))
for i, entry := range entriesWithOptions {
entries[i] = &Entry{Set: ipset.Sets[setName], Options: entry}
}
ipset.Get(setName).Entries = entries
}
// Add a given entry to the set. If the -exist option is specified, ipset
// ignores if the entry already added to the set.
// Note: if you need to add multiple entries (e.g., in a loop), use BatchAdd instead,
@ -243,7 +265,7 @@ func (set *Set) Add(addOptions ...string) (*Entry, error) {
return entry, nil
}
// Adds given entries (with their options) to the set.
// BatchAdd given entries (with their options) to the set.
// For multiple items, this is much faster than Add().
func (set *Set) BatchAdd(addOptions [][]string) error {
newEntries := make([]*Entry, len(addOptions))
@ -389,14 +411,59 @@ func parseIPSetSave(ipset *IPSet, result string) map[string]*Set {
// create KUBE-DST-3YNVZWWGX3UQQ4VQ hash:ip family inet hashsize 1024 maxelem 65536 timeout 0
// add KUBE-DST-3YNVZWWGX3UQQ4VQ 100.96.1.6 timeout 0
func buildIPSetRestore(ipset *IPSet) string {
ipSetRestore := ""
for _, set := range ipset.Sets {
ipSetRestore += fmt.Sprintf("create %s %s\n", set.Name, strings.Join(set.Options[:], " "))
for _, entry := range set.Entries {
ipSetRestore += fmt.Sprintf("add %s %s\n", set.Name, strings.Join(entry.Options[:], " "))
}
setNames := make([]string, 0, len(ipset.Sets))
for setName := range ipset.Sets {
// we need setNames in some consistent order so that we can unit-test this method has a predictable output:
setNames = append(setNames, setName)
}
return ipSetRestore
sort.Strings(setNames)
tmpSets := map[string]string{}
ipSetRestore := &strings.Builder{}
for _, setName := range setNames {
set := ipset.Sets[setName]
setOptions := strings.Join(set.Options, " ")
tmpSetName := tmpSets[setOptions]
if tmpSetName == "" {
// create a temporary set per unique set-options:
hash := sha1.Sum([]byte("tmp:" + setOptions))
tmpSetName = tmpIPSetPrefix + base32.StdEncoding.EncodeToString(hash[:10])
ipSetRestore.WriteString(fmt.Sprintf("create %s %s\n", tmpSetName, setOptions))
// just in case we are starting up after a crash, we should flush the TMP ipset to be safe if it
// already existed, so we do not pollute other ipsets:
ipSetRestore.WriteString(fmt.Sprintf("flush %s\n", tmpSetName))
tmpSets[setOptions] = tmpSetName
}
for _, entry := range set.Entries {
// add entries to the tmp set:
ipSetRestore.WriteString(fmt.Sprintf("add %s %s\n", tmpSetName, strings.Join(entry.Options, " ")))
}
// now create the actual IPSet (this is a noop if it already exists, because we run with -exists):
ipSetRestore.WriteString(fmt.Sprintf("create %s %s\n", set.Name, setOptions))
// now that both exist, we can swap them:
ipSetRestore.WriteString(fmt.Sprintf("swap %s %s\n", tmpSetName, set.Name))
// empty the tmp set (which is actually the old one now):
ipSetRestore.WriteString(fmt.Sprintf("flush %s\n", tmpSetName))
}
setsToDestroy := make([]string, 0, len(tmpSets))
for _, tmpSetName := range tmpSets {
setsToDestroy = append(setsToDestroy, tmpSetName)
}
// need to destroy the sets in a predictable order for unit test!
sort.Strings(setsToDestroy)
for _, tmpSetName := range setsToDestroy {
// finally, destroy the tmp sets.
ipSetRestore.WriteString(fmt.Sprintf("destroy %s\n", tmpSetName))
}
return ipSetRestore.String()
}
// Save the given set, or all sets if none is given to stdout in a format that
@ -489,7 +556,7 @@ func (set *Set) Refresh(entries []string, extraOptions ...string) error {
return set.RefreshWithBuiltinOptions(entriesWithOptions)
}
// Refresh a Set with new entries with built-in options.
// RefreshWithBuiltinOptions refresh a Set with new entries with built-in options.
func (set *Set) RefreshWithBuiltinOptions(entries [][]string) error {
var err error

View File

@ -0,0 +1,77 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/ipset_test.go
package utils
import "testing"
func Test_buildIPSetRestore(t *testing.T) {
type args struct {
ipset *IPSet
}
tests := []struct {
name string
args args
want string
}{
{
name: "simple-restore",
args: args{
ipset: &IPSet{Sets: map[string]*Set{
"foo": {
Name: "foo",
Options: []string{"hash:ip", "yolo", "things", "12345"},
Entries: []*Entry{
{Options: []string{"1.2.3.4"}},
},
},
"google-dns-servers": {
Name: "google-dns-servers",
Options: []string{"hash:ip", "lol"},
Entries: []*Entry{
{Options: []string{"4.4.4.4"}},
{Options: []string{"8.8.8.8"}},
},
},
// this one and the one above share the same exact options -- and therefore will reuse the same
// tmp ipset:
"more-ip-addresses": {
Name: "google-dns-servers",
Options: []string{"hash:ip", "lol"},
Entries: []*Entry{
{Options: []string{"5.5.5.5"}},
{Options: []string{"6.6.6.6"}},
},
},
}},
},
want: "create TMP-7NOTZDOMLXBX6DAJ hash:ip yolo things 12345\n" +
"flush TMP-7NOTZDOMLXBX6DAJ\n" +
"add TMP-7NOTZDOMLXBX6DAJ 1.2.3.4\n" +
"create foo hash:ip yolo things 12345\n" +
"swap TMP-7NOTZDOMLXBX6DAJ foo\n" +
"flush TMP-7NOTZDOMLXBX6DAJ\n" +
"create TMP-XD7BSSQZELS7TP35 hash:ip lol\n" +
"flush TMP-XD7BSSQZELS7TP35\n" +
"add TMP-XD7BSSQZELS7TP35 4.4.4.4\n" +
"add TMP-XD7BSSQZELS7TP35 8.8.8.8\n" +
"create google-dns-servers hash:ip lol\n" +
"swap TMP-XD7BSSQZELS7TP35 google-dns-servers\n" +
"flush TMP-XD7BSSQZELS7TP35\n" +
"add TMP-XD7BSSQZELS7TP35 5.5.5.5\n" +
"add TMP-XD7BSSQZELS7TP35 6.6.6.6\n" +
"create google-dns-servers hash:ip lol\n" +
"swap TMP-XD7BSSQZELS7TP35 google-dns-servers\n" +
"flush TMP-XD7BSSQZELS7TP35\n" +
"destroy TMP-7NOTZDOMLXBX6DAJ\n" +
"destroy TMP-XD7BSSQZELS7TP35\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildIPSetRestore(tt.args.ipset); got != tt.want {
t.Errorf("buildIPSetRestore() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,75 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/iptables.go
package utils
import (
"bytes"
"fmt"
"os/exec"
"strings"
)
var hasWait bool
func init() {
path, err := exec.LookPath("iptables-restore")
if err != nil {
return
}
args := []string{"iptables-restore", "--help"}
cmd := exec.Cmd{
Path: path,
Args: args,
}
cmdOutput, err := cmd.CombinedOutput()
if err != nil {
return
}
hasWait = strings.Contains(string(cmdOutput), "wait")
}
// SaveInto calls `iptables-save` for given table and stores result in a given buffer.
func SaveInto(table string, buffer *bytes.Buffer) error {
path, err := exec.LookPath("iptables-save")
if err != nil {
return err
}
stderrBuffer := bytes.NewBuffer(nil)
args := []string{"iptables-save", "-t", table}
cmd := exec.Cmd{
Path: path,
Args: args,
Stdout: buffer,
Stderr: stderrBuffer,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("%v (%s)", err, stderrBuffer)
}
return nil
}
// Restore runs `iptables-restore` passing data through []byte.
func Restore(table string, data []byte) error {
path, err := exec.LookPath("iptables-restore")
if err != nil {
return err
}
var args []string
if hasWait {
args = []string{"iptables-restore", "--wait", "-T", table}
} else {
args = []string{"iptables-restore", "-T", table}
}
cmd := exec.Cmd{
Path: path,
Args: args,
Stdin: bytes.NewBuffer(data),
}
b, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%v (%s)", err, b)
}
return nil
}

View File

@ -1,5 +1,5 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/utils/node.go
// - modified from https://github.com/cloudnativelabs/kube-router/blob/73b1b03b32c5755b240f6c077bb097abe3888314/pkg/utils/node.go
// +build !windows
@ -43,7 +43,7 @@ func GetNodeObject(clientset kubernetes.Interface, hostnameOverride string) (*ap
}
}
return nil, fmt.Errorf("Failed to identify the node by NODE_NAME, hostname or --hostname-override")
return nil, fmt.Errorf("failed to identify the node by NODE_NAME, hostname or --hostname-override")
}
// GetNodeIP returns the most valid external facing IP address for a node.

1
vendor/modules.txt vendored
View File

@ -2142,6 +2142,7 @@ k8s.io/heapster/metrics/api/v1/types
## explicit
k8s.io/klog
# k8s.io/klog/v2 v2.8.0
## explicit
k8s.io/klog/v2
# k8s.io/kube-aggregator v0.18.0 => github.com/k3s-io/kubernetes/staging/src/k8s.io/kube-aggregator v1.21.2-k3s1
k8s.io/kube-aggregator/pkg/apis/apiregistration