Merge pull request #17614 from bprashanth/IngressE2E

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-11-30 20:16:17 -08:00
commit 53a544d980
5 changed files with 319 additions and 46 deletions

View File

@ -5,7 +5,7 @@ metadata:
namespace: kube-system
labels:
k8s-app: glbc
version: v0.5
version: v0.5.1
kubernetes.io/cluster-service: "true"
kubernetes.io/name: "GLBC"
spec:
@ -13,12 +13,12 @@ spec:
replicas: 1
selector:
k8s-app: glbc
version: v0.5
version: v0.5.1
template:
metadata:
labels:
k8s-app: glbc
version: v0.5
version: v0.5.1
name: glbc
kubernetes.io/cluster-service: "true"
spec:
@ -45,13 +45,15 @@ spec:
requests:
cpu: 10m
memory: 20Mi
- image: gcr.io/google_containers/glbc:0.5
- image: gcr.io/google_containers/glbc:0.5.1
livenessProbe:
httpGet:
path: /healthz
port: 8081
scheme: HTTP
initialDelaySeconds: 30
# healthz reaches out to GCE
periodSeconds: 30
timeoutSeconds: 5
name: l7-lb-controller
resources:

View File

@ -365,7 +365,7 @@ GCE_SLOW_TESTS=(
# Tests which are not able to be run in parallel.
GCE_PARALLEL_SKIP_TESTS=(
"GCE\sL7\sLoadBalancer\sController" # TODO: This cannot run in parallel with other L4 tests till quota has been bumped up.
"GCE\sL7\sLoadBalancer\sController" # namespaced watch flakes, issue: #17805
"Nodes\sNetwork"
"MaxPods"
"Resource\susage\sof\ssystem\scontainers"

View File

@ -112,7 +112,7 @@ readonly KUBE_CLIENT_PLATFORMS=(
# arbitrary, but is a reasonable splitting point for 2015
# laptops-versus-not.
#
# If you are using boot2docker, the following seems to work (note
# If you are using boot2docker, the following seems to work (note
# that 12000 rounds to 11G):
# boot2docker down
# VBoxManage modifyvm boot2docker-vm --memory 12000

View File

@ -21,7 +21,9 @@ import (
"fmt"
"net/http"
"os/exec"
"path/filepath"
"sort"
"strings"
"time"
compute "google.golang.org/api/compute/v1"
@ -54,7 +56,7 @@ import (
// - Mismatched service/container port, or endpoints are dead.
var (
appPrefix = "foo-app-"
appPrefix = "app"
pathPrefix = "foo"
testImage = "gcr.io/google_containers/n-way-http:1.0"
httpContainerPort = 8080
@ -62,6 +64,9 @@ var (
expectedLBCreationTime = 7 * time.Minute
expectedLBHealthCheckTime = 7 * time.Minute
// Labels applied throughout, to the RC, the default backend, as a selector etc.
controllerLabels = map[string]string{"name": "glbc"}
// Name of the loadbalancer controller within the cluster addon
lbContainerName = "l7-lb-controller"
@ -77,13 +82,19 @@ var (
lbPollTimeout = 15 * time.Minute
lbPollInterval = 30 * time.Second
// Time required by the loadbalancer to cleanup, proportional to numApps/Ing.
lbCleanupTimeout = 5 * time.Minute
// One can scale this test by tweaking numApps and numIng, the former will
// create more RCs/Services and add them to a single Ingress, while the latter
// will create smaller, more fragmented Ingresses. The numbers 4, 2 are chosen
// arbitrarity, we want to test more than a single Ingress, and it should have
// more than 1 url endpoint going to a service.
numApps = 4
numIng = 2
// will create smaller, more fragmented Ingresses. The numbers 2, 1 are chosen
// arbitrarity, we want to test more than a single endpoint.
numApps = 2
numIng = 1
// GCE only allows names < 64 characters, and the loadbalancer controller inserts
// a single character of padding.
nameLenLimit = 62
)
// timeSlice allows sorting of time.Duration
@ -184,35 +195,22 @@ func gcloudUnmarshal(resource, regex string, out interface{}) {
Failf("Error unmarshalling gcloud output: %v", err)
}
if err := json.Unmarshal([]byte(output), out); err != nil {
Failf("Error unmarshalling gcloud output: %v", err)
Failf("Error unmarshalling gcloud output for %v: %v", resource, err)
}
}
func checkLeakedResources() error {
msg := ""
// Check all resources #16636.
beList := []compute.BackendService{}
gcloudUnmarshal("backend-services", "k8s-be-[0-9]+", &beList)
if len(beList) != 0 {
for _, b := range beList {
msg += fmt.Sprintf("%v\n", b.Name)
}
return fmt.Errorf("Found backend services:\n%v", msg)
func gcloudDelete(resource, name string) {
Logf("Deleting %v: %v", resource, name)
output, err := exec.Command("gcloud", "compute", resource, "delete",
name, fmt.Sprintf("--project=%v", testContext.CloudConfig.ProjectID), "-q").CombinedOutput()
if err != nil {
Logf("Error deleting %v, output: %v\nerror: %+v", resource, string(output), err)
}
fwList := []compute.ForwardingRule{}
gcloudUnmarshal("forwarding-rules", "k8s-fw-.*", &fwList)
if len(fwList) != 0 {
for _, f := range fwList {
msg += fmt.Sprintf("%v\n", f.Name)
}
return fmt.Errorf("Found forwarding rules:\n%v", msg)
}
return nil
}
// kubectlLogLBController logs kubectl debug output for the L7 controller pod.
func kubectlLogLBController(c *client.Client) {
selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": "glbc"}))
func kubectlLogLBController(c *client.Client, ns string) {
selector := labels.SelectorFromSet(labels.Set(controllerLabels))
podList, err := c.Pods(api.NamespaceAll).List(selector, fields.Everything())
if err != nil {
Logf("Cannot log L7 controller output, error listing pods %v", err)
@ -224,16 +222,166 @@ func kubectlLogLBController(c *client.Client) {
}
for _, p := range podList.Items {
Logf("\nLast 100 log lines of %v\n", p.Name)
l, _ := runKubectl("logs", p.Name, fmt.Sprintf("--namespace=%v", api.NamespaceSystem), "-c", lbContainerName, "--tail=100")
l, _ := runKubectl("logs", p.Name, fmt.Sprintf("--namespace=%v", ns), "-c", lbContainerName, "--tail=100")
Logf(l)
}
}
type ingressController struct {
ns string
rcPath string
defaultSvcPath string
UID string
rc *api.ReplicationController
svc *api.Service
c *client.Client
}
func (cont *ingressController) create() {
// TODO: This cop out is because it would be *more* brittle to duplicate all
// the name construction logic from the controller cross-repo. We will not
// need to be so paranoid about leaked resources once we figure out a solution
// for issues like #16337. Currently, all names should fall within 63 chars.
testName := fmt.Sprintf("k8s-fw-foo-app-X-%v--%v", cont.ns, cont.UID)
if len(testName) > nameLenLimit {
Failf("Cannot reliably test the given namespace(%v)/uid(%v), too close to GCE limit of %v",
cont.ns, cont.UID, nameLenLimit)
}
if cont.defaultSvcPath != "" {
svc := svcFromManifest(cont.defaultSvcPath)
svc.Namespace = cont.ns
svc.Labels = controllerLabels
svc.Spec.Selector = controllerLabels
cont.svc = svc
_, err := cont.c.Services(cont.ns).Create(cont.svc)
Expect(err).NotTo(HaveOccurred())
}
rc := rcFromManifest(cont.rcPath)
existingRc, err := cont.c.ReplicationControllers(api.NamespaceSystem).Get(lbContainerName)
Expect(err).NotTo(HaveOccurred())
// Merge the existing spec and new spec. The modifications should not
// manifest as functional changes to the controller. Most importantly, the
// podTemplate shouldn't change (but for the additional test cmd line flags)
// to ensure we test actual cluster functionality across upgrades.
rc.Spec = existingRc.Spec
rc.Name = "glbc"
rc.Namespace = cont.ns
rc.Labels = controllerLabels
rc.Spec.Selector = controllerLabels
rc.Spec.Template.Labels = controllerLabels
rc.Spec.Replicas = 1
// These command line params are only recognized by v0.51 and above.
testArgs := []string{
// Pass namespace uid so the controller will tag resources with it.
fmt.Sprintf("--cluster-uid=%v", cont.UID),
// Tell the controller to delete all resources as it quits.
fmt.Sprintf("--delete-all-on-quit=true"),
// Don't use the default Service from kube-system.
fmt.Sprintf("--default-backend-service=%v/%v", cont.svc.Namespace, cont.svc.Name),
}
for i, c := range rc.Spec.Template.Spec.Containers {
if c.Name == lbContainerName {
rc.Spec.Template.Spec.Containers[i].Args = append(c.Args, testArgs...)
}
}
cont.rc = rc
_, err = cont.c.ReplicationControllers(cont.ns).Create(cont.rc)
Expect(err).NotTo(HaveOccurred())
Expect(waitForRCPodsRunning(cont.c, cont.ns, cont.rc.Name)).NotTo(HaveOccurred())
}
func (cont *ingressController) cleanup(del bool) error {
errMsg := ""
// Ordering is important here because we cannot delete resources that other
// resources hold references to.
fwList := []compute.ForwardingRule{}
gcloudUnmarshal("forwarding-rules", fmt.Sprintf("k8s-fw-.*--%v", cont.UID), &fwList)
if len(fwList) != 0 {
msg := ""
for _, f := range fwList {
msg += fmt.Sprintf("%v\n", f.Name)
if del {
Logf("Deleting forwarding-rule: %v", f.Name)
output, err := exec.Command("gcloud", "compute", "forwarding-rules", "delete",
f.Name, fmt.Sprintf("--project=%v", testContext.CloudConfig.ProjectID), "-q", "--global").CombinedOutput()
if err != nil {
Logf("Error deleting forwarding rules, output: %v\nerror:%v", string(output), err)
}
}
}
errMsg += fmt.Sprintf("\nFound forwarding rules:\n%v", msg)
}
tpList := []compute.TargetHttpProxy{}
gcloudUnmarshal("target-http-proxies", fmt.Sprintf("k8s-tp-.*--%v", cont.UID), &tpList)
if len(tpList) != 0 {
msg := ""
for _, t := range tpList {
msg += fmt.Sprintf("%v\n", t.Name)
if del {
gcloudDelete("target-http-proxies", t.Name)
}
}
errMsg += fmt.Sprintf("Found target proxies:\n%v", msg)
}
umList := []compute.UrlMap{}
gcloudUnmarshal("url-maps", fmt.Sprintf("k8s-um-.*--%v", cont.UID), &umList)
if len(umList) != 0 {
msg := ""
for _, u := range umList {
msg += fmt.Sprintf("%v\n", u.Name)
if del {
gcloudDelete("url-maps", u.Name)
}
}
errMsg += fmt.Sprintf("Found url maps:\n%v", msg)
}
beList := []compute.BackendService{}
gcloudUnmarshal("backend-services", fmt.Sprintf("k8s-be-[0-9]+--%v", cont.UID), &beList)
if len(beList) != 0 {
msg := ""
for _, b := range beList {
msg += fmt.Sprintf("%v\n", b.Name)
if del {
gcloudDelete("backend-services", b.Name)
}
}
errMsg += fmt.Sprintf("Found backend services:\n%v", msg)
}
hcList := []compute.HttpHealthCheck{}
gcloudUnmarshal("http-health-checks", fmt.Sprintf("k8s-be-[0-9]+--%v", cont.UID), &hcList)
if len(hcList) != 0 {
msg := ""
for _, h := range hcList {
msg += fmt.Sprintf("%v\n", h.Name)
if del {
gcloudDelete("http-health-checks", h.Name)
}
}
errMsg += fmt.Sprintf("Found health check:\n%v", msg)
}
// TODO: Verify instance-groups, issue #16636. Gcloud mysteriously barfs when told
// to unmarshal instance groups into the current vendored gce-client's understanding
// of the struct.
if errMsg == "" {
return nil
}
return fmt.Errorf(errMsg)
}
var _ = Describe("GCE L7 LoadBalancer Controller", func() {
// These variables are initialized after framework's beforeEach.
var ns string
var addonDir string
var client *client.Client
var responseTimes, creationTimes []time.Duration
var ingController *ingressController
framework := Framework{BaseName: "glbc"}
@ -243,31 +391,71 @@ var _ = Describe("GCE L7 LoadBalancer Controller", func() {
framework.beforeEach()
client = framework.Client
ns = framework.Namespace.Name
Expect(waitForRCPodsRunning(client, "kube-system", "glbc")).NotTo(HaveOccurred())
Expect(checkLeakedResources()).NotTo(HaveOccurred())
// Scaled down the existing Ingress controller so it doesn't interfere with the test.
Expect(scaleRCByName(client, api.NamespaceSystem, lbContainerName, 0)).NotTo(HaveOccurred())
addonDir = filepath.Join(
testContext.RepoRoot, "cluster", "addons", "cluster-loadbalancing", "glbc")
nsParts := strings.Split(ns, "-")
ingController = &ingressController{
ns: ns,
// The UID in the namespace was generated by the master, so it's
// global to the cluster.
UID: nsParts[len(nsParts)-1],
rcPath: filepath.Join(addonDir, "glbc-controller.yaml"),
defaultSvcPath: filepath.Join(addonDir, "default-svc.yaml"),
c: client,
}
ingController.create()
// If we somehow get the same namespace uid as someone else in this
// gce project, just back off.
Expect(ingController.cleanup(false)).NotTo(HaveOccurred())
responseTimes = []time.Duration{}
creationTimes = []time.Duration{}
})
AfterEach(func() {
Logf("Average creation time %+v, health check time %+v", creationTimes, responseTimes)
if CurrentGinkgoTestDescription().Failed {
kubectlLogLBController(client)
kubectlLogLBController(client, ns)
Logf("\nOutput of kubectl describe ing:\n")
desc, _ := runKubectl("describe", "ing", fmt.Sprintf("--namespace=%v", ns))
Logf(desc)
}
framework.afterEach()
err := wait.Poll(lbPollInterval, lbPollTimeout, func() (bool, error) {
if err := checkLeakedResources(); err != nil {
// Delete all Ingress, then wait for the controller to cleanup.
ings, err := client.Extensions().Ingress(ns).List(
labels.Everything(), fields.Everything())
if err != nil {
Logf("WARNING: Failed to list ingress: %+v", err)
} else {
for _, ing := range ings.Items {
Logf("Deleting ingress %v/%v", ing.Namespace, ing.Name)
if err := client.Extensions().Ingress(ns).Delete(ing.Name, nil); err != nil {
Logf("WARNING: Failed to delete ingress %v: %v", ing.Name, err)
}
}
}
pollErr := wait.Poll(5*time.Second, lbCleanupTimeout, func() (bool, error) {
if err := ingController.cleanup(false); err != nil {
Logf("Still waiting for glbc to cleanup: %v", err)
return false, nil
}
return true, nil
})
Logf("Average creation time %+v, health check time %+v", creationTimes, responseTimes)
if err != nil {
// TODO: Remove this once issue #17802 is fixed
Expect(scaleRCByName(client, ingController.rc.Namespace, ingController.rc.Name, 0)).NotTo(HaveOccurred())
// If the controller failed to cleanup the test will fail, but we want to cleanup
// resources before that.
if pollErr != nil {
if cleanupErr := ingController.cleanup(true); cleanupErr != nil {
Logf("WARNING: Failed to cleanup resources %v", cleanupErr)
}
Failf("Failed to cleanup GCE L7 resources.")
}
// Restore the cluster Addon.
Expect(scaleRCByName(client, api.NamespaceSystem, lbContainerName, 1)).NotTo(HaveOccurred())
framework.afterEach()
Logf("Successfully verified GCE L7 loadbalancer via Ingress.")
})
@ -324,6 +512,9 @@ var _ = Describe("GCE L7 LoadBalancer Controller", func() {
route := fmt.Sprintf("http://%v%v", address, p.Path)
Logf("Testing route %v host %v with simple GET", route, rules.Host)
// Make sure the service node port is reachable
Expect(curlServiceNodePort(client, ns, p.Backend.ServiceName, int(p.Backend.ServicePort.IntVal))).NotTo(HaveOccurred())
GETStart := time.Now()
var lastBody string
pollErr := wait.Poll(lbPollInterval, lbPollTimeout, func() (bool, error) {
@ -336,8 +527,14 @@ var _ = Describe("GCE L7 LoadBalancer Controller", func() {
return true, nil
})
if pollErr != nil {
Failf("Failed to execute a successful GET within %v, Last response body for %v, host %v:\n%v\n\n%v",
msg := fmt.Sprintf("Failed to execute a successful GET within %v, Last response body for %v, host %v:\n%v\n\n%v\n",
lbPollTimeout, route, rules.Host, lastBody, pollErr)
// Make sure the service node port is still reachable
if err := curlServiceNodePort(client, ns, p.Backend.ServiceName, int(p.Backend.ServicePort.IntVal)); err != nil {
msg += fmt.Sprintf("Also unable to curl service node port: %v", err)
}
Failf(msg)
}
rt := time.Since(GETStart)
By(fmt.Sprintf("Route %v host %v took %v to respond", route, rules.Host, rt))
@ -351,7 +548,7 @@ var _ = Describe("GCE L7 LoadBalancer Controller", func() {
sort.Sort(timeSlice(creationTimes))
perc50 := creationTimes[len(creationTimes)/2]
if perc50 > expectedLBCreationTime {
Failf("Average creation time is too high: %+v", creationTimes)
Logf("WARNING: Average creation time is too high: %+v", creationTimes)
}
if !verifyHTTPGET {
return
@ -359,7 +556,21 @@ var _ = Describe("GCE L7 LoadBalancer Controller", func() {
sort.Sort(timeSlice(responseTimes))
perc50 = responseTimes[len(responseTimes)/2]
if perc50 > expectedLBHealthCheckTime {
Failf("Average startup time is too high: %+v", responseTimes)
Logf("WARNING: Average startup time is too high: %+v", responseTimes)
}
})
})
func curlServiceNodePort(client *client.Client, ns, name string, port int) error {
// TODO: Curl all nodes?
u, err := getNodePortURL(client, ns, name, port)
if err != nil {
return err
}
svcCurlBody, err := simpleGET(http.DefaultClient, u, "")
if err != nil {
return fmt.Errorf("Failed to curl service node port, body: %v\nerror %v", svcCurlBody, err)
}
Logf("Successfully curled service node port, body: %v", svcCurlBody)
return nil
}

View File

@ -2525,3 +2525,63 @@ func lookForString(expectedString string, timeout time.Duration, fn func() strin
err = fmt.Errorf("Failed to find \"%s\", last result: \"%s\"", expectedString, result)
return
}
// getSvcNodePort returns the node port for the given service:port.
func getSvcNodePort(client *client.Client, ns, name string, svcPort int) (int, error) {
svc, err := client.Services(ns).Get(name)
if err != nil {
return 0, err
}
for _, p := range svc.Spec.Ports {
if p.Port == svcPort {
if p.NodePort != 0 {
return p.NodePort, nil
}
}
}
return 0, fmt.Errorf(
"No node port found for service %v, port %v", name, svcPort)
}
// getNodePortURL returns the url to a nodeport Service.
func getNodePortURL(client *client.Client, ns, name string, svcPort int) (string, error) {
nodePort, err := getSvcNodePort(client, ns, name, svcPort)
if err != nil {
return "", err
}
nodes, err := client.Nodes().List(labels.Everything(), fields.Everything())
if err != nil {
return "", err
}
if len(nodes.Items) == 0 {
return "", fmt.Errorf("Unable to list nodes in cluster.")
}
for _, node := range nodes.Items {
for _, address := range node.Status.Addresses {
if address.Type == api.NodeExternalIP {
if address.Address != "" {
return fmt.Sprintf("http://%v:%v", address.Address, nodePort), nil
}
}
}
}
return "", fmt.Errorf("Failed to find external address for service %v", name)
}
// scaleRCByName scales an RC via ns/name lookup. If replicas == 0 it waits till
// none are running, otherwise it does what a synchronous scale operation would do.
func scaleRCByName(client *client.Client, ns, name string, replicas uint) error {
if err := ScaleRC(client, ns, name, replicas, false); err != nil {
return err
}
rc, err := client.ReplicationControllers(ns).Get(name)
if err != nil {
return err
}
if replicas == 0 {
return waitForRCPodsGone(client, rc)
} else {
return waitForPodsWithLabelRunning(
client, ns, labels.SelectorFromSet(labels.Set(rc.Spec.Selector)))
}
}