Add e2e test for Source IP preservation (pod to service cluster IP)

pull/6/head
MrHohn 2016-08-16 21:07:17 -07:00
parent 09e3fb355b
commit f20effb5e1
1 changed files with 180 additions and 13 deletions

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -228,6 +228,58 @@ var _ = framework.KubeDescribe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
})
// TODO: verify source IP preservation for LoadBalancer type services when applicable
It("should preserve source pod IP for traffic thru service cluster IP", func() {
serviceName := "sourceip-test"
ns := f.Namespace.Name
By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
jig := NewServiceTestJig(c, serviceName)
servicePort := 8080
tcpService := jig.CreateTCPServiceWithPort(ns, nil, int32(servicePort))
jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP)
defer func() {
framework.Logf("Cleaning up the sourceip test service")
err := c.Services(ns).Delete(serviceName)
Expect(err).NotTo(HaveOccurred())
}()
serviceIp := tcpService.Spec.ClusterIP
framework.Logf("sourceip-test cluster ip: %s", serviceIp)
By("Picking multiple nodes")
nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
if len(nodes.Items) == 1 {
framework.Skipf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider)
}
node1 := nodes.Items[0]
node2 := nodes.Items[1]
By("Creating a webserver pod be part of the TCP service which echoes back source ip")
serverPodName := "echoserver-sourceip"
jig.launchEchoserverPodOnNode(f, node1.Name, serverPodName)
defer func() {
framework.Logf("Cleaning up the echo server pod")
err := c.Pods(ns).Delete(serverPodName, nil)
Expect(err).NotTo(HaveOccurred())
}()
// Waiting for service to expose endpoint
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{serverPodName: {servicePort}})
By("Retrieve sourceip from a pod on the same node")
sourceIp1, execPodIp1 := execSourceipTest(f, c, ns, node1.Name, serviceIp, servicePort)
By("Verifying the preserved source ip")
Expect(sourceIp1).To(Equal(execPodIp1))
By("Retrieve sourceip from a pod on a different node")
sourceIp2, execPodIp2 := execSourceipTest(f, c, ns, node2.Name, serviceIp, servicePort)
By("Verifying the preserved source ip")
Expect(sourceIp2).To(Equal(execPodIp2))
})
It("should be able to up and down services", func() {
// TODO: use the ServiceTestJig here
// this test uses framework.NodeSSHHosts that does not work if a Node only reports LegacyHostIP
@ -1232,11 +1284,8 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex
framework.Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, framework.ServiceStartTimeout)
}
// createExecPodOrFail creates a simple busybox pod in a sleep loop used as a
// vessel for kubectl exec commands.
// Returns the name of the created pod.
func createExecPodOrFail(c *client.Client, ns, generateName string) string {
framework.Logf("Creating new exec pod")
// newExecPodSpec returns the pod spec of exec pod
func newExecPodSpec(ns, generateName string) *api.Pod {
immediate := int64(0)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -1254,10 +1303,38 @@ func createExecPodOrFail(c *client.Client, ns, generateName string) string {
},
},
}
created, err := c.Pods(ns).Create(pod)
return pod
}
// createExecPodOrFail creates a simple busybox pod in a sleep loop used as a
// vessel for kubectl exec commands.
// Returns the name of the created pod.
func createExecPodOrFail(client *client.Client, ns, generateName string) string {
framework.Logf("Creating new exec pod")
execPod := newExecPodSpec(ns, generateName)
created, err := client.Pods(ns).Create(execPod)
Expect(err).NotTo(HaveOccurred())
err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) {
retrievedPod, err := c.Pods(pod.Namespace).Get(created.Name)
retrievedPod, err := client.Pods(execPod.Namespace).Get(created.Name)
if err != nil {
return false, nil
}
return retrievedPod.Status.Phase == api.PodRunning, nil
})
Expect(err).NotTo(HaveOccurred())
return created.Name
}
// createExecPodOnNode launches a exec pod in the given namespace and node
// waits until it's Running, created pod name would be returned
func createExecPodOnNode(client *client.Client, ns, nodeName, generateName string) string {
framework.Logf("Creating exec pod %q in namespace %q", generateName, ns)
execPod := newExecPodSpec(ns, generateName)
execPod.Spec.NodeName = nodeName
created, err := client.Pods(ns).Create(execPod)
Expect(err).NotTo(HaveOccurred())
err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) {
retrievedPod, err := client.Pods(execPod.Namespace).Get(created.Name)
if err != nil {
return false, nil
}
@ -1712,8 +1789,8 @@ func NewServiceTestJig(client *client.Client, name string) *ServiceTestJig {
// newServiceTemplate returns the default api.Service template for this jig, but
// does not actually create the Service. The default Service has the same name
// as the jig and exposes port 80.
func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol) *api.Service {
// as the jig and exposes the given port.
func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol, port int32) *api.Service {
service := &api.Service{
ObjectMeta: api.ObjectMeta{
Namespace: namespace,
@ -1725,7 +1802,7 @@ func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol
Ports: []api.ServicePort{
{
Protocol: proto,
Port: 80,
Port: port,
},
},
},
@ -1733,11 +1810,26 @@ func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol
return service
}
// CreateTCPServiceWithPort creates a new TCP Service with given port based on the
// jig's defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *api.Service), port int32) *api.Service {
svc := j.newServiceTemplate(namespace, api.ProtocolTCP, port)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.Services(namespace).Create(svc)
if err != nil {
framework.Failf("Failed to create TCP Service %q: %v", svc.Name, err)
}
return result
}
// CreateTCPServiceOrFail creates a new TCP Service based on the jig's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
svc := j.newServiceTemplate(namespace, api.ProtocolTCP)
svc := j.newServiceTemplate(namespace, api.ProtocolTCP, 80)
if tweak != nil {
tweak(svc)
}
@ -1752,7 +1844,7 @@ func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
svc := j.newServiceTemplate(namespace, api.ProtocolUDP)
svc := j.newServiceTemplate(namespace, api.ProtocolUDP, 80)
if tweak != nil {
tweak(svc)
}
@ -2181,3 +2273,78 @@ func (t *ServiceTestFixture) Cleanup() []error {
return errs
}
// newEchoServerPodSpec returns the pod spec of echo server pod
func newEchoServerPodSpec(podName string) *api.Pod {
port := 8080
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "echoserver",
Image: "gcr.io/google_containers/echoserver:1.4",
Ports: []api.ContainerPort{{ContainerPort: int32(port)}},
},
},
RestartPolicy: api.RestartPolicyNever,
},
}
return pod
}
// launchEchoserverPodOnNode launches a pod serving http on port 8080 to act
// as the target for source IP preservation test. The client's source ip would
// be echoed back by the web server.
func (j *ServiceTestJig) launchEchoserverPodOnNode(f *framework.Framework, nodeName, podName string) {
framework.Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name)
pod := newEchoServerPodSpec(podName)
pod.Spec.NodeName = nodeName
pod.ObjectMeta.Labels = j.Labels
podClient := f.Client.Pods(f.Namespace.Name)
_, err := podClient.Create(pod)
framework.ExpectNoError(err)
framework.ExpectNoError(f.WaitForPodRunning(podName))
framework.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
}
func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, serviceIp string, servicePort int) (string, string) {
framework.Logf("Creating an exec pod on the same node")
execPodName := createExecPodOnNode(f.Client, ns, nodeName, fmt.Sprintf("execpod-sourceip-%s", nodeName))
defer func() {
framework.Logf("Cleaning up the exec pod")
err := c.Pods(ns).Delete(execPodName, nil)
Expect(err).NotTo(HaveOccurred())
}()
podClient := f.Client.Pods(ns)
execPod, err := podClient.Get(execPodName)
ExpectNoError(err)
execPodIp := execPod.Status.PodIP
framework.Logf("Exec pod ip: %s", execPodIp)
framework.Logf("Getting echo response from service")
var stdout string
timeout := 2 * time.Minute
framework.Logf("Waiting up to %v for sourceIp test to be executed", timeout)
cmd := fmt.Sprintf(`wget -T 30 -qO- %s:%d | grep client_address`, serviceIp, servicePort)
// need timeout mechanism because it may takes more times for iptables to be populated
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2) {
stdout, err = framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
if err != nil {
framework.Logf("got err: %v, retry until timeout", err)
continue
}
break
}
ExpectNoError(err)
// the stdout return from RunHostCmd seems to come with "\n", so TrimSpace is needed
// desired stdout in this format: client_address=x.x.x.x
outputs := strings.Split(strings.TrimSpace(stdout), "=")
sourceIp := outputs[1]
return execPodIp, sourceIp
}