Watch for firewalld restart, to allow reloading iptables rules

pull/6/head
Dan Winship 2015-08-14 12:36:15 -04:00
parent c9570e34d0
commit 8bc9c40796
4 changed files with 251 additions and 21 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
"k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -105,7 +106,8 @@ func (s *ProxyServer) Run(_ []string) error {
// remove iptables rules and exit
if s.CleanupAndExit {
execer := exec.New()
ipt := utiliptables.New(execer, protocol)
dbus := utildbus.New()
ipt := utiliptables.New(execer, dbus, protocol)
encounteredError := userspace.CleanupLeftovers(ipt)
encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError
if encounteredError {
@ -165,6 +167,10 @@ func (s *ProxyServer) Run(_ []string) error {
var proxier proxy.ProxyProvider
var endpointsHandler config.EndpointsConfigHandler
execer := exec.New()
dbus := utildbus.New()
ipt := utiliptables.New(execer, dbus, protocol)
shouldUseIptables := false
if !s.ForceUserspaceProxy {
var err error
@ -178,8 +184,6 @@ func (s *ProxyServer) Run(_ []string) error {
if shouldUseIptables {
glog.V(2).Info("Using iptables Proxier.")
execer := exec.New()
ipt := utiliptables.New(execer, protocol)
proxierIptables, err := iptables.NewProxier(ipt, execer, s.SyncPeriod, s.MasqueradeAll)
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
@ -198,8 +202,6 @@ func (s *ProxyServer) Run(_ []string) error {
// set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer
execer := exec.New()
ipt := utiliptables.New(execer, protocol)
proxierUserspace, err := userspace.NewProxier(loadBalancer, s.BindAddress, ipt, s.PortRange, s.SyncPeriod)
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)

View File

@ -121,6 +121,12 @@ func (fake *fakeIptables) RestoreAll(data []byte, flush iptables.FlushFlag, coun
return nil
}
func (fake *fakeIptables) AddReloadFunc(reloadFunc func()) {
}
func (fake *fakeIptables) Destroy() {
}
var tcpServerPort int
var udpServerPort int

View File

@ -25,7 +25,9 @@ import (
"sync"
"github.com/coreos/go-semver/semver"
godbus "github.com/godbus/dbus"
"github.com/golang/glog"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -64,6 +66,10 @@ type Interface interface {
Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// RestoreAll is the same as Restore except that no table is specified.
RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// AddReloadFunc adds a function to call on iptables reload
AddReloadFunc(reloadFunc func())
// Destroy cleans up resources used by the Interface
Destroy()
}
type Protocol byte
@ -118,24 +124,66 @@ const MinWait2Version = "1.4.22"
type runner struct {
mu sync.Mutex
exec utilexec.Interface
dbus utildbus.Interface
protocol Protocol
hasCheck bool
waitFlag []string
reloadFuncs []func()
signal chan *godbus.Signal
}
// New returns a new Interface which will exec iptables.
func New(exec utilexec.Interface, protocol Protocol) Interface {
func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface {
vstring, err := GetIptablesVersionString(exec)
if err != nil {
glog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err)
vstring = MinCheckVersion
}
return &runner{
runner := &runner{
exec: exec,
dbus: dbus,
protocol: protocol,
hasCheck: getIptablesHasCheckCommand(vstring),
waitFlag: getIptablesWaitFlag(vstring),
}
runner.connectToFirewallD()
return runner
}
// Destroy is part of Interface.
func (runner *runner) Destroy() {
if runner.signal != nil {
runner.signal <- nil
}
}
const (
firewalldName = "org.fedoraproject.FirewallD1"
firewalldPath = "/org/fedoraproject/FirewallD1"
firewalldInterface = "org.fedoraproject.FirewallD1"
)
// Connects to D-Bus and listens for FirewallD start/restart. (On non-FirewallD-using
// systems, this is effectively a no-op; we listen for the signals, but they will never be
// emitted, so reload() will never be called.)
func (runner *runner) connectToFirewallD() {
bus, err := runner.dbus.SystemBus()
if err != nil {
glog.V(1).Info("Could not connect to D-Bus system bus: %s", err)
return
}
rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface)
bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName)
bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
runner.signal = make(chan *godbus.Signal, 10)
bus.Signal(runner.signal)
go runner.dbusSignalHandler(bus)
}
// EnsureChain is part of Interface.
@ -472,3 +520,50 @@ func GetIptablesVersionString(exec utilexec.Interface) (string, error) {
}
return match[1], nil
}
// goroutine to listen for D-Bus signals
func (runner *runner) dbusSignalHandler(bus utildbus.Connection) {
firewalld := bus.Object(firewalldName, firewalldPath)
for s := range runner.signal {
if s == nil {
// Unregister
bus.Signal(runner.signal)
return
}
switch s.Name {
case "org.freedesktop.DBus.NameOwnerChanged":
name := s.Body[0].(string)
new_owner := s.Body[2].(string)
if name != firewalldName || len(new_owner) == 0 {
continue
}
// FirewallD startup (specifically the part where it deletes
// all existing iptables rules) may not yet be complete when
// we get this signal, so make a dummy request to it to
// synchronize.
firewalld.Call(firewalldInterface+".getDefaultZone", 0)
runner.reload()
case firewalldInterface + ".Reloaded":
runner.reload()
}
}
}
// AddReloadFunc is part of Interface
func (runner *runner) AddReloadFunc(reloadFunc func()) {
runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc)
}
// runs all reload funcs to re-sync iptables rules
func (runner *runner) reload() {
glog.V(1).Infof("reloading iptables rules")
for _, f := range runner.reloadFuncs {
f()
}
}

View File

@ -19,7 +19,9 @@ package iptables
import (
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/util/dbus"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -55,7 +57,8 @@ func testEnsureChain(t *testing.T, protocol Protocol) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, protocol)
runner := New(&fexec, dbus.NewFake(nil, nil), protocol)
defer runner.Destroy()
// Success.
exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR"))
if err != nil {
@ -112,7 +115,8 @@ func TestFlushChain(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
// Success.
err := runner.FlushChain(TableNAT, Chain("FOOBAR"))
if err != nil {
@ -149,7 +153,8 @@ func TestDeleteChain(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
// Success.
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
@ -185,7 +190,8 @@ func TestEnsureRuleAlreadyExists(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -221,7 +227,8 @@ func TestEnsureRuleNew(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -254,7 +261,8 @@ func TestEnsureRuleErrorChecking(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
_, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -284,7 +292,8 @@ func TestEnsureRuleErrorCreating(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
_, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -311,7 +320,8 @@ func TestDeleteRuleAlreadyExists(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -344,7 +354,8 @@ func TestDeleteRuleNew(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -374,7 +385,8 @@ func TestDeleteRuleErrorChecking(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -404,7 +416,8 @@ func TestDeleteRuleErrorCreating(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -565,7 +578,8 @@ func TestWaitFlagUnavailable(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
@ -593,7 +607,8 @@ func TestWaitFlagOld(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
@ -624,7 +639,8 @@ func TestWaitFlagNew(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
@ -639,3 +655,114 @@ func TestWaitFlagNew(t *testing.T) {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1])
}
}
func TestReload(t *testing.T) {
dbusConn := dbus.NewFakeConnection()
dbusConn.SetBusObject(func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil })
dbusConn.AddObject(firewalldName, firewalldPath, func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil })
fdbus := dbus.NewFake(dbusConn, nil)
reloaded := make(chan bool, 2)
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
// iptables version check
func() ([]byte, error) { return []byte("iptables v1.4.22"), nil },
// first reload
// EnsureChain
func() ([]byte, error) { return []byte{}, nil },
// EnsureRule abc check
func() ([]byte, error) { return []byte{}, &exec.FakeExitError{1} },
// EnsureRule abc
func() ([]byte, error) { return []byte{}, nil },
// second reload
// EnsureChain
func() ([]byte, error) { return []byte{}, nil },
// EnsureRule abc check
func() ([]byte, error) { return []byte{}, &exec.FakeExitError{1} },
// EnsureRule abc
func() ([]byte, error) { return []byte{}, nil },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, fdbus, ProtocolIpv4)
defer runner.Destroy()
runner.AddReloadFunc(func() {
exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
}
if exists {
t.Errorf("expected exists = false")
}
reloaded <- true
})
runner.AddReloadFunc(func() {
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
}
if exists {
t.Errorf("expected exists = false")
}
reloaded <- true
})
dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", firewalldName, "", ":1.1")
<-reloaded
<-reloaded
if fcmd.CombinedOutputCalls != 4 {
t.Errorf("expected 4 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1])
}
if !sets.NewString(fcmd.CombinedOutputLog[2]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[2])
}
if !sets.NewString(fcmd.CombinedOutputLog[3]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[3])
}
go func() { time.Sleep(time.Second / 100); reloaded <- true }()
dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "DefaultZoneChanged", "public")
dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", "io.k8s.Something", "", ":1.1")
<-reloaded
if fcmd.CombinedOutputCalls != 4 {
t.Errorf("Incorrect signal caused a reload")
}
dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "Reloaded")
<-reloaded
<-reloaded
if fcmd.CombinedOutputCalls != 7 {
t.Errorf("expected 7 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[4]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[4])
}
if !sets.NewString(fcmd.CombinedOutputLog[5]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[5])
}
if !sets.NewString(fcmd.CombinedOutputLog[6]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[6])
}
}