diff --git a/examples/spark/spark-worker-controller.yaml b/examples/spark/spark-worker-controller.yaml index 33d04b6e50..7587560be5 100644 --- a/examples/spark/spark-worker-controller.yaml +++ b/examples/spark/spark-worker-controller.yaml @@ -3,7 +3,7 @@ apiVersion: v1 metadata: name: spark-worker-controller spec: - replicas: 3 + replicas: 2 selector: component: spark-worker template: @@ -20,3 +20,4 @@ spec: resources: requests: cpu: 100m + diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 7b33f8ebd1..dbaecdbf35 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -156,34 +156,48 @@ var _ = Describe("[Feature:Example]", func() { mkpath := func(file string) string { return filepath.Join(testContext.RepoRoot, "examples", "spark", file) } - serviceJson := mkpath("spark-master-service.json") - masterJson := mkpath("spark-master.json") - driverJson := mkpath("spark-driver.json") - workerControllerJson := mkpath("spark-worker-controller.json") + + // TODO: Add Zepplin and Web UI to this example. + serviceYaml := mkpath("spark-master-service.yaml") + masterYaml := mkpath("spark-master-controller.yaml") + workerControllerYaml := mkpath("spark-worker-controller.yaml") nsFlag := fmt.Sprintf("--namespace=%v", ns) - By("starting master") - runKubectlOrDie("create", "-f", serviceJson, nsFlag) - runKubectlOrDie("create", "-f", masterJson, nsFlag) - runKubectlOrDie("create", "-f", driverJson, nsFlag) - err := waitForPodRunningInNamespace(c, "spark-master", ns) - Expect(err).NotTo(HaveOccurred()) - _, err = lookForStringInLog(ns, "spark-master", "spark-master", "Starting Spark master at", serverStartTimeout) - Expect(err).NotTo(HaveOccurred()) - _, err = lookForStringInLog(ns, "spark-driver", "spark-driver", "Use kubectl exec", serverStartTimeout) - Expect(err).NotTo(HaveOccurred()) + master := func() { + By("starting master") + runKubectlOrDie("create", "-f", serviceYaml, nsFlag) + runKubectlOrDie("create", "-f", masterYaml, nsFlag) - By("waiting for master endpoint") - err = waitForEndpoint(c, ns, "spark-master") - Expect(err).NotTo(HaveOccurred()) + Logf("Now polling for Master startup...") - By("starting workers") - runKubectlOrDie("create", "-f", workerControllerJson, nsFlag) - ScaleRC(c, ns, "spark-worker-controller", 2, true) - forEachPod(c, ns, "name", "spark-worker", func(pod api.Pod) { - _, err := lookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout) + // Only one master pod: But its a natural way to look up pod names. + forEachPod(c, ns, "component", "spark-master", func(pod api.Pod) { + Logf("Now waiting for master to startup in %v", pod.Name) + _, err := lookForStringInLog(ns, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout) + Expect(err).NotTo(HaveOccurred()) + }) + + By("waiting for master endpoint") + err := waitForEndpoint(c, ns, "spark-master") Expect(err).NotTo(HaveOccurred()) - }) + } + worker := func() { + By("starting workers") + Logf("Now starting Workers") + runKubectlOrDie("create", "-f", workerControllerYaml, nsFlag) + + // For now, scaling is orthogonal to the core test. + // ScaleRC(c, ns, "spark-worker-controller", 2, true) + + Logf("Now polling for worker startup...") + forEachPod(c, ns, "component", "spark-worker", func(pod api.Pod) { + _, err := lookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout) + Expect(err).NotTo(HaveOccurred()) + }) + } + // Run the worker verification after we turn up the master. + defer worker() + master() }) })