Merge pull request #28717 from freehan/ebtable

Automatic merge from submit-queue

Filter duplicate network packets in promiscuous bridge mode (with ebtables)

also fixes: #30783
pull/6/head
Kubernetes Submit Queue 2016-08-25 19:12:09 -07:00 committed by GitHub
commit d3ecad111e
8 changed files with 442 additions and 1 deletions

View File

@ -22,5 +22,6 @@ CROSS_BUILD_COPY qemu-ARCH-static /usr/bin/
# cleanup has no effect.
RUN DEBIAN_FRONTEND=noninteractive apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y iptables \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y ebtables \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y conntrack \
&& rm -rf /var/lib/apt/lists/*

View File

@ -16,7 +16,7 @@
REGISTRY?="gcr.io/google_containers"
IMAGE=debian-iptables
TAG=v3
TAG=v4
ARCH?=amd64
TEMP_DIR:=$(shell mktemp -d)

View File

@ -21,6 +21,7 @@ CROSS_BUILD_COPY qemu-ARCH-static /usr/bin/
RUN DEBIAN_FRONTEND=noninteractive apt-get update -y \
&& DEBIAN_FRONTEND=noninteractive apt-get -yy -q install \
iptables \
ebtables \
ethtool \
ca-certificates \
conntrack \

View File

@ -2,6 +2,7 @@ pkg-core:
pkg.installed:
- names:
- curl
- ebtables
{% if grains['os_family'] == 'RedHat' %}
- python
- git

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/util/bandwidth"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilebtables "k8s.io/kubernetes/pkg/util/ebtables"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@ -45,6 +46,7 @@ import (
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
"strconv"
)
const (
@ -56,6 +58,15 @@ const (
// fallbackMTU is used if an MTU is not specified, and we cannot determine the MTU
fallbackMTU = 1460
// private mac prefix safe to use
// Universally administered and locally administered addresses are distinguished by setting the second-least-significant
// bit of the first octet of the address. If it is 1, the address is locally administered. For example, for address 0a:00:00:00:00:00,
// the first cotet is 0a(hex), the binary form of which is 00001010, where the second-least-significant bit is 1.
privateMACPrefix = "0a:58"
// ebtables Chain to store dedup rules
dedupChain = utilebtables.Chain("KUBE-DEDUP")
)
type kubenetNetworkPlugin struct {
@ -75,10 +86,13 @@ type kubenetNetworkPlugin struct {
hostportHandler hostport.HostportHandler
iptables utiliptables.Interface
sysctl utilsysctl.Interface
ebtables utilebtables.Interface
// vendorDir is passed by kubelet network-plugin-dir parameter.
// kubenet will search for cni binaries in DefaultCNIDir first, then continue to vendorDir.
vendorDir string
nonMasqueradeCIDR string
podCidr string
gateway net.IP
}
func NewPlugin(networkPluginDir string) network.NetworkPlugin {
@ -243,6 +257,8 @@ func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interf
// plugin will bail out if the bridge has an unexpected one
plugin.clearBridgeAddressesExcept(cidr)
}
plugin.podCidr = podCIDR
plugin.gateway = cidr.IP
}
if err != nil {
@ -327,6 +343,22 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4)
}
// Explicitly assign mac address to cbr0. If bridge mac address is not explicitly set will adopt the lowest MAC address of the attached veths.
// TODO: Remove this once upstream cni bridge plugin handles this
link, err := netlink.LinkByName(BridgeName)
if err != nil {
return fmt.Errorf("failed to lookup %q: %v", BridgeName, err)
}
macAddr, err := generateHardwareAddr(plugin.gateway)
if err != nil {
return err
}
glog.V(3).Infof("Configure %q mac address to %v", BridgeName, macAddr)
err = netlink.LinkSetHardwareAddr(link, macAddr)
if err != nil {
return fmt.Errorf("Failed to configure %q mac address to %q: %v", BridgeName, macAddr, err)
}
// Put the container bridge into promiscuous mode to force it to accept hairpin packets.
// TODO: Remove this once the kernel bug (#20096) is fixed.
// TODO: check and set promiscuous mode with netlink once vishvananda/netlink supports it
@ -338,6 +370,8 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
return fmt.Errorf("Error setting promiscuous mode on %s: %v", BridgeName, err)
}
}
// configure the ebtables rules to eliminate duplicate packets by best effort
plugin.syncEbtablesDedupRules(macAddr)
}
// The first SetUpPod call creates the bridge; get a shaper for the sake of
@ -583,3 +617,63 @@ func (plugin *kubenetNetworkPlugin) shaper() bandwidth.BandwidthShaper {
}
return plugin.bandwidthShaper
}
//TODO: make this into a goroutine and rectify the dedup rules periodically
func (plugin *kubenetNetworkPlugin) syncEbtablesDedupRules(macAddr net.HardwareAddr) {
if plugin.ebtables == nil {
plugin.ebtables = utilebtables.New(plugin.execer)
glog.V(3).Infof("Flushing dedup chain")
if err := plugin.ebtables.FlushChain(utilebtables.TableFilter, dedupChain); err != nil {
glog.Errorf("Failed to flush dedup chain: %v", err)
}
}
_, err := plugin.ebtables.GetVersion()
if err != nil {
glog.Warningf("Failed to get ebtables version. Skip syncing ebtables dedup rules: %v", err)
return
}
glog.V(3).Infof("Filtering packets with ebtables on mac address: %v, gateway: %v, pod CIDR: %v", macAddr.String(), plugin.gateway.String(), plugin.podCidr)
_, err = plugin.ebtables.EnsureChain(utilebtables.TableFilter, dedupChain)
if err != nil {
glog.Errorf("Failed to ensure %v chain %v", utilebtables.TableFilter, dedupChain)
return
}
_, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, utilebtables.ChainOutput, "-j", string(dedupChain))
if err != nil {
glog.Errorf("Failed to ensure %v chain %v jump to %v chain: %v", utilebtables.TableFilter, utilebtables.ChainOutput, dedupChain, err)
return
}
commonArgs := []string{"-p", "IPv4", "-s", macAddr.String(), "-o", "veth+"}
_, err = plugin.ebtables.EnsureRule(utilebtables.Prepend, utilebtables.TableFilter, dedupChain, append(commonArgs, "--ip-src", plugin.gateway.String(), "-j", "ACCEPT")...)
if err != nil {
glog.Errorf("Failed to ensure packets from cbr0 gateway to be accepted")
return
}
_, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, dedupChain, append(commonArgs, "--ip-src", plugin.podCidr, "-j", "DROP")...)
if err != nil {
glog.Errorf("Failed to ensure packets from podCidr but has mac address of cbr0 to get dropped.")
return
}
}
// generateHardwareAddr generates 48 bit virtual mac addresses based on the IP input.
func generateHardwareAddr(ip net.IP) (net.HardwareAddr, error) {
if ip.To4() == nil {
return nil, fmt.Errorf("generateHardwareAddr only support valid ipv4 address as input")
}
mac := privateMACPrefix
sections := strings.Split(ip.String(), ".")
for _, s := range sections {
i, _ := strconv.Atoi(s)
mac = mac + ":" + fmt.Sprintf("%02x", i)
}
hwAddr, err := net.ParseMAC(mac)
if err != nil {
return nil, fmt.Errorf("Failed to parse mac address %s generated based on ip %s due to: %v", mac, ip, err)
}
return hwAddr, nil
}

View File

@ -18,6 +18,7 @@ package kubenet
import (
"fmt"
"net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -198,4 +199,34 @@ func TestInit_MTU(t *testing.T) {
assert.Equal(t, 1, sysctl.Settings["net/bridge/bridge-nf-call-iptables"], "net/bridge/bridge-nf-call-iptables sysctl should have been set")
}
func TestGenerateMacAddress(t *testing.T) {
testCases := []struct {
ip net.IP
expectedMAC string
}{
{
ip: net.ParseIP("10.0.0.2"),
expectedMAC: privateMACPrefix + ":0a:00:00:02",
},
{
ip: net.ParseIP("10.250.0.244"),
expectedMAC: privateMACPrefix + ":0a:fa:00:f4",
},
{
ip: net.ParseIP("172.17.0.2"),
expectedMAC: privateMACPrefix + ":ac:11:00:02",
},
}
for _, tc := range testCases {
mac, err := generateHardwareAddr(tc.ip)
if err != nil {
t.Errorf("Did not expect error: %v", err)
}
if mac.String() != tc.expectedMAC {
t.Errorf("generated mac: %q, expecting: %q", mac.String(), tc.expectedMAC)
}
}
}
//TODO: add unit test for each implementation of network plugin interface

View File

@ -0,0 +1,189 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ebtables
import (
"fmt"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"regexp"
"strings"
"sync"
)
const (
cmdebtables = "ebtables"
// Flag to show full mac in output. The default representation omits leading zeroes.
fullMac = "--Lmac2"
)
type RulePosition string
const (
Prepend RulePosition = "-I"
Append RulePosition = "-A"
)
type Table string
const (
TableNAT Table = "nat"
TableFilter Table = "filter"
)
type Chain string
const (
ChainPostrouting Chain = "POSTROUTING"
ChainPrerouting Chain = "PREROUTING"
ChainOutput Chain = "OUTPUT"
ChainInput Chain = "INPUT"
)
type operation string
const (
opCreateChain operation = "-N"
opFlushChain operation = "-F"
opDeleteChain operation = "-X"
opListChain operation = "-L"
opAppendRule operation = "-A"
opPrependRule operation = "-I"
opDeleteRule operation = "-D"
)
// An injectable interface for running ebtables commands. Implementations must be goroutine-safe.
type Interface interface {
// GetVersion returns the "X.Y.Z" semver string for ebtables.
GetVersion() (string, error)
// EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true.
// WARNING: ebtables does not provide check operation like iptables do. Hence we have to do a string match of args.
// Input args must follow the format and sequence of ebtables list output. Otherwise, EnsureRule will always create
// new rules and causing duplicates.
EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
// EnsureChain checks if the specified chain is present and, if not, creates it. If the rule existed, return true.
EnsureChain(table Table, chain Chain) (bool, error)
// DeleteChain deletes the specified chain. If the chain did not exist, return error.
DeleteChain(table Table, chain Chain) error
// FlushChain flush the specified chain. If the chain did not exist, return error.
FlushChain(table Table, chain Chain) error
}
// runner implements Interface in terms of exec("ebtables").
type runner struct {
mu sync.Mutex
exec utilexec.Interface
}
// New returns a new Interface which will exec ebtables.
func New(exec utilexec.Interface) Interface {
runner := &runner{
exec: exec,
}
return runner
}
func makeFullArgs(table Table, op operation, chain Chain, args ...string) []string {
return append([]string{"-t", string(table), string(op), string(chain)}, args...)
}
// getEbtablesVersionString runs "ebtables --version" to get the version string
// in the form "X.X.X"
func getEbtablesVersionString(exec utilexec.Interface) (string, error) {
// this doesn't access mutable state so we don't need to use the interface / runner
bytes, err := exec.Command(cmdebtables, "--version").CombinedOutput()
if err != nil {
return "", err
}
versionMatcher := regexp.MustCompile("v([0-9]+\\.[0-9]+\\.[0-9]+)")
match := versionMatcher.FindStringSubmatch(string(bytes))
if match == nil {
return "", fmt.Errorf("no ebtables version found in string: %s", bytes)
}
return match[1], nil
}
func (runner *runner) GetVersion() (string, error) {
return getEbtablesVersionString(runner.exec)
}
func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
exist := true
fullArgs := makeFullArgs(table, opListChain, chain, fullMac)
out, err := runner.exec.Command(cmdebtables, fullArgs...).CombinedOutput()
if err != nil {
exist = false
} else {
exist = checkIfRuleExists(string(out), args...)
}
if !exist {
fullArgs = makeFullArgs(table, operation(position), chain, args...)
out, err := runner.exec.Command(cmdebtables, fullArgs...).CombinedOutput()
if err != nil {
return exist, fmt.Errorf("Failed to ensure rule: %v, output: %v", err, string(out))
}
}
return exist, nil
}
func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) {
exist := true
args := makeFullArgs(table, opListChain, chain)
_, err := runner.exec.Command(cmdebtables, args...).CombinedOutput()
if err != nil {
exist = false
}
if !exist {
args = makeFullArgs(table, opCreateChain, chain)
out, err := runner.exec.Command(cmdebtables, args...).CombinedOutput()
if err != nil {
return exist, fmt.Errorf("Failed to ensure %v chain: %v, output: %v", chain, err, string(out))
}
}
return exist, nil
}
// checkIfRuleExists takes the output of ebtables list chain and checks if the input rules exists
// WARNING: checkIfRuleExists expects the input args matches the format and sequence of ebtables list output
func checkIfRuleExists(listChainOutput string, args ...string) bool {
rule := strings.Join(args, " ")
for _, line := range strings.Split(listChainOutput, "\n") {
if strings.TrimSpace(line) == rule {
return true
}
}
return false
}
func (runner *runner) DeleteChain(table Table, chain Chain) error {
fullArgs := makeFullArgs(table, opDeleteChain, chain)
out, err := runner.exec.Command(cmdebtables, fullArgs...).CombinedOutput()
if err != nil {
return fmt.Errorf("Failed to delete %v chain %v: %v, output: %v", string(table), string(chain), err, string(out))
}
return nil
}
func (runner *runner) FlushChain(table Table, chain Chain) error {
fullArgs := makeFullArgs(table, opFlushChain, chain)
out, err := runner.exec.Command(cmdebtables, fullArgs...).CombinedOutput()
if err != nil {
return fmt.Errorf("Failed to flush %v chain %v: %v, output: %v", string(table), string(chain), err, string(out))
}
return nil
}

View File

@ -0,0 +1,124 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ebtables
import (
"k8s.io/kubernetes/pkg/util/exec"
"strings"
"testing"
)
func testEnsureChain(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
// Does not Exists
func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 1} },
// Success
func() ([]byte, error) { return []byte{}, nil },
// Exists
func() ([]byte, error) { return nil, nil },
// Does not Exists
func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 1} },
// Fail to create chain
func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 2} },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
exists, err := runner.EnsureChain(TableFilter, "TEST-CHAIN")
if exists {
t.Errorf("expected exists = false")
}
if err != nil {
t.Errorf("expected err = nil")
}
exists, err = runner.EnsureChain(TableFilter, "TEST-CHAIN")
if !exists {
t.Errorf("expected exists = true")
}
if err != nil {
t.Errorf("expected err = nil")
}
exists, err = runner.EnsureChain(TableFilter, "TEST-CHAIN")
if exists {
t.Errorf("expected exists = false")
}
errStr := "Failed to ensure TEST-CHAIN chain: exit 2, output:"
if err == nil || !strings.Contains(err.Error(), errStr) {
t.Errorf("expected error: %q", errStr)
}
}
func testEnsureRule(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
// Exists
func() ([]byte, error) {
return []byte(`Bridge table: filter
Bridge chain: OUTPUT, entries: 4, policy: ACCEPT
-j TEST
`), nil
},
// Does not Exists.
func() ([]byte, error) {
return []byte(`Bridge table: filter
Bridge chain: TEST, entries: 0, policy: ACCEPT`), nil
},
// Fail to create
func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 2} },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
exists, err := runner.EnsureRule(Append, TableFilter, ChainOutput, "-j", "TEST")
if !exists {
t.Errorf("expected exists = true")
}
if err != nil {
t.Errorf("expected err = nil")
}
exists, err = runner.EnsureRule(Append, TableFilter, ChainOutput, "-j", "NEXT-TEST")
if exists {
t.Errorf("expected exists = false")
}
errStr := "Failed to ensure rule: exist 2, output: "
if err == nil || err.Error() != errStr {
t.Errorf("expected error: %q", errStr)
}
}