fix nodeport FORWARD chain

pull/8/head
m1093782566 2018-02-22 12:12:50 +08:00
parent e7ed9b408a
commit 00430b4b6c
2 changed files with 112 additions and 10 deletions

View File

@ -56,9 +56,6 @@ const (
// kubeServicesChain is the services portal chain
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// KubeServiceIPSetsChain is the services access IP chain
KubeServiceIPSetsChain utiliptables.Chain = "KUBE-SVC-IPSETS"
// KubeFireWallChain is the kubernetes firewall chain.
KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL"
@ -74,6 +71,9 @@ const (
// KubeMarkDropChain is the mark-for-drop chain
KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
// KubeForwardChain is the kubernetes forward chain
KubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
// DefaultScheduler is the default ipvs scheduler algorithm - round robin.
DefaultScheduler = "rr"
@ -149,7 +149,9 @@ type Proxier struct {
// that are significantly impacting performance.
iptablesData *bytes.Buffer
natChains *bytes.Buffer
filterChains *bytes.Buffer
natRules *bytes.Buffer
filterRules *bytes.Buffer
// Added as a member to the struct to allow injection for testing.
netlinkHandle NetLinkHandle
// loopbackSet is the ipset which stores all endpoints IP:Port,IP for solving hairpin mode purpose.
@ -339,6 +341,8 @@ func NewProxier(ipt utiliptables.Interface,
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(),
ipset: ipset,
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
@ -458,7 +462,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
for _, tc := range tableChainsWithJumpService {
if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
encounteredError = true
}
}
@ -471,26 +475,52 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing ipvs Proxier iptables rule: %v", err)
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
encounteredError = true
}
}
// Flush and remove all of our chains.
// Unlink the forwarding chain.
args = []string{
"-m", "comment", "--comment", "kubernetes forwarding rules",
"-j", string(KubeForwardChain),
}
if err := ipt.DeleteRule(utiliptables.TableFilter, utiliptables.ChainForward, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
encounteredError = true
}
}
// Flush and remove all of our "-t nat" chains.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain} {
if err := ipt.FlushChain(utiliptables.TableNAT, chain); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing ipvs Proxier iptables rule: %v", err)
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
encounteredError = true
}
}
if err := ipt.DeleteChain(utiliptables.TableNAT, chain); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing ipvs Proxier iptables rule: %v", err)
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
encounteredError = true
}
}
}
// Flush and remove all of our "-t filter" chains.
if err := ipt.FlushChain(utiliptables.TableFilter, KubeForwardChain); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
encounteredError = true
}
}
if err := ipt.DeleteChain(utiliptables.TableFilter, KubeForwardChain); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
encounteredError = true
}
}
return encounteredError
}
@ -651,13 +681,21 @@ func (proxier *Proxier) syncProxyRules() {
glog.V(3).Infof("Syncing ipvs Proxier rules")
// TODO: UT output result
// Begin install iptables
// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingFilterChains := make(map[utiliptables.Chain]string)
proxier.iptablesData.Reset()
err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData)
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes())
}
existingNATChains := make(map[utiliptables.Chain]string)
proxier.iptablesData.Reset()
err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
@ -667,8 +705,11 @@ func (proxier *Proxier) syncProxyRules() {
// This is to avoid memory reallocations and thus improve performance.
proxier.natChains.Reset()
proxier.natRules.Reset()
// Write table headers.
writeLine(proxier.filterChains, "*filter")
writeLine(proxier.natChains, "*nat")
// Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above).
if chain, ok := existingNATChains[kubePostroutingChain]; ok {
@ -747,6 +788,11 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
return
}
// Kube forward
if err := proxier.linkKubeForwardChain(existingFilterChains, proxier.filterChains); err != nil {
glog.Errorf("Failed to create and link KUBE-FORWARD chain: %v", err)
return
}
// `iptables -t nat -N KUBE-NODE-PORT`
if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil {
@ -1272,7 +1318,42 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
// If the masqueradeMark has been added then we want to forward that same
// traffic, this allows NodePort traffic to be forwarded even if the default
// FORWARD policy is not accept.
writeLine(proxier.filterRules,
"-A", string(KubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding rules"`,
"-m", "mark", "--mark", proxier.masqueradeMark,
"-j", "ACCEPT",
)
// The following rules can only be set if clusterCIDR has been defined.
if len(proxier.clusterCIDR) != 0 {
// The following two rules ensure the traffic after the initial packet
// accepted by the "kubernetes forwarding rules" rule above will be
// accepted, to be as specific as possible the traffic must be sourced
// or destined to the clusterCIDR (to/from a pod).
writeLine(proxier.filterRules,
"-A", string(KubeForwardChain),
"-s", proxier.clusterCIDR,
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
writeLine(proxier.filterRules,
"-A", string(KubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
"-d", proxier.clusterCIDR,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
}
// Write the end-of-table markers.
writeLine(proxier.filterRules, "COMMIT")
writeLine(proxier.natRules, "COMMIT")
// Sync iptables rules.
@ -1529,6 +1610,25 @@ func (proxier *Proxier) createKubeChain(existingNATChains map[utiliptables.Chain
return nil
}
func (proxier *Proxier) linkKubeForwardChain(existingFilterChains map[utiliptables.Chain]string, filterChains *bytes.Buffer) error {
if _, err := proxier.iptables.EnsureChain(utiliptables.TableFilter, KubeForwardChain); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableFilter, KubeForwardChain, err)
}
comment := "kubernetes forward rules"
args := []string{"-m", "comment", "--comment", comment, "-j", string(KubeForwardChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableFilter, utiliptables.ChainForward, args...); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, err)
}
if chain, ok := existingFilterChains[KubeForwardChain]; ok {
writeLine(filterChains, chain)
} else {
writeLine(filterChains, utiliptables.MakeChainLine(KubeForwardChain))
}
return nil
}
// Join all words with spaces, terminate with newline and write to buff.
func writeLine(buf *bytes.Buffer, words ...string) {
// We avoid strings.Join for performance reasons.

View File

@ -138,6 +138,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),