From f20effb5e15725115e45fe23ad95fed22e8406a8 Mon Sep 17 00:00:00 2001 From: MrHohn Date: Tue, 16 Aug 2016 21:07:17 -0700 Subject: [PATCH] Add e2e test for Source IP preservation (pod to service cluster IP) --- test/e2e/service.go | 193 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 180 insertions(+), 13 deletions(-) diff --git a/test/e2e/service.go b/test/e2e/service.go index 9c1d61037c..7ca80c2593 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -228,6 +228,58 @@ var _ = framework.KubeDescribe("Services", func() { validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) }) + // TODO: verify source IP preservation for LoadBalancer type services when applicable + It("should preserve source pod IP for traffic thru service cluster IP", func() { + + serviceName := "sourceip-test" + ns := f.Namespace.Name + + By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns) + jig := NewServiceTestJig(c, serviceName) + servicePort := 8080 + tcpService := jig.CreateTCPServiceWithPort(ns, nil, int32(servicePort)) + jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP) + defer func() { + framework.Logf("Cleaning up the sourceip test service") + err := c.Services(ns).Delete(serviceName) + Expect(err).NotTo(HaveOccurred()) + }() + serviceIp := tcpService.Spec.ClusterIP + framework.Logf("sourceip-test cluster ip: %s", serviceIp) + + By("Picking multiple nodes") + nodes := framework.GetReadySchedulableNodesOrDie(f.Client) + + if len(nodes.Items) == 1 { + framework.Skipf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider) + } + + node1 := nodes.Items[0] + node2 := nodes.Items[1] + + By("Creating a webserver pod be part of the TCP service which echoes back source ip") + serverPodName := "echoserver-sourceip" + jig.launchEchoserverPodOnNode(f, node1.Name, serverPodName) + defer func() { + framework.Logf("Cleaning up the echo server pod") + err := c.Pods(ns).Delete(serverPodName, nil) + Expect(err).NotTo(HaveOccurred()) + }() + + // Waiting for service to expose endpoint + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{serverPodName: {servicePort}}) + + By("Retrieve sourceip from a pod on the same node") + sourceIp1, execPodIp1 := execSourceipTest(f, c, ns, node1.Name, serviceIp, servicePort) + By("Verifying the preserved source ip") + Expect(sourceIp1).To(Equal(execPodIp1)) + + By("Retrieve sourceip from a pod on a different node") + sourceIp2, execPodIp2 := execSourceipTest(f, c, ns, node2.Name, serviceIp, servicePort) + By("Verifying the preserved source ip") + Expect(sourceIp2).To(Equal(execPodIp2)) + }) + It("should be able to up and down services", func() { // TODO: use the ServiceTestJig here // this test uses framework.NodeSSHHosts that does not work if a Node only reports LegacyHostIP @@ -1232,11 +1284,8 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex framework.Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, framework.ServiceStartTimeout) } -// createExecPodOrFail creates a simple busybox pod in a sleep loop used as a -// vessel for kubectl exec commands. -// Returns the name of the created pod. -func createExecPodOrFail(c *client.Client, ns, generateName string) string { - framework.Logf("Creating new exec pod") +// newExecPodSpec returns the pod spec of exec pod +func newExecPodSpec(ns, generateName string) *api.Pod { immediate := int64(0) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -1254,10 +1303,38 @@ func createExecPodOrFail(c *client.Client, ns, generateName string) string { }, }, } - created, err := c.Pods(ns).Create(pod) + return pod +} + +// createExecPodOrFail creates a simple busybox pod in a sleep loop used as a +// vessel for kubectl exec commands. +// Returns the name of the created pod. +func createExecPodOrFail(client *client.Client, ns, generateName string) string { + framework.Logf("Creating new exec pod") + execPod := newExecPodSpec(ns, generateName) + created, err := client.Pods(ns).Create(execPod) Expect(err).NotTo(HaveOccurred()) err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) { - retrievedPod, err := c.Pods(pod.Namespace).Get(created.Name) + retrievedPod, err := client.Pods(execPod.Namespace).Get(created.Name) + if err != nil { + return false, nil + } + return retrievedPod.Status.Phase == api.PodRunning, nil + }) + Expect(err).NotTo(HaveOccurred()) + return created.Name +} + +// createExecPodOnNode launches a exec pod in the given namespace and node +// waits until it's Running, created pod name would be returned +func createExecPodOnNode(client *client.Client, ns, nodeName, generateName string) string { + framework.Logf("Creating exec pod %q in namespace %q", generateName, ns) + execPod := newExecPodSpec(ns, generateName) + execPod.Spec.NodeName = nodeName + created, err := client.Pods(ns).Create(execPod) + Expect(err).NotTo(HaveOccurred()) + err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) { + retrievedPod, err := client.Pods(execPod.Namespace).Get(created.Name) if err != nil { return false, nil } @@ -1712,8 +1789,8 @@ func NewServiceTestJig(client *client.Client, name string) *ServiceTestJig { // newServiceTemplate returns the default api.Service template for this jig, but // does not actually create the Service. The default Service has the same name -// as the jig and exposes port 80. -func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol) *api.Service { +// as the jig and exposes the given port. +func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol, port int32) *api.Service { service := &api.Service{ ObjectMeta: api.ObjectMeta{ Namespace: namespace, @@ -1725,7 +1802,7 @@ func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol Ports: []api.ServicePort{ { Protocol: proto, - Port: 80, + Port: port, }, }, }, @@ -1733,11 +1810,26 @@ func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol return service } +// CreateTCPServiceWithPort creates a new TCP Service with given port based on the +// jig's defaults. Callers can provide a function to tweak the Service object before +// it is created. +func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *api.Service), port int32) *api.Service { + svc := j.newServiceTemplate(namespace, api.ProtocolTCP, port) + if tweak != nil { + tweak(svc) + } + result, err := j.Client.Services(namespace).Create(svc) + if err != nil { + framework.Failf("Failed to create TCP Service %q: %v", svc.Name, err) + } + return result +} + // CreateTCPServiceOrFail creates a new TCP Service based on the jig's // defaults. Callers can provide a function to tweak the Service object before // it is created. func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service { - svc := j.newServiceTemplate(namespace, api.ProtocolTCP) + svc := j.newServiceTemplate(namespace, api.ProtocolTCP, 80) if tweak != nil { tweak(svc) } @@ -1752,7 +1844,7 @@ func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc // defaults. Callers can provide a function to tweak the Service object before // it is created. func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service { - svc := j.newServiceTemplate(namespace, api.ProtocolUDP) + svc := j.newServiceTemplate(namespace, api.ProtocolUDP, 80) if tweak != nil { tweak(svc) } @@ -2181,3 +2273,78 @@ func (t *ServiceTestFixture) Cleanup() []error { return errs } + +// newEchoServerPodSpec returns the pod spec of echo server pod +func newEchoServerPodSpec(podName string) *api.Pod { + port := 8080 + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: podName, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "echoserver", + Image: "gcr.io/google_containers/echoserver:1.4", + Ports: []api.ContainerPort{{ContainerPort: int32(port)}}, + }, + }, + RestartPolicy: api.RestartPolicyNever, + }, + } + return pod +} + +// launchEchoserverPodOnNode launches a pod serving http on port 8080 to act +// as the target for source IP preservation test. The client's source ip would +// be echoed back by the web server. +func (j *ServiceTestJig) launchEchoserverPodOnNode(f *framework.Framework, nodeName, podName string) { + framework.Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name) + pod := newEchoServerPodSpec(podName) + pod.Spec.NodeName = nodeName + pod.ObjectMeta.Labels = j.Labels + podClient := f.Client.Pods(f.Namespace.Name) + _, err := podClient.Create(pod) + framework.ExpectNoError(err) + framework.ExpectNoError(f.WaitForPodRunning(podName)) + framework.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name) +} + +func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, serviceIp string, servicePort int) (string, string) { + framework.Logf("Creating an exec pod on the same node") + execPodName := createExecPodOnNode(f.Client, ns, nodeName, fmt.Sprintf("execpod-sourceip-%s", nodeName)) + defer func() { + framework.Logf("Cleaning up the exec pod") + err := c.Pods(ns).Delete(execPodName, nil) + Expect(err).NotTo(HaveOccurred()) + }() + podClient := f.Client.Pods(ns) + execPod, err := podClient.Get(execPodName) + ExpectNoError(err) + execPodIp := execPod.Status.PodIP + framework.Logf("Exec pod ip: %s", execPodIp) + + framework.Logf("Getting echo response from service") + var stdout string + timeout := 2 * time.Minute + framework.Logf("Waiting up to %v for sourceIp test to be executed", timeout) + cmd := fmt.Sprintf(`wget -T 30 -qO- %s:%d | grep client_address`, serviceIp, servicePort) + // need timeout mechanism because it may takes more times for iptables to be populated + for start := time.Now(); time.Since(start) < timeout; time.Sleep(2) { + stdout, err = framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + if err != nil { + framework.Logf("got err: %v, retry until timeout", err) + continue + } + break + } + + ExpectNoError(err) + + // the stdout return from RunHostCmd seems to come with "\n", so TrimSpace is needed + // desired stdout in this format: client_address=x.x.x.x + outputs := strings.Split(strings.TrimSpace(stdout), "=") + sourceIp := outputs[1] + + return execPodIp, sourceIp +}