diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 43cf636313..6f61baa4f1 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -56,9 +56,6 @@ const ( // kubeServicesChain is the services portal chain kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" - // KubeServiceIPSetsChain is the services access IP chain - KubeServiceIPSetsChain utiliptables.Chain = "KUBE-SVC-IPSETS" - // KubeFireWallChain is the kubernetes firewall chain. KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL" @@ -74,6 +71,9 @@ const ( // KubeMarkDropChain is the mark-for-drop chain KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" + // KubeForwardChain is the kubernetes forward chain + KubeForwardChain utiliptables.Chain = "KUBE-FORWARD" + // DefaultScheduler is the default ipvs scheduler algorithm - round robin. DefaultScheduler = "rr" @@ -149,7 +149,9 @@ type Proxier struct { // that are significantly impacting performance. iptablesData *bytes.Buffer natChains *bytes.Buffer + filterChains *bytes.Buffer natRules *bytes.Buffer + filterRules *bytes.Buffer // Added as a member to the struct to allow injection for testing. netlinkHandle NetLinkHandle // loopbackSet is the ipset which stores all endpoints IP:Port,IP for solving hairpin mode purpose. @@ -339,6 +341,8 @@ func NewProxier(ipt utiliptables.Interface, iptablesData: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), netlinkHandle: NewNetLinkHandle(), ipset: ipset, loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), @@ -458,7 +462,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool for _, tc := range tableChainsWithJumpService { if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil { if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing pure-iptables proxy rule: %v", err) + glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) encounteredError = true } } @@ -471,26 +475,52 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool } if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing ipvs Proxier iptables rule: %v", err) + glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) encounteredError = true } } - // Flush and remove all of our chains. + // Unlink the forwarding chain. + args = []string{ + "-m", "comment", "--comment", "kubernetes forwarding rules", + "-j", string(KubeForwardChain), + } + if err := ipt.DeleteRule(utiliptables.TableFilter, utiliptables.ChainForward, args...); err != nil { + if !utiliptables.IsNotFoundError(err) { + glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) + encounteredError = true + } + } + + // Flush and remove all of our "-t nat" chains. for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain} { if err := ipt.FlushChain(utiliptables.TableNAT, chain); err != nil { if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing ipvs Proxier iptables rule: %v", err) + glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) encounteredError = true } } if err := ipt.DeleteChain(utiliptables.TableNAT, chain); err != nil { if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing ipvs Proxier iptables rule: %v", err) + glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) encounteredError = true } } } + // Flush and remove all of our "-t filter" chains. + if err := ipt.FlushChain(utiliptables.TableFilter, KubeForwardChain); err != nil { + if !utiliptables.IsNotFoundError(err) { + glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) + encounteredError = true + } + } + if err := ipt.DeleteChain(utiliptables.TableFilter, KubeForwardChain); err != nil { + if !utiliptables.IsNotFoundError(err) { + glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) + encounteredError = true + } + } + return encounteredError } @@ -651,13 +681,21 @@ func (proxier *Proxier) syncProxyRules() { glog.V(3).Infof("Syncing ipvs Proxier rules") - // TODO: UT output result // Begin install iptables // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore + existingFilterChains := make(map[utiliptables.Chain]string) + proxier.iptablesData.Reset() + err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData) + if err != nil { // if we failed to get any rules + glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + } else { // otherwise parse the output + existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes()) + } + existingNATChains := make(map[utiliptables.Chain]string) proxier.iptablesData.Reset() - err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) + err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output @@ -667,8 +705,11 @@ func (proxier *Proxier) syncProxyRules() { // This is to avoid memory reallocations and thus improve performance. proxier.natChains.Reset() proxier.natRules.Reset() + // Write table headers. + writeLine(proxier.filterChains, "*filter") writeLine(proxier.natChains, "*nat") + // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). if chain, ok := existingNATChains[kubePostroutingChain]; ok { @@ -747,6 +788,11 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err) return } + // Kube forward + if err := proxier.linkKubeForwardChain(existingFilterChains, proxier.filterChains); err != nil { + glog.Errorf("Failed to create and link KUBE-FORWARD chain: %v", err) + return + } // `iptables -t nat -N KUBE-NODE-PORT` if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil { @@ -1272,7 +1318,42 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } + // If the masqueradeMark has been added then we want to forward that same + // traffic, this allows NodePort traffic to be forwarded even if the default + // FORWARD policy is not accept. + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding rules"`, + "-m", "mark", "--mark", proxier.masqueradeMark, + "-j", "ACCEPT", + ) + + // The following rules can only be set if clusterCIDR has been defined. + if len(proxier.clusterCIDR) != 0 { + // The following two rules ensure the traffic after the initial packet + // accepted by the "kubernetes forwarding rules" rule above will be + // accepted, to be as specific as possible the traffic must be sourced + // or destined to the clusterCIDR (to/from a pod). + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-s", proxier.clusterCIDR, + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, + "-d", proxier.clusterCIDR, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) + } + // Write the end-of-table markers. + writeLine(proxier.filterRules, "COMMIT") writeLine(proxier.natRules, "COMMIT") // Sync iptables rules. @@ -1529,6 +1610,25 @@ func (proxier *Proxier) createKubeChain(existingNATChains map[utiliptables.Chain return nil } +func (proxier *Proxier) linkKubeForwardChain(existingFilterChains map[utiliptables.Chain]string, filterChains *bytes.Buffer) error { + if _, err := proxier.iptables.EnsureChain(utiliptables.TableFilter, KubeForwardChain); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableFilter, KubeForwardChain, err) + } + + comment := "kubernetes forward rules" + args := []string{"-m", "comment", "--comment", comment, "-j", string(KubeForwardChain)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableFilter, utiliptables.ChainForward, args...); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, err) + } + + if chain, ok := existingFilterChains[KubeForwardChain]; ok { + writeLine(filterChains, chain) + } else { + writeLine(filterChains, utiliptables.MakeChainLine(KubeForwardChain)) + } + return nil +} + // Join all words with spaces, terminate with newline and write to buff. func writeLine(buf *bytes.Buffer, words ...string) { // We avoid strings.Join for performance reasons. diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index f5d062b8d5..e766486093 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -138,6 +138,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u iptablesData: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), netlinkHandle: netlinktest.NewFakeNetlinkHandle(), loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),