mirror of https://github.com/k3s-io/k3s
E2E tests for the Source IP Preservation for LoadBalancers
@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
@ -31,6 +32,7 @@ import (
. "github.com/onsi/gomega"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -966,6 +968,89 @@ var _ = framework.KubeDescribe("Services", func() {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
It("should be able to create services of type LoadBalancer and externalTraffic=localOnly [Slow][Feature:externalTrafficLocalOnly]", func() {
// requires cloud load-balancer support - this feature currently supported only on GCE/GKE
framework.SkipUnlessProviderIs("gce", "gke")
loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault
largeClusterMinNodesNumber := 100
if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber {
loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge
namespace := f.Namespace.Name
serviceName := "external-local"
jig := NewServiceTestJig(c, serviceName)
By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only")
svc := jig.CreateTCPServiceOrFail(namespace, func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = api.ServiceAffinityNone
svc.ObjectMeta.Annotations = map[string]string{
service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}}
By("creating a pod to be part of the service " + serviceName)
// This container is an nginx container listening on port 80
// See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response
jig.RunOrFail(namespace, nil)
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = jig.WaitForLoadBalancerOrFail(namespace, serviceName, loadBalancerCreateTimeout)
jig.SanityCheckService(svc, api.ServiceTypeLoadBalancer)
svcTcpPort := int(svc.Spec.Ports[0].Port)
framework.Logf("service port : %d", svcTcpPort)
tcpNodePort := int(svc.Spec.Ports[0].NodePort)
framework.Logf("TCP node port: %d", tcpNodePort)
ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
framework.Logf("TCP load balancer: %s", ingressIP)
healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc))
By("checking health check node port allocated")
if healthCheckNodePort == 0 {
framework.Failf("Service HealthCheck NodePort was not allocated")
nodeIP := pickNodeIP(jig.Client)
By("hitting the TCP service's NodePort on " + nodeIP + ":" + fmt.Sprintf("%d", tcpNodePort))
jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
By("hitting the TCP service's service port, via its external VIP " + ingressIP + ":" + fmt.Sprintf("%d", svcTcpPort))
jig.TestReachableHTTP(ingressIP, svcTcpPort, kubeProxyLagTimeout)
By("reading clientIP using the TCP service's NodePort")
content := jig.GetHTTPContent(nodeIP, tcpNodePort, kubeProxyLagTimeout, "/clientip")
clientIP := content.String()
framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP)
By("reading clientIP using the TCP service's service port via its external VIP")
content = jig.GetHTTPContent(ingressIP, svcTcpPort, kubeProxyLagTimeout, "/clientip")
clientIP = content.String()
framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP)
By("checking if Source IP is preserved")
if strings.HasPrefix(clientIP, "10.") {
framework.Failf("Source IP was NOT preserved")
By("finding nodes for all service endpoints")
endpoints, err := c.Endpoints(namespace).Get(serviceName)
if err != nil {
framework.Failf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
if len(endpoints.Subsets[0].Addresses) == 0 {
framework.Failf("Expected Ready endpoints - found none")
readyHostName := *endpoints.Subsets[0].Addresses[0].NodeName
framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, readyHostName)
// HealthCheck responder validation - iterate over all node IPs and check their HC responses
// Collect all node names and their public IPs - the nodes and ips slices parallel each other
nodes := framework.GetReadySchedulableNodesOrDie(jig.Client)
ips := collectAddresses(nodes, api.NodeExternalIP)
if len(ips) == 0 {
ips = collectAddresses(nodes, api.NodeLegacyHostIP)
By("checking kube-proxy health check responses are correct")
for n, publicIP := range ips {
framework.Logf("Checking health check response for node %s, public IP %s", nodes.Items[n].Name, publicIP)
// HealthCheck should pass only on the node where num(endpoints) > 0
// All other nodes should fail the healthcheck on the service healthCheckNodePort
expectedSuccess := nodes.Items[n].Name == readyHostName
jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, "/healthz", expectedSuccess)
// updateService fetches a service, calls the update function on it,
@ -1200,6 +1285,10 @@ func pickNodeIP(c *client.Client) string {
func testReachableHTTP(ip string, port int, request string, expect string) (bool, error) {
return testReachableHTTPWithContent(ip, port, request, expect, nil)
func testReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) {
url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
if ip == "" {
framework.Failf("Got empty IP for reachability check (%s)", url)
@ -1224,15 +1313,46 @@ func testReachableHTTP(ip string, port int, request string, expect string) (bool
return false, nil
if resp.StatusCode != 200 {
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body))
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s",
resp.Status, url, string(body))
if !strings.Contains(string(body), expect) {
return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body))
framework.Logf("Successfully reached %v", url)
if content != nil {
return true, nil
func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
if ip == "" || port == 0 {
framework.Failf("Got empty IP for reachability check (%s)", url)
return false, fmt.Errorf("Invalid input ip or port")
framework.Logf("Testing HTTP health check on %v", url)
resp, err := httpGetNoConnectionPool(url)
if err != nil {
framework.Logf("Got error testing for reachability of %s: %v", url, err)
return false, err
defer resp.Body.Close()
if err != nil {
framework.Logf("Got error reading response from %s: %v", url, err)
return false, err
// HealthCheck responder returns 503 for no local endpoints
if resp.StatusCode == 503 {
return false, nil
// HealthCheck responder returns 200 for non-zero local endpoints
if resp.StatusCode == 200 {
return true, nil
return false, fmt.Errorf("Unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
func testNotReachableHTTP(ip string, port int) (bool, error) {
url := fmt.Sprintf("http://%s:%d", ip, port)
if ip == "" {
@ -1759,6 +1879,29 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer
if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTPWithContent(host, port, url, "", &body) }); err != nil {
framework.Failf("Could not reach HTTP service through %v:%v/%v after %v: %v", host, port, url, timeout, err)
return body
return body
func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, expectedSuccess bool) {
success, err := testHTTPHealthCheckNodePort(host, port, request)
if expectedSuccess && success {
framework.Logf("HealthCheck successful for node %v:%v, as expected", host, port)
} else if !expectedSuccess && (!success || err != nil) {
framework.Logf("HealthCheck failed for node %v:%v, as expected", host, port)
} else if expectedSuccess {
framework.Failf("HealthCheck NodePort incorrectly reporting unhealthy on %v:%v: %v", host, port, err)
framework.Failf("Unexpected HealthCheck NodePort still reporting healthy %v:%v: %v", host, port, err)
func getIngressPoint(ing *api.LoadBalancerIngress) string {
host := ing.IP
if host == "" {
@ -1788,7 +1931,7 @@ func (j *ServiceTestJig) newRCTemplate(namespace string) *api.ReplicationControl
Containers: []api.Container{
Name: "netexec",
Image: "gcr.io/google_containers/netexec:1.4",
Image: "gcr.io/google_containers/netexec:1.6",
Args: []string{"--http-port=80", "--udp-port=80"},
ReadinessProbe: &api.Probe{
PeriodSeconds: 3,
Reference in New Issue