From 4fa6f3841ad362844c3c1e04c8657cacad5decde Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Fri, 4 Mar 2016 17:01:25 -0800 Subject: [PATCH 1/2] fixing dead endpoint black hole udp traffic --- build/common.sh | 8 ++-- build/debian-iptables/Dockerfile | 1 + build/debian-iptables/Makefile | 2 +- pkg/proxy/iptables/proxier.go | 79 ++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 5 deletions(-) diff --git a/build/common.sh b/build/common.sh index be757325a9..17b91ccaa2 100755 --- a/build/common.sh +++ b/build/common.sh @@ -103,28 +103,28 @@ kube::build::get_docker_wrapped_binaries() { kube-apiserver,busybox kube-controller-manager,busybox kube-scheduler,busybox - kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v2 + kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v3 );; "arm") local targets=( kube-apiserver,armel/busybox kube-controller-manager,armel/busybox kube-scheduler,armel/busybox - kube-proxy,gcr.io/google_containers/debian-iptables-arm:v2 + kube-proxy,gcr.io/google_containers/debian-iptables-arm:v3 );; "arm64") local targets=( kube-apiserver,aarch64/busybox kube-controller-manager,aarch64/busybox kube-scheduler,aarch64/busybox - kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v2 + kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v3 );; "ppc64le") local targets=( kube-apiserver,ppc64le/busybox kube-controller-manager,ppc64le/busybox kube-scheduler,ppc64le/busybox - kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v2 + kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v3 );; esac diff --git a/build/debian-iptables/Dockerfile b/build/debian-iptables/Dockerfile index 80fa3ecc44..2d5c9d8c47 100644 --- a/build/debian-iptables/Dockerfile +++ b/build/debian-iptables/Dockerfile @@ -22,4 +22,5 @@ CROSS_BUILD_COPY qemu-ARCH-static /usr/bin/ # cleanup has no effect. RUN DEBIAN_FRONTEND=noninteractive apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -y iptables \ + && DEBIAN_FRONTEND=noninteractive apt-get install -y conntrack \ && rm -rf /var/lib/apt/lists/* diff --git a/build/debian-iptables/Makefile b/build/debian-iptables/Makefile index d1a4e0d873..79e1deccf0 100644 --- a/build/debian-iptables/Makefile +++ b/build/debian-iptables/Makefile @@ -16,7 +16,7 @@ REGISTRY?="gcr.io/google_containers" IMAGE=debian-iptables -TAG=v2 +TAG=v3 ARCH?=amd64 TEMP_DIR:=$(shell mktemp -d) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 2e5db0461f..7367142094 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/types" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/slice" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" ) @@ -70,6 +71,8 @@ const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" // TODO(thockin): Remove this for v1.3 or v1.4. const oldIptablesMasqueradeMark = "0x4d415351" +const noConnectionToDelete = "0 flow entries have been deleted" + // IptablesVersioner can query the current iptables version. type IptablesVersioner interface { // returns "X.Y.Z" @@ -160,6 +163,7 @@ type Proxier struct { iptables utiliptables.Interface masqueradeAll bool masqueradeMark string + exec utilexec.Interface } type localPort struct { @@ -220,6 +224,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, + exec: exec, }, nil } @@ -434,15 +439,21 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { } } + staleUDPService := sets.NewString() // Remove services missing from the update. for name := range proxier.serviceMap { if !activeServices[name] { glog.V(1).Infof("Removing service %q", name) + if proxier.serviceMap[name].protocol == api.ProtocolUDP { + staleUDPService.Insert(proxier.serviceMap[name].clusterIP.String()) + } delete(proxier.serviceMap, name) } } proxier.syncProxyRules() + proxier.deleteServiceConnection(staleUDPService.List()) + } // OnEndpointsUpdate takes in a slice of updated endpoints. @@ -457,6 +468,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.haveReceivedEndpointsUpdate = true activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set + staleConnections := make(map[endpointServicePair]bool) // Update endpoints for services. for i := range allEndpoints { @@ -480,7 +492,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} curEndpoints := proxier.endpointsMap[svcPort] newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + removedEndpoints := getRemovedEndpoints(curEndpoints, newEndpoints) + for _, ep := range removedEndpoints { + staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true + } glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) proxier.endpointsMap[svcPort] = newEndpoints } @@ -491,12 +508,18 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Remove endpoints missing from the update. for name := range proxier.endpointsMap { if !activeEndpoints[name] { + // record endpoints of unactive service to stale connections + for _, ep := range proxier.endpointsMap[name] { + staleConnections[endpointServicePair{endpoint: ep, servicePortName: name}] = true + } + glog.V(2).Infof("Removing endpoints for %q", name) delete(proxier.endpointsMap, name) } } proxier.syncProxyRules() + proxier.deleteEndpointConnection(staleConnections) } // used in OnEndpointsUpdate @@ -552,6 +575,62 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) } +// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints +func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string { + return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List() +} + +type endpointServicePair struct { + endpoint string + servicePortName proxy.ServicePortName +} + +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). +// This assumes the proxier mutex is held +func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServicePair]bool) { + for epSvcPair := range connectionMap { + if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { + endpointIP := strings.Split(epSvcPair.endpoint, ":")[0] + glog.V(2).Infof("Deleting connection to service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) + err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby sit all udp connections to kubernetes services. + glog.Errorf("conntrack return with error: %v", err) + } + } + } +} + +// deleteServiceConnection use conntrack-tool to delete udp connection specified by service ip +func (proxier *Proxier) deleteServiceConnection(svcIPs []string) { + for _, ip := range svcIPs { + glog.V(2).Infof("Deleting udp connection to service IP %s", ip) + err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby sit all udp connections to kubernetes services. + glog.Errorf("conntrack return with error: %v", err) + } + } +} + +//execConntrackTool executes conntrack tool using given paramters +func (proxier *Proxier) execConntrackTool(parameters ...string) error { + conntrackPath, err := proxier.exec.LookPath("conntrack") + if err != nil { + return fmt.Errorf("Error looking for path of conntrack: %v", err) + } + output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput() + if err != nil { + return fmt.Errorf("Conntrack command returns: %s, error message: %s", string(output), err) + } + return nil +} + // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held From ad8c67723a62664ac667555acaed76421615e6e6 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Tue, 12 Apr 2016 17:45:39 -0700 Subject: [PATCH 2/2] add test for udp connection flush --- pkg/proxy/iptables/proxier.go | 28 ++-- pkg/proxy/iptables/proxier_test.go | 218 +++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 14 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 7367142094..1a1196e3d6 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -71,8 +71,6 @@ const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" // TODO(thockin): Remove this for v1.3 or v1.4. const oldIptablesMasqueradeMark = "0x4d415351" -const noConnectionToDelete = "0 flow entries have been deleted" - // IptablesVersioner can query the current iptables version. type IptablesVersioner interface { // returns "X.Y.Z" @@ -439,20 +437,20 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { } } - staleUDPService := sets.NewString() + staleUDPServices := sets.NewString() // Remove services missing from the update. for name := range proxier.serviceMap { if !activeServices[name] { glog.V(1).Infof("Removing service %q", name) if proxier.serviceMap[name].protocol == api.ProtocolUDP { - staleUDPService.Insert(proxier.serviceMap[name].clusterIP.String()) + staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String()) } delete(proxier.serviceMap, name) } } proxier.syncProxyRules() - proxier.deleteServiceConnection(staleUDPService.List()) + proxier.deleteServiceConnections(staleUDPServices.List()) } @@ -519,7 +517,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { } proxier.syncProxyRules() - proxier.deleteEndpointConnection(staleConnections) + proxier.deleteEndpointConnections(staleConnections) } // used in OnEndpointsUpdate @@ -585,17 +583,19 @@ type endpointServicePair struct { servicePortName proxy.ServicePortName } +const noConnectionToDelete = "0 flow entries have been deleted" + // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held -func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServicePair]bool) { +func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { for epSvcPair := range connectionMap { if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { endpointIP := strings.Split(epSvcPair.endpoint, ":")[0] - glog.V(2).Infof("Deleting connection to service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) + glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services. glog.Errorf("conntrack return with error: %v", err) @@ -604,13 +604,13 @@ func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServi } } -// deleteServiceConnection use conntrack-tool to delete udp connection specified by service ip -func (proxier *Proxier) deleteServiceConnection(svcIPs []string) { +// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip +func (proxier *Proxier) deleteServiceConnections(svcIPs []string) { for _, ip := range svcIPs { - glog.V(2).Infof("Deleting udp connection to service IP %s", ip) + glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp") if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services. glog.Errorf("conntrack return with error: %v", err) @@ -626,7 +626,7 @@ func (proxier *Proxier) execConntrackTool(parameters ...string) error { } output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput() if err != nil { - return fmt.Errorf("Conntrack command returns: %s, error message: %s", string(output), err) + return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err) } return nil } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 2d3a9c8582..c961e1f041 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -19,7 +19,14 @@ package iptables import ( "testing" + "fmt" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" + "net" + "strings" ) func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) { @@ -156,4 +163,215 @@ func TestGetChainLinesMultipleTables(t *testing.T) { checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) } +func TestGetRemovedEndpoints(t *testing.T) { + testCases := []struct { + currentEndpoints []string + newEndpoints []string + removedEndpoints []string + }{ + { + currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, + newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, + removedEndpoints: []string{}, + }, + { + currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.3:80"}, + newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, + removedEndpoints: []string{"10.0.2.3:80"}, + }, + { + currentEndpoints: []string{}, + newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, + removedEndpoints: []string{}, + }, + { + currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, + newEndpoints: []string{}, + removedEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, + }, + { + currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.2:443"}, + newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, + removedEndpoints: []string{"10.0.2.2:443"}, + }, + } + + for i := range testCases { + res := getRemovedEndpoints(testCases[i].currentEndpoints, testCases[i].newEndpoints) + if !slicesEquiv(res, testCases[i].removedEndpoints) { + t.Errorf("Expected: %v, but getRemovedEndpoints returned: %v", testCases[i].removedEndpoints, res) + } + } +} + +func TestExecConntrackTool(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + fakeProxier := Proxier{exec: &fexec} + + testCases := [][]string{ + {"-L", "-p", "udp"}, + {"-D", "-p", "udp", "-d", "10.0.240.1"}, + {"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"}, + } + + expectErr := []bool{false, false, true} + + for i := range testCases { + err := fakeProxier.execConntrackTool(testCases[i]...) + + if expectErr[i] { + if err == nil { + t.Errorf("expected err, got %v", err) + } + } else { + if err != nil { + t.Errorf("expected success, got %v", err) + } + } + + execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ") + expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " ")) + + if execCmd != expectCmd { + t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd) + } + } +} + +func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, protocol api.Protocol) *serviceInfo { + return &serviceInfo{ + sessionAffinityType: api.ServiceAffinityNone, // default + stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API. + clusterIP: ip, + protocol: protocol, + } +} + +func TestDeleteEndpointConnections(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + serviceMap := make(map[proxy.ServicePortName]*serviceInfo) + svc1 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, ""} + svc2 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc2"}, ""} + serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), api.ProtocolUDP) + serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), api.ProtocolTCP) + + fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap} + + testCases := []endpointServicePair{ + { + endpoint: "10.240.0.3:80", + servicePortName: svc1, + }, + { + endpoint: "10.240.0.4:80", + servicePortName: svc1, + }, + { + endpoint: "10.240.0.5:80", + servicePortName: svc2, + }, + } + + expectCommandExecCount := 0 + for i := range testCases { + input := map[endpointServicePair]bool{testCases[i]: true} + fakeProxier.deleteEndpointConnections(input) + svcInfo := fakeProxier.serviceMap[testCases[i].servicePortName] + if svcInfo.protocol == api.ProtocolUDP { + svcIp := svcInfo.clusterIP.String() + endpointIp := strings.Split(testCases[i].endpoint, ":")[0] + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", svcIp, endpointIp) + execCommand := strings.Join(fcmd.CombinedOutputLog[expectCommandExecCount], " ") + if expectCommand != execCommand { + t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) + } + expectCommandExecCount += 1 + } + + if expectCommandExecCount != fexec.CommandCalls { + t.Errorf("Exepect comand executed %d times, but got %d", expectCommandExecCount, fexec.CommandCalls) + } + } +} + +func TestDeleteServiceConnections(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + fakeProxier := Proxier{exec: &fexec} + + testCases := [][]string{ + { + "10.240.0.3", + "10.240.0.5", + }, + { + "10.240.0.4", + }, + } + + svcCount := 0 + for i := range testCases { + fakeProxier.deleteServiceConnections(testCases[i]) + for _, ip := range testCases[i] { + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) + execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") + if expectCommand != execCommand { + t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) + } + svcCount += 1 + } + if svcCount != fexec.CommandCalls { + t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + } + } +} + // TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces.