From 6e50f39dbd67b8e6f418773454858c5cfb59d67f Mon Sep 17 00:00:00 2001 From: wojtekt Date: Fri, 6 Jul 2018 13:34:15 +0200 Subject: [PATCH] Avoid allocations when parsing iptables --- .../network/hostport/hostport_manager.go | 2 +- .../network/hostport/hostport_syncer.go | 13 ++++-- pkg/proxy/iptables/proxier.go | 44 +++++++++++-------- pkg/proxy/iptables/proxier_test.go | 3 +- pkg/proxy/ipvs/proxier.go | 25 +++++++---- pkg/util/iptables/save_restore.go | 17 ++++--- 6 files changed, 63 insertions(+), 41 deletions(-) diff --git a/pkg/kubelet/dockershim/network/hostport/hostport_manager.go b/pkg/kubelet/dockershim/network/hostport/hostport_manager.go index 6d85c75efe..67ef0e5aea 100644 --- a/pkg/kubelet/dockershim/network/hostport/hostport_manager.go +++ b/pkg/kubelet/dockershim/network/hostport/hostport_manager.go @@ -341,7 +341,7 @@ func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[util for chain := range existingNATChains { if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) { - existingHostportChains[chain] = existingNATChains[chain] + existingHostportChains[chain] = string(existingNATChains[chain]) } } diff --git a/pkg/kubelet/dockershim/network/hostport/hostport_syncer.go b/pkg/kubelet/dockershim/network/hostport/hostport_syncer.go index 3d7bfd6e4d..43c3c52ecb 100644 --- a/pkg/kubelet/dockershim/network/hostport/hostport_syncer.go +++ b/pkg/kubelet/dockershim/network/hostport/hostport_syncer.go @@ -137,6 +137,11 @@ func writeLine(buf *bytes.Buffer, words ...string) { buf.WriteString(strings.Join(words, " ") + "\n") } +func writeBytesLine(buf *bytes.Buffer, bytes []byte) { + buf.Write(bytes) + buf.WriteByte('\n') +} + //hostportChainName takes containerPort for a pod and returns associated iptables chain. // This is computed by hashing (sha256) // then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do @@ -189,7 +194,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingNATChains := make(map[utiliptables.Chain]string) + existingNATChains := make(map[utiliptables.Chain][]byte) iptablesData := bytes.NewBuffer(nil) err = h.iptables.SaveInto(utiliptables.TableNAT, iptablesData) if err != nil { // if we failed to get any rules @@ -204,7 +209,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). if chain, ok := existingNATChains[kubeHostportsChain]; ok { - writeLine(natChains, chain) + writeBytesLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(kubeHostportsChain)) } @@ -216,7 +221,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap protocol := strings.ToLower(string(port.Protocol)) hostportChain := hostportChainName(port, target.podFullName) if chain, ok := existingNATChains[hostportChain]; ok { - writeLine(natChains, chain) + writeBytesLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(hostportChain)) } @@ -264,7 +269,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap // We must (as per iptables) write a chain-line for it, which has // the nice effect of flushing the chain. Then we can remove the // chain. - writeLine(natChains, existingNATChains[chain]) + writeBytesLine(natChains, existingNATChains[chain]) writeLine(natRules, "-X", chainString) } } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a4338f62d3..f53790089a 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -403,16 +403,16 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { if _, found := existingNATChains[chain]; found { chainString := string(chain) - writeLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete + writeBytesLine(natChains, existingNATChains[chain]) // flush + writeLine(natRules, "-X", chainString) // delete } } // Hunt for service and endpoint chains. for chain := range existingNATChains { chainString := string(chain) if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { - writeLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete + writeBytesLine(natChains, existingNATChains[chain]) // flush + writeLine(natRules, "-X", chainString) // delete } } writeLine(natRules, "COMMIT") @@ -426,7 +426,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { } // Flush and remove all of our "-t filter" chains. - iptablesData = bytes.NewBuffer(nil) + iptablesData.Reset() if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil { glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableFilter, err) encounteredError = true @@ -438,7 +438,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { if _, found := existingFilterChains[chain]; found { chainString := string(chain) - writeLine(filterChains, existingFilterChains[chain]) + writeBytesLine(filterChains, existingFilterChains[chain]) writeLine(filterRules, "-X", chainString) } } @@ -663,16 +663,19 @@ func (proxier *Proxier) syncProxyRules() { // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingFilterChains := make(map[utiliptables.Chain]string) - proxier.iptablesData.Reset() - err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData) + existingFilterChains := make(map[utiliptables.Chain][]byte) + // TODO: Filter table is small so we're not reusing this buffer over rounds. + // However, to optimize it further, we should do that. + existingFilterChainsData := bytes.NewBuffer(nil) + err := proxier.iptables.SaveInto(utiliptables.TableFilter, existingFilterChainsData) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output - existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes()) + existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, existingFilterChainsData.Bytes()) } - existingNATChains := make(map[utiliptables.Chain]string) + // IMPORTANT: existingNATChains may share memory with proxier.iptablesData. + existingNATChains := make(map[utiliptables.Chain][]byte) proxier.iptablesData.Reset() err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) if err != nil { // if we failed to get any rules @@ -696,14 +699,14 @@ func (proxier *Proxier) syncProxyRules() { // (which most should have because we created them above). for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { if chain, ok := existingFilterChains[chainName]; ok { - writeLine(proxier.filterChains, chain) + writeBytesLine(proxier.filterChains, chain) } else { writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName)) } } for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { if chain, ok := existingNATChains[chainName]; ok { - writeLine(proxier.natChains, chain) + writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) } @@ -763,7 +766,7 @@ func (proxier *Proxier) syncProxyRules() { if hasEndpoints { // Create the per-service chain, retaining counters if possible. if chain, ok := existingNATChains[svcChain]; ok { - writeLine(proxier.natChains, chain) + writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) } @@ -775,7 +778,7 @@ func (proxier *Proxier) syncProxyRules() { // Only for services request OnlyLocal traffic // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { - writeLine(proxier.natChains, lbChain) + writeBytesLine(proxier.natChains, lbChain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) } @@ -891,7 +894,7 @@ func (proxier *Proxier) syncProxyRules() { if ingress.IP != "" { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { - writeLine(proxier.natChains, chain) + writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) } @@ -1067,7 +1070,7 @@ func (proxier *Proxier) syncProxyRules() { // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { - writeLine(proxier.natChains, chain) + writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) } @@ -1215,7 +1218,7 @@ func (proxier *Proxier) syncProxyRules() { // We must (as per iptables) write a chain-line for it, which has // the nice effect of flushing the chain. Then we can remove the // chain. - writeLine(proxier.natChains, existingNATChains[chain]) + writeBytesLine(proxier.natChains, existingNATChains[chain]) writeLine(proxier.natRules, "-X", chainString) } } @@ -1356,6 +1359,11 @@ func writeLine(buf *bytes.Buffer, words ...string) { } } +func writeBytesLine(buf *bytes.Buffer, bytes []byte) { + buf.Write(bytes) + buf.WriteByte('\n') +} + func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) { // For ports on node IPs, open the actual port and hold it, even though we // use iptables to redirect traffic. diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index db5fce2065..a2930e819a 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -45,7 +45,8 @@ import ( func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) { chainLines := utiliptables.GetChainLines(table, save) - for chain, line := range chainLines { + for chain, lineBytes := range chainLines { + line := string(lineBytes) if expected, exists := expectedLines[chain]; exists { if expected != line { t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 2f7af79923..10dd36f226 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1367,8 +1367,11 @@ func (proxier *Proxier) acceptIPVSTraffic() { // createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link. func (proxier *Proxier) createAndLinkeKubeChain() { - existingFilterChains := proxier.getExistingChains(utiliptables.TableFilter) - existingNATChains := proxier.getExistingChains(utiliptables.TableNAT) + // TODO: Filter table is small so we're not reusing this buffer over rounds. + // However, to optimize it further, we should do that. + filterBuffer := bytes.NewBuffer(nil) + existingFilterChains := proxier.getExistingChains(filterBuffer, utiliptables.TableFilter) + existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT) // Make sure we keep stats for the top-level chains for _, ch := range iptablesChains { @@ -1378,13 +1381,13 @@ func (proxier *Proxier) createAndLinkeKubeChain() { } if ch.table == utiliptables.TableNAT { if chain, ok := existingNATChains[ch.chain]; ok { - writeLine(proxier.natChains, chain) + writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) } } else { if chain, ok := existingFilterChains[KubeForwardChain]; ok { - writeLine(proxier.filterChains, chain) + writeBytesLine(proxier.filterChains, chain) } else { writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain)) } @@ -1419,13 +1422,14 @@ func (proxier *Proxier) createAndLinkeKubeChain() { // getExistingChains get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore -func (proxier *Proxier) getExistingChains(table utiliptables.Table) map[utiliptables.Chain]string { - proxier.iptablesData.Reset() - err := proxier.iptables.SaveInto(table, proxier.iptablesData) +// Result may SHARE memory with contents of buffer. +func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte { + buffer.Reset() + err := proxier.iptables.SaveInto(table, buffer) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output - return utiliptables.GetChainLines(table, proxier.iptablesData.Bytes()) + return utiliptables.GetChainLines(table, buffer.Bytes()) } return nil } @@ -1610,6 +1614,11 @@ func writeLine(buf *bytes.Buffer, words ...string) { } } +func writeBytesLine(buf *bytes.Buffer, bytes []byte) { + buf.Write(bytes) + buf.WriteByte('\n') +} + // listenPortOpener opens ports by calling bind() and listen(). type listenPortOpener struct{} diff --git a/pkg/util/iptables/save_restore.go b/pkg/util/iptables/save_restore.go index 08671839f2..172f07e7fa 100644 --- a/pkg/util/iptables/save_restore.go +++ b/pkg/util/iptables/save_restore.go @@ -19,11 +19,11 @@ package iptables import ( "bytes" "fmt" - "strings" ) var ( commitBytes = []byte("COMMIT") + spaceBytes = []byte(" ") ) // MakeChainLine return an iptables-save/restore formatted chain line given a Chain @@ -32,9 +32,11 @@ func MakeChainLine(chain Chain) string { } // GetChainLines parses a table's iptables-save data to find chains in the table. -// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc). -func GetChainLines(table Table, save []byte) map[Chain]string { - chainsMap := make(map[Chain]string) +// It returns a map of iptables.Chain to []byte where the []byte is the chain line +// from save (with counters etc.). +// Note that to avoid allocations memory is SHARED with save. +func GetChainLines(table Table, save []byte) map[Chain][]byte { + chainsMap := make(map[Chain][]byte) tablePrefix := []byte("*" + string(table)) readIndex := 0 // find beginning of table @@ -59,11 +61,8 @@ func GetChainLines(table Table, save []byte) map[Chain]string { } else if line[0] == ':' && len(line) > 1 { // We assume that the contains space - chain lines have 3 fields, // space delimited. If there is no space, this line will panic. - // - // TODO: Try to avoid these allocations by reusing memory with the input. - lineString := string(line) - chain := Chain(line[1:strings.Index(lineString, " ")]) - chainsMap[chain] = lineString + chain := Chain(line[1:bytes.Index(line, spaceBytes)]) + chainsMap[chain] = line } } return chainsMap