k3s/test/e2e/framework/service_util.go

1542 lines
53 KiB
Go

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"bytes"
"fmt"
"net"
"sort"
"strconv"
"strings"
"time"
v1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo"
)
const (
// KubeProxyLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
// to not notice a Service update, such as type=NodePort.
// TODO: This timeout should be O(10s), observed values are O(1m), 5m is very
// liberal. Fix tracked in #20567.
KubeProxyLagTimeout = 5 * time.Minute
// KubeProxyEndpointLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
// to not notice an Endpoint update.
KubeProxyEndpointLagTimeout = 30 * time.Second
// LoadBalancerLagTimeoutDefault is the maximum time a load balancer is allowed to
// not respond after creation.
LoadBalancerLagTimeoutDefault = 2 * time.Minute
// LoadBalancerLagTimeoutAWS is the delay between ELB creation and serving traffic
// on AWS. A few minutes is typical, so use 10m.
LoadBalancerLagTimeoutAWS = 10 * time.Minute
// LoadBalancerCreateTimeoutDefault is the default time to wait for a load balancer to be created/modified.
// TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
LoadBalancerCreateTimeoutDefault = 20 * time.Minute
// LoadBalancerCreateTimeoutLarge is the maximum time to wait for a load balancer to be created/modified.
LoadBalancerCreateTimeoutLarge = 2 * time.Hour
// LoadBalancerCleanupTimeout is the time required by the loadbalancer to cleanup, proportional to numApps/Ing.
// Bring the cleanup timeout back down to 5m once b/33588344 is resolved.
LoadBalancerCleanupTimeout = 15 * time.Minute
// LoadBalancerPollTimeout is the time required by the loadbalancer to poll.
// On average it takes ~6 minutes for a single backend to come online in GCE.
LoadBalancerPollTimeout = 15 * time.Minute
// LoadBalancerPollInterval is the interval value in which the loadbalancer polls.
LoadBalancerPollInterval = 30 * time.Second
// LargeClusterMinNodesNumber is the number of nodes which a large cluster consists of.
LargeClusterMinNodesNumber = 100
// MaxNodesForEndpointsTests is the max number for testing endpoints.
// Don't test with more than 3 nodes.
// Many tests create an endpoint per node, in large clusters, this is
// resource and time intensive.
MaxNodesForEndpointsTests = 3
// ServiceTestTimeout is used for most polling/waiting activities
ServiceTestTimeout = 60 * time.Second
// GCPMaxInstancesInInstanceGroup is the maximum number of instances supported in
// one instance group on GCP.
GCPMaxInstancesInInstanceGroup = 2000
// AffinityConfirmCount is the number of needed continuous requests to confirm that
// affinity is enabled.
AffinityConfirmCount = 15
)
// ServiceNodePortRange should match whatever the default/configured range is
var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
// ServiceTestJig is a test jig to help service testing.
type ServiceTestJig struct {
ID string
Name string
Client clientset.Interface
Labels map[string]string
}
// PodNode is a pod-node pair indicating which node a given pod is running on
type PodNode struct {
// Pod represents pod name
Pod string
// Node represents node name
Node string
}
// NewServiceTestJig allocates and inits a new ServiceTestJig.
func NewServiceTestJig(client clientset.Interface, name string) *ServiceTestJig {
j := &ServiceTestJig{}
j.Client = client
j.Name = name
j.ID = j.Name + "-" + string(uuid.NewUUID())
j.Labels = map[string]string{"testid": j.ID}
return j
}
// newServiceTemplate returns the default v1.Service template for this jig, but
// does not actually create the Service. The default Service has the same name
// as the jig and exposes the given port.
func (j *ServiceTestJig) newServiceTemplate(namespace string, proto v1.Protocol, port int32) *v1.Service {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: v1.ServiceSpec{
Selector: j.Labels,
Ports: []v1.ServicePort{
{
Protocol: proto,
Port: port,
},
},
},
}
return service
}
// CreateTCPServiceWithPort creates a new TCP Service with given port based on the
// jig's defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *v1.Service), port int32) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, port)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create TCP Service %q: %v", svc.Name, err)
}
return result
}
// CreateTCPServiceOrFail creates a new TCP Service based on the jig's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, 80)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create TCP Service %q: %v", svc.Name, err)
}
return result
}
// CreateUDPServiceOrFail creates a new UDP Service based on the jig's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolUDP, 80)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create UDP Service %q: %v", svc.Name, err)
}
return result
}
// CreateExternalNameServiceOrFail creates a new ExternalName type Service based on the jig's defaults.
// Callers can provide a function to tweak the Service object before it is created.
func (j *ServiceTestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: v1.ServiceSpec{
Selector: j.Labels,
ExternalName: "foo.example.com",
Type: v1.ServiceTypeExternalName,
},
}
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create ExternalName Service %q: %v", svc.Name, err)
}
return result
}
// CreateServiceWithServicePort creates a new Service with ServicePort.
func (j *ServiceTestJig) CreateServiceWithServicePort(labels map[string]string, namespace string, ports []v1.ServicePort) (*v1.Service, error) {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: j.Name,
},
Spec: v1.ServiceSpec{
Selector: labels,
Ports: ports,
},
}
return j.Client.CoreV1().Services(namespace).Create(service)
}
// ChangeServiceType updates the given service's ServiceType to the given newType.
func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) {
ingressIP := ""
svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) {
for _, ing := range s.Status.LoadBalancer.Ingress {
if ing.IP != "" {
ingressIP = ing.IP
}
}
s.Spec.Type = newType
s.Spec.Ports[0].NodePort = 0
})
if ingressIP != "" {
j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
}
}
// CreateOnlyLocalNodePortService creates a NodePort service with
// ExternalTrafficPolicy set to Local and sanity checks its nodePort.
// If createPod is true, it also creates an RC with 1 replica of
// the standard netexec container used everywhere in this test.
func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service {
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
})
if createPod {
ginkgo.By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
}
j.SanityCheckService(svc, v1.ServiceTypeNodePort)
return svc
}
// CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
// ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
// If createPod is true, it also creates an RC with 1 replica of
// the standard netexec container used everywhere in this test.
func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
tweak func(svc *v1.Service)) *v1.Service {
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
if tweak != nil {
tweak(svc)
}
})
if createPod {
ginkgo.By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
}
ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
}
// CreateLoadBalancerService creates a loadbalancer service and waits
// for it to acquire an ingress IP.
func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service {
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
if tweak != nil {
tweak(svc)
}
})
ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
}
// GetNodeAddresses returns a list of addresses of the given addressType for the given node
func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
for j := range node.Status.Addresses {
nodeAddress := &node.Status.Addresses[j]
if nodeAddress.Type == addressType && nodeAddress.Address != "" {
ips = append(ips, nodeAddress.Address)
}
}
return
}
// CollectAddresses returns a list of addresses of the given addressType for the given list of nodes
func CollectAddresses(nodes *v1.NodeList, addressType v1.NodeAddressType) []string {
ips := []string{}
for i := range nodes.Items {
ips = append(ips, GetNodeAddresses(&nodes.Items[i], addressType)...)
}
return ips
}
// GetNodePublicIps returns a public IP list of nodes.
func GetNodePublicIps(c clientset.Interface) ([]string, error) {
nodes := GetReadySchedulableNodesOrDie(c)
ips := CollectAddresses(nodes, v1.NodeExternalIP)
if len(ips) == 0 {
// If ExternalIP isn't set, assume the test programs can reach the InternalIP
ips = CollectAddresses(nodes, v1.NodeInternalIP)
}
return ips, nil
}
// PickNodeIP picks one public node IP
func PickNodeIP(c clientset.Interface) string {
publicIps, err := GetNodePublicIps(c)
ExpectNoError(err)
if len(publicIps) == 0 {
Failf("got unexpected number (%d) of public IPs", len(publicIps))
}
ip := publicIps[0]
return ip
}
// PodNodePairs return PodNode pairs for all pods in a namespace
func PodNodePairs(c clientset.Interface, ns string) ([]PodNode, error) {
var result []PodNode
podList, err := c.CoreV1().Pods(ns).List(metav1.ListOptions{})
if err != nil {
return result, err
}
for _, pod := range podList.Items {
result = append(result, PodNode{
Pod: pod.Name,
Node: pod.Spec.NodeName,
})
}
return result, nil
}
// GetEndpointNodes returns a map of nodenames:external-ip on which the
// endpoints of the given Service are running.
func (j *ServiceTestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
nodes := j.GetNodes(MaxNodesForEndpointsTests)
endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
if err != nil {
Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
}
if len(endpoints.Subsets) == 0 {
Failf("Endpoint has no subsets, cannot determine node addresses.")
}
epNodes := sets.NewString()
for _, ss := range endpoints.Subsets {
for _, e := range ss.Addresses {
if e.NodeName != nil {
epNodes.Insert(*e.NodeName)
}
}
}
nodeMap := map[string][]string{}
for _, n := range nodes.Items {
if epNodes.Has(n.Name) {
nodeMap[n.Name] = GetNodeAddresses(&n, v1.NodeExternalIP)
}
}
return nodeMap
}
// GetNodes returns the first maxNodesForTest nodes. Useful in large clusters
// where we don't eg: want to create an endpoint per node.
func (j *ServiceTestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) {
nodes = GetReadySchedulableNodesOrDie(j.Client)
if len(nodes.Items) <= maxNodesForTest {
maxNodesForTest = len(nodes.Items)
}
nodes.Items = nodes.Items[:maxNodesForTest]
return nodes
}
// GetNodesNames returns a list of names of the first maxNodesForTest nodes
func (j *ServiceTestJig) GetNodesNames(maxNodesForTest int) []string {
nodes := j.GetNodes(maxNodesForTest)
nodesNames := []string{}
for _, node := range nodes.Items {
nodesNames = append(nodesNames, node.Name)
}
return nodesNames
}
// WaitForEndpointOnNode waits for a service endpoint on the given node.
func (j *ServiceTestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) {
err := wait.PollImmediate(Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
endpoints, err := j.Client.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
if err != nil {
e2elog.Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
return false, nil
}
if len(endpoints.Subsets) == 0 {
e2elog.Logf("Expect endpoints with subsets, got none.")
return false, nil
}
// TODO: Handle multiple endpoints
if len(endpoints.Subsets[0].Addresses) == 0 {
e2elog.Logf("Expected Ready endpoints - found none")
return false, nil
}
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
e2elog.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName)
if epHostName != nodeName {
e2elog.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
return false, nil
}
return true, nil
})
ExpectNoError(err)
}
// SanityCheckService performs sanity checks on the given service
func (j *ServiceTestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
if svc.Spec.Type != svcType {
Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
}
if svcType != v1.ServiceTypeExternalName {
if svc.Spec.ExternalName != "" {
Failf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
}
if svc.Spec.ClusterIP != api.ClusterIPNone && svc.Spec.ClusterIP == "" {
Failf("didn't get ClusterIP for non-ExternamName service")
}
} else {
if svc.Spec.ClusterIP != "" {
Failf("unexpected Spec.ClusterIP (%s) for ExternamName service, expected empty", svc.Spec.ClusterIP)
}
}
expectNodePorts := false
if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName {
expectNodePorts = true
}
for i, port := range svc.Spec.Ports {
hasNodePort := (port.NodePort != 0)
if hasNodePort != expectNodePorts {
Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
}
if hasNodePort {
if !ServiceNodePortRange.Contains(int(port.NodePort)) {
Failf("out-of-range nodePort (%d) for service", port.NodePort)
}
}
}
expectIngress := false
if svcType == v1.ServiceTypeLoadBalancer {
expectIngress = true
}
hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
if hasIngress != expectIngress {
Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
}
if hasIngress {
for i, ing := range svc.Status.LoadBalancer.Ingress {
if ing.IP == "" && ing.Hostname == "" {
Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
}
}
}
}
// UpdateService fetches a service, calls the update function on it, and
// then attempts to send the updated service. It tries up to 3 times in the
// face of timeouts and conflicts.
func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*v1.Service)) (*v1.Service, error) {
for i := 0; i < 3; i++ {
service, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Service %q: %v", name, err)
}
update(service)
service, err = j.Client.CoreV1().Services(namespace).Update(service)
if err == nil {
return service, nil
}
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
return nil, fmt.Errorf("failed to update Service %q: %v", name, err)
}
}
return nil, fmt.Errorf("too many retries updating Service %q", name)
}
// UpdateServiceOrFail fetches a service, calls the update function on it, and
// then attempts to send the updated service. It tries up to 3 times in the
// face of timeouts and conflicts.
func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*v1.Service)) *v1.Service {
svc, err := j.UpdateService(namespace, name, update)
if err != nil {
Failf(err.Error())
}
return svc
}
// WaitForNewIngressIPOrFail waits for the given service to get a new ingress IP, or fails after the given timeout
func (j *ServiceTestJig) WaitForNewIngressIPOrFail(namespace, name, existingIP string, timeout time.Duration) *v1.Service {
e2elog.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have a new ingress IP", func(svc *v1.Service) bool {
if len(svc.Status.LoadBalancer.Ingress) == 0 {
return false
}
ip := svc.Status.LoadBalancer.Ingress[0].IP
if ip == "" || ip == existingIP {
return false
}
return true
})
return service
}
// ChangeServiceNodePortOrFail changes node ports of the given service.
func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *v1.Service {
var err error
var service *v1.Service
for i := 1; i < ServiceNodePortRange.Size; i++ {
offs1 := initial - ServiceNodePortRange.Base
offs2 := (offs1 + i) % ServiceNodePortRange.Size
newPort := ServiceNodePortRange.Base + offs2
service, err = j.UpdateService(namespace, name, func(s *v1.Service) {
s.Spec.Ports[0].NodePort = int32(newPort)
})
if err != nil && strings.Contains(err.Error(), portallocator.ErrAllocated.Error()) {
e2elog.Logf("tried nodePort %d, but it is in use, will try another", newPort)
continue
}
// Otherwise err was nil or err was a real error
break
}
if err != nil {
Failf("Could not change the nodePort: %v", err)
}
return service
}
// WaitForLoadBalancerOrFail waits the given service to have a LoadBalancer, or fails after the given timeout
func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *v1.Service {
e2elog.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have a load balancer", func(svc *v1.Service) bool {
return len(svc.Status.LoadBalancer.Ingress) > 0
})
return service
}
// WaitForLoadBalancerDestroyOrFail waits the given service to destroy a LoadBalancer, or fails after the given timeout
func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *v1.Service {
// TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
defer func() {
if err := EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
e2elog.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
}
}()
e2elog.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have no load balancer", func(svc *v1.Service) bool {
return len(svc.Status.LoadBalancer.Ingress) == 0
})
return service
}
func (j *ServiceTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1.Service) bool) *v1.Service {
var service *v1.Service
pollFunc := func() (bool, error) {
svc, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if conditionFn(svc) {
service = svc
return true, nil
}
return false, nil
}
if err := wait.PollImmediate(Poll, timeout, pollFunc); err != nil {
Failf("Timed out waiting for service %q to %s", name, message)
}
return service
}
// newRCTemplate returns the default v1.ReplicationController object for
// this jig, but does not actually create the RC. The default RC has the same
// name as the jig and runs the "netexec" container.
func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController {
var replicas int32 = 1
var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
rc := &v1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: v1.ReplicationControllerSpec{
Replicas: &replicas,
Selector: j.Labels,
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: j.Labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "netexec",
Image: imageutils.GetE2EImage(imageutils.Netexec),
Args: []string{"--http-port=80", "--udp-port=80"},
ReadinessProbe: &v1.Probe{
PeriodSeconds: 3,
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromInt(80),
Path: "/hostName",
},
},
},
},
},
TerminationGracePeriodSeconds: &grace,
},
},
},
}
return rc
}
// AddRCAntiAffinity adds AntiAffinity to the given ReplicationController.
func (j *ServiceTestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
var replicas int32 = 2
rc.Spec.Replicas = &replicas
if rc.Spec.Template.Spec.Affinity == nil {
rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
}
if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
}
rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
Namespaces: nil,
TopologyKey: "kubernetes.io/hostname",
})
}
// CreatePDBOrFail returns a PodDisruptionBudget for the given ReplicationController, or fails if a PodDisruptionBudget isn't ready
func (j *ServiceTestJig) CreatePDBOrFail(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
pdb := j.newPDBTemplate(namespace, rc)
newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Create(pdb)
if err != nil {
Failf("Failed to create PDB %q %v", pdb.Name, err)
}
if err := j.waitForPdbReady(namespace); err != nil {
Failf("Failed waiting for PDB to be ready: %v", err)
}
return newPdb
}
// newPDBTemplate returns the default policyv1beta1.PodDisruptionBudget object for
// this jig, but does not actually create the PDB. The default PDB specifies a
// MinAvailable of N-1 and matches the pods created by the RC.
func (j *ServiceTestJig) newPDBTemplate(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
minAvailable := intstr.FromInt(int(*rc.Spec.Replicas) - 1)
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: policyv1beta1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{MatchLabels: j.Labels},
},
}
return pdb
}
// RunOrFail creates a ReplicationController and Pod(s) and waits for the
// Pod(s) to be running. Callers can provide a function to tweak the RC object
// before it is created.
func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.ReplicationController)) *v1.ReplicationController {
rc := j.newRCTemplate(namespace)
if tweak != nil {
tweak(rc)
}
result, err := j.Client.CoreV1().ReplicationControllers(namespace).Create(rc)
if err != nil {
Failf("Failed to create RC %q: %v", rc.Name, err)
}
pods, err := j.waitForPodsCreated(namespace, int(*(rc.Spec.Replicas)))
if err != nil {
Failf("Failed to create pods: %v", err)
}
if err := j.waitForPodsReady(namespace, pods); err != nil {
Failf("Failed waiting for pods to be running: %v", err)
}
return result
}
// Scale scales pods to the given replicas
func (j *ServiceTestJig) Scale(namespace string, replicas int) {
rc := j.Name
scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{})
if err != nil {
Failf("Failed to get scale for RC %q: %v", rc, err)
}
scale.Spec.Replicas = int32(replicas)
_, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale)
if err != nil {
Failf("Failed to scale RC %q: %v", rc, err)
}
pods, err := j.waitForPodsCreated(namespace, replicas)
if err != nil {
Failf("Failed waiting for pods: %v", err)
}
if err := j.waitForPodsReady(namespace, pods); err != nil {
Failf("Failed waiting for pods to be running: %v", err)
}
}
func (j *ServiceTestJig) waitForPdbReady(namespace string) error {
timeout := 2 * time.Minute
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
return err
}
if pdb.Status.PodDisruptionsAllowed > 0 {
return nil
}
}
return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
}
func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
timeout := 2 * time.Minute
// List the pods, making sure we observe all the replicas.
label := labels.SelectorFromSet(labels.Set(j.Labels))
e2elog.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := j.Client.CoreV1().Pods(namespace).List(options)
if err != nil {
return nil, err
}
found := []string{}
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
continue
}
found = append(found, pod.Name)
}
if len(found) == replicas {
e2elog.Logf("Found all %d pods", replicas)
return found, nil
}
e2elog.Logf("Found %d/%d pods - will retry", len(found), replicas)
}
return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
}
func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error {
timeout := 2 * time.Minute
if !CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
}
return nil
}
// newNetexecPodSpec returns the pod spec of netexec pod
func newNetexecPodSpec(podName string, httpPort, udpPort int32, hostNetwork bool) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "netexec",
Image: netexecImageName,
Command: []string{
"/netexec",
fmt.Sprintf("--http-port=%d", httpPort),
fmt.Sprintf("--udp-port=%d", udpPort),
},
Ports: []v1.ContainerPort{
{
Name: "http",
ContainerPort: httpPort,
},
{
Name: "udp",
ContainerPort: udpPort,
},
},
},
},
HostNetwork: hostNetwork,
},
}
return pod
}
// LaunchNetexecPodOnNode launches a netexec pod on the given node.
func (j *ServiceTestJig) LaunchNetexecPodOnNode(f *Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) {
e2elog.Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name)
pod := newNetexecPodSpec(podName, httpPort, udpPort, hostNetwork)
pod.Spec.NodeName = nodeName
pod.ObjectMeta.Labels = j.Labels
podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
_, err := podClient.Create(pod)
ExpectNoError(err)
ExpectNoError(f.WaitForPodRunning(podName))
e2elog.Logf("Netexec pod %q in namespace %q running", pod.Name, f.Namespace.Name)
}
// newEchoServerPodSpec returns the pod spec of echo server pod
func newEchoServerPodSpec(podName string) *v1.Pod {
port := 8080
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "echoserver",
Image: imageutils.GetE2EImage(imageutils.EchoServer),
Ports: []v1.ContainerPort{{ContainerPort: int32(port)}},
},
},
RestartPolicy: v1.RestartPolicyNever,
},
}
return pod
}
// LaunchEchoserverPodOnNode launches a pod serving http on port 8080 to act
// as the target for source IP preservation test. The client's source ip would
// be echoed back by the web server.
func (j *ServiceTestJig) LaunchEchoserverPodOnNode(f *Framework, nodeName, podName string) {
e2elog.Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name)
pod := newEchoServerPodSpec(podName)
pod.Spec.NodeName = nodeName
pod.ObjectMeta.Labels = j.Labels
podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
_, err := podClient.Create(pod)
ExpectNoError(err)
ExpectNoError(f.WaitForPodRunning(podName))
e2elog.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
}
// TestReachableHTTP tests that the given host serves HTTP on the given port.
func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) {
j.TestReachableHTTPWithRetriableErrorCodes(host, port, []int{}, timeout)
}
// TestReachableHTTPWithRetriableErrorCodes tests that the given host serves HTTP on the given port with the given retriableErrCodes.
func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeHTTP(host, port, "/echo?msg=hello",
&HTTPPokeParams{
BodyContains: "hello",
RetriableCodes: retriableErrCodes,
})
if result.Status == HTTPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
if err == wait.ErrWaitTimeout {
Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout)
} else {
Failf("Failed to reach HTTP service through %v:%v: %v", host, port, err)
}
}
}
// TestNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port.
func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeHTTP(host, port, "/", nil)
if result.Code == 0 {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
// TestRejectedHTTP tests that the given host rejects a HTTP request on the given port.
func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeHTTP(host, port, "/", nil)
if result.Status == HTTPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("HTTP service %v:%v not rejected: %v", host, port, err)
}
}
// TestReachableUDP tests that the given host serves UDP on the given port.
func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{
Timeout: 3 * time.Second,
Response: "hello",
})
if result.Status == UDPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
}
}
// TestNotReachableUDP tests that the given host doesn't serve UDP on the given port.
func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status != UDPSuccess && result.Status != UDPError {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
// TestRejectedUDP tests that the given host rejects a UDP request on the given port.
func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status == UDPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("UDP service %v:%v not rejected: %v", host, port, err)
}
}
// GetHTTPContent returns the content of the given url by HTTP.
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
result := PokeHTTP(host, port, url, nil)
if result.Status == HTTPSuccess {
body.Write(result.Body)
return true, nil
}
return false, nil
}); pollErr != nil {
Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
}
return body
}
func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
url := fmt.Sprintf("http://%s%s", ipPort, request)
if ip == "" || port == 0 {
Failf("Got empty IP for reachability check (%s)", url)
return false, fmt.Errorf("invalid input ip or port")
}
e2elog.Logf("Testing HTTP health check on %v", url)
resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
if err != nil {
e2elog.Logf("Got error testing for reachability of %s: %v", url, err)
return false, err
}
defer resp.Body.Close()
if err != nil {
e2elog.Logf("Got error reading response from %s: %v", url, err)
return false, err
}
// HealthCheck responder returns 503 for no local endpoints
if resp.StatusCode == 503 {
return false, nil
}
// HealthCheck responder returns 200 for non-zero local endpoints
if resp.StatusCode == 200 {
return true, nil
}
return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
}
// TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port.
func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
count := 0
condition := func() (bool, error) {
success, _ := testHTTPHealthCheckNodePort(host, port, request)
if success && expectSucceed ||
!success && !expectSucceed {
count++
}
if count >= threshold {
return true, nil
}
return false, nil
}
if err := wait.PollImmediate(time.Second, timeout, condition); err != nil {
return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
}
return nil
}
// ServiceTestFixture is a simple helper class to avoid too much boilerplate in tests
type ServiceTestFixture struct {
ServiceName string
Namespace string
Client clientset.Interface
TestID string
Labels map[string]string
rcs map[string]bool
services map[string]bool
Name string
Image string
}
// NewServerTest creates a new ServiceTestFixture for the tests.
func NewServerTest(client clientset.Interface, namespace string, serviceName string) *ServiceTestFixture {
t := &ServiceTestFixture{}
t.Client = client
t.Namespace = namespace
t.ServiceName = serviceName
t.TestID = t.ServiceName + "-" + string(uuid.NewUUID())
t.Labels = map[string]string{
"testid": t.TestID,
}
t.rcs = make(map[string]bool)
t.services = make(map[string]bool)
t.Name = "webserver"
t.Image = imageutils.GetE2EImage(imageutils.TestWebserver)
return t
}
// BuildServiceSpec builds default config for a service (which can then be changed)
func (t *ServiceTestFixture) BuildServiceSpec() *v1.Service {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: t.ServiceName,
Namespace: t.Namespace,
},
Spec: v1.ServiceSpec{
Selector: t.Labels,
Ports: []v1.ServicePort{{
Port: 80,
TargetPort: intstr.FromInt(80),
}},
},
}
return service
}
// CreateRC creates a replication controller and records it for cleanup.
func (t *ServiceTestFixture) CreateRC(rc *v1.ReplicationController) (*v1.ReplicationController, error) {
rc, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Create(rc)
if err == nil {
t.rcs[rc.Name] = true
}
return rc, err
}
// CreateService creates a service, and record it for cleanup
func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, error) {
result, err := t.Client.CoreV1().Services(t.Namespace).Create(service)
if err == nil {
t.services[service.Name] = true
}
return result, err
}
// DeleteService deletes a service, and remove it from the cleanup list
func (t *ServiceTestFixture) DeleteService(serviceName string) error {
err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
if err == nil {
delete(t.services, serviceName)
}
return err
}
// Cleanup cleans all ReplicationControllers and Services which this object holds.
func (t *ServiceTestFixture) Cleanup() []error {
var errs []error
for rcName := range t.rcs {
ginkgo.By("stopping RC " + rcName + " in namespace " + t.Namespace)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// First, resize the RC to 0.
old, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Get(rcName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
x := int32(0)
old.Spec.Replicas = &x
if _, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Update(old); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return nil
})
if err != nil {
errs = append(errs, err)
}
// TODO(mikedanese): Wait.
// Then, delete the RC altogether.
if err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Delete(rcName, nil); err != nil {
if !errors.IsNotFound(err) {
errs = append(errs, err)
}
}
}
for serviceName := range t.services {
ginkgo.By("deleting service " + serviceName + " in namespace " + t.Namespace)
err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
if err != nil {
if !errors.IsNotFound(err) {
errs = append(errs, err)
}
}
}
return errs
}
// GetIngressPoint returns a host on which ingress serves.
func GetIngressPoint(ing *v1.LoadBalancerIngress) string {
host := ing.IP
if host == "" {
host = ing.Hostname
}
return host
}
// UpdateService fetches a service, calls the update function on it,
// and then attempts to send the updated service. It retries up to 2
// times in the face of timeouts and conflicts.
func UpdateService(c clientset.Interface, namespace, serviceName string, update func(*v1.Service)) (*v1.Service, error) {
var service *v1.Service
var err error
for i := 0; i < 3; i++ {
service, err = c.CoreV1().Services(namespace).Get(serviceName, metav1.GetOptions{})
if err != nil {
return service, err
}
update(service)
service, err = c.CoreV1().Services(namespace).Update(service)
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
return service, err
}
}
return service, err
}
// StartServeHostnameService creates a replication controller that serves its
// hostname and a service on top of it.
func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
podNames := make([]string, replicas)
name := svc.ObjectMeta.Name
ginkgo.By("creating service " + name + " in namespace " + ns)
_, err := c.CoreV1().Services(ns).Create(svc)
if err != nil {
return podNames, "", err
}
var createdPods []*v1.Pod
maxContainerFailures := 0
config := testutils.RCConfig{
Client: c,
Image: ServeHostnameImage,
Name: name,
Namespace: ns,
PollInterval: 3 * time.Second,
Timeout: PodReadyBeforeTimeout,
Replicas: replicas,
CreatedPods: &createdPods,
MaxContainerFailures: &maxContainerFailures,
}
err = RunRC(config)
if err != nil {
return podNames, "", err
}
if len(createdPods) != replicas {
return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods))
}
for i := range createdPods {
podNames[i] = createdPods[i].ObjectMeta.Name
}
sort.StringSlice(podNames).Sort()
service, err := c.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
return podNames, "", err
}
if service.Spec.ClusterIP == "" {
return podNames, "", fmt.Errorf("service IP is blank for %v", name)
}
serviceIP := service.Spec.ClusterIP
return podNames, serviceIP, nil
}
// StopServeHostnameService stops the given service.
func StopServeHostnameService(clientset clientset.Interface, ns, name string) error {
if err := DeleteRCAndWaitForGC(clientset, ns, name); err != nil {
return err
}
if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil {
return err
}
return nil
}
// VerifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
// given host and from within a pod. The host is expected to be an SSH-able node
// in the cluster. Each pod in the service is expected to echo its name. These
// names are compared with the given expectedPods list after a sort | uniq.
func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
execPodName := CreateExecPodOrFail(c, ns, "execpod-", nil)
defer func() {
DeletePodOrFail(c, ns, execPodName)
}()
// Loop a bunch of times - the proxy is randomized, so we want a good
// chance of hitting each backend at least once.
buildCommand := func(wget string) string {
serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done",
50*len(expectedPods), wget, serviceIPPort)
}
commands := []func() string{
// verify service from node
func() string {
cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
e2elog.Logf("Executing cmd %q on host %v", cmd, host)
result, err := e2essh.SSH(cmd, host, TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
e2elog.Logf("error while SSH-ing to node: %v", err)
}
return result.Stdout
},
// verify service from pod
func() string {
cmd := buildCommand("wget -q -T 1 -O -")
e2elog.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
// TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
output, err := RunHostCmd(ns, execPodName, cmd)
if err != nil {
e2elog.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
}
return output
},
}
expectedEndpoints := sets.NewString(expectedPods...)
ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
for _, cmdFunc := range commands {
passed := false
gotEndpoints := sets.NewString()
// Retry cmdFunc for a while
for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
for _, endpoint := range strings.Split(cmdFunc(), "\n") {
trimmedEp := strings.TrimSpace(endpoint)
if trimmedEp != "" {
gotEndpoints.Insert(trimmedEp)
}
}
// TODO: simply checking that the retrieved endpoints is a superset
// of the expected allows us to ignore intermitten network flakes that
// result in output like "wget timed out", but these should be rare
// and we need a better way to track how often it occurs.
if gotEndpoints.IsSuperset(expectedEndpoints) {
if !gotEndpoints.Equal(expectedEndpoints) {
e2elog.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
}
passed = true
break
}
e2elog.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
}
if !passed {
// Sort the lists so they're easier to visually diff.
exp := expectedEndpoints.List()
got := gotEndpoints.List()
sort.StringSlice(exp).Sort()
sort.StringSlice(got).Sort()
return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
}
}
return nil
}
// VerifyServeHostnameServiceDown verifies that the given service isn't served.
func VerifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error {
ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
// The current versions of curl included in CentOS and RHEL distros
// misinterpret square brackets around IPv6 as globbing, so use the -g
// argument to disable globbing to handle the IPv6 case.
command := fmt.Sprintf(
"curl -g -s --connect-timeout 2 http://%s && exit 99", ipPort)
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
result, err := e2essh.SSH(command, host, TestContext.Provider)
if err != nil {
e2essh.LogResult(result)
e2elog.Logf("error while SSH-ing to node: %v", err)
}
if result.Code != 99 {
return nil
}
e2elog.Logf("service still alive - still waiting")
}
return fmt.Errorf("waiting for service to be down timed out")
}
// CleanupServiceResources cleans up service Type=LoadBalancer resources.
func CleanupServiceResources(c clientset.Interface, loadBalancerName, region, zone string) {
TestContext.CloudConfig.Provider.CleanupServiceResources(c, loadBalancerName, region, zone)
}
// DescribeSvc logs the output of kubectl describe svc for the given namespace
func DescribeSvc(ns string) {
e2elog.Logf("\nOutput of kubectl describe svc:\n")
desc, _ := RunKubectl(
"describe", "svc", fmt.Sprintf("--namespace=%v", ns))
e2elog.Logf(desc)
}
// CreateServiceSpec returns a Service object for testing.
func CreateServiceSpec(serviceName, externalName string, isHeadless bool, selector map[string]string) *v1.Service {
headlessService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
},
Spec: v1.ServiceSpec{
Selector: selector,
},
}
if externalName != "" {
headlessService.Spec.Type = v1.ServiceTypeExternalName
headlessService.Spec.ExternalName = externalName
} else {
headlessService.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
}
}
if isHeadless {
headlessService.Spec.ClusterIP = "None"
}
return headlessService
}
// EnableAndDisableInternalLB returns two functions for enabling and disabling the internal load balancer
// setting for the supported cloud providers (currently GCE/GKE and Azure) and empty functions for others.
func EnableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(svc *v1.Service)) {
return TestContext.CloudConfig.Provider.EnableAndDisableInternalLB()
}
// GetServiceLoadBalancerCreationTimeout returns a timeout value for creating a load balancer of a service.
func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration {
if nodes := GetReadySchedulableNodesOrDie(cs); len(nodes.Items) > LargeClusterMinNodesNumber {
return LoadBalancerCreateTimeoutLarge
}
return LoadBalancerCreateTimeoutDefault
}
// affinityTracker tracks the destination of a request for the affinity tests.
type affinityTracker struct {
hostTrace []string
}
// Record the response going to a given host.
func (at *affinityTracker) recordHost(host string) {
at.hostTrace = append(at.hostTrace, host)
e2elog.Logf("Received response from host: %s", host)
}
// Check that we got a constant count requests going to the same host.
func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
fulfilled = (len(at.hostTrace) >= count)
if len(at.hostTrace) == 0 {
return fulfilled, true
}
last := at.hostTrace[0:]
if len(at.hostTrace)-count >= 0 {
last = at.hostTrace[len(at.hostTrace)-count:]
}
host := at.hostTrace[len(at.hostTrace)-1]
for _, h := range last {
if h != host {
return fulfilled, false
}
}
return fulfilled, true
}
func checkAffinityFailed(tracker affinityTracker, err string) {
e2elog.Logf("%v", tracker.hostTrace)
Failf(err)
}
// CheckAffinity function tests whether the service affinity works as expected.
// If affinity is expected, the test will return true once affinityConfirmCount
// number of same response observed in a row. If affinity is not expected, the
// test will keep observe until different responses observed. The function will
// return false only in case of unexpected errors.
func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIP string, targetPort int, shouldHold bool) bool {
targetIPPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIPPort)
timeout := ServiceTestTimeout
if execPod == nil {
timeout = LoadBalancerPollTimeout
}
var tracker affinityTracker
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
if execPod != nil {
stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd)
if err != nil {
e2elog.Logf("Failed to get response from %s. Retry until timeout", targetIPPort)
return false, nil
}
tracker.recordHost(stdout)
} else {
rawResponse := jig.GetHTTPContent(targetIP, targetPort, timeout, "")
tracker.recordHost(rawResponse.String())
}
trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
if !shouldHold && !affinityHolds {
return true, nil
}
if shouldHold && trackerFulfilled && affinityHolds {
return true, nil
}
return false, nil
}); pollErr != nil {
trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
if pollErr != wait.ErrWaitTimeout {
checkAffinityFailed(tracker, pollErr.Error())
return false
}
if !trackerFulfilled {
checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIPPort))
}
if shouldHold {
checkAffinityFailed(tracker, "Affinity should hold but didn't.")
} else {
checkAffinityFailed(tracker, "Affinity shouldn't hold but did.")
}
return true
}
return true
}