Merge pull request #21626 from jayunit100/examples-spark-fix

Fix broken spark tests to use yaml files, 1 worker, declare DNS depe…
pull/6/head
Alex Robinson 2016-02-24 16:30:19 -08:00
commit c031697848
2 changed files with 39 additions and 24 deletions

View File

@ -3,7 +3,7 @@ apiVersion: v1
metadata:
name: spark-worker-controller
spec:
replicas: 3
replicas: 2
selector:
component: spark-worker
template:
@ -20,3 +20,4 @@ spec:
resources:
requests:
cpu: 100m

View File

@ -156,34 +156,48 @@ var _ = Describe("[Feature:Example]", func() {
mkpath := func(file string) string {
return filepath.Join(testContext.RepoRoot, "examples", "spark", file)
}
serviceJson := mkpath("spark-master-service.json")
masterJson := mkpath("spark-master.json")
driverJson := mkpath("spark-driver.json")
workerControllerJson := mkpath("spark-worker-controller.json")
// TODO: Add Zepplin and Web UI to this example.
serviceYaml := mkpath("spark-master-service.yaml")
masterYaml := mkpath("spark-master-controller.yaml")
workerControllerYaml := mkpath("spark-worker-controller.yaml")
nsFlag := fmt.Sprintf("--namespace=%v", ns)
master := func() {
By("starting master")
runKubectlOrDie("create", "-f", serviceJson, nsFlag)
runKubectlOrDie("create", "-f", masterJson, nsFlag)
runKubectlOrDie("create", "-f", driverJson, nsFlag)
err := waitForPodRunningInNamespace(c, "spark-master", ns)
Expect(err).NotTo(HaveOccurred())
_, err = lookForStringInLog(ns, "spark-master", "spark-master", "Starting Spark master at", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
_, err = lookForStringInLog(ns, "spark-driver", "spark-driver", "Use kubectl exec", serverStartTimeout)
runKubectlOrDie("create", "-f", serviceYaml, nsFlag)
runKubectlOrDie("create", "-f", masterYaml, nsFlag)
Logf("Now polling for Master startup...")
// Only one master pod: But its a natural way to look up pod names.
forEachPod(c, ns, "component", "spark-master", func(pod api.Pod) {
Logf("Now waiting for master to startup in %v", pod.Name)
_, err := lookForStringInLog(ns, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
By("waiting for master endpoint")
err = waitForEndpoint(c, ns, "spark-master")
err := waitForEndpoint(c, ns, "spark-master")
Expect(err).NotTo(HaveOccurred())
}
worker := func() {
By("starting workers")
runKubectlOrDie("create", "-f", workerControllerJson, nsFlag)
ScaleRC(c, ns, "spark-worker-controller", 2, true)
forEachPod(c, ns, "name", "spark-worker", func(pod api.Pod) {
Logf("Now starting Workers")
runKubectlOrDie("create", "-f", workerControllerYaml, nsFlag)
// For now, scaling is orthogonal to the core test.
// ScaleRC(c, ns, "spark-worker-controller", 2, true)
Logf("Now polling for worker startup...")
forEachPod(c, ns, "component", "spark-worker", func(pod api.Pod) {
_, err := lookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
}
// Run the worker verification after we turn up the master.
defer worker()
master()
})
})