mirror of https://github.com/k3s-io/k3s
1745 lines
65 KiB
Go
1745 lines
65 KiB
Go
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
|
|
// - modified from https://github.com/cloudnativelabs/kube-router/blob/d6f9f31a7b/pkg/controllers/netpol/network_policy_controller.go
|
|
|
|
// +build !windows
|
|
|
|
package netpol
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/base32"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
// "github.com/cloudnativelabs/kube-router/pkg/healthcheck"
|
|
// "github.com/cloudnativelabs/kube-router/pkg/metrics"
|
|
// "github.com/cloudnativelabs/kube-router/pkg/options"
|
|
// "github.com/cloudnativelabs/kube-router/pkg/utils"
|
|
|
|
"github.com/coreos/go-iptables/iptables"
|
|
api "k8s.io/api/core/v1"
|
|
apiextensions "k8s.io/api/extensions/v1beta1"
|
|
networking "k8s.io/api/networking/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
listers "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
log "k8s.io/klog"
|
|
)
|
|
|
|
const (
|
|
networkPolicyAnnotation = "net.beta.kubernetes.io/network-policy"
|
|
kubePodFirewallChainPrefix = "KUBE-POD-FW-"
|
|
kubeNetworkPolicyChainPrefix = "KUBE-NWPLCY-"
|
|
kubeSourceIPSetPrefix = "KUBE-SRC-"
|
|
kubeDestinationIPSetPrefix = "KUBE-DST-"
|
|
)
|
|
|
|
// Network policy controller provides both ingress and egress filtering for the pods as per the defined network
|
|
// policies. Two different types of iptables chains are used. Each pod running on the node which either
|
|
// requires ingress or egress filtering gets a pod specific chains. Each network policy has a iptables chain, which
|
|
// has rules expressed through ipsets matching source and destination pod ip's. In the FORWARD chain of the
|
|
// filter table a rule is added to jump the traffic originating (in case of egress network policy) from the pod
|
|
// or destined (in case of ingress network policy) to the pod specific iptables chain. Each
|
|
// pod specific iptables chain has rules to jump to the network polices chains, that pod matches. So packet
|
|
// originating/destined from/to pod goes through filter table's, FORWARD chain, followed by pod specific chain,
|
|
// followed by one or more network policy chains, till there is a match which will accept the packet, or gets
|
|
// dropped by the rule in the pod chain, if there is no match.
|
|
|
|
// NetworkPolicyController struct to hold information required by NetworkPolicyController
|
|
type NetworkPolicyController struct {
|
|
nodeIP net.IP
|
|
nodeHostName string
|
|
mu sync.Mutex
|
|
syncPeriod time.Duration
|
|
MetricsEnabled bool
|
|
v1NetworkPolicy bool
|
|
readyForUpdates bool
|
|
// healthChan chan<- *healthcheck.ControllerHeartbeat
|
|
|
|
// list of all active network policies expressed as networkPolicyInfo
|
|
networkPoliciesInfo *[]networkPolicyInfo
|
|
ipSetHandler *IPSet
|
|
|
|
podLister cache.Indexer
|
|
npLister cache.Indexer
|
|
nsLister cache.Indexer
|
|
|
|
PodEventHandler cache.ResourceEventHandler
|
|
NamespaceEventHandler cache.ResourceEventHandler
|
|
NetworkPolicyEventHandler cache.ResourceEventHandler
|
|
}
|
|
|
|
// internal structure to represent a network policy
|
|
type networkPolicyInfo struct {
|
|
name string
|
|
namespace string
|
|
labels map[string]string
|
|
|
|
// set of pods matching network policy spec podselector label selector
|
|
targetPods map[string]podInfo
|
|
|
|
// whitelist ingress rules from the network policy spec
|
|
ingressRules []ingressRule
|
|
|
|
// whitelist egress rules from the network policy spec
|
|
egressRules []egressRule
|
|
|
|
// policy type "ingress" or "egress" or "both" as defined by PolicyType in the spec
|
|
policyType string
|
|
}
|
|
|
|
// internal structure to represent Pod
|
|
type podInfo struct {
|
|
ip string
|
|
name string
|
|
namespace string
|
|
labels map[string]string
|
|
}
|
|
|
|
// internal structure to represent NetworkPolicyIngressRule in the spec
|
|
type ingressRule struct {
|
|
matchAllPorts bool
|
|
ports []protocolAndPort
|
|
namedPorts []endPoints
|
|
matchAllSource bool
|
|
srcPods []podInfo
|
|
srcIPBlocks [][]string
|
|
}
|
|
|
|
// internal structure to represent NetworkPolicyEgressRule in the spec
|
|
type egressRule struct {
|
|
matchAllPorts bool
|
|
ports []protocolAndPort
|
|
namedPorts []endPoints
|
|
matchAllDestinations bool
|
|
dstPods []podInfo
|
|
dstIPBlocks [][]string
|
|
}
|
|
|
|
type protocolAndPort struct {
|
|
protocol string
|
|
port string
|
|
}
|
|
|
|
type endPoints struct {
|
|
ips []string
|
|
protocolAndPort
|
|
}
|
|
|
|
type numericPort2eps map[string]*endPoints
|
|
type protocol2eps map[string]numericPort2eps
|
|
type namedPort2eps map[string]protocol2eps
|
|
|
|
// Run runs forever till we receive notification on stopCh
|
|
func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
|
|
t := time.NewTicker(npc.syncPeriod)
|
|
defer t.Stop()
|
|
|
|
log.Info("Starting network policy controller")
|
|
// npc.healthChan = healthChan
|
|
|
|
// loop forever till notified to stop on stopCh
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
log.Info("Shutting down network policies controller")
|
|
return
|
|
default:
|
|
}
|
|
|
|
log.V(1).Info("Performing periodic sync of iptables to reflect network policies")
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
log.Errorf("Error during periodic sync of network policies in network policy controller. Error: " + err.Error())
|
|
log.Errorf("Skipping sending heartbeat from network policy controller as periodic sync failed.")
|
|
}
|
|
// else {
|
|
// healthcheck.SendHeartBeat(healthChan, "NPC")
|
|
// }
|
|
npc.readyForUpdates = true
|
|
select {
|
|
case <-stopCh:
|
|
log.Infof("Shutting down network policies controller")
|
|
return
|
|
case <-t.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnPodUpdate handles updates to pods from the Kubernetes api server
|
|
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
|
|
pod := obj.(*api.Pod)
|
|
log.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
|
|
|
|
if !npc.readyForUpdates {
|
|
log.V(3).Infof("Skipping update to pod: %s/%s, controller still performing bootup full-sync", pod.Namespace, pod.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
log.Errorf("Error syncing network policy for the update to pod: %s/%s Error: %s", pod.Namespace, pod.Name, err)
|
|
}
|
|
}
|
|
|
|
// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
|
|
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
|
|
netpol := obj.(*networking.NetworkPolicy)
|
|
log.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
|
|
|
|
if !npc.readyForUpdates {
|
|
log.V(3).Infof("Skipping update to network policy: %s/%s, controller still performing bootup full-sync", netpol.Namespace, netpol.Name)
|
|
return
|
|
}
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
log.Errorf("Error syncing network policy for the update to network policy: %s/%s Error: %s", netpol.Namespace, netpol.Name, err)
|
|
}
|
|
}
|
|
|
|
// OnNamespaceUpdate handles updates to namespace from kubernetes api server
|
|
func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) {
|
|
namespace := obj.(*api.Namespace)
|
|
// namespace (and annotations on it) has no significance in GA ver of network policy
|
|
if npc.v1NetworkPolicy {
|
|
return
|
|
}
|
|
log.V(2).Infof("Received update for namespace: %s", namespace.Name)
|
|
|
|
err := npc.Sync()
|
|
if err != nil {
|
|
log.Errorf("Error syncing on namespace update: %s", err)
|
|
}
|
|
}
|
|
|
|
// Sync synchronizes iptables to desired state of network policies
|
|
func (npc *NetworkPolicyController) Sync() error {
|
|
|
|
var err error
|
|
npc.mu.Lock()
|
|
defer npc.mu.Unlock()
|
|
|
|
// healthcheck.SendHeartBeat(npc.healthChan, "NPC")
|
|
start := time.Now()
|
|
syncVersion := strconv.FormatInt(start.UnixNano(), 10)
|
|
defer func() {
|
|
endTime := time.Since(start)
|
|
// if npc.MetricsEnabled {
|
|
// metrics.ControllerIPtablesSyncTime.Observe(endTime.Seconds())
|
|
// }
|
|
log.V(1).Infof("sync iptables took %v", endTime)
|
|
}()
|
|
|
|
log.V(1).Infof("Starting sync of iptables with version: %s", syncVersion)
|
|
if npc.v1NetworkPolicy {
|
|
npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
|
|
}
|
|
} else {
|
|
// TODO remove the Beta support
|
|
npc.networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo()
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
|
|
}
|
|
}
|
|
|
|
activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(syncVersion)
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error())
|
|
}
|
|
|
|
activePodFwChains, err := npc.syncPodFirewallChains(syncVersion)
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error())
|
|
}
|
|
|
|
err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets)
|
|
if err != nil {
|
|
return errors.New("Aborting sync. Failed to cleanup stale iptables rules: " + err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Configure iptables rules representing each network policy. All pod's matched by
|
|
// network policy spec podselector labels are grouped together in one ipset which
|
|
// is used for matching destination ip address. Each ingress rule in the network
|
|
// policyspec is evaluated to set of matching pods, which are grouped in to a
|
|
// ipset used for source ip addr matching.
|
|
func (npc *NetworkPolicyController) syncNetworkPolicyChains(version string) (map[string]bool, map[string]bool, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
endTime := time.Since(start)
|
|
// metrics.ControllerPolicyChainsSyncTime.Observe(endTime.Seconds())
|
|
log.V(2).Infof("Syncing network policy chains took %v", endTime)
|
|
}()
|
|
activePolicyChains := make(map[string]bool)
|
|
activePolicyIPSets := make(map[string]bool)
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
log.Fatalf("Failed to initialize iptables executor due to: %s", err.Error())
|
|
}
|
|
|
|
// run through all network policies
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
|
|
// ensure there is a unique chain per network policy in filter table
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
err := iptablesCmdHandler.NewChain("filter", policyChainName)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
|
|
activePolicyChains[policyChainName] = true
|
|
|
|
// create a ipset for all destination pod ip's matched by the policy spec PodSelector
|
|
targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name)
|
|
targetDestPodIPSet, err := npc.ipSetHandler.Create(targetDestPodIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
// create a ipset for all source pod ip's matched by the policy spec PodSelector
|
|
targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name)
|
|
targetSourcePodIPSet, err := npc.ipSetHandler.Create(targetSourcePodIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIPSets[targetDestPodIPSet.Name] = true
|
|
activePolicyIPSets[targetSourcePodIPSet.Name] = true
|
|
|
|
currentPodIPs := make([]string, 0, len(policy.targetPods))
|
|
for ip := range policy.targetPods {
|
|
currentPodIPs = append(currentPodIPs, ip)
|
|
}
|
|
|
|
err = targetSourcePodIPSet.Refresh(currentPodIPs, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error())
|
|
}
|
|
err = targetDestPodIPSet.Refresh(currentPodIPs, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh targetDestPodIPSet: " + err.Error())
|
|
}
|
|
|
|
err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
log.V(2).Infof("IPtables chains in the filter table are synchronized with the network policies.")
|
|
|
|
return activePolicyChains, activePolicyIPSets, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo,
|
|
targetDestPodIPSetName string, activePolicyIPSets map[string]bool, version string) error {
|
|
|
|
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
|
|
// so no whitelist rules to be added to the network policy
|
|
if policy.ingressRules == nil {
|
|
return nil
|
|
}
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
|
|
}
|
|
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
|
|
// run through all the ingress rules in the spec and create iptables rules
|
|
// in the chain for the network policy
|
|
for i, ingressRule := range policy.ingressRules {
|
|
|
|
if len(ingressRule.srcPods) != 0 {
|
|
srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i)
|
|
srcPodIPSet, err := npc.ipSetHandler.Create(srcPodIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIPSets[srcPodIPSet.Name] = true
|
|
|
|
ingressRuleSrcPodIPs := make([]string, 0, len(ingressRule.srcPods))
|
|
for _, pod := range ingressRule.srcPods {
|
|
ingressRuleSrcPodIPs = append(ingressRuleSrcPodIPs, pod.ip)
|
|
}
|
|
err = srcPodIPSet.Refresh(ingressRuleSrcPodIPs, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh srcPodIPSet: " + err.Error())
|
|
}
|
|
|
|
if len(ingressRule.ports) != 0 {
|
|
// case where 'ports' details and 'from' details specified in the ingress rule
|
|
// so match on specified source and destination ip's and specified port (if any) and protocol
|
|
for _, portProtocol := range ingressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(ingressRule.namedPorts) != 0 {
|
|
for j, endPoints := range ingressRule.namedPorts {
|
|
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
activePolicyIPSets[namedPortIPSet.Name] = true
|
|
err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh namedPortIPSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(ingressRule.ports) == 0 && len(ingressRule.namedPorts) == 0 {
|
|
// case where no 'ports' details specified in the ingress rule but 'from' details specified
|
|
// so match on specified source and destination ip with all port and protocol
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where only 'ports' details specified but no 'from' details in the ingress rule
|
|
// so match on all sources, with specified port (if any) and protocol
|
|
if ingressRule.matchAllSource && !ingressRule.matchAllPorts {
|
|
for _, portProtocol := range ingressRule.ports {
|
|
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for j, endPoints := range ingressRule.namedPorts {
|
|
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIPSets[namedPortIPSet.Name] = true
|
|
|
|
err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh namedPortIPSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where nether ports nor from details are specified in the ingress rule
|
|
// so match on all ports, protocol, source IP's
|
|
if ingressRule.matchAllSource && ingressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(ingressRule.srcIPBlocks) != 0 {
|
|
srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i)
|
|
srcIPBlockIPSet, err := npc.ipSetHandler.Create(srcIPBlockIPSetName, TypeHashNet, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
activePolicyIPSets[srcIPBlockIPSet.Name] = true
|
|
err = srcIPBlockIPSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks)
|
|
if err != nil {
|
|
log.Errorf("failed to refresh srcIPBlockIPSet: " + err.Error())
|
|
}
|
|
if !ingressRule.matchAllPorts {
|
|
for _, portProtocol := range ingressRule.ports {
|
|
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for j, endPoints := range ingressRule.namedPorts {
|
|
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIPSets[namedPortIPSet.Name] = true
|
|
|
|
err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh namedPortIPSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if ingressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
|
|
targetSourcePodIPSetName string, activePolicyIPSets map[string]bool, version string) error {
|
|
|
|
// From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic "
|
|
// so no whitelist rules to be added to the network policy
|
|
if policy.egressRules == nil {
|
|
return nil
|
|
}
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to initialize iptables executor due to: %s", err.Error())
|
|
}
|
|
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
|
|
// run through all the egress rules in the spec and create iptables rules
|
|
// in the chain for the network policy
|
|
for i, egressRule := range policy.egressRules {
|
|
|
|
if len(egressRule.dstPods) != 0 {
|
|
dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i)
|
|
dstPodIPSet, err := npc.ipSetHandler.Create(dstPodIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIPSets[dstPodIPSet.Name] = true
|
|
|
|
egressRuleDstPodIPs := make([]string, 0, len(egressRule.dstPods))
|
|
for _, pod := range egressRule.dstPods {
|
|
egressRuleDstPodIPs = append(egressRuleDstPodIPs, pod.ip)
|
|
}
|
|
err = dstPodIPSet.Refresh(egressRuleDstPodIPs, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh dstPodIPSet: " + err.Error())
|
|
}
|
|
if len(egressRule.ports) != 0 {
|
|
// case where 'ports' details and 'from' details specified in the egress rule
|
|
// so match on specified source and destination ip's and specified port (if any) and protocol
|
|
for _, portProtocol := range egressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(egressRule.namedPorts) != 0 {
|
|
for j, endPoints := range egressRule.namedPorts {
|
|
namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j)
|
|
namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, TypeHashIP, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
|
|
activePolicyIPSets[namedPortIPSet.Name] = true
|
|
|
|
err = namedPortIPSet.Refresh(endPoints.ips, OptionTimeout, "0")
|
|
if err != nil {
|
|
log.Errorf("failed to refresh namedPortIPSet: " + err.Error())
|
|
}
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if len(egressRule.ports) == 0 && len(egressRule.namedPorts) == 0 {
|
|
// case where no 'ports' details specified in the ingress rule but 'from' details specified
|
|
// so match on specified source and destination ip with all port and protocol
|
|
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where only 'ports' details specified but no 'to' details in the egress rule
|
|
// so match on all sources, with specified port (if any) and protocol
|
|
if egressRule.matchAllDestinations && !egressRule.matchAllPorts {
|
|
for _, portProtocol := range egressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// case where nether ports nor from details are specified in the egress rule
|
|
// so match on all ports, protocol, source IP's
|
|
if egressRule.matchAllDestinations && egressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if len(egressRule.dstIPBlocks) != 0 {
|
|
dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i)
|
|
dstIPBlockIPSet, err := npc.ipSetHandler.Create(dstIPBlockIPSetName, TypeHashNet, OptionTimeout, "0")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ipset: %s", err.Error())
|
|
}
|
|
activePolicyIPSets[dstIPBlockIPSet.Name] = true
|
|
err = dstIPBlockIPSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks)
|
|
if err != nil {
|
|
log.Errorf("failed to refresh dstIPBlockIPSet: " + err.Error())
|
|
}
|
|
if !egressRule.matchAllPorts {
|
|
for _, portProtocol := range egressRule.ports {
|
|
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if egressRule.matchAllPorts {
|
|
comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " +
|
|
policy.name + " namespace " + policy.namespace
|
|
if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error {
|
|
if iptablesCmdHandler == nil {
|
|
return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil")
|
|
}
|
|
args := make([]string, 0)
|
|
if comment != "" {
|
|
args = append(args, "-m", "comment", "--comment", comment)
|
|
}
|
|
if srcIPSetName != "" {
|
|
args = append(args, "-m", "set", "--set", srcIPSetName, "src")
|
|
}
|
|
if dstIPSetName != "" {
|
|
args = append(args, "-m", "set", "--set", dstIPSetName, "dst")
|
|
}
|
|
if protocol != "" {
|
|
args = append(args, "-p", protocol)
|
|
}
|
|
if dPort != "" {
|
|
args = append(args, "--dport", dPort)
|
|
}
|
|
args = append(args, "-j", "ACCEPT")
|
|
err := iptablesCmdHandler.AppendUnique("filter", policyChainName, args...)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) syncPodFirewallChains(version string) (map[string]bool, error) {
|
|
|
|
activePodFwChains := make(map[string]bool)
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
log.Fatalf("Failed to initialize iptables executor: %s", err.Error())
|
|
}
|
|
|
|
// loop through the pods running on the node which to which ingress network policies to be applied
|
|
ingressNetworkPolicyEnabledPods, err := npc.getIngressNetworkPolicyEnabledPods(npc.nodeIP.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pod := range *ingressNetworkPolicyEnabledPods {
|
|
|
|
// below condition occurs when we get trasient update while removing or adding pod
|
|
// subseqent update will do the correct action
|
|
if len(pod.ip) == 0 || pod.ip == "" {
|
|
continue
|
|
}
|
|
|
|
// ensure pod specific firewall chain exist for all the pods that need ingress firewall
|
|
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
|
|
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
activePodFwChains[podFwChainName] = true
|
|
|
|
// add entries in pod firewall to run through required network policies
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if _, ok := policy.targetPods[pod.ip]; ok {
|
|
comment := "run through nw policy " + policy.name
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
|
|
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
comment := "rule to permit the traffic traffic to pods when source is the pod's local node"
|
|
args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT"}
|
|
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting routed (coming for other node pods)
|
|
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName}
|
|
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
|
|
exists, err = iptablesCmdHandler.Exists("filter", "OUTPUT", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", "OUTPUT", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting switched (coming for same node pods)
|
|
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args = []string{"-m", "physdev", "--physdev-is-bridged",
|
|
"-m", "comment", "--comment", comment,
|
|
"-d", pod.ip,
|
|
"-j", podFwChainName}
|
|
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// add default DROP rule at the end of chain
|
|
comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace
|
|
args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"}
|
|
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
|
|
// ensure stateful firewall, that permits return traffic for the traffic originated by the pod
|
|
comment = "rule for stateful firewall for pod"
|
|
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
|
|
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// loop through the pods running on the node which egress network policies to be applied
|
|
egressNetworkPolicyEnabledPods, err := npc.getEgressNetworkPolicyEnabledPods(npc.nodeIP.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pod := range *egressNetworkPolicyEnabledPods {
|
|
|
|
// below condition occurs when we get trasient update while removing or adding pod
|
|
// subseqent update will do the correct action
|
|
if len(pod.ip) == 0 || pod.ip == "" {
|
|
continue
|
|
}
|
|
|
|
// ensure pod specific firewall chain exist for all the pods that need egress firewall
|
|
podFwChainName := podFirewallChainName(pod.namespace, pod.name, version)
|
|
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
activePodFwChains[podFwChainName] = true
|
|
|
|
// add entries in pod firewall to run through required network policies
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if _, ok := policy.targetPods[pod.ip]; ok {
|
|
comment := "run through nw policy " + policy.name
|
|
policyChainName := networkPolicyChainName(policy.namespace, policy.name, version)
|
|
args := []string{"-m", "comment", "--comment", comment, "-j", policyChainName}
|
|
exists, err := iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting routed (coming for other node pods)
|
|
comment := "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args := []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName}
|
|
exists, err := iptablesCmdHandler.Exists("filter", "FORWARD", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
|
|
// this rule applies to the traffic getting switched (coming for same node pods)
|
|
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
|
|
" to chain " + podFwChainName
|
|
args = []string{"-m", "physdev", "--physdev-is-bridged",
|
|
"-m", "comment", "--comment", comment,
|
|
"-s", pod.ip,
|
|
"-j", podFwChainName}
|
|
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// add default DROP rule at the end of chain
|
|
comment = "default rule to REJECT traffic destined for POD name:" + pod.name + " namespace: " + pod.namespace
|
|
args = []string{"-m", "comment", "--comment", comment, "-j", "REJECT"}
|
|
err = iptablesCmdHandler.AppendUnique("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
|
|
// ensure stateful firewall, that permits return traffic for the traffic originated by the pod
|
|
comment = "rule for stateful firewall for pod"
|
|
args = []string{"-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT"}
|
|
exists, err = iptablesCmdHandler.Exists("filter", podFwChainName, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
if !exists {
|
|
err := iptablesCmdHandler.Insert("filter", podFwChainName, 1, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
return activePodFwChains, nil
|
|
}
|
|
|
|
func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets map[string]bool) error {
|
|
|
|
cleanupPodFwChains := make([]string, 0)
|
|
cleanupPolicyChains := make([]string, 0)
|
|
cleanupPolicyIPSets := make([]*Set, 0)
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
log.Fatalf("failed to initialize iptables command executor due to %s", err.Error())
|
|
}
|
|
ipset, err := NewSavedIPSet(false)
|
|
if err != nil {
|
|
log.Fatalf("failed to create ipset command executor due to %s", err.Error())
|
|
}
|
|
|
|
// get the list of chains created for pod firewall and network policies
|
|
chains, err := iptablesCmdHandler.ListChains("filter")
|
|
for _, chain := range chains {
|
|
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
|
|
if _, ok := activePolicyChains[chain]; !ok {
|
|
cleanupPolicyChains = append(cleanupPolicyChains, chain)
|
|
}
|
|
}
|
|
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
|
|
if _, ok := activePodFwChains[chain]; !ok {
|
|
cleanupPodFwChains = append(cleanupPodFwChains, chain)
|
|
}
|
|
}
|
|
}
|
|
for _, set := range ipset.Sets {
|
|
if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) ||
|
|
strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) {
|
|
if _, ok := activePolicyIPSets[set.Name]; !ok {
|
|
cleanupPolicyIPSets = append(cleanupPolicyIPSets, set)
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanup FORWARD chain rules to jump to pod firewall
|
|
for _, chain := range cleanupPodFwChains {
|
|
|
|
forwardChainRules, err := iptablesCmdHandler.List("filter", "FORWARD")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list rules in filter table, FORWARD chain due to %s", err.Error())
|
|
}
|
|
outputChainRules, err := iptablesCmdHandler.List("filter", "OUTPUT")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list rules in filter table, OUTPUT chain due to %s", err.Error())
|
|
}
|
|
|
|
// TODO delete rule by spec, than rule number to avoid extra loop
|
|
var realRuleNo int
|
|
for i, rule := range forwardChainRules {
|
|
if strings.Contains(rule, chain) {
|
|
err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete rule: %s from the FORWARD chain of filter table due to %s", rule, err.Error())
|
|
}
|
|
realRuleNo++
|
|
}
|
|
}
|
|
realRuleNo = 0
|
|
for i, rule := range outputChainRules {
|
|
if strings.Contains(rule, chain) {
|
|
err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete rule: %s from the OUTPUT chain of filter table due to %s", rule, err.Error())
|
|
}
|
|
realRuleNo++
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanup pod firewall chain
|
|
for _, chain := range cleanupPodFwChains {
|
|
log.V(2).Infof("Found pod fw chain to cleanup: %s", chain)
|
|
err = iptablesCmdHandler.ClearChain("filter", chain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", chain, err.Error())
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", chain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to delete the chain %s due to %s", chain, err.Error())
|
|
}
|
|
log.V(2).Infof("Deleted pod specific firewall chain: %s from the filter table", chain)
|
|
}
|
|
|
|
// cleanup network policy chains
|
|
for _, policyChain := range cleanupPolicyChains {
|
|
log.V(2).Infof("Found policy chain to cleanup %s", policyChain)
|
|
|
|
// first clean up any references from pod firewall chain
|
|
for podFwChain := range activePodFwChains {
|
|
podFwChainRules, err := iptablesCmdHandler.List("filter", podFwChain)
|
|
if err != nil {
|
|
|
|
}
|
|
for i, rule := range podFwChainRules {
|
|
if strings.Contains(rule, policyChain) {
|
|
err = iptablesCmdHandler.Delete("filter", podFwChain, strconv.Itoa(i))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to delete rule %s from the chain %s", rule, podFwChain)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
err = iptablesCmdHandler.ClearChain("filter", policyChain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", policyChain)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to flush the rules in chain %s due to %s", policyChain, err)
|
|
}
|
|
log.V(2).Infof("Deleted network policy chain: %s from the filter table", policyChain)
|
|
}
|
|
|
|
// cleanup network policy ipsets
|
|
for _, set := range cleanupPolicyIPSets {
|
|
err = set.Destroy()
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to delete ipset %s due to %s", set.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIP string) (*map[string]podInfo, error) {
|
|
nodePods := make(map[string]podInfo)
|
|
|
|
for _, obj := range npc.podLister.List() {
|
|
pod := obj.(*api.Pod)
|
|
|
|
if strings.Compare(pod.Status.HostIP, nodeIP) != 0 {
|
|
continue
|
|
}
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if policy.namespace != pod.ObjectMeta.Namespace {
|
|
continue
|
|
}
|
|
_, ok := policy.targetPods[pod.Status.PodIP]
|
|
if ok && (policy.policyType == "both" || policy.policyType == "ingress") {
|
|
log.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
|
|
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
|
|
name: pod.ObjectMeta.Name,
|
|
namespace: pod.ObjectMeta.Namespace,
|
|
labels: pod.ObjectMeta.Labels}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return &nodePods, nil
|
|
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIP string) (*map[string]podInfo, error) {
|
|
|
|
nodePods := make(map[string]podInfo)
|
|
|
|
for _, obj := range npc.podLister.List() {
|
|
pod := obj.(*api.Pod)
|
|
|
|
if strings.Compare(pod.Status.HostIP, nodeIP) != 0 {
|
|
continue
|
|
}
|
|
for _, policy := range *npc.networkPoliciesInfo {
|
|
if policy.namespace != pod.ObjectMeta.Namespace {
|
|
continue
|
|
}
|
|
_, ok := policy.targetPods[pod.Status.PodIP]
|
|
if ok && (policy.policyType == "both" || policy.policyType == "egress") {
|
|
log.V(2).Infof("Found pod name: " + pod.ObjectMeta.Name + " namespace: " + pod.ObjectMeta.Namespace + " for which network policies need to be applied.")
|
|
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
|
|
name: pod.ObjectMeta.Name,
|
|
namespace: pod.ObjectMeta.Namespace,
|
|
labels: pod.ObjectMeta.Labels}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return &nodePods, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
|
|
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
|
|
for _, npPort := range npPorts {
|
|
if npPort.Port == nil {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)})
|
|
} else if npPort.Port.Type == intstr.Int {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)})
|
|
} else {
|
|
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
|
|
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok {
|
|
for _, eps := range numericPort2eps {
|
|
namedPorts = append(namedPorts, *eps)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) processBetaNetworkPolicyPorts(npPorts []apiextensions.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) {
|
|
numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0)
|
|
for _, npPort := range npPorts {
|
|
if npPort.Port == nil {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: "", protocol: string(*npPort.Protocol)})
|
|
} else if npPort.Port.Type == intstr.Int {
|
|
numericPorts = append(numericPorts, protocolAndPort{port: npPort.Port.String(), protocol: string(*npPort.Protocol)})
|
|
} else {
|
|
if protocol2eps, ok := namedPort2eps[npPort.Port.String()]; ok {
|
|
if numericPort2eps, ok := protocol2eps[string(*npPort.Protocol)]; ok {
|
|
for _, eps := range numericPort2eps {
|
|
namedPorts = append(namedPorts, *eps)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
|
|
|
|
NetworkPolicies := make([]networkPolicyInfo, 0)
|
|
|
|
for _, policyObj := range npc.npLister.List() {
|
|
|
|
policy, ok := policyObj.(*networking.NetworkPolicy)
|
|
if !ok {
|
|
return nil, fmt.Errorf("Failed to convert")
|
|
}
|
|
newPolicy := networkPolicyInfo{
|
|
name: policy.Name,
|
|
namespace: policy.Namespace,
|
|
labels: policy.Spec.PodSelector.MatchLabels,
|
|
policyType: "ingress",
|
|
}
|
|
|
|
// check if there is explicitly specified PolicyTypes in the spec
|
|
if len(policy.Spec.PolicyTypes) > 0 {
|
|
ingressType, egressType := false, false
|
|
for _, policyType := range policy.Spec.PolicyTypes {
|
|
if policyType == networking.PolicyTypeIngress {
|
|
ingressType = true
|
|
}
|
|
if policyType == networking.PolicyTypeEgress {
|
|
egressType = true
|
|
}
|
|
}
|
|
if ingressType && egressType {
|
|
newPolicy.policyType = "both"
|
|
} else if egressType {
|
|
newPolicy.policyType = "egress"
|
|
} else if ingressType {
|
|
newPolicy.policyType = "ingress"
|
|
}
|
|
} else {
|
|
if policy.Spec.Egress != nil && policy.Spec.Ingress != nil {
|
|
newPolicy.policyType = "both"
|
|
} else if policy.Spec.Egress != nil {
|
|
newPolicy.policyType = "egress"
|
|
} else if policy.Spec.Ingress != nil {
|
|
newPolicy.policyType = "ingress"
|
|
}
|
|
}
|
|
|
|
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
|
|
newPolicy.targetPods = make(map[string]podInfo)
|
|
namedPort2IngressEps := make(namedPort2eps)
|
|
if err == nil {
|
|
for _, matchingPod := range matchingPods {
|
|
if matchingPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
|
|
name: matchingPod.ObjectMeta.Name,
|
|
namespace: matchingPod.ObjectMeta.Namespace,
|
|
labels: matchingPod.ObjectMeta.Labels}
|
|
npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps)
|
|
}
|
|
}
|
|
|
|
if policy.Spec.Ingress == nil {
|
|
newPolicy.ingressRules = nil
|
|
} else {
|
|
newPolicy.ingressRules = make([]ingressRule, 0)
|
|
}
|
|
|
|
if policy.Spec.Egress == nil {
|
|
newPolicy.egressRules = nil
|
|
} else {
|
|
newPolicy.egressRules = make([]egressRule, 0)
|
|
}
|
|
|
|
for _, specIngressRule := range policy.Spec.Ingress {
|
|
ingressRule := ingressRule{}
|
|
ingressRule.srcPods = make([]podInfo, 0)
|
|
ingressRule.srcIPBlocks = make([][]string, 0)
|
|
|
|
// If this field is empty or missing in the spec, this rule matches all sources
|
|
if len(specIngressRule.From) == 0 {
|
|
ingressRule.matchAllSource = true
|
|
} else {
|
|
ingressRule.matchAllSource = false
|
|
for _, peer := range specIngressRule.From {
|
|
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
|
|
for _, peerPod := range peerPods {
|
|
if peerPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
ingressRule.srcPods = append(ingressRule.srcPods,
|
|
podInfo{ip: peerPod.Status.PodIP,
|
|
name: peerPod.ObjectMeta.Name,
|
|
namespace: peerPod.ObjectMeta.Namespace,
|
|
labels: peerPod.ObjectMeta.Labels})
|
|
}
|
|
}
|
|
ingressRule.srcIPBlocks = append(ingressRule.srcIPBlocks, npc.evalIPBlockPeer(peer)...)
|
|
}
|
|
}
|
|
|
|
ingressRule.ports = make([]protocolAndPort, 0)
|
|
ingressRule.namedPorts = make([]endPoints, 0)
|
|
// If this field is empty or missing in the spec, this rule matches all ports
|
|
if len(specIngressRule.Ports) == 0 {
|
|
ingressRule.matchAllPorts = true
|
|
} else {
|
|
ingressRule.matchAllPorts = false
|
|
ingressRule.ports, ingressRule.namedPorts = npc.processNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps)
|
|
}
|
|
|
|
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
|
|
}
|
|
|
|
for _, specEgressRule := range policy.Spec.Egress {
|
|
egressRule := egressRule{}
|
|
egressRule.dstPods = make([]podInfo, 0)
|
|
egressRule.dstIPBlocks = make([][]string, 0)
|
|
namedPort2EgressEps := make(namedPort2eps)
|
|
|
|
// If this field is empty or missing in the spec, this rule matches all sources
|
|
if len(specEgressRule.To) == 0 {
|
|
egressRule.matchAllDestinations = true
|
|
} else {
|
|
egressRule.matchAllDestinations = false
|
|
for _, peer := range specEgressRule.To {
|
|
if peerPods, err := npc.evalPodPeer(policy, peer); err == nil {
|
|
for _, peerPod := range peerPods {
|
|
if peerPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
egressRule.dstPods = append(egressRule.dstPods,
|
|
podInfo{ip: peerPod.Status.PodIP,
|
|
name: peerPod.ObjectMeta.Name,
|
|
namespace: peerPod.ObjectMeta.Namespace,
|
|
labels: peerPod.ObjectMeta.Labels})
|
|
npc.grabNamedPortFromPod(peerPod, &namedPort2EgressEps)
|
|
}
|
|
|
|
}
|
|
egressRule.dstIPBlocks = append(egressRule.dstIPBlocks, npc.evalIPBlockPeer(peer)...)
|
|
}
|
|
}
|
|
|
|
egressRule.ports = make([]protocolAndPort, 0)
|
|
egressRule.namedPorts = make([]endPoints, 0)
|
|
// If this field is empty or missing in the spec, this rule matches all ports
|
|
if len(specEgressRule.Ports) == 0 {
|
|
egressRule.matchAllPorts = true
|
|
} else {
|
|
egressRule.matchAllPorts = false
|
|
egressRule.ports, egressRule.namedPorts = npc.processNetworkPolicyPorts(specEgressRule.Ports, namedPort2EgressEps)
|
|
}
|
|
|
|
newPolicy.egressRules = append(newPolicy.egressRules, egressRule)
|
|
}
|
|
NetworkPolicies = append(NetworkPolicies, newPolicy)
|
|
}
|
|
|
|
return &NetworkPolicies, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy, peer networking.NetworkPolicyPeer) ([]*api.Pod, error) {
|
|
|
|
var matchingPods []*api.Pod
|
|
matchingPods = make([]*api.Pod, 0)
|
|
var err error
|
|
// spec can have both PodSelector AND NamespaceSelector
|
|
if peer.NamespaceSelector != nil {
|
|
namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels)
|
|
if err != nil {
|
|
return nil, errors.New("Failed to build network policies info due to " + err.Error())
|
|
}
|
|
|
|
var podSelectorLabels map[string]string
|
|
if peer.PodSelector != nil {
|
|
podSelectorLabels = peer.PodSelector.MatchLabels
|
|
}
|
|
for _, namespace := range namespaces {
|
|
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, podSelectorLabels)
|
|
if err != nil {
|
|
return nil, errors.New("Failed to build network policies info due to " + err.Error())
|
|
}
|
|
matchingPods = append(matchingPods, namespacePods...)
|
|
}
|
|
} else if peer.PodSelector != nil {
|
|
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
|
|
}
|
|
|
|
return matchingPods, err
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) {
|
|
podLister := listers.NewPodLister(npc.podLister)
|
|
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return allMatchedNameSpacePods, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) ListNamespaceByLabels(set labels.Set) ([]*api.Namespace, error) {
|
|
namespaceLister := listers.NewNamespaceLister(npc.nsLister)
|
|
matchedNamespaces, err := namespaceLister.List(set.AsSelector())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return matchedNamespaces, nil
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolicyPeer) [][]string {
|
|
ipBlock := make([][]string, 0)
|
|
if peer.PodSelector == nil && peer.NamespaceSelector == nil && peer.IPBlock != nil {
|
|
if cidr := peer.IPBlock.CIDR; strings.HasSuffix(cidr, "/0") {
|
|
ipBlock = append(ipBlock, []string{"0.0.0.0/1", OptionTimeout, "0"}, []string{"128.0.0.0/1", OptionTimeout, "0"})
|
|
} else {
|
|
ipBlock = append(ipBlock, []string{cidr, OptionTimeout, "0"})
|
|
}
|
|
for _, except := range peer.IPBlock.Except {
|
|
if strings.HasSuffix(except, "/0") {
|
|
ipBlock = append(ipBlock, []string{"0.0.0.0/1", OptionTimeout, "0", OptionNoMatch}, []string{"128.0.0.0/1", OptionTimeout, "0", OptionNoMatch})
|
|
} else {
|
|
ipBlock = append(ipBlock, []string{except, OptionTimeout, "0", OptionNoMatch})
|
|
}
|
|
}
|
|
}
|
|
return ipBlock
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) grabNamedPortFromPod(pod *api.Pod, namedPort2eps *namedPort2eps) {
|
|
if pod == nil || namedPort2eps == nil {
|
|
return
|
|
}
|
|
for k := range pod.Spec.Containers {
|
|
for _, port := range pod.Spec.Containers[k].Ports {
|
|
name := port.Name
|
|
protocol := string(port.Protocol)
|
|
containerPort := strconv.Itoa(int(port.ContainerPort))
|
|
|
|
if (*namedPort2eps)[name] == nil {
|
|
(*namedPort2eps)[name] = make(protocol2eps)
|
|
}
|
|
if (*namedPort2eps)[name][protocol] == nil {
|
|
(*namedPort2eps)[name][protocol] = make(numericPort2eps)
|
|
}
|
|
if eps, ok := (*namedPort2eps)[name][protocol][containerPort]; !ok {
|
|
(*namedPort2eps)[name][protocol][containerPort] = &endPoints{
|
|
ips: []string{pod.Status.PodIP},
|
|
protocolAndPort: protocolAndPort{port: containerPort, protocol: protocol},
|
|
}
|
|
} else {
|
|
eps.ips = append(eps.ips, pod.Status.PodIP)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
|
|
|
|
NetworkPolicies := make([]networkPolicyInfo, 0)
|
|
|
|
for _, policyObj := range npc.npLister.List() {
|
|
|
|
policy, _ := policyObj.(*apiextensions.NetworkPolicy)
|
|
newPolicy := networkPolicyInfo{
|
|
name: policy.Name,
|
|
namespace: policy.Namespace,
|
|
labels: policy.Spec.PodSelector.MatchLabels,
|
|
}
|
|
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
|
|
newPolicy.targetPods = make(map[string]podInfo)
|
|
newPolicy.ingressRules = make([]ingressRule, 0)
|
|
namedPort2IngressEps := make(namedPort2eps)
|
|
if err == nil {
|
|
for _, matchingPod := range matchingPods {
|
|
if matchingPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
|
|
name: matchingPod.ObjectMeta.Name,
|
|
namespace: matchingPod.ObjectMeta.Namespace,
|
|
labels: matchingPod.ObjectMeta.Labels}
|
|
npc.grabNamedPortFromPod(matchingPod, &namedPort2IngressEps)
|
|
}
|
|
}
|
|
|
|
for _, specIngressRule := range policy.Spec.Ingress {
|
|
ingressRule := ingressRule{}
|
|
|
|
ingressRule.ports = make([]protocolAndPort, 0)
|
|
ingressRule.namedPorts = make([]endPoints, 0)
|
|
ingressRule.ports, ingressRule.namedPorts = npc.processBetaNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps)
|
|
ingressRule.srcPods = make([]podInfo, 0)
|
|
for _, peer := range specIngressRule.From {
|
|
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
|
|
if err == nil {
|
|
for _, matchingPod := range matchingPods {
|
|
if matchingPod.Status.PodIP == "" {
|
|
continue
|
|
}
|
|
ingressRule.srcPods = append(ingressRule.srcPods,
|
|
podInfo{ip: matchingPod.Status.PodIP,
|
|
name: matchingPod.ObjectMeta.Name,
|
|
namespace: matchingPod.ObjectMeta.Namespace,
|
|
labels: matchingPod.ObjectMeta.Labels})
|
|
}
|
|
}
|
|
}
|
|
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
|
|
}
|
|
NetworkPolicies = append(NetworkPolicies, newPolicy)
|
|
}
|
|
|
|
return &NetworkPolicies, nil
|
|
}
|
|
|
|
func podFirewallChainName(namespace, podName string, version string) string {
|
|
hash := sha256.Sum256([]byte(namespace + podName + version))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubePodFirewallChainPrefix + encoded[:16]
|
|
}
|
|
|
|
func networkPolicyChainName(namespace, policyName string, version string) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + version))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeNetworkPolicyChainPrefix + encoded[:16]
|
|
}
|
|
|
|
func policySourcePodIPSetName(namespace, policyName string) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeSourceIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyDestinationPodIPSetName(namespace, policyName string) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedSourcePodIPSetName(namespace, policyName string, ingressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "pod"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeSourceIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedDestinationPodIPSetName(namespace, policyName string, egressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "pod"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedSourceIPBlockIPSetName(namespace, policyName string, ingressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "ipblock"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeSourceIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedDestinationIPBlockIPSetName(namespace, policyName string, egressRuleNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "ipblock"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedIngressNamedPortIPSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + strconv.Itoa(namedPortNo) + "namedport"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string {
|
|
hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + strconv.Itoa(namedPortNo) + "namedport"))
|
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
|
return kubeDestinationIPSetPrefix + encoded[:16]
|
|
}
|
|
|
|
// Cleanup cleanup configurations done
|
|
func (npc *NetworkPolicyController) Cleanup() {
|
|
|
|
log.Info("Cleaning up iptables configuration permanently done by kube-router")
|
|
|
|
iptablesCmdHandler, err := iptables.New()
|
|
if err != nil {
|
|
log.Errorf("Failed to initialize iptables executor: %s", err.Error())
|
|
}
|
|
|
|
// delete jump rules in FORWARD chain to pod specific firewall chain
|
|
forwardChainRules, err := iptablesCmdHandler.List("filter", "FORWARD")
|
|
if err != nil {
|
|
log.Errorf("Failed to delete iptables rules as part of cleanup")
|
|
return
|
|
}
|
|
|
|
// TODO: need a better way to delte rule with out using number
|
|
var realRuleNo int
|
|
for i, rule := range forwardChainRules {
|
|
if strings.Contains(rule, kubePodFirewallChainPrefix) {
|
|
err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))
|
|
realRuleNo++
|
|
}
|
|
}
|
|
|
|
// delete jump rules in OUTPUT chain to pod specific firewall chain
|
|
forwardChainRules, err = iptablesCmdHandler.List("filter", "OUTPUT")
|
|
if err != nil {
|
|
log.Errorf("Failed to delete iptables rules as part of cleanup")
|
|
return
|
|
}
|
|
|
|
// TODO: need a better way to delte rule with out using number
|
|
realRuleNo = 0
|
|
for i, rule := range forwardChainRules {
|
|
if strings.Contains(rule, kubePodFirewallChainPrefix) {
|
|
err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo))
|
|
realRuleNo++
|
|
}
|
|
}
|
|
|
|
// flush and delete pod specific firewall chain
|
|
chains, err := iptablesCmdHandler.ListChains("filter")
|
|
for _, chain := range chains {
|
|
if strings.HasPrefix(chain, kubePodFirewallChainPrefix) {
|
|
err = iptablesCmdHandler.ClearChain("filter", chain)
|
|
if err != nil {
|
|
log.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", chain)
|
|
if err != nil {
|
|
log.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flush and delete per network policy specific chain
|
|
chains, err = iptablesCmdHandler.ListChains("filter")
|
|
for _, chain := range chains {
|
|
if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) {
|
|
err = iptablesCmdHandler.ClearChain("filter", chain)
|
|
if err != nil {
|
|
log.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
err = iptablesCmdHandler.DeleteChain("filter", chain)
|
|
if err != nil {
|
|
log.Errorf("Failed to cleanup iptables rules: " + err.Error())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// delete all ipsets
|
|
ipset, err := NewSavedIPSet(false)
|
|
if err != nil {
|
|
log.Errorf("Failed to clean up ipsets: " + err.Error())
|
|
}
|
|
err = ipset.DestroyAllWithin()
|
|
if err != nil {
|
|
log.Errorf("Failed to clean up ipsets: " + err.Error())
|
|
}
|
|
log.Infof("Successfully cleaned the iptables configuration done by kube-router")
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
npc.OnPodUpdate(obj)
|
|
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
newPoObj := newObj.(*api.Pod)
|
|
oldPoObj := oldObj.(*api.Pod)
|
|
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
|
|
// for the network policies, we are only interested in pod status phase change or IP change
|
|
npc.OnPodUpdate(newObj)
|
|
}
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
npc.OnPodUpdate(obj)
|
|
},
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
npc.OnNamespaceUpdate(obj)
|
|
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
npc.OnNamespaceUpdate(newObj)
|
|
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
npc.OnNamespaceUpdate(obj)
|
|
|
|
},
|
|
}
|
|
}
|
|
|
|
func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
npc.OnNetworkPolicyUpdate(obj)
|
|
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
npc.OnNetworkPolicyUpdate(newObj)
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
npc.OnNetworkPolicyUpdate(obj)
|
|
|
|
},
|
|
}
|
|
}
|
|
|
|
// NewNetworkPolicyController returns new NetworkPolicyController object
|
|
func NewNetworkPolicyController(
|
|
stopCh <-chan struct{},
|
|
clientset kubernetes.Interface,
|
|
ipTablesSyncPeriod time.Duration,
|
|
hostnameOverride string) (*NetworkPolicyController, error) {
|
|
|
|
npc := NetworkPolicyController{}
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
|
|
podInformer := informerFactory.Core().V1().Pods().Informer()
|
|
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
|
|
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
|
|
informerFactory.Start(stopCh)
|
|
|
|
if err := CacheSyncOrTimeout(informerFactory, stopCh, 1*time.Minute); err != nil {
|
|
return nil, errors.New("Failed to synchronize cache: " + err.Error())
|
|
}
|
|
|
|
// if config.MetricsEnabled {
|
|
// //Register the metrics for this controller
|
|
// prometheus.MustRegister(metrics.ControllerIPtablesSyncTime)
|
|
// prometheus.MustRegister(metrics.ControllerPolicyChainsSyncTime)
|
|
// npc.MetricsEnabled = true
|
|
// }
|
|
|
|
npc.syncPeriod = ipTablesSyncPeriod
|
|
|
|
npc.v1NetworkPolicy = true
|
|
v, err := clientset.Discovery().ServerVersion()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
valid := regexp.MustCompile("[0-9]")
|
|
v.Minor = strings.Join(valid.FindAllString(v.Minor, -1), "")
|
|
minorVer, _ := strconv.Atoi(v.Minor)
|
|
if v.Major == "1" && minorVer < 7 {
|
|
npc.v1NetworkPolicy = false
|
|
}
|
|
|
|
node, err := clientset.CoreV1().Nodes().Get(context.TODO(), hostnameOverride, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
npc.nodeHostName = node.Name
|
|
|
|
nodeIP, err := GetNodeIP(node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
npc.nodeIP = nodeIP
|
|
|
|
ipset, err := NewSavedIPSet(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
npc.ipSetHandler = ipset
|
|
|
|
npc.podLister = podInformer.GetIndexer()
|
|
npc.PodEventHandler = npc.newPodEventHandler()
|
|
podInformer.AddEventHandler(npc.PodEventHandler)
|
|
|
|
npc.nsLister = nsInformer.GetIndexer()
|
|
npc.NamespaceEventHandler = npc.newNamespaceEventHandler()
|
|
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
|
|
|
|
npc.npLister = npInformer.GetIndexer()
|
|
npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler()
|
|
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)
|
|
|
|
return &npc, nil
|
|
}
|