mirror of https://github.com/k3s-io/k3s
1203 lines
40 KiB
Go
1203 lines
40 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package framework
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/kubernetes/pkg/api/errors"
|
|
"k8s.io/kubernetes/pkg/api/v1"
|
|
"k8s.io/kubernetes/pkg/api/v1/service"
|
|
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
"k8s.io/kubernetes/pkg/labels"
|
|
"k8s.io/kubernetes/pkg/types"
|
|
"k8s.io/kubernetes/pkg/util/intstr"
|
|
utilnet "k8s.io/kubernetes/pkg/util/net"
|
|
"k8s.io/kubernetes/pkg/util/sets"
|
|
"k8s.io/kubernetes/pkg/util/uuid"
|
|
"k8s.io/kubernetes/pkg/util/wait"
|
|
testutils "k8s.io/kubernetes/test/utils"
|
|
|
|
. "github.com/onsi/ginkgo"
|
|
. "github.com/onsi/gomega"
|
|
)
|
|
|
|
const (
|
|
// KubeProxyLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
|
|
// to not notice a Service update, such as type=NodePort.
|
|
// TODO: This timeout should be O(10s), observed values are O(1m), 5m is very
|
|
// liberal. Fix tracked in #20567.
|
|
KubeProxyLagTimeout = 5 * time.Minute
|
|
|
|
// LoadBalancerLagTimeoutDefault is the maximum time a load balancer is allowed to
|
|
// not respond after creation.
|
|
LoadBalancerLagTimeoutDefault = 2 * time.Minute
|
|
|
|
// LoadBalancerLagTimeoutAWS is the delay between ELB creation and serving traffic
|
|
// on AWS. A few minutes is typical, so use 10m.
|
|
LoadBalancerLagTimeoutAWS = 10 * time.Minute
|
|
|
|
// How long to wait for a load balancer to be created/modified.
|
|
//TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
|
|
LoadBalancerCreateTimeoutDefault = 20 * time.Minute
|
|
LoadBalancerCreateTimeoutLarge = 2 * time.Hour
|
|
|
|
// Time required by the loadbalancer to cleanup, proportional to numApps/Ing.
|
|
// Bring the cleanup timeout back down to 5m once b/33588344 is resolved.
|
|
LoadBalancerCleanupTimeout = 15 * time.Minute
|
|
|
|
// On average it takes ~6 minutes for a single backend to come online in GCE.
|
|
LoadBalancerPollTimeout = 15 * time.Minute
|
|
LoadBalancerPollInterval = 30 * time.Second
|
|
|
|
LargeClusterMinNodesNumber = 100
|
|
|
|
// Don't test with more than 3 nodes.
|
|
// Many tests create an endpoint per node, in large clusters, this is
|
|
// resource and time intensive.
|
|
MaxNodesForEndpointsTests = 3
|
|
|
|
// ServiceTestTimeout is used for most polling/waiting activities
|
|
ServiceTestTimeout = 60 * time.Second
|
|
)
|
|
|
|
// This should match whatever the default/configured range is
|
|
var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
|
|
|
|
// A test jig to help service testing.
|
|
type ServiceTestJig struct {
|
|
ID string
|
|
Name string
|
|
Client clientset.Interface
|
|
Labels map[string]string
|
|
}
|
|
|
|
// NewServiceTestJig allocates and inits a new ServiceTestJig.
|
|
func NewServiceTestJig(client clientset.Interface, name string) *ServiceTestJig {
|
|
j := &ServiceTestJig{}
|
|
j.Client = client
|
|
j.Name = name
|
|
j.ID = j.Name + "-" + string(uuid.NewUUID())
|
|
j.Labels = map[string]string{"testid": j.ID}
|
|
|
|
return j
|
|
}
|
|
|
|
// newServiceTemplate returns the default v1.Service template for this jig, but
|
|
// does not actually create the Service. The default Service has the same name
|
|
// as the jig and exposes the given port.
|
|
func (j *ServiceTestJig) newServiceTemplate(namespace string, proto v1.Protocol, port int32) *v1.Service {
|
|
service := &v1.Service{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
Namespace: namespace,
|
|
Name: j.Name,
|
|
Labels: j.Labels,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: j.Labels,
|
|
Ports: []v1.ServicePort{
|
|
{
|
|
Protocol: proto,
|
|
Port: port,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
return service
|
|
}
|
|
|
|
// CreateTCPServiceWithPort creates a new TCP Service with given port based on the
|
|
// jig's defaults. Callers can provide a function to tweak the Service object before
|
|
// it is created.
|
|
func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *v1.Service), port int32) *v1.Service {
|
|
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, port)
|
|
if tweak != nil {
|
|
tweak(svc)
|
|
}
|
|
result, err := j.Client.Core().Services(namespace).Create(svc)
|
|
if err != nil {
|
|
Failf("Failed to create TCP Service %q: %v", svc.Name, err)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// CreateTCPServiceOrFail creates a new TCP Service based on the jig's
|
|
// defaults. Callers can provide a function to tweak the Service object before
|
|
// it is created.
|
|
func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
|
|
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, 80)
|
|
if tweak != nil {
|
|
tweak(svc)
|
|
}
|
|
result, err := j.Client.Core().Services(namespace).Create(svc)
|
|
if err != nil {
|
|
Failf("Failed to create TCP Service %q: %v", svc.Name, err)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// CreateUDPServiceOrFail creates a new UDP Service based on the jig's
|
|
// defaults. Callers can provide a function to tweak the Service object before
|
|
// it is created.
|
|
func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
|
|
svc := j.newServiceTemplate(namespace, v1.ProtocolUDP, 80)
|
|
if tweak != nil {
|
|
tweak(svc)
|
|
}
|
|
result, err := j.Client.Core().Services(namespace).Create(svc)
|
|
if err != nil {
|
|
Failf("Failed to create UDP Service %q: %v", svc.Name, err)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) {
|
|
ingressIP := ""
|
|
svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) {
|
|
for _, ing := range s.Status.LoadBalancer.Ingress {
|
|
if ing.IP != "" {
|
|
ingressIP = ing.IP
|
|
}
|
|
}
|
|
s.Spec.Type = newType
|
|
s.Spec.Ports[0].NodePort = 0
|
|
})
|
|
if ingressIP != "" {
|
|
j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
|
|
}
|
|
}
|
|
|
|
// CreateOnlyLocalNodePortService creates a loadbalancer service and sanity checks its
|
|
// nodePort. If createPod is true, it also creates an RC with 1 replica of
|
|
// the standard netexec container used everywhere in this test.
|
|
func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service {
|
|
By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and annotation for local-traffic-only")
|
|
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
|
|
svc.Spec.Type = v1.ServiceTypeNodePort
|
|
svc.ObjectMeta.Annotations = map[string]string{
|
|
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
|
|
svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: 80}}
|
|
})
|
|
|
|
if createPod {
|
|
By("creating a pod to be part of the service " + serviceName)
|
|
j.RunOrFail(namespace, nil)
|
|
}
|
|
j.SanityCheckService(svc, v1.ServiceTypeNodePort)
|
|
return svc
|
|
}
|
|
|
|
// CreateOnlyLocalLoadBalancerService creates a loadbalancer service and waits for it to
|
|
// acquire an ingress IP. If createPod is true, it also creates an RC with 1
|
|
// replica of the standard netexec container used everywhere in this test.
|
|
func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
|
|
tweak func(svc *v1.Service)) *v1.Service {
|
|
By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and annotation for local-traffic-only")
|
|
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
|
|
svc.Spec.Type = v1.ServiceTypeLoadBalancer
|
|
// We need to turn affinity off for our LB distribution tests
|
|
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
|
|
svc.ObjectMeta.Annotations = map[string]string{
|
|
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
|
|
if tweak != nil {
|
|
tweak(svc)
|
|
}
|
|
})
|
|
|
|
if createPod {
|
|
By("creating a pod to be part of the service " + serviceName)
|
|
j.RunOrFail(namespace, nil)
|
|
}
|
|
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
|
|
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
|
|
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
|
|
return svc
|
|
}
|
|
|
|
func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
|
|
for j := range node.Status.Addresses {
|
|
nodeAddress := &node.Status.Addresses[j]
|
|
if nodeAddress.Type == addressType {
|
|
ips = append(ips, nodeAddress.Address)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func CollectAddresses(nodes *v1.NodeList, addressType v1.NodeAddressType) []string {
|
|
ips := []string{}
|
|
for i := range nodes.Items {
|
|
ips = append(ips, GetNodeAddresses(&nodes.Items[i], addressType)...)
|
|
}
|
|
return ips
|
|
}
|
|
|
|
func GetNodePublicIps(c clientset.Interface) ([]string, error) {
|
|
nodes := GetReadySchedulableNodesOrDie(c)
|
|
|
|
ips := CollectAddresses(nodes, v1.NodeExternalIP)
|
|
if len(ips) == 0 {
|
|
ips = CollectAddresses(nodes, v1.NodeLegacyHostIP)
|
|
}
|
|
return ips, nil
|
|
}
|
|
|
|
func PickNodeIP(c clientset.Interface) string {
|
|
publicIps, err := GetNodePublicIps(c)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
if len(publicIps) == 0 {
|
|
Failf("got unexpected number (%d) of public IPs", len(publicIps))
|
|
}
|
|
ip := publicIps[0]
|
|
return ip
|
|
}
|
|
|
|
// GetEndpointNodes returns a map of nodenames:external-ip on which the
|
|
// endpoints of the given Service are running.
|
|
func (j *ServiceTestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
|
|
nodes := j.GetNodes(MaxNodesForEndpointsTests)
|
|
endpoints, err := j.Client.Core().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
|
|
}
|
|
if len(endpoints.Subsets) == 0 {
|
|
Failf("Endpoint has no subsets, cannot determine node addresses.")
|
|
}
|
|
epNodes := sets.NewString()
|
|
for _, ss := range endpoints.Subsets {
|
|
for _, e := range ss.Addresses {
|
|
if e.NodeName != nil {
|
|
epNodes.Insert(*e.NodeName)
|
|
}
|
|
}
|
|
}
|
|
nodeMap := map[string][]string{}
|
|
for _, n := range nodes.Items {
|
|
if epNodes.Has(n.Name) {
|
|
nodeMap[n.Name] = GetNodeAddresses(&n, v1.NodeExternalIP)
|
|
}
|
|
}
|
|
return nodeMap
|
|
}
|
|
|
|
// getNodes returns the first maxNodesForTest nodes. Useful in large clusters
|
|
// where we don't eg: want to create an endpoint per node.
|
|
func (j *ServiceTestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) {
|
|
nodes = GetReadySchedulableNodesOrDie(j.Client)
|
|
if len(nodes.Items) <= maxNodesForTest {
|
|
maxNodesForTest = len(nodes.Items)
|
|
}
|
|
nodes.Items = nodes.Items[:maxNodesForTest]
|
|
return nodes
|
|
}
|
|
|
|
func (j *ServiceTestJig) GetNodesNames(maxNodesForTest int) []string {
|
|
nodes := j.GetNodes(maxNodesForTest)
|
|
nodesNames := []string{}
|
|
for _, node := range nodes.Items {
|
|
nodesNames = append(nodesNames, node.Name)
|
|
}
|
|
return nodesNames
|
|
}
|
|
|
|
func (j *ServiceTestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) {
|
|
err := wait.PollImmediate(Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
|
|
endpoints, err := j.Client.Core().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
|
|
if err != nil {
|
|
Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
|
|
return false, nil
|
|
}
|
|
// TODO: Handle multiple endpoints
|
|
if len(endpoints.Subsets[0].Addresses) == 0 {
|
|
Logf("Expected Ready endpoints - found none")
|
|
return false, nil
|
|
}
|
|
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
|
|
Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName)
|
|
if epHostName != nodeName {
|
|
Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
})
|
|
ExpectNoError(err)
|
|
}
|
|
|
|
func (j *ServiceTestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
|
|
if svc.Spec.Type != svcType {
|
|
Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
|
|
}
|
|
expectNodePorts := false
|
|
if svcType != v1.ServiceTypeClusterIP {
|
|
expectNodePorts = true
|
|
}
|
|
for i, port := range svc.Spec.Ports {
|
|
hasNodePort := (port.NodePort != 0)
|
|
if hasNodePort != expectNodePorts {
|
|
Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
|
|
}
|
|
if hasNodePort {
|
|
if !ServiceNodePortRange.Contains(int(port.NodePort)) {
|
|
Failf("out-of-range nodePort (%d) for service", port.NodePort)
|
|
}
|
|
}
|
|
}
|
|
expectIngress := false
|
|
if svcType == v1.ServiceTypeLoadBalancer {
|
|
expectIngress = true
|
|
}
|
|
hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
|
|
if hasIngress != expectIngress {
|
|
Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
|
|
}
|
|
if hasIngress {
|
|
for i, ing := range svc.Status.LoadBalancer.Ingress {
|
|
if ing.IP == "" && ing.Hostname == "" {
|
|
Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// UpdateService fetches a service, calls the update function on it, and
|
|
// then attempts to send the updated service. It tries up to 3 times in the
|
|
// face of timeouts and conflicts.
|
|
func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*v1.Service)) (*v1.Service, error) {
|
|
for i := 0; i < 3; i++ {
|
|
service, err := j.Client.Core().Services(namespace).Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to get Service %q: %v", name, err)
|
|
}
|
|
update(service)
|
|
service, err = j.Client.Core().Services(namespace).Update(service)
|
|
if err == nil {
|
|
return service, nil
|
|
}
|
|
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
|
|
return nil, fmt.Errorf("Failed to update Service %q: %v", name, err)
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("Too many retries updating Service %q", name)
|
|
}
|
|
|
|
// UpdateServiceOrFail fetches a service, calls the update function on it, and
|
|
// then attempts to send the updated service. It tries up to 3 times in the
|
|
// face of timeouts and conflicts.
|
|
func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*v1.Service)) *v1.Service {
|
|
svc, err := j.UpdateService(namespace, name, update)
|
|
if err != nil {
|
|
Failf(err.Error())
|
|
}
|
|
return svc
|
|
}
|
|
|
|
func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *v1.Service {
|
|
var err error
|
|
var service *v1.Service
|
|
for i := 1; i < ServiceNodePortRange.Size; i++ {
|
|
offs1 := initial - ServiceNodePortRange.Base
|
|
offs2 := (offs1 + i) % ServiceNodePortRange.Size
|
|
newPort := ServiceNodePortRange.Base + offs2
|
|
service, err = j.UpdateService(namespace, name, func(s *v1.Service) {
|
|
s.Spec.Ports[0].NodePort = int32(newPort)
|
|
})
|
|
if err != nil && strings.Contains(err.Error(), "provided port is already allocated") {
|
|
Logf("tried nodePort %d, but it is in use, will try another", newPort)
|
|
continue
|
|
}
|
|
// Otherwise err was nil or err was a real error
|
|
break
|
|
}
|
|
if err != nil {
|
|
Failf("Could not change the nodePort: %v", err)
|
|
}
|
|
return service
|
|
}
|
|
|
|
func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *v1.Service {
|
|
var service *v1.Service
|
|
Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
|
|
pollFunc := func() (bool, error) {
|
|
svc, err := j.Client.Core().Services(namespace).Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if len(svc.Status.LoadBalancer.Ingress) > 0 {
|
|
service = svc
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
if err := wait.PollImmediate(Poll, timeout, pollFunc); err != nil {
|
|
Failf("Timeout waiting for service %q to have a load balancer", name)
|
|
}
|
|
return service
|
|
}
|
|
|
|
func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *v1.Service {
|
|
// TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
|
|
defer func() {
|
|
if err := EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
|
|
Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
|
|
}
|
|
}()
|
|
|
|
var service *v1.Service
|
|
Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
|
|
pollFunc := func() (bool, error) {
|
|
svc, err := j.Client.Core().Services(namespace).Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if len(svc.Status.LoadBalancer.Ingress) == 0 {
|
|
service = svc
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
if err := wait.PollImmediate(Poll, timeout, pollFunc); err != nil {
|
|
Failf("Timeout waiting for service %q to have no load balancer", name)
|
|
}
|
|
return service
|
|
}
|
|
|
|
// newRCTemplate returns the default v1.ReplicationController object for
|
|
// this jig, but does not actually create the RC. The default RC has the same
|
|
// name as the jig and runs the "netexec" container.
|
|
func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController {
|
|
rc := &v1.ReplicationController{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
Namespace: namespace,
|
|
Name: j.Name,
|
|
Labels: j.Labels,
|
|
},
|
|
Spec: v1.ReplicationControllerSpec{
|
|
Replicas: func(i int) *int32 { x := int32(i); return &x }(1),
|
|
Selector: j.Labels,
|
|
Template: &v1.PodTemplateSpec{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
Labels: j.Labels,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Name: "netexec",
|
|
Image: "gcr.io/google_containers/netexec:1.7",
|
|
Args: []string{"--http-port=80", "--udp-port=80"},
|
|
ReadinessProbe: &v1.Probe{
|
|
PeriodSeconds: 3,
|
|
Handler: v1.Handler{
|
|
HTTPGet: &v1.HTTPGetAction{
|
|
Port: intstr.FromInt(80),
|
|
Path: "/hostName",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
TerminationGracePeriodSeconds: new(int64),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
return rc
|
|
}
|
|
|
|
// RunOrFail creates a ReplicationController and Pod(s) and waits for the
|
|
// Pod(s) to be running. Callers can provide a function to tweak the RC object
|
|
// before it is created.
|
|
func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.ReplicationController)) *v1.ReplicationController {
|
|
rc := j.newRCTemplate(namespace)
|
|
if tweak != nil {
|
|
tweak(rc)
|
|
}
|
|
result, err := j.Client.Core().ReplicationControllers(namespace).Create(rc)
|
|
if err != nil {
|
|
Failf("Failed to created RC %q: %v", rc.Name, err)
|
|
}
|
|
pods, err := j.waitForPodsCreated(namespace, int(*(rc.Spec.Replicas)))
|
|
if err != nil {
|
|
Failf("Failed to create pods: %v", err)
|
|
}
|
|
if err := j.waitForPodsReady(namespace, pods); err != nil {
|
|
Failf("Failed waiting for pods to be running: %v", err)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
|
|
timeout := 2 * time.Minute
|
|
// List the pods, making sure we observe all the replicas.
|
|
label := labels.SelectorFromSet(labels.Set(j.Labels))
|
|
Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
|
|
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
|
|
options := v1.ListOptions{LabelSelector: label.String()}
|
|
pods, err := j.Client.Core().Pods(namespace).List(options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
found := []string{}
|
|
for _, pod := range pods.Items {
|
|
if pod.DeletionTimestamp != nil {
|
|
continue
|
|
}
|
|
found = append(found, pod.Name)
|
|
}
|
|
if len(found) == replicas {
|
|
Logf("Found all %d pods", replicas)
|
|
return found, nil
|
|
}
|
|
Logf("Found %d/%d pods - will retry", len(found), replicas)
|
|
}
|
|
return nil, fmt.Errorf("Timeout waiting for %d pods to be created", replicas)
|
|
}
|
|
|
|
func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error {
|
|
timeout := 2 * time.Minute
|
|
if !CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
|
|
return fmt.Errorf("Timeout waiting for %d pods to be ready", len(pods))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// newNetexecPodSpec returns the pod spec of netexec pod
|
|
func newNetexecPodSpec(podName string, httpPort, udpPort int32, hostNetwork bool) *v1.Pod {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
Name: podName,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Name: "netexec",
|
|
Image: NetexecImageName,
|
|
Command: []string{
|
|
"/netexec",
|
|
fmt.Sprintf("--http-port=%d", httpPort),
|
|
fmt.Sprintf("--udp-port=%d", udpPort),
|
|
},
|
|
Ports: []v1.ContainerPort{
|
|
{
|
|
Name: "http",
|
|
ContainerPort: httpPort,
|
|
},
|
|
{
|
|
Name: "udp",
|
|
ContainerPort: udpPort,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
HostNetwork: hostNetwork,
|
|
},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
func (j *ServiceTestJig) LaunchNetexecPodOnNode(f *Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) {
|
|
Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name)
|
|
pod := newNetexecPodSpec(podName, httpPort, udpPort, hostNetwork)
|
|
pod.Spec.NodeName = nodeName
|
|
pod.ObjectMeta.Labels = j.Labels
|
|
podClient := f.ClientSet.Core().Pods(f.Namespace.Name)
|
|
_, err := podClient.Create(pod)
|
|
ExpectNoError(err)
|
|
ExpectNoError(f.WaitForPodRunning(podName))
|
|
Logf("Netexec pod %q in namespace %q running", pod.Name, f.Namespace.Name)
|
|
}
|
|
|
|
// newEchoServerPodSpec returns the pod spec of echo server pod
|
|
func newEchoServerPodSpec(podName string) *v1.Pod {
|
|
port := 8080
|
|
pod := &v1.Pod{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
Name: podName,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Name: "echoserver",
|
|
Image: "gcr.io/google_containers/echoserver:1.4",
|
|
Ports: []v1.ContainerPort{{ContainerPort: int32(port)}},
|
|
},
|
|
},
|
|
RestartPolicy: v1.RestartPolicyNever,
|
|
},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
// LaunchEchoserverPodOnNode launches a pod serving http on port 8080 to act
|
|
// as the target for source IP preservation test. The client's source ip would
|
|
// be echoed back by the web server.
|
|
func (j *ServiceTestJig) LaunchEchoserverPodOnNode(f *Framework, nodeName, podName string) {
|
|
Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name)
|
|
pod := newEchoServerPodSpec(podName)
|
|
pod.Spec.NodeName = nodeName
|
|
pod.ObjectMeta.Labels = j.Labels
|
|
podClient := f.ClientSet.Core().Pods(f.Namespace.Name)
|
|
_, err := podClient.Create(pod)
|
|
ExpectNoError(err)
|
|
ExpectNoError(f.WaitForPodRunning(podName))
|
|
Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
|
|
}
|
|
|
|
func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) {
|
|
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestReachableHTTP(host, port, "/echo?msg=hello", "hello") }); err != nil {
|
|
Failf("Could not reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
|
|
}
|
|
}
|
|
|
|
func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
|
|
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableHTTP(host, port) }); err != nil {
|
|
Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
|
|
}
|
|
}
|
|
|
|
func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
|
|
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestReachableUDP(host, port, "echo hello", "hello") }); err != nil {
|
|
Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
|
|
}
|
|
}
|
|
|
|
func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
|
|
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableUDP(host, port, "echo hello") }); err != nil {
|
|
Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
|
|
}
|
|
}
|
|
|
|
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
|
|
var body bytes.Buffer
|
|
var err error
|
|
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
|
|
result, err := TestReachableHTTPWithContent(host, port, url, "", &body)
|
|
if err != nil {
|
|
Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err)
|
|
return false, nil
|
|
}
|
|
return result, nil
|
|
}); pollErr != nil {
|
|
Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err)
|
|
}
|
|
return body
|
|
}
|
|
|
|
func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
|
|
url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
|
|
if ip == "" || port == 0 {
|
|
Failf("Got empty IP for reachability check (%s)", url)
|
|
return false, fmt.Errorf("Invalid input ip or port")
|
|
}
|
|
Logf("Testing HTTP health check on %v", url)
|
|
resp, err := httpGetNoConnectionPool(url)
|
|
if err != nil {
|
|
Logf("Got error testing for reachability of %s: %v", url, err)
|
|
return false, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if err != nil {
|
|
Logf("Got error reading response from %s: %v", url, err)
|
|
return false, err
|
|
}
|
|
// HealthCheck responder returns 503 for no local endpoints
|
|
if resp.StatusCode == 503 {
|
|
return false, nil
|
|
}
|
|
// HealthCheck responder returns 200 for non-zero local endpoints
|
|
if resp.StatusCode == 200 {
|
|
return true, nil
|
|
}
|
|
return false, fmt.Errorf("Unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
|
|
}
|
|
|
|
func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, tries int) (pass, fail int, statusMsg string) {
|
|
for i := 0; i < tries; i++ {
|
|
success, err := testHTTPHealthCheckNodePort(host, port, request)
|
|
if success {
|
|
pass++
|
|
} else {
|
|
fail++
|
|
}
|
|
statusMsg += fmt.Sprintf("\nAttempt %d Error %v", i, err)
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
return pass, fail, statusMsg
|
|
}
|
|
|
|
// Simple helper class to avoid too much boilerplate in tests
|
|
type ServiceTestFixture struct {
|
|
ServiceName string
|
|
Namespace string
|
|
Client clientset.Interface
|
|
|
|
TestId string
|
|
Labels map[string]string
|
|
|
|
rcs map[string]bool
|
|
services map[string]bool
|
|
Name string
|
|
Image string
|
|
}
|
|
|
|
func NewServerTest(client clientset.Interface, namespace string, serviceName string) *ServiceTestFixture {
|
|
t := &ServiceTestFixture{}
|
|
t.Client = client
|
|
t.Namespace = namespace
|
|
t.ServiceName = serviceName
|
|
t.TestId = t.ServiceName + "-" + string(uuid.NewUUID())
|
|
t.Labels = map[string]string{
|
|
"testid": t.TestId,
|
|
}
|
|
|
|
t.rcs = make(map[string]bool)
|
|
t.services = make(map[string]bool)
|
|
|
|
t.Name = "webserver"
|
|
t.Image = "gcr.io/google_containers/test-webserver:e2e"
|
|
|
|
return t
|
|
}
|
|
|
|
// Build default config for a service (which can then be changed)
|
|
func (t *ServiceTestFixture) BuildServiceSpec() *v1.Service {
|
|
service := &v1.Service{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
Name: t.ServiceName,
|
|
Namespace: t.Namespace,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Selector: t.Labels,
|
|
Ports: []v1.ServicePort{{
|
|
Port: 80,
|
|
TargetPort: intstr.FromInt(80),
|
|
}},
|
|
},
|
|
}
|
|
return service
|
|
}
|
|
|
|
// CreateWebserverRC creates rc-backed pods with the well-known webserver
|
|
// configuration and records it for cleanup.
|
|
func (t *ServiceTestFixture) CreateWebserverRC(replicas int32) *v1.ReplicationController {
|
|
rcSpec := RcByNamePort(t.Name, replicas, t.Image, 80, v1.ProtocolTCP, t.Labels, nil)
|
|
rcAct, err := t.CreateRC(rcSpec)
|
|
if err != nil {
|
|
Failf("Failed to create rc %s: %v", rcSpec.Name, err)
|
|
}
|
|
if err := VerifyPods(t.Client, t.Namespace, t.Name, false, replicas); err != nil {
|
|
Failf("Failed to create %d pods with name %s: %v", replicas, t.Name, err)
|
|
}
|
|
return rcAct
|
|
}
|
|
|
|
// CreateRC creates a replication controller and records it for cleanup.
|
|
func (t *ServiceTestFixture) CreateRC(rc *v1.ReplicationController) (*v1.ReplicationController, error) {
|
|
rc, err := t.Client.Core().ReplicationControllers(t.Namespace).Create(rc)
|
|
if err == nil {
|
|
t.rcs[rc.Name] = true
|
|
}
|
|
return rc, err
|
|
}
|
|
|
|
// Create a service, and record it for cleanup
|
|
func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, error) {
|
|
result, err := t.Client.Core().Services(t.Namespace).Create(service)
|
|
if err == nil {
|
|
t.services[service.Name] = true
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
// Delete a service, and remove it from the cleanup list
|
|
func (t *ServiceTestFixture) DeleteService(serviceName string) error {
|
|
err := t.Client.Core().Services(t.Namespace).Delete(serviceName, nil)
|
|
if err == nil {
|
|
delete(t.services, serviceName)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (t *ServiceTestFixture) Cleanup() []error {
|
|
var errs []error
|
|
for rcName := range t.rcs {
|
|
By("stopping RC " + rcName + " in namespace " + t.Namespace)
|
|
// First, resize the RC to 0.
|
|
old, err := t.Client.Core().ReplicationControllers(t.Namespace).Get(rcName, metav1.GetOptions{})
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
x := int32(0)
|
|
old.Spec.Replicas = &x
|
|
if _, err := t.Client.Core().ReplicationControllers(t.Namespace).Update(old); err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
// TODO(mikedanese): Wait.
|
|
// Then, delete the RC altogether.
|
|
if err := t.Client.Core().ReplicationControllers(t.Namespace).Delete(rcName, nil); err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
|
|
for serviceName := range t.services {
|
|
By("deleting service " + serviceName + " in namespace " + t.Namespace)
|
|
err := t.Client.Core().Services(t.Namespace).Delete(serviceName, nil)
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
|
|
return errs
|
|
}
|
|
|
|
func GetIngressPoint(ing *v1.LoadBalancerIngress) string {
|
|
host := ing.IP
|
|
if host == "" {
|
|
host = ing.Hostname
|
|
}
|
|
return host
|
|
}
|
|
|
|
// UpdateService fetches a service, calls the update function on it,
|
|
// and then attempts to send the updated service. It retries up to 2
|
|
// times in the face of timeouts and conflicts.
|
|
func UpdateService(c clientset.Interface, namespace, serviceName string, update func(*v1.Service)) (*v1.Service, error) {
|
|
var service *v1.Service
|
|
var err error
|
|
for i := 0; i < 3; i++ {
|
|
service, err = c.Core().Services(namespace).Get(serviceName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return service, err
|
|
}
|
|
|
|
update(service)
|
|
|
|
service, err = c.Core().Services(namespace).Update(service)
|
|
|
|
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
|
|
return service, err
|
|
}
|
|
}
|
|
return service, err
|
|
}
|
|
|
|
func GetContainerPortsByPodUID(endpoints *v1.Endpoints) PortsByPodUID {
|
|
m := PortsByPodUID{}
|
|
for _, ss := range endpoints.Subsets {
|
|
for _, port := range ss.Ports {
|
|
for _, addr := range ss.Addresses {
|
|
containerPort := port.Port
|
|
hostPort := port.Port
|
|
|
|
// use endpoint annotations to recover the container port in a Mesos setup
|
|
// compare contrib/mesos/pkg/service/endpoints_controller.syncService
|
|
key := fmt.Sprintf("k8s.mesosphere.io/containerPort_%s_%s_%d", port.Protocol, addr.IP, hostPort)
|
|
mesosContainerPortString := endpoints.Annotations[key]
|
|
if mesosContainerPortString != "" {
|
|
mesosContainerPort, err := strconv.Atoi(mesosContainerPortString)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
containerPort = int32(mesosContainerPort)
|
|
Logf("Mapped mesos host port %d to container port %d via annotation %s=%s", hostPort, containerPort, key, mesosContainerPortString)
|
|
}
|
|
|
|
// Logf("Found pod %v, host port %d and container port %d", addr.TargetRef.UID, hostPort, containerPort)
|
|
if _, ok := m[addr.TargetRef.UID]; !ok {
|
|
m[addr.TargetRef.UID] = make([]int, 0)
|
|
}
|
|
m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort))
|
|
}
|
|
}
|
|
}
|
|
return m
|
|
}
|
|
|
|
type PortsByPodName map[string][]int
|
|
type PortsByPodUID map[types.UID][]int
|
|
|
|
func translatePodNameToUIDOrFail(c clientset.Interface, ns string, expectedEndpoints PortsByPodName) PortsByPodUID {
|
|
portsByUID := make(PortsByPodUID)
|
|
|
|
for name, portList := range expectedEndpoints {
|
|
pod, err := c.Core().Pods(ns).Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
|
|
}
|
|
portsByUID[pod.ObjectMeta.UID] = portList
|
|
}
|
|
// Logf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns)
|
|
return portsByUID
|
|
}
|
|
|
|
func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) {
|
|
if len(endpoints) != len(expectedEndpoints) {
|
|
// should not happen because we check this condition before
|
|
Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints)
|
|
}
|
|
for podUID := range expectedEndpoints {
|
|
if _, ok := endpoints[podUID]; !ok {
|
|
Failf("endpoint %v not found", podUID)
|
|
}
|
|
if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) {
|
|
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
|
|
}
|
|
sort.Ints(endpoints[podUID])
|
|
sort.Ints(expectedEndpoints[podUID])
|
|
for index := range endpoints[podUID] {
|
|
if endpoints[podUID][index] != expectedEndpoints[podUID][index] {
|
|
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) {
|
|
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
|
|
i := 1
|
|
for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) {
|
|
endpoints, err := c.Core().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
|
|
if err != nil {
|
|
Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err)
|
|
continue
|
|
}
|
|
// Logf("Found endpoints %v", endpoints)
|
|
|
|
portsByPodUID := GetContainerPortsByPodUID(endpoints)
|
|
// Logf("Found port by pod UID %v", portsByPodUID)
|
|
|
|
expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints)
|
|
if len(portsByPodUID) == len(expectedEndpoints) {
|
|
validatePortsOrFail(portsByPodUID, expectedPortsByPodUID)
|
|
Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)",
|
|
serviceName, namespace, expectedEndpoints, time.Since(start))
|
|
return
|
|
}
|
|
|
|
if i%5 == 0 {
|
|
Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start))
|
|
}
|
|
i++
|
|
}
|
|
|
|
if pods, err := c.Core().Pods(v1.NamespaceAll).List(v1.ListOptions{}); err == nil {
|
|
for _, pod := range pods.Items {
|
|
Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
|
|
}
|
|
} else {
|
|
Logf("Can't list pod debug info: %v", err)
|
|
}
|
|
Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout)
|
|
}
|
|
|
|
// StartServeHostnameService creates a replication controller that serves its hostname and a service on top of it.
|
|
func StartServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, port, replicas int) ([]string, string, error) {
|
|
podNames := make([]string, replicas)
|
|
|
|
By("creating service " + name + " in namespace " + ns)
|
|
_, err := c.Core().Services(ns).Create(&v1.Service{
|
|
ObjectMeta: v1.ObjectMeta{
|
|
Name: name,
|
|
},
|
|
Spec: v1.ServiceSpec{
|
|
Ports: []v1.ServicePort{{
|
|
Port: int32(port),
|
|
TargetPort: intstr.FromInt(9376),
|
|
Protocol: "TCP",
|
|
}},
|
|
Selector: map[string]string{
|
|
"name": name,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return podNames, "", err
|
|
}
|
|
|
|
var createdPods []*v1.Pod
|
|
maxContainerFailures := 0
|
|
config := testutils.RCConfig{
|
|
Client: c,
|
|
InternalClient: internalClient,
|
|
Image: "gcr.io/google_containers/serve_hostname:v1.4",
|
|
Name: name,
|
|
Namespace: ns,
|
|
PollInterval: 3 * time.Second,
|
|
Timeout: PodReadyBeforeTimeout,
|
|
Replicas: replicas,
|
|
CreatedPods: &createdPods,
|
|
MaxContainerFailures: &maxContainerFailures,
|
|
}
|
|
err = RunRC(config)
|
|
if err != nil {
|
|
return podNames, "", err
|
|
}
|
|
|
|
if len(createdPods) != replicas {
|
|
return podNames, "", fmt.Errorf("Incorrect number of running pods: %v", len(createdPods))
|
|
}
|
|
|
|
for i := range createdPods {
|
|
podNames[i] = createdPods[i].ObjectMeta.Name
|
|
}
|
|
sort.StringSlice(podNames).Sort()
|
|
|
|
service, err := c.Core().Services(ns).Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return podNames, "", err
|
|
}
|
|
if service.Spec.ClusterIP == "" {
|
|
return podNames, "", fmt.Errorf("Service IP is blank for %v", name)
|
|
}
|
|
serviceIP := service.Spec.ClusterIP
|
|
return podNames, serviceIP, nil
|
|
}
|
|
|
|
func StopServeHostnameService(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string) error {
|
|
if err := DeleteRCAndPods(clientset, internalClientset, ns, name); err != nil {
|
|
return err
|
|
}
|
|
if err := clientset.Core().Services(ns).Delete(name, nil); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// VerifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
|
|
// given host and from within a pod. The host is expected to be an SSH-able node
|
|
// in the cluster. Each pod in the service is expected to echo its name. These
|
|
// names are compared with the given expectedPods list after a sort | uniq.
|
|
func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
|
|
execPodName := CreateExecPodOrFail(c, ns, "execpod-", nil)
|
|
defer func() {
|
|
DeletePodOrFail(c, ns, execPodName)
|
|
}()
|
|
|
|
// Loop a bunch of times - the proxy is randomized, so we want a good
|
|
// chance of hitting each backend at least once.
|
|
buildCommand := func(wget string) string {
|
|
return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s:%d 2>&1 || true; echo; done",
|
|
50*len(expectedPods), wget, serviceIP, servicePort)
|
|
}
|
|
commands := []func() string{
|
|
// verify service from node
|
|
func() string {
|
|
cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
|
|
Logf("Executing cmd %q on host %v", cmd, host)
|
|
result, err := SSH(cmd, host, TestContext.Provider)
|
|
if err != nil || result.Code != 0 {
|
|
LogSSHResult(result)
|
|
Logf("error while SSH-ing to node: %v", err)
|
|
}
|
|
return result.Stdout
|
|
},
|
|
// verify service from pod
|
|
func() string {
|
|
cmd := buildCommand("wget -q -T 1 -O -")
|
|
Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
|
|
// TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
|
|
output, err := RunHostCmd(ns, execPodName, cmd)
|
|
if err != nil {
|
|
Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
|
|
}
|
|
return output
|
|
},
|
|
}
|
|
|
|
expectedEndpoints := sets.NewString(expectedPods...)
|
|
By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
|
|
for _, cmdFunc := range commands {
|
|
passed := false
|
|
gotEndpoints := sets.NewString()
|
|
|
|
// Retry cmdFunc for a while
|
|
for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
|
|
for _, endpoint := range strings.Split(cmdFunc(), "\n") {
|
|
trimmedEp := strings.TrimSpace(endpoint)
|
|
if trimmedEp != "" {
|
|
gotEndpoints.Insert(trimmedEp)
|
|
}
|
|
}
|
|
// TODO: simply checking that the retrieved endpoints is a superset
|
|
// of the expected allows us to ignore intermitten network flakes that
|
|
// result in output like "wget timed out", but these should be rare
|
|
// and we need a better way to track how often it occurs.
|
|
if gotEndpoints.IsSuperset(expectedEndpoints) {
|
|
if !gotEndpoints.Equal(expectedEndpoints) {
|
|
Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
|
|
}
|
|
passed = true
|
|
break
|
|
}
|
|
Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
|
|
}
|
|
if !passed {
|
|
// Sort the lists so they're easier to visually diff.
|
|
exp := expectedEndpoints.List()
|
|
got := gotEndpoints.List()
|
|
sort.StringSlice(exp).Sort()
|
|
sort.StringSlice(got).Sort()
|
|
return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func VerifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error {
|
|
command := fmt.Sprintf(
|
|
"curl -s --connect-timeout 2 http://%s:%d && exit 99", serviceIP, servicePort)
|
|
|
|
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
|
|
result, err := SSH(command, host, TestContext.Provider)
|
|
if err != nil {
|
|
LogSSHResult(result)
|
|
Logf("error while SSH-ing to node: %v", err)
|
|
}
|
|
if result.Code != 99 {
|
|
return nil
|
|
}
|
|
Logf("service still alive - still waiting")
|
|
}
|
|
return fmt.Errorf("waiting for service to be down timed out")
|
|
}
|
|
|
|
func CleanupServiceGCEResources(loadBalancerName string) {
|
|
if pollErr := wait.Poll(5*time.Second, LoadBalancerCleanupTimeout, func() (bool, error) {
|
|
if err := CleanupGCEResources(loadBalancerName); err != nil {
|
|
Logf("Still waiting for glbc to cleanup: %v", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}); pollErr != nil {
|
|
Failf("Failed to cleanup service GCE resources.")
|
|
}
|
|
}
|
|
|
|
func DescribeSvc(ns string) {
|
|
Logf("\nOutput of kubectl describe svc:\n")
|
|
desc, _ := RunKubectl(
|
|
"describe", "svc", fmt.Sprintf("--namespace=%v", ns))
|
|
Logf(desc)
|
|
}
|