Merge pull request #65902 from wojtek-t/kube_proxy_less_allocations_2

Automatic merge from submit-queue (batch tested with PRs 65902, 65781). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Avoid unnecessary allocations in kube-proxy
pull/8/head
Kubernetes Submit Queue 2018-07-09 23:07:01 -07:00 committed by GitHub
commit 13f9c26fd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 63 additions and 41 deletions

View File

@ -341,7 +341,7 @@ func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[util
for chain := range existingNATChains { for chain := range existingNATChains {
if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) { if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) {
existingHostportChains[chain] = existingNATChains[chain] existingHostportChains[chain] = string(existingNATChains[chain])
} }
} }

View File

@ -137,6 +137,11 @@ func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n") buf.WriteString(strings.Join(words, " ") + "\n")
} }
func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
buf.Write(bytes)
buf.WriteByte('\n')
}
//hostportChainName takes containerPort for a pod and returns associated iptables chain. //hostportChainName takes containerPort for a pod and returns associated iptables chain.
// This is computed by hashing (sha256) // This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do // then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
@ -189,7 +194,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
// Get iptables-save output so we can check for existing chains and rules. // Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingNATChains := make(map[utiliptables.Chain]string) existingNATChains := make(map[utiliptables.Chain][]byte)
iptablesData := bytes.NewBuffer(nil) iptablesData := bytes.NewBuffer(nil)
err = h.iptables.SaveInto(utiliptables.TableNAT, iptablesData) err = h.iptables.SaveInto(utiliptables.TableNAT, iptablesData)
if err != nil { // if we failed to get any rules if err != nil { // if we failed to get any rules
@ -204,7 +209,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
// Make sure we keep stats for the top-level chains, if they existed // Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above). // (which most should have because we created them above).
if chain, ok := existingNATChains[kubeHostportsChain]; ok { if chain, ok := existingNATChains[kubeHostportsChain]; ok {
writeLine(natChains, chain) writeBytesLine(natChains, chain)
} else { } else {
writeLine(natChains, utiliptables.MakeChainLine(kubeHostportsChain)) writeLine(natChains, utiliptables.MakeChainLine(kubeHostportsChain))
} }
@ -216,7 +221,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
protocol := strings.ToLower(string(port.Protocol)) protocol := strings.ToLower(string(port.Protocol))
hostportChain := hostportChainName(port, target.podFullName) hostportChain := hostportChainName(port, target.podFullName)
if chain, ok := existingNATChains[hostportChain]; ok { if chain, ok := existingNATChains[hostportChain]; ok {
writeLine(natChains, chain) writeBytesLine(natChains, chain)
} else { } else {
writeLine(natChains, utiliptables.MakeChainLine(hostportChain)) writeLine(natChains, utiliptables.MakeChainLine(hostportChain))
} }
@ -264,7 +269,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
// We must (as per iptables) write a chain-line for it, which has // We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the // the nice effect of flushing the chain. Then we can remove the
// chain. // chain.
writeLine(natChains, existingNATChains[chain]) writeBytesLine(natChains, existingNATChains[chain])
writeLine(natRules, "-X", chainString) writeLine(natRules, "-X", chainString)
} }
} }

View File

