mirror of https://github.com/k3s-io/k3s
E2e tests
parent
a46a849b9e
commit
eb235a8218
|
@ -65,6 +65,13 @@ const (
|
|||
//TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
|
||||
loadBalancerCreateTimeoutDefault = 20 * time.Minute
|
||||
loadBalancerCreateTimeoutLarge = time.Hour
|
||||
|
||||
largeClusterMinNodesNumber = 100
|
||||
|
||||
// Don't test with more than 3 nodes.
|
||||
// Many tests create an endpoint per node, in large clusters, this is
|
||||
// resource and time intensive.
|
||||
maxNodesForEndpointsTests = 3
|
||||
)
|
||||
|
||||
// This should match whatever the default/configured range is
|
||||
|
@ -1073,79 +1080,293 @@ var _ = framework.KubeDescribe("Services", func() {
|
|||
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
It("should be able to create services of type LoadBalancer and externalTraffic=localOnly [Slow][Feature:ExternalTrafficLocalOnly]", func() {
|
||||
var _ = framework.KubeDescribe("ESIPP [Slow][Feature:ExternalTrafficLocalOnly]", func() {
|
||||
f := framework.NewDefaultFramework("esipp")
|
||||
loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault
|
||||
|
||||
var c *client.Client
|
||||
var cs clientset.Interface
|
||||
|
||||
BeforeEach(func() {
|
||||
// requires cloud load-balancer support - this feature currently supported only on GCE/GKE
|
||||
framework.SkipUnlessProviderIs("gce", "gke")
|
||||
loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault
|
||||
|
||||
largeClusterMinNodesNumber := 100
|
||||
if nodes := framework.GetReadySchedulableNodesOrDie(cs); len(nodes.Items) > largeClusterMinNodesNumber {
|
||||
c = f.Client
|
||||
cs = f.ClientSet
|
||||
if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber {
|
||||
loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge
|
||||
}
|
||||
})
|
||||
|
||||
It("should work for type=LoadBalancer [Slow][Feature:ExternalTrafficLocalOnly]", func() {
|
||||
namespace := f.Namespace.Name
|
||||
serviceName := "external-local"
|
||||
jig := NewServiceTestJig(c, cs, serviceName)
|
||||
By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only")
|
||||
svc := jig.CreateTCPServiceOrFail(namespace, func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
||||
// We need to turn affinity off for our LB distribution tests
|
||||
svc.Spec.SessionAffinity = api.ServiceAffinityNone
|
||||
svc.ObjectMeta.Annotations = map[string]string{
|
||||
service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
|
||||
svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}}
|
||||
})
|
||||
By("creating a pod to be part of the service " + serviceName)
|
||||
// This container is an nginx container listening on port 80
|
||||
// See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response
|
||||
jig.RunOrFail(namespace, nil)
|
||||
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
|
||||
svc = jig.WaitForLoadBalancerOrFail(namespace, serviceName, loadBalancerCreateTimeout)
|
||||
jig.SanityCheckService(svc, api.ServiceTypeLoadBalancer)
|
||||
svcTcpPort := int(svc.Spec.Ports[0].Port)
|
||||
framework.Logf("service port : %d", svcTcpPort)
|
||||
ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
|
||||
framework.Logf("TCP load balancer: %s", ingressIP)
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true)
|
||||
healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc))
|
||||
By("checking health check node port allocated")
|
||||
if healthCheckNodePort == 0 {
|
||||
framework.Failf("Service HealthCheck NodePort was not allocated")
|
||||
}
|
||||
// TODO(33957): test localOnly nodePort Services.
|
||||
By("hitting the TCP service's service port, via its external VIP " + ingressIP + ":" + fmt.Sprintf("%d", svcTcpPort))
|
||||
jig.TestReachableHTTP(ingressIP, svcTcpPort, kubeProxyLagTimeout)
|
||||
defer func() {
|
||||
jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout)
|
||||
|
||||
// Make sure we didn't leak the health check node port.
|
||||
for name, ips := range jig.getEndpointNodes(svc) {
|
||||
_, fail, status := jig.TestHTTPHealthCheckNodePort(ips[0], healthCheckNodePort, "/healthz", 5)
|
||||
if fail < 2 {
|
||||
framework.Failf("Health check node port %v not released on node %v: %v", healthCheckNodePort, name, status)
|
||||
}
|
||||
break
|
||||
}
|
||||
Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
svcTCPPort := int(svc.Spec.Ports[0].Port)
|
||||
ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
|
||||
|
||||
By("reading clientIP using the TCP service's service port via its external VIP")
|
||||
content := jig.GetHTTPContent(ingressIP, svcTcpPort, kubeProxyLagTimeout, "/clientip")
|
||||
content := jig.GetHTTPContent(ingressIP, svcTCPPort, kubeProxyLagTimeout, "/clientip")
|
||||
clientIP := content.String()
|
||||
framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP)
|
||||
|
||||
By("checking if Source IP is preserved")
|
||||
if strings.HasPrefix(clientIP, "10.") {
|
||||
framework.Failf("Source IP was NOT preserved")
|
||||
}
|
||||
By("finding nodes for all service endpoints")
|
||||
endpoints, err := c.Endpoints(namespace).Get(serviceName)
|
||||
if err != nil {
|
||||
framework.Failf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
|
||||
})
|
||||
|
||||
It("should work for type=NodePort [Slow][Feature:ExternalTrafficLocalOnly]", func() {
|
||||
namespace := f.Namespace.Name
|
||||
serviceName := "external-local"
|
||||
jig := NewServiceTestJig(c, cs, serviceName)
|
||||
|
||||
svc := jig.createOnlyLocalNodePortService(namespace, serviceName, true)
|
||||
defer func() {
|
||||
Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
tcpNodePort := int(svc.Spec.Ports[0].NodePort)
|
||||
endpointsNodeMap := jig.getEndpointNodes(svc)
|
||||
path := "/clientip"
|
||||
|
||||
for nodeName, nodeIPs := range endpointsNodeMap {
|
||||
nodeIP := nodeIPs[0]
|
||||
By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v%v%v", nodeName, nodeIP, tcpNodePort, path))
|
||||
content := jig.GetHTTPContent(nodeIP, tcpNodePort, kubeProxyLagTimeout, path)
|
||||
clientIP := content.String()
|
||||
framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP)
|
||||
if strings.HasPrefix(clientIP, "10.") {
|
||||
framework.Failf("Source IP was NOT preserved")
|
||||
}
|
||||
}
|
||||
if len(endpoints.Subsets[0].Addresses) == 0 {
|
||||
framework.Failf("Expected Ready endpoints - found none")
|
||||
})
|
||||
|
||||
It("should only target nodes with endpoints [Slow][Feature:ExternalTrafficLocalOnly]", func() {
|
||||
namespace := f.Namespace.Name
|
||||
serviceName := "external-local"
|
||||
jig := NewServiceTestJig(c, cs, serviceName)
|
||||
nodes := jig.getNodes(maxNodesForEndpointsTests)
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false)
|
||||
defer func() {
|
||||
jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout)
|
||||
Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc))
|
||||
if healthCheckNodePort == 0 {
|
||||
framework.Failf("Service HealthCheck NodePort was not allocated")
|
||||
}
|
||||
readyHostName := *endpoints.Subsets[0].Addresses[0].NodeName
|
||||
framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, readyHostName)
|
||||
// HealthCheck responder validation - iterate over all node IPs and check their HC responses
|
||||
// Collect all node names and their public IPs - the nodes and ips slices parallel each other
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(jig.ClientSet)
|
||||
|
||||
ips := collectAddresses(nodes, api.NodeExternalIP)
|
||||
if len(ips) == 0 {
|
||||
ips = collectAddresses(nodes, api.NodeLegacyHostIP)
|
||||
}
|
||||
By("checking kube-proxy health check responses are correct")
|
||||
for n, publicIP := range ips {
|
||||
framework.Logf("Checking health check response for node %s, public IP %s", nodes.Items[n].Name, publicIP)
|
||||
|
||||
ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
|
||||
svcTCPPort := int(svc.Spec.Ports[0].Port)
|
||||
|
||||
threshold := 2
|
||||
path := "/healthz"
|
||||
for i := 0; i < len(nodes.Items); i++ {
|
||||
endpointNodeName := nodes.Items[i].Name
|
||||
|
||||
By("creating a pod to be part of the service " + serviceName + " on node " + endpointNodeName)
|
||||
jig.RunOrFail(namespace, func(rc *api.ReplicationController) {
|
||||
rc.Name = serviceName
|
||||
if endpointNodeName != "" {
|
||||
rc.Spec.Template.Spec.NodeName = endpointNodeName
|
||||
}
|
||||
})
|
||||
|
||||
By(fmt.Sprintf("waiting for service endpoint on node %v", endpointNodeName))
|
||||
jig.waitForEndpointOnNode(namespace, serviceName, endpointNodeName)
|
||||
|
||||
// HealthCheck should pass only on the node where num(endpoints) > 0
|
||||
// All other nodes should fail the healthcheck on the service healthCheckNodePort
|
||||
expectedSuccess := nodes.Items[n].Name == readyHostName
|
||||
jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, "/healthz", expectedSuccess)
|
||||
for n, publicIP := range ips {
|
||||
expectedSuccess := nodes.Items[n].Name == endpointNodeName
|
||||
framework.Logf("Health checking %s, http://%s:%d/%s, expectedSuccess %v", nodes.Items[n].Name, publicIP, healthCheckNodePort, path, expectedSuccess)
|
||||
pass, fail, err := jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, 5)
|
||||
if expectedSuccess && pass < threshold {
|
||||
framework.Failf("Expected %s successes on %v/%v, got %d, err %v", threshold, endpointNodeName, path, pass, err)
|
||||
} else if !expectedSuccess && fail < threshold {
|
||||
framework.Failf("Expected %s failures on %v/%v, got %d, err %v", threshold, endpointNodeName, path, fail, err)
|
||||
}
|
||||
// Make sure the loadbalancer picked up the helth check change
|
||||
jig.TestReachableHTTP(ingressIP, svcTCPPort, kubeProxyLagTimeout)
|
||||
}
|
||||
framework.ExpectNoError(framework.DeleteRCAndPods(c, f.ClientSet, namespace, serviceName))
|
||||
}
|
||||
})
|
||||
|
||||
It("should work from pods [Slow][Feature:ExternalTrafficLocalOnly]", func() {
|
||||
namespace := f.Namespace.Name
|
||||
serviceName := "external-local"
|
||||
jig := NewServiceTestJig(c, cs, serviceName)
|
||||
nodes := jig.getNodes(maxNodesForEndpointsTests)
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true)
|
||||
defer func() {
|
||||
jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout)
|
||||
Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
|
||||
path := fmt.Sprintf("%s:%d/clientip", ingressIP, int(svc.Spec.Ports[0].Port))
|
||||
nodeName := nodes.Items[0].Name
|
||||
podName := "execpod-sourceip"
|
||||
|
||||
By(fmt.Sprintf("Creating %v on node %v", podName, nodeName))
|
||||
execPodName := createExecPodOnNode(f.Client, namespace, nodeName, podName)
|
||||
defer func() {
|
||||
err := c.Pods(namespace).Delete(execPodName, nil)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}()
|
||||
execPod, err := f.Client.Pods(namespace).Get(execPodName)
|
||||
ExpectNoError(err)
|
||||
|
||||
framework.Logf("Waiting up to %v wget %v", kubeProxyLagTimeout, path)
|
||||
cmd := fmt.Sprintf(`wget -T 30 -qO- %v`, path)
|
||||
|
||||
var srcIP string
|
||||
By(fmt.Sprintf("Hitting external lb %v from pod %v on node %v", ingressIP, podName, nodeName))
|
||||
if pollErr := wait.PollImmediate(framework.Poll, loadBalancerCreateTimeoutDefault, func() (bool, error) {
|
||||
stdout, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
|
||||
if err != nil {
|
||||
framework.Logf("got err: %v, retry until timeout", err)
|
||||
return false, nil
|
||||
}
|
||||
srcIP = strings.TrimSpace(strings.Split(stdout, ":")[0])
|
||||
return srcIP == execPod.Status.PodIP, nil
|
||||
}); pollErr != nil {
|
||||
framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", podName, execPod.Status.PodIP, srcIP)
|
||||
}
|
||||
})
|
||||
|
||||
It("should handle updates to source ip annotation [Slow][Feature:ExternalTrafficLocalOnly]", func() {
|
||||
namespace := f.Namespace.Name
|
||||
serviceName := "external-local"
|
||||
jig := NewServiceTestJig(c, cs, serviceName)
|
||||
|
||||
nodes := jig.getNodes(maxNodesForEndpointsTests)
|
||||
if len(nodes.Items) < 2 {
|
||||
framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
|
||||
}
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true)
|
||||
defer func() {
|
||||
jig.ChangeServiceType(svc.Namespace, svc.Name, api.ServiceTypeClusterIP, loadBalancerCreateTimeout)
|
||||
Expect(c.Services(svc.Namespace).Delete(svc.Name)).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
// save the health check node port because it disappears when lift the annotation.
|
||||
healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc))
|
||||
|
||||
By("turning ESIPP off")
|
||||
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *api.Service) {
|
||||
svc.ObjectMeta.Annotations[service.BetaAnnotationExternalTraffic] =
|
||||
service.AnnotationValueExternalTrafficGlobal
|
||||
})
|
||||
if service.GetServiceHealthCheckNodePort(svc) > 0 {
|
||||
framework.Failf("Service HealthCheck NodePort annotation still present")
|
||||
}
|
||||
|
||||
endpointNodeMap := jig.getEndpointNodes(svc)
|
||||
noEndpointNodeMap := map[string][]string{}
|
||||
for _, n := range nodes.Items {
|
||||
if _, ok := endpointNodeMap[n.Name]; ok {
|
||||
continue
|
||||
}
|
||||
noEndpointNodeMap[n.Name] = getNodeAddresses(&n, api.NodeExternalIP)
|
||||
}
|
||||
|
||||
svcTCPPort := int(svc.Spec.Ports[0].Port)
|
||||
svcNodePort := int(svc.Spec.Ports[0].NodePort)
|
||||
ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
|
||||
path := "/clientip"
|
||||
|
||||
By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
|
||||
for nodeName, nodeIPs := range noEndpointNodeMap {
|
||||
By(fmt.Sprintf("Checking %v (%v:%v%v) proxies to endpoints on another node", nodeName, nodeIPs[0], path, svcNodePort))
|
||||
jig.GetHTTPContent(nodeIPs[0], svcNodePort, kubeProxyLagTimeout, path)
|
||||
}
|
||||
|
||||
for nodeName, nodeIPs := range endpointNodeMap {
|
||||
By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0]))
|
||||
var body bytes.Buffer
|
||||
var result bool
|
||||
var err error
|
||||
if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
|
||||
result, err = testReachableHTTPWithContent(nodeIPs[0], healthCheckNodePort, "/healthz", "", &body)
|
||||
return !result, nil
|
||||
}); pollErr != nil {
|
||||
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. Last err %v, last body %v",
|
||||
nodeName, healthCheckNodePort, err, body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Poll till kube-proxy re-adds the MASQUERADE rule on the node.
|
||||
By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP))
|
||||
pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
|
||||
content := jig.GetHTTPContent(ingressIP, svcTCPPort, kubeProxyLagTimeout, "/clientip")
|
||||
clientIP := content.String()
|
||||
if strings.HasPrefix(clientIP, "10.") {
|
||||
return true, nil
|
||||
}
|
||||
return false, fmt.Errorf("Source IP (%v) is the client IP, expected a ten-dot cluster ip.", clientIP)
|
||||
})
|
||||
if pollErr != nil {
|
||||
framework.Failf("Source IP WAS preserved even after ESIPP turned off: %v", pollErr)
|
||||
}
|
||||
|
||||
// TODO: We need to attempt to create another service with the previously
|
||||
// allocated healthcheck nodePort. If the health check nodePort has been
|
||||
// freed, the new service creation will succeed, upon which we cleanup.
|
||||
// If the health check nodePort has NOT been freed, the new service
|
||||
// creation will fail.
|
||||
|
||||
By("turning ESIPP annotation back on")
|
||||
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *api.Service) {
|
||||
svc.ObjectMeta.Annotations[service.BetaAnnotationExternalTraffic] =
|
||||
service.AnnotationValueExternalTrafficLocal
|
||||
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
|
||||
svc.ObjectMeta.Annotations[service.BetaAnnotationHealthCheckNodePort] =
|
||||
fmt.Sprintf("%d", healthCheckNodePort)
|
||||
})
|
||||
pollErr = wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
|
||||
content := jig.GetHTTPContent(ingressIP, svcTCPPort, kubeProxyLagTimeout, "/clientip")
|
||||
clientIP := content.String()
|
||||
if !strings.HasPrefix(clientIP, "10.") {
|
||||
return true, nil
|
||||
}
|
||||
return false, fmt.Errorf("Source IP (%v) is not the client IP, expected a public IP.", clientIP)
|
||||
})
|
||||
if pollErr != nil {
|
||||
framework.Failf("Source IP was not preserved when the ESIPP annotation was on: %v", pollErr)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
@ -1372,16 +1593,20 @@ func deletePodOrFail(c *client.Client, ns, name string) {
|
|||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
func getNodeAddresses(node *api.Node, addressType api.NodeAddressType) (ips []string) {
|
||||
for j := range node.Status.Addresses {
|
||||
nodeAddress := &node.Status.Addresses[j]
|
||||
if nodeAddress.Type == addressType {
|
||||
ips = append(ips, nodeAddress.Address)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func collectAddresses(nodes *api.NodeList, addressType api.NodeAddressType) []string {
|
||||
ips := []string{}
|
||||
for i := range nodes.Items {
|
||||
item := &nodes.Items[i]
|
||||
for j := range item.Status.Addresses {
|
||||
nodeAddress := &item.Status.Addresses[j]
|
||||
if nodeAddress.Type == addressType {
|
||||
ips = append(ips, nodeAddress.Address)
|
||||
}
|
||||
}
|
||||
ips = append(ips, getNodeAddresses(&nodes.Items[i], addressType)...)
|
||||
}
|
||||
return ips
|
||||
}
|
||||
|
@ -1855,6 +2080,128 @@ func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc
|
|||
return result
|
||||
}
|
||||
|
||||
func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType api.ServiceType, timeout time.Duration) {
|
||||
ingressIP := ""
|
||||
svc := j.UpdateServiceOrFail(namespace, name, func(s *api.Service) {
|
||||
for _, ing := range s.Status.LoadBalancer.Ingress {
|
||||
if ing.IP != "" {
|
||||
ingressIP = ing.IP
|
||||
}
|
||||
}
|
||||
s.Spec.Type = newType
|
||||
s.Spec.Ports[0].NodePort = 0
|
||||
})
|
||||
if ingressIP != "" {
|
||||
j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
|
||||
}
|
||||
}
|
||||
|
||||
// createOnlyLocalNodePortService creates a loadbalancer service and sanity checks its
|
||||
// nodePort. If createPod is true, it also creates an RC with 1 replica of
|
||||
// the standard netexec container used everywhere in this test.
|
||||
func (j *ServiceTestJig) createOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *api.Service {
|
||||
By("creating a service " + namespace + "/" + namespace + " with type=NodePort and annotation for local-traffic-only")
|
||||
svc := j.CreateTCPServiceOrFail(namespace, func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeNodePort
|
||||
svc.ObjectMeta.Annotations = map[string]string{
|
||||
service.AlphaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
|
||||
svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}}
|
||||
})
|
||||
|
||||
if createPod {
|
||||
By("creating a pod to be part of the service " + serviceName)
|
||||
j.RunOrFail(namespace, nil)
|
||||
}
|
||||
j.SanityCheckService(svc, api.ServiceTypeNodePort)
|
||||
return svc
|
||||
}
|
||||
|
||||
// createOnlyLocalLoadBalancerService creates a loadbalancer service and waits for it to
|
||||
// acquire an ingress IP. If createPod is true, it also creates an RC with 1
|
||||
// replica of the standard netexec container used everywhere in this test.
|
||||
func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool) *api.Service {
|
||||
By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only")
|
||||
svc := j.CreateTCPServiceOrFail(namespace, func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
||||
// We need to turn affinity off for our LB distribution tests
|
||||
svc.Spec.SessionAffinity = api.ServiceAffinityNone
|
||||
svc.ObjectMeta.Annotations = map[string]string{
|
||||
service.AlphaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
|
||||
svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}}
|
||||
})
|
||||
|
||||
if createPod {
|
||||
By("creating a pod to be part of the service " + serviceName)
|
||||
j.RunOrFail(namespace, nil)
|
||||
}
|
||||
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
|
||||
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
|
||||
j.SanityCheckService(svc, api.ServiceTypeLoadBalancer)
|
||||
return svc
|
||||
}
|
||||
|
||||
// getEndpointNodes returns a map of nodenames:external-ip on which the
|
||||
// endpoints of the given Service are running.
|
||||
func (j *ServiceTestJig) getEndpointNodes(svc *api.Service) map[string][]string {
|
||||
nodes := j.getNodes(maxNodesForEndpointsTests)
|
||||
endpoints, err := j.Client.Endpoints(svc.Namespace).Get(svc.Name)
|
||||
if err != nil {
|
||||
framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
|
||||
}
|
||||
if len(endpoints.Subsets) == 0 {
|
||||
framework.Failf("Endpoint has no subsets, cannot determine node addresses.")
|
||||
}
|
||||
epNodes := sets.NewString()
|
||||
for _, ss := range endpoints.Subsets {
|
||||
for _, e := range ss.Addresses {
|
||||
if e.NodeName != nil {
|
||||
epNodes.Insert(*e.NodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeMap := map[string][]string{}
|
||||
for _, n := range nodes.Items {
|
||||
if epNodes.Has(n.Name) {
|
||||
nodeMap[n.Name] = getNodeAddresses(&n, api.NodeExternalIP)
|
||||
}
|
||||
}
|
||||
return nodeMap
|
||||
}
|
||||
|
||||
// getNodes returns the first maxNodesForTest nodes. Useful in large clusters
|
||||
// where we don't eg: want to create an endpoint per node.
|
||||
func (j *ServiceTestJig) getNodes(maxNodesForTest int) (nodes *api.NodeList) {
|
||||
nodes = framework.GetReadySchedulableNodesOrDie(j.Client)
|
||||
if len(nodes.Items) <= maxNodesForTest {
|
||||
maxNodesForTest = len(nodes.Items)
|
||||
}
|
||||
nodes.Items = nodes.Items[:maxNodesForTest]
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (j *ServiceTestJig) waitForEndpointOnNode(namespace, serviceName, nodeName string) {
|
||||
err := wait.PollImmediate(framework.Poll, loadBalancerCreateTimeoutDefault, func() (bool, error) {
|
||||
endpoints, err := j.Client.Endpoints(namespace).Get(serviceName)
|
||||
if err != nil {
|
||||
framework.Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
|
||||
return false, nil
|
||||
}
|
||||
// TODO: Handle multiple endpoints
|
||||
if len(endpoints.Subsets[0].Addresses) == 0 {
|
||||
framework.Logf("Expected Ready endpoints - found none")
|
||||
return false, nil
|
||||
}
|
||||
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
|
||||
framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName)
|
||||
if epHostName != nodeName {
|
||||
framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
}
|
||||
|
||||
func (j *ServiceTestJig) SanityCheckService(svc *api.Service, svcType api.ServiceType) {
|
||||
if svc.Spec.Type != svcType {
|
||||
framework.Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
|
||||
|
@ -1900,7 +2247,6 @@ func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*api.
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to get Service %q: %v", name, err)
|
||||
}
|
||||
|
||||
update(service)
|
||||
service, err = j.Client.Services(namespace).Update(service)
|
||||
if err == nil {
|
||||
|
@ -2020,25 +2366,32 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time
|
|||
|
||||
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
|
||||
var body bytes.Buffer
|
||||
if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTPWithContent(host, port, url, "", &body) }); err != nil {
|
||||
framework.Failf("Could not reach HTTP service through %v:%v/%v after %v: %v", host, port, url, timeout, err)
|
||||
return body
|
||||
var err error
|
||||
if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
|
||||
result, err := testReachableHTTPWithContent(host, port, url, "", &body)
|
||||
if err != nil {
|
||||
framework.Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err)
|
||||
return false, nil
|
||||
}
|
||||
return result, nil
|
||||
}); pollErr != nil {
|
||||
framework.Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err)
|
||||
}
|
||||
return body
|
||||
}
|
||||
|
||||
func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, expectedSuccess bool) {
|
||||
success, err := testHTTPHealthCheckNodePort(host, port, request)
|
||||
if expectedSuccess && success {
|
||||
framework.Logf("HealthCheck successful for node %v:%v, as expected", host, port)
|
||||
return
|
||||
} else if !expectedSuccess && (!success || err != nil) {
|
||||
framework.Logf("HealthCheck failed for node %v:%v, as expected", host, port)
|
||||
return
|
||||
} else if expectedSuccess {
|
||||
framework.Failf("HealthCheck NodePort incorrectly reporting unhealthy on %v:%v: %v", host, port, err)
|
||||
func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, tries int) (pass, fail int, statusMsg string) {
|
||||
for i := 0; i < tries; i++ {
|
||||
success, err := testHTTPHealthCheckNodePort(host, port, request)
|
||||
if success {
|
||||
pass++
|
||||
} else {
|
||||
fail++
|
||||
}
|
||||
statusMsg += fmt.Sprintf("\nAttempt %d Error %v", i, err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
framework.Failf("Unexpected HealthCheck NodePort still reporting healthy %v:%v: %v", host, port, err)
|
||||
return pass, fail, statusMsg
|
||||
}
|
||||
|
||||
func getIngressPoint(ing *api.LoadBalancerIngress) string {
|
||||
|
@ -2310,26 +2663,21 @@ func (j *ServiceTestJig) launchEchoserverPodOnNode(f *framework.Framework, nodeN
|
|||
framework.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
|
||||
}
|
||||
|
||||
func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, serviceIp string, servicePort int) (string, string) {
|
||||
framework.Logf("Creating an exec pod on the same node")
|
||||
func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, serviceIP string, servicePort int) (string, string) {
|
||||
framework.Logf("Creating an exec pod on node %v", nodeName)
|
||||
execPodName := createExecPodOnNode(f.Client, ns, nodeName, fmt.Sprintf("execpod-sourceip-%s", nodeName))
|
||||
defer func() {
|
||||
framework.Logf("Cleaning up the exec pod")
|
||||
err := c.Pods(ns).Delete(execPodName, nil)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}()
|
||||
podClient := f.Client.Pods(ns)
|
||||
execPod, err := podClient.Get(execPodName)
|
||||
execPod, err := f.Client.Pods(ns).Get(execPodName)
|
||||
ExpectNoError(err)
|
||||
execPodIp := execPod.Status.PodIP
|
||||
framework.Logf("Exec pod ip: %s", execPodIp)
|
||||
|
||||
framework.Logf("Getting echo response from service")
|
||||
var stdout string
|
||||
timeout := 2 * time.Minute
|
||||
framework.Logf("Waiting up to %v for sourceIp test to be executed", timeout)
|
||||
cmd := fmt.Sprintf(`wget -T 30 -qO- %s:%d | grep client_address`, serviceIp, servicePort)
|
||||
// Need timeout mechanism because it may takes more times for iptables to be populated.
|
||||
framework.Logf("Waiting up to %v wget %s:%d", timeout, serviceIP, servicePort)
|
||||
cmd := fmt.Sprintf(`wget -T 30 -qO- %s:%d | grep client_address`, serviceIP, servicePort)
|
||||
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2) {
|
||||
stdout, err = framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
|
||||
if err != nil {
|
||||
|
@ -2349,12 +2697,9 @@ func execSourceipTest(f *framework.Framework, c *client.Client, ns, nodeName, se
|
|||
// The stdout return from RunHostCmd seems to come with "\n", so TrimSpace is needed.
|
||||
// Desired stdout in this format: client_address=x.x.x.x
|
||||
outputs := strings.Split(strings.TrimSpace(stdout), "=")
|
||||
sourceIp := ""
|
||||
if len(outputs) != 2 {
|
||||
// Fail the test if output format is unexpected.
|
||||
framework.Failf("exec pod returned unexpected stdout format: [%v]\n", stdout)
|
||||
} else {
|
||||
sourceIp = outputs[1]
|
||||
}
|
||||
return execPodIp, sourceIp
|
||||
return execPod.Status.PodIP, outputs[1]
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue