mirror of https://github.com/k3s-io/k3s
Avoid allocations when parsing iptables
parent
cb9ddd3456
commit
6e50f39dbd
|
@ -341,7 +341,7 @@ func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[util
|
|||
|
||||
for chain := range existingNATChains {
|
||||
if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) {
|
||||
existingHostportChains[chain] = existingNATChains[chain]
|
||||
existingHostportChains[chain] = string(existingNATChains[chain])
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -137,6 +137,11 @@ func writeLine(buf *bytes.Buffer, words ...string) {
|
|||
buf.WriteString(strings.Join(words, " ") + "\n")
|
||||
}
|
||||
|
||||
func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
|
||||
buf.Write(bytes)
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
|
||||
//hostportChainName takes containerPort for a pod and returns associated iptables chain.
|
||||
// This is computed by hashing (sha256)
|
||||
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
|
||||
|
@ -189,7 +194,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
|
|||
|
||||
// Get iptables-save output so we can check for existing chains and rules.
|
||||
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
|
||||
existingNATChains := make(map[utiliptables.Chain]string)
|
||||
existingNATChains := make(map[utiliptables.Chain][]byte)
|
||||
iptablesData := bytes.NewBuffer(nil)
|
||||
err = h.iptables.SaveInto(utiliptables.TableNAT, iptablesData)
|
||||
if err != nil { // if we failed to get any rules
|
||||
|
@ -204,7 +209,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
|
|||
// Make sure we keep stats for the top-level chains, if they existed
|
||||
// (which most should have because we created them above).
|
||||
if chain, ok := existingNATChains[kubeHostportsChain]; ok {
|
||||
writeLine(natChains, chain)
|
||||
writeBytesLine(natChains, chain)
|
||||
} else {
|
||||
writeLine(natChains, utiliptables.MakeChainLine(kubeHostportsChain))
|
||||
}
|
||||
|
@ -216,7 +221,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
|
|||
protocol := strings.ToLower(string(port.Protocol))
|
||||
hostportChain := hostportChainName(port, target.podFullName)
|
||||
if chain, ok := existingNATChains[hostportChain]; ok {
|
||||
writeLine(natChains, chain)
|
||||
writeBytesLine(natChains, chain)
|
||||
} else {
|
||||
writeLine(natChains, utiliptables.MakeChainLine(hostportChain))
|
||||
}
|
||||
|
@ -264,7 +269,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
|
|||
// We must (as per iptables) write a chain-line for it, which has
|
||||
// the nice effect of flushing the chain. Then we can remove the
|
||||
// chain.
|
||||
writeLine(natChains, existingNATChains[chain])
|
||||
writeBytesLine(natChains, existingNATChains[chain])
|
||||
writeLine(natRules, "-X", chainString)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -403,16 +403,16 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
|
|||
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
|
||||
if _, found := existingNATChains[chain]; found {
|
||||
chainString := string(chain)
|
||||
writeLine(natChains, existingNATChains[chain]) // flush
|
||||
writeLine(natRules, "-X", chainString) // delete
|
||||
writeBytesLine(natChains, existingNATChains[chain]) // flush
|
||||
writeLine(natRules, "-X", chainString) // delete
|
||||
}
|
||||
}
|
||||
// Hunt for service and endpoint chains.
|
||||
for chain := range existingNATChains {
|
||||
chainString := string(chain)
|
||||
if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") {
|
||||
writeLine(natChains, existingNATChains[chain]) // flush
|
||||
writeLine(natRules, "-X", chainString) // delete
|
||||
writeBytesLine(natChains, existingNATChains[chain]) // flush
|
||||
writeLine(natRules, "-X", chainString) // delete
|
||||
}
|
||||
}
|
||||
writeLine(natRules, "COMMIT")
|
||||
|
@ -426,7 +426,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
|
|||
}
|
||||
|
||||
// Flush and remove all of our "-t filter" chains.
|
||||
iptablesData = bytes.NewBuffer(nil)
|
||||
iptablesData.Reset()
|
||||
if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
|
||||
glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableFilter, err)
|
||||
encounteredError = true
|
||||
|
@ -438,7 +438,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
|
|||
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
|
||||
if _, found := existingFilterChains[chain]; found {
|
||||
chainString := string(chain)
|
||||
writeLine(filterChains, existingFilterChains[chain])
|
||||
writeBytesLine(filterChains, existingFilterChains[chain])
|
||||
writeLine(filterRules, "-X", chainString)
|
||||
}
|
||||
}
|
||||
|
@ -663,16 +663,19 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
|
||||
// Get iptables-save output so we can check for existing chains and rules.
|
||||
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
|
||||
existingFilterChains := make(map[utiliptables.Chain]string)
|
||||
proxier.iptablesData.Reset()
|
||||
err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData)
|
||||
existingFilterChains := make(map[utiliptables.Chain][]byte)
|
||||
// TODO: Filter table is small so we're not reusing this buffer over rounds.
|
||||
// However, to optimize it further, we should do that.
|
||||
existingFilterChainsData := bytes.NewBuffer(nil)
|
||||
err := proxier.iptables.SaveInto(utiliptables.TableFilter, existingFilterChainsData)
|
||||
if err != nil { // if we failed to get any rules
|
||||
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
|
||||
} else { // otherwise parse the output
|
||||
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes())
|
||||
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, existingFilterChainsData.Bytes())
|
||||
}
|
||||
|
||||
existingNATChains := make(map[utiliptables.Chain]string)
|
||||
// IMPORTANT: existingNATChains may share memory with proxier.iptablesData.
|
||||
existingNATChains := make(map[utiliptables.Chain][]byte)
|
||||
proxier.iptablesData.Reset()
|
||||
err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
|
||||
if err != nil { // if we failed to get any rules
|
||||
|
@ -696,14 +699,14 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
// (which most should have because we created them above).
|
||||
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
|
||||
if chain, ok := existingFilterChains[chainName]; ok {
|
||||
writeLine(proxier.filterChains, chain)
|
||||
writeBytesLine(proxier.filterChains, chain)
|
||||
} else {
|
||||
writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
|
||||
}
|
||||
}
|
||||
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
|
||||
if chain, ok := existingNATChains[chainName]; ok {
|
||||
writeLine(proxier.natChains, chain)
|
||||
writeBytesLine(proxier.natChains, chain)
|
||||
} else {
|
||||
writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
|
||||
}
|
||||
|
@ -763,7 +766,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
if hasEndpoints {
|
||||
// Create the per-service chain, retaining counters if possible.
|
||||
if chain, ok := existingNATChains[svcChain]; ok {
|
||||
writeLine(proxier.natChains, chain)
|
||||
writeBytesLine(proxier.natChains, chain)
|
||||
} else {
|
||||
writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
|
||||
}
|
||||
|
@ -775,7 +778,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
// Only for services request OnlyLocal traffic
|
||||
// create the per-service LB chain, retaining counters if possible.
|
||||
if lbChain, ok := existingNATChains[svcXlbChain]; ok {
|
||||
writeLine(proxier.natChains, lbChain)
|
||||
writeBytesLine(proxier.natChains, lbChain)
|
||||
} else {
|
||||
writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
|
||||
}
|
||||
|
@ -891,7 +894,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
if ingress.IP != "" {
|
||||
// create service firewall chain
|
||||
if chain, ok := existingNATChains[fwChain]; ok {
|
||||
writeLine(proxier.natChains, chain)
|
||||
writeBytesLine(proxier.natChains, chain)
|
||||
} else {
|
||||
writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
|
||||
}
|
||||
|
@ -1067,7 +1070,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
|
||||
// Create the endpoint chain, retaining counters if possible.
|
||||
if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
|
||||
writeLine(proxier.natChains, chain)
|
||||
writeBytesLine(proxier.natChains, chain)
|
||||
} else {
|
||||
writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
|
||||
}
|
||||
|
@ -1215,7 +1218,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
// We must (as per iptables) write a chain-line for it, which has
|
||||
// the nice effect of flushing the chain. Then we can remove the
|
||||
// chain.
|
||||
writeLine(proxier.natChains, existingNATChains[chain])
|
||||
writeBytesLine(proxier.natChains, existingNATChains[chain])
|
||||
writeLine(proxier.natRules, "-X", chainString)
|
||||
}
|
||||
}
|
||||
|
@ -1356,6 +1359,11 @@ func writeLine(buf *bytes.Buffer, words ...string) {
|
|||
}
|
||||
}
|
||||
|
||||
func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
|
||||
buf.Write(bytes)
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
|
||||
func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
|
||||
// For ports on node IPs, open the actual port and hold it, even though we
|
||||
// use iptables to redirect traffic.
|
||||
|
|
|
@ -45,7 +45,8 @@ import (
|
|||
|
||||
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
|
||||
chainLines := utiliptables.GetChainLines(table, save)
|
||||
for chain, line := range chainLines {
|
||||
for chain, lineBytes := range chainLines {
|
||||
line := string(lineBytes)
|
||||
if expected, exists := expectedLines[chain]; exists {
|
||||
if expected != line {
|
||||
t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line)
|
||||
|
|
|
@ -1367,8 +1367,11 @@ func (proxier *Proxier) acceptIPVSTraffic() {
|
|||
|
||||
// createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link.
|
||||
func (proxier *Proxier) createAndLinkeKubeChain() {
|
||||
existingFilterChains := proxier.getExistingChains(utiliptables.TableFilter)
|
||||
existingNATChains := proxier.getExistingChains(utiliptables.TableNAT)
|
||||
// TODO: Filter table is small so we're not reusing this buffer over rounds.
|
||||
// However, to optimize it further, we should do that.
|
||||
filterBuffer := bytes.NewBuffer(nil)
|
||||
existingFilterChains := proxier.getExistingChains(filterBuffer, utiliptables.TableFilter)
|
||||
existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
|
||||
|
||||
// Make sure we keep stats for the top-level chains
|
||||
for _, ch := range iptablesChains {
|
||||
|
@ -1378,13 +1381,13 @@ func (proxier *Proxier) createAndLinkeKubeChain() {
|
|||
}
|
||||
if ch.table == utiliptables.TableNAT {
|
||||
if chain, ok := existingNATChains[ch.chain]; ok {
|
||||
writeLine(proxier.natChains, chain)
|
||||
writeBytesLine(proxier.natChains, chain)
|
||||
} else {
|
||||
writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
|
||||
}
|
||||
} else {
|
||||
if chain, ok := existingFilterChains[KubeForwardChain]; ok {
|
||||
writeLine(proxier.filterChains, chain)
|
||||
writeBytesLine(proxier.filterChains, chain)
|
||||
} else {
|
||||
writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
|
||||
}
|
||||
|
@ -1419,13 +1422,14 @@ func (proxier *Proxier) createAndLinkeKubeChain() {
|
|||
|
||||
// getExistingChains get iptables-save output so we can check for existing chains and rules.
|
||||
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
|
||||
func (proxier *Proxier) getExistingChains(table utiliptables.Table) map[utiliptables.Chain]string {
|
||||
proxier.iptablesData.Reset()
|
||||
err := proxier.iptables.SaveInto(table, proxier.iptablesData)
|
||||
// Result may SHARE memory with contents of buffer.
|
||||
func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte {
|
||||
buffer.Reset()
|
||||
err := proxier.iptables.SaveInto(table, buffer)
|
||||
if err != nil { // if we failed to get any rules
|
||||
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
|
||||
} else { // otherwise parse the output
|
||||
return utiliptables.GetChainLines(table, proxier.iptablesData.Bytes())
|
||||
return utiliptables.GetChainLines(table, buffer.Bytes())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1610,6 +1614,11 @@ func writeLine(buf *bytes.Buffer, words ...string) {
|
|||
}
|
||||
}
|
||||
|
||||
func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
|
||||
buf.Write(bytes)
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
|
||||
// listenPortOpener opens ports by calling bind() and listen().
|
||||
type listenPortOpener struct{}
|
||||
|
||||
|
|
|
@ -19,11 +19,11 @@ package iptables
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
commitBytes = []byte("COMMIT")
|
||||
spaceBytes = []byte(" ")
|
||||
)
|
||||
|
||||
// MakeChainLine return an iptables-save/restore formatted chain line given a Chain
|
||||
|
@ -32,9 +32,11 @@ func MakeChainLine(chain Chain) string {
|
|||
}
|
||||
|
||||
// GetChainLines parses a table's iptables-save data to find chains in the table.
|
||||
// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc).
|
||||
func GetChainLines(table Table, save []byte) map[Chain]string {
|
||||
chainsMap := make(map[Chain]string)
|
||||
// It returns a map of iptables.Chain to []byte where the []byte is the chain line
|
||||
// from save (with counters etc.).
|
||||
// Note that to avoid allocations memory is SHARED with save.
|
||||
func GetChainLines(table Table, save []byte) map[Chain][]byte {
|
||||
chainsMap := make(map[Chain][]byte)
|
||||
tablePrefix := []byte("*" + string(table))
|
||||
readIndex := 0
|
||||
// find beginning of table
|
||||
|
@ -59,11 +61,8 @@ func GetChainLines(table Table, save []byte) map[Chain]string {
|
|||
} else if line[0] == ':' && len(line) > 1 {
|
||||
// We assume that the <line> contains space - chain lines have 3 fields,
|
||||
// space delimited. If there is no space, this line will panic.
|
||||
//
|
||||
// TODO: Try to avoid these allocations by reusing memory with the input.
|
||||
lineString := string(line)
|
||||
chain := Chain(line[1:strings.Index(lineString, " ")])
|
||||
chainsMap[chain] = lineString
|
||||
chain := Chain(line[1:bytes.Index(line, spaceBytes)])
|
||||
chainsMap[chain] = line
|
||||
}
|
||||
}
|
||||
return chainsMap
|
||||
|
|
Loading…
Reference in New Issue