@ -409,7 +409,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
if _, found := existingNATChains[chain]; found { if _, found := existingNATChains[chain]; found {
chainString := string(chain) chainString := string(chain)
writeLine(natChains, existingNATChains[chain]) // flush writeBytesLine(natChains, existingNATChains[chain]) // flush
writeLine(natRules, "-X", chainString) // delete writeLine(natRules, "-X", chainString) // delete
} }
} }
@ -417,7 +417,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
for chain := range existingNATChains { for chain := range existingNATChains {
chainString := string(chain) chainString := string(chain)
if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") {
writeLine(natChains, existingNATChains[chain]) // flush writeBytesLine(natChains, existingNATChains[chain]) // flush
writeLine(natRules, "-X", chainString) // delete writeLine(natRules, "-X", chainString) // delete
} }
} }
@ -432,7 +432,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
} }
// Flush and remove all of our "-t filter" chains. // Flush and remove all of our "-t filter" chains.
iptablesData = bytes.NewBuffer(nil) iptablesData.Reset()
if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil { if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableFilter, err) glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableFilter, err)
encounteredError = true encounteredError = true
@ -444,7 +444,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
if _, found := existingFilterChains[chain]; found { if _, found := existingFilterChains[chain]; found {
chainString := string(chain) chainString := string(chain)
writeLine(filterChains, existingFilterChains[chain]) writeBytesLine(filterChains, existingFilterChains[chain])
writeLine(filterRules, "-X", chainString) writeLine(filterRules, "-X", chainString)
} }
} }
@ -682,16 +682,19 @@ func (proxier *Proxier) syncProxyRules() {
// Get iptables-save output so we can check for existing chains and rules. // Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingFilterChains := make(map[utiliptables.Chain]string) existingFilterChains := make(map[utiliptables.Chain][]byte)
proxier.iptablesData.Reset() // TODO: Filter table is small so we're not reusing this buffer over rounds.
err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData) // However, to optimize it further, we should do that.
existingFilterChainsData := bytes.NewBuffer(nil)
err := proxier.iptables.SaveInto(utiliptables.TableFilter, existingFilterChainsData)
if err != nil { // if we failed to get any rules if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output } else { // otherwise parse the output
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes()) existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, existingFilterChainsData.Bytes())
} }
existingNATChains := make(map[utiliptables.Chain]string) // IMPORTANT: existingNATChains may share memory with proxier.iptablesData.
existingNATChains := make(map[utiliptables.Chain][]byte)
proxier.iptablesData.Reset() proxier.iptablesData.Reset()
err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil { // if we failed to get any rules if err != nil { // if we failed to get any rules
@ -715,14 +718,14 @@ func (proxier *Proxier) syncProxyRules() {
// (which most should have because we created them above). // (which most should have because we created them above).
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
if chain, ok := existingFilterChains[chainName]; ok { if chain, ok := existingFilterChains[chainName]; ok {
writeLine(proxier.filterChains, chain) writeBytesLine(proxier.filterChains, chain)
} else { } else {
writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName)) writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
} }
} }
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
if chain, ok := existingNATChains[chainName]; ok { if chain, ok := existingNATChains[chainName]; ok {
writeLine(proxier.natChains, chain) writeBytesLine(proxier.natChains, chain)
} else { } else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
} }
@ -788,7 +791,7 @@ func (proxier *Proxier) syncProxyRules() {
if hasEndpoints { if hasEndpoints {
// Create the per-service chain, retaining counters if possible. // Create the per-service chain, retaining counters if possible.
if chain, ok := existingNATChains[svcChain]; ok { if chain, ok := existingNATChains[svcChain]; ok {
writeLine(proxier.natChains, chain) writeBytesLine(proxier.natChains, chain)
} else { } else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
} }
@ -800,7 +803,7 @@ func (proxier *Proxier) syncProxyRules() {
// Only for services request OnlyLocal traffic // Only for services request OnlyLocal traffic
// create the per-service LB chain, retaining counters if possible. // create the per-service LB chain, retaining counters if possible.
if lbChain, ok := existingNATChains[svcXlbChain]; ok { if lbChain, ok := existingNATChains[svcXlbChain]; ok {
writeLine(proxier.natChains, lbChain) writeBytesLine(proxier.natChains, lbChain)
} else { } else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
} }
@ -916,7 +919,7 @@ func (proxier *Proxier) syncProxyRules() {
if ingress.IP != "" { if ingress.IP != "" {
// create service firewall chain // create service firewall chain
if chain, ok := existingNATChains[fwChain]; ok { if chain, ok := existingNATChains[fwChain]; ok {
writeLine(proxier.natChains, chain) writeBytesLine(proxier.natChains, chain)
} else { } else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
} }
@ -1092,7 +1095,7 @@ func (proxier *Proxier) syncProxyRules() {
// Create the endpoint chain, retaining counters if possible. // Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
writeLine(proxier.natChains, chain) writeBytesLine(proxier.natChains, chain)
} else { } else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
} }
@ -1240,7 +1243,7 @@ func (proxier *Proxier) syncProxyRules() {
// We must (as per iptables) write a chain-line for it, which has // We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the // the nice effect of flushing the chain. Then we can remove the
// chain. // chain.
writeLine(proxier.natChains, existingNATChains[chain]) writeBytesLine(proxier.natChains, existingNATChains[chain])
writeLine(proxier.natRules, "-X", chainString) writeLine(proxier.natRules, "-X", chainString)
} }
} }
@ -1381,6 +1384,11 @@ func writeLine(buf *bytes.Buffer, words ...string) {
} }
} }
func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
buf.Write(bytes)
buf.WriteByte('\n')
}
func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) { func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we // For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic. // use iptables to redirect traffic.

View File

@ -45,7 +45,8 @@ import (
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) { func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
chainLines := utiliptables.GetChainLines(table, save) chainLines := utiliptables.GetChainLines(table, save)
for chain, line := range chainLines { for chain, lineBytes := range chainLines {
line := string(lineBytes)
if expected, exists := expectedLines[chain]; exists { if expected, exists := expectedLines[chain]; exists {
if expected != line { if expected != line {
t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line) t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line)

View File

@ -1367,8 +1367,11 @@ func (proxier *Proxier) acceptIPVSTraffic() {
// createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link. // createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link.
func (proxier *Proxier) createAndLinkeKubeChain() { func (proxier *Proxier) createAndLinkeKubeChain() {
existingFilterChains := proxier.getExistingChains(utiliptables.TableFilter) // TODO: Filter table is small so we're not reusing this buffer over rounds.
existingNATChains := proxier.getExistingChains(utiliptables.TableNAT) // However, to optimize it further, we should do that.
filterBuffer := bytes.NewBuffer(nil)
existingFilterChains := proxier.getExistingChains(filterBuffer, utiliptables.TableFilter)
existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
// Make sure we keep stats for the top-level chains // Make sure we keep stats for the top-level chains
for _, ch := range iptablesChains { for _, ch := range iptablesChains {
@ -1378,13 +1381,13 @@ func (proxier *Proxier) createAndLinkeKubeChain() {
} }
if ch.table == utiliptables.TableNAT { if ch.table == utiliptables.TableNAT {
if chain, ok := existingNATChains[ch.chain]; ok { if chain, ok := existingNATChains[ch.chain]; ok {
writeLine(proxier.natChains, chain) writeBytesLine(proxier.natChains, chain)
} else { } else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
} }
} else { } else {
if chain, ok := existingFilterChains[KubeForwardChain]; ok { if chain, ok := existingFilterChains[KubeForwardChain]; ok {
writeLine(proxier.filterChains, chain) writeBytesLine(proxier.filterChains, chain)
} else { } else {
writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain)) writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
} }
@ -1419,13 +1422,14 @@ func (proxier *Proxier) createAndLinkeKubeChain() {
// getExistingChains get iptables-save output so we can check for existing chains and rules. // getExistingChains get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
func (proxier *Proxier) getExistingChains(table utiliptables.Table) map[utiliptables.Chain]string { // Result may SHARE memory with contents of buffer.
proxier.iptablesData.Reset() func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte {
err := proxier.iptables.SaveInto(table, proxier.iptablesData) buffer.Reset()
err := proxier.iptables.SaveInto(table, buffer)
if err != nil { // if we failed to get any rules if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output } else { // otherwise parse the output
return utiliptables.GetChainLines(table, proxier.iptablesData.Bytes()) return utiliptables.GetChainLines(table, buffer.Bytes())
} }
return nil return nil
} }
@ -1610,6 +1614,11 @@ func writeLine(buf *bytes.Buffer, words ...string) {
} }
} }
func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
buf.Write(bytes)
buf.WriteByte('\n')
}
// listenPortOpener opens ports by calling bind() and listen(). // listenPortOpener opens ports by calling bind() and listen().
type listenPortOpener struct{} type listenPortOpener struct{}

View File

@ -19,11 +19,11 @@ package iptables
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"strings"
) )
var ( var (
commitBytes = []byte("COMMIT") commitBytes = []byte("COMMIT")
spaceBytes = []byte(" ")
) )
// MakeChainLine return an iptables-save/restore formatted chain line given a Chain // MakeChainLine return an iptables-save/restore formatted chain line given a Chain
@ -32,9 +32,11 @@ func MakeChainLine(chain Chain) string {
} }
// GetChainLines parses a table's iptables-save data to find chains in the table. // GetChainLines parses a table's iptables-save data to find chains in the table.
// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc). // It returns a map of iptables.Chain to []byte where the []byte is the chain line
func GetChainLines(table Table, save []byte) map[Chain]string { // from save (with counters etc.).
chainsMap := make(map[Chain]string) // Note that to avoid allocations memory is SHARED with save.
func GetChainLines(table Table, save []byte) map[Chain][]byte {
chainsMap := make(map[Chain][]byte)
tablePrefix := []byte("*" + string(table)) tablePrefix := []byte("*" + string(table))
readIndex := 0 readIndex := 0
// find beginning of table // find beginning of table
@ -59,11 +61,8 @@ func GetChainLines(table Table, save []byte) map[Chain]string {
} else if line[0] == ':' && len(line) > 1 { } else if line[0] == ':' && len(line) > 1 {
// We assume that the <line> contains space - chain lines have 3 fields, // We assume that the <line> contains space - chain lines have 3 fields,
// space delimited. If there is no space, this line will panic. // space delimited. If there is no space, this line will panic.
// chain := Chain(line[1:bytes.Index(line, spaceBytes)])
// TODO: Try to avoid these allocations by reusing memory with the input. chainsMap[chain] = line
lineString := string(line)
chain := Chain(line[1:strings.Index(lineString, " ")])
chainsMap[chain] = lineString
} }
} }
return chainsMap return chainsMap