diff --git a/test/e2e/networking.go b/test/e2e/networking.go index 127e513ec4..d54ee2b263 100644 --- a/test/e2e/networking.go +++ b/test/e2e/networking.go @@ -18,25 +18,25 @@ package e2e import ( "fmt" - "net/http" - "strings" - "time" - + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/util/wait" + "net/http" + "strconv" + "strings" + "time" ) +//versions ~ 1.3 (original RO test), 1.6 uses newer services/tokens,... +const nettestVersion = "1.6" + var _ = Describe("Networking", func() { f := NewFramework("nettest") - - var svcname = "nettest" - BeforeEach(func() { //Assert basic external connectivity. //Since this is not really a test of kubernetes in any way, we @@ -82,7 +82,7 @@ var _ = Describe("Networking", func() { }) // First test because it has no dependencies on variables created later on. - It("should provide unchanging, static URL paths for kubernetes api services [Conformance]", func() { + It("should provide unchanging, static URL paths for kubernetes api services [Conformance].", func() { tests := []struct { path string }{ @@ -102,153 +102,221 @@ var _ = Describe("Networking", func() { } }) - //Now we can proceed with the test. - It("should function for intra-pod communication [Conformance]", func() { + //Each tuple defined in this struct array represents + //a number of services, and a timeout. So for example, + //{1, 300} defines a test where 1 service is created and + //we give it 300 seconds to timeout. This is how we test + //services in parallel... we can create a tuple like {5,400} + //to confirm that services over 5 ports all pass the networking test. + serviceSoakTests := []struct { + service int + timeoutSeconds time.Duration + }{ + //These are very liberal, once this test is running regularly, + //We will DECREASE the timeout value. + //Replace this with --scale constants eventually https://github.com/kubernetes/kubernetes/issues/10479. + {1, time.Duration(100 * time.Second)}, + {3, time.Duration(200 * time.Second)}, //Test that parallel nettests running on different ports complete. + } - By(fmt.Sprintf("Creating a service named %q in namespace %q", svcname, f.Namespace.Name)) - svc, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{ - ObjectMeta: api.ObjectMeta{ - Name: svcname, - Labels: map[string]string{ - "name": svcname, - }, - }, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{ - Protocol: "TCP", - Port: 8080, - TargetPort: util.NewIntOrStringFromInt(8080), - }}, - Selector: map[string]string{ - "name": svcname, - }, - }, - }) - if err != nil { - Failf("unable to create test service named [%s] %v", svc.Name, err) - } + for _, svcSoak := range serviceSoakTests { + //copy to local to avoid range overwriting + timeoutSeconds := svcSoak.timeoutSeconds + serviceNum := svcSoak.service + It(fmt.Sprintf("should function for intrapod communication between all hosts in %v parallel services [Conformance]", serviceNum), + func() { + Logf("running service test with timeout = %v for %v", timeout, serviceNum) + runNetTest(timeoutSeconds, f, makePorts(serviceNum), nettestVersion) + }) + } +}) - // Clean up service - defer func() { - By("Cleaning up the service") - if err = f.Client.Services(f.Namespace.Name).Delete(svc.Name); err != nil { - Failf("unable to delete svc %v: %v", svc.Name, err) - } - }() +//pollPeerStatus will either fail, pass, or continue polling. +//When completed, it will write the service name to the channel provided, thus +//facilitating parallel service testing. +func pollPeerStatus(serviceDoneChannel chan string, f *Framework, svc *api.Service, pollTimeoutSeconds time.Duration) { - By("Creating a webserver (pending) pod on each node") + Logf("Begin polling " + svc.Name) + getDetails := func() ([]byte, error) { + return f.Client.Get(). + Namespace(f.Namespace.Name). + Prefix("proxy"). + Resource("services"). + Name(svc.Name). + Suffix("read"). + DoRaw() + } - nodes, err := f.Client.Nodes().List(labels.Everything(), fields.Everything()) + getStatus := func() ([]byte, error) { + return f.Client.Get(). + Namespace(f.Namespace.Name). + Prefix("proxy"). + Resource("services"). + Name(svc.Name). + Suffix("status"). + DoRaw() + } + + passed := false + + expectNoError(wait.Poll(2*time.Second, pollTimeoutSeconds, func() (bool, error) { + body, err := getStatus() if err != nil { Failf("Failed to list nodes: %v", err) } - // previous tests may have cause failures of some nodes. Let's skip - // 'Not Ready' nodes, just in case (there is no need to fail the test). - filterNodes(nodes, func(node api.Node) bool { - return isNodeReadySetAsExpected(&node, true) - }) - if len(nodes.Items) == 0 { - Failf("No Ready nodes found.") - } - if len(nodes.Items) == 1 { - // in general, the test requires two nodes. But for local development, often a one node cluster - // is created, for simplicity and speed. (see issue #10012). We permit one-node test - // only in some cases - if !providerIs("local") { - Failf(fmt.Sprintf("The test requires two Ready nodes on %s, but found just one.", testContext.Provider)) - } - Logf("Only one ready node is detected. The test has limited scope in such setting. " + - "Rerun it with at least two nodes to get complete coverage.") - } - - podNames := LaunchNetTestPodPerNode(f, nodes, svcname, "1.6") - - // Clean up the pods - defer func() { - By("Cleaning up the webserver pods") - for _, podName := range podNames { - if err = f.Client.Pods(f.Namespace.Name).Delete(podName, nil); err != nil { - Logf("Failed to delete pod %s: %v", podName, err) - } - } - }() - - By("Waiting for the webserver pods to transition to Running state") - for _, podName := range podNames { - err = f.WaitForPodRunning(podName) - Expect(err).NotTo(HaveOccurred()) - } - - By("Waiting for connectivity to be verified") - passed := false - - //once response OK, evaluate response body for pass/fail. - var body []byte - getDetails := func() ([]byte, error) { - return f.Client.Get(). - Namespace(f.Namespace.Name). - Prefix("proxy"). - Resource("services"). - Name(svc.Name). - Suffix("read"). - DoRaw() - } - - getStatus := func() ([]byte, error) { - return f.Client.Get(). - Namespace(f.Namespace.Name). - Prefix("proxy"). - Resource("services"). - Name(svc.Name). - Suffix("status"). - DoRaw() - } - - timeout := time.Now().Add(2 * time.Minute) - for i := 0; !passed && timeout.After(time.Now()); i++ { - time.Sleep(2 * time.Second) - Logf("About to make a proxy status call") - start := time.Now() - body, err = getStatus() - Logf("Proxy status call returned in %v", time.Since(start)) - if err != nil { - Logf("Attempt %v: service/pod still starting. (error: '%v')", i, err) - continue - } - // Finally, we pass/fail the test based on if the container's response body, as to whether or not it was able to find peers. - switch { - case string(body) == "pass": - Logf("Passed on attempt %v. Cleaning up.", i) - passed = true - case string(body) == "running": - Logf("Attempt %v: test still running", i) - case string(body) == "fail": - if body, err = getDetails(); err != nil { - Failf("Failed on attempt %v. Cleaning up. Error reading details: %v", i, err) - } else { - Failf("Failed on attempt %v. Cleaning up. Details:\n%s", i, string(body)) - } - case strings.Contains(string(body), "no endpoints available"): - Logf("Attempt %v: waiting on service/endpoints", i) - default: - Logf("Unexpected response:\n%s", body) - } - } - - if !passed { + // Finally, we pass/fail the test based on if the container's response body, as to whether or not it was able to find peers. + switch { + case string(body) == "pass": + passed = true + return true, nil + case string(body) == "running": + case string(body) == "fail": if body, err = getDetails(); err != nil { - Failf("Timed out. Cleaning up. Error reading details: %v", err) + Failf("Failed on attempt. Error reading details: %v", err) + return false, err } else { - Failf("Timed out. Cleaning up. Details:\n%s", string(body)) + Failf("Failed on attempt. Details:\n%s", string(body)) + return false, nil } + case strings.Contains(string(body), "no endpoints available"): + Logf("Attempt: waiting on service/endpoints") + default: + Logf("Unexpected response:\n%s", body) } - Expect(string(body)).To(Equal("pass")) + return false, nil + })) + + if !passed { + if body, err := getDetails(); err != nil { + Failf("Timed out. Major error : Couldn't read service details: %v", err) + } else { + Failf("Timed out. Service details :\n%s", string(body)) + } + } + serviceDoneChannel <- svc.Name +} + +//runNetTest Creates a single pod on each host which serves +//on a unique port in the cluster. It then binds a service to +//that port, so that there are "n" nodes to balance traffic to - +//finally, each node reaches out to ping every other node in +//the cluster on the given port. +//The more ports given, the more services will be spun up, +//i.e. one service per port. +//To test basic pod networking, send a single port. +//To soak test the services, we can send a range (i.e. 8000-9000). +func runNetTest(timeoutSeconds time.Duration, f *Framework, ports []int, nettestVersion string) { + nodes, err := f.Client.Nodes().List(labels.Everything(), fields.Everything()) + expectNoError(err, "Failed to list nodes") + + // previous tests may have caused failures of some nodes. Let's skip + // 'Not Ready' nodes, just in case (there is no need to fail the test). + filterNodes(nodes, func(node api.Node) bool { + return isNodeReadySetAsExpected(&node, true) }) -}) + if len(nodes.Items) == 0 { + Failf("No Ready nodes found.") + } + if len(nodes.Items) == 1 { + // in general, the test requires two nodes. But for local development, often a one node cluster + // is created, for simplicity and speed. (see issue #10012). We permit one-node test + // only in some cases + if !providerIs("local") { + Failf("The test requires two Ready nodes on %s, but found just one.", testContext.Provider) + } + Logf("Only one ready node is detected. The test has limited scope in such setting. " + + "Rerun it with at least two nodes to get complete coverage.") + } -func LaunchNetTestPodPerNode(f *Framework, nodes *api.NodeList, name, version string) []string { + portCompleteChannel := make(chan string, len(ports)) + + for p := range ports { + go func(nodes *api.NodeList, port int) { + var svcname = fmt.Sprintf("nettest-%v", port) + + defer GinkgoRecover() + + By(fmt.Sprintf("creating a service named %q in namespace %q", svcname, f.Namespace.Name)) + svc, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: svcname, + Labels: map[string]string{ + "name": svcname, + }, + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Protocol: "TCP", + Port: port, + TargetPort: util.NewIntOrStringFromInt(port), + }}, + Selector: map[string]string{ + "name": svcname, + }, + }, + }) + + if err != nil { + Failf("unable to create test service named [%s] %v on port %v", svc.Name, err, port) + } else { + Logf("Created service successfully [%s]", svc.Name) + } + + // Clean up service + defer func() { + By("Cleaning up the service") + if err := f.Client.Services(f.Namespace.Name).Delete(svc.Name); err != nil { + Failf("unable to delete svc %v: %v", svc.Name, err) + } + }() + + Logf("launching pod per node.....") + podNames := launchNetTestPodPerNode(port, nettestVersion, f, nodes, svcname) + // Clean up the pods + defer func() { + By("Cleaning up the webserver pods") + for _, podName := range podNames { + if err = f.Client.Pods(f.Namespace.Name).Delete(podName, nil); err != nil { + Logf("Failed to delete pod %s: %v", podName, err) + } + } + }() + + Logf("Launched test pods for %v", port) + By("waiting for all of the webserver pods to transition to Running + reaching the Passing state.") + + for _, podName := range podNames { + err = f.WaitForPodRunning(podName) + Expect(err).NotTo(HaveOccurred()) + By(fmt.Sprintf("waiting for connectivity to be verified [ port = %v ] ", port)) + //once response OK, evaluate response body for pass/fail. + pollPeerStatus(portCompleteChannel, f, svc, timeoutSeconds) + } + + Logf("Finished test pods for %v", port) + }(nodes, ports[p]) + } + //now wait for the all X nettests to complete... + for pReturned := range ports { + Logf("Waiting on ports to report back. So far %v have been discovered...", pReturned) + Logf("... Another service has successfully been discovered: %v ( %v ) ", pReturned, <-portCompleteChannel) + } + Logf("Completed test on %v port/services", len(ports)) +} + +//makePorts makes a bunch of ports from 8080->8080+n +func makePorts(n int) []int { + m := make([]int, n) + for i := 0; i < n; i++ { + m[i] = 8080 + i + } + return m +} + +//launchNetTestPodPerNode launches nettest pods, and returns their names. +func launchNetTestPodPerNode(port int, version string, f *Framework, nodes *api.NodeList, name string) []string { podNames := []string{} totalPods := len(nodes.Items) @@ -269,12 +337,13 @@ func LaunchNetTestPodPerNode(f *Framework, nodes *api.NodeList, name, version st Name: "webserver", Image: "gcr.io/google_containers/nettest:" + version, Args: []string{ + "-port=" + strconv.Itoa(port), "-service=" + name, //peers >= totalPods should be asserted by the container. //the nettest container finds peers by looking up list of svc endpoints. fmt.Sprintf("-peers=%d", totalPods), "-namespace=" + f.Namespace.Name}, - Ports: []api.ContainerPort{{ContainerPort: 8080}}, + Ports: []api.ContainerPort{{ContainerPort: port}}, }, }, NodeName: node.Name,