From 4506744def258670a0c1eb469806cf3fa4f75596 Mon Sep 17 00:00:00 2001 From: Kenneth Owens Date: Thu, 9 Feb 2017 14:05:47 -0800 Subject: [PATCH] Adds StatefulSet upgrade tests and moves common functionality into the framework package. This removes the potential for cyclic dependencies while allowing for code reuse. --- test/e2e/cluster_upgrade.go | 1 + test/e2e/examples.go | 2 +- test/e2e/framework/BUILD | 3 + test/e2e/framework/statefulset_utils.go | 558 +++++++++++++++++++++ test/e2e/network_partition.go | 18 +- test/e2e/statefulset.go | 625 +++--------------------- test/e2e/upgrades/BUILD | 3 + test/e2e/upgrades/statefulset.go | 99 ++++ 8 files changed, 749 insertions(+), 560 deletions(-) create mode 100644 test/e2e/framework/statefulset_utils.go create mode 100644 test/e2e/upgrades/statefulset.go diff --git a/test/e2e/cluster_upgrade.go b/test/e2e/cluster_upgrade.go index 479c0f9640..8c7a666211 100644 --- a/test/e2e/cluster_upgrade.go +++ b/test/e2e/cluster_upgrade.go @@ -32,6 +32,7 @@ import ( var upgradeTests = []upgrades.Test{ &upgrades.ServiceUpgradeTest{}, &upgrades.SecretUpgradeTest{}, + &upgrades.StatefulSetUpgradeTest{}, &upgrades.DeploymentUpgradeTest{}, } diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 66a8aab305..c44f9df0d4 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -330,7 +330,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { } }) // using out of statefulset e2e as deleting pvc is a pain - deleteAllStatefulSets(c, ns) + framework.DeleteAllStatefulSets(c, ns) }) }) diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 835a7a8fb6..44a6610a18 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -27,6 +27,7 @@ go_library( "pods.go", "resource_usage_gatherer.go", "service_util.go", + "statefulset_utils.go", "test_context.go", "util.go", ], @@ -86,6 +87,7 @@ go_library( "//vendor:google.golang.org/api/compute/v1", "//vendor:google.golang.org/api/googleapi", "//vendor:k8s.io/apimachinery/pkg/api/errors", + "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1/unstructured", "//vendor:k8s.io/apimachinery/pkg/fields", @@ -102,6 +104,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/apimachinery/pkg/util/validation", "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/apimachinery/pkg/util/yaml", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/discovery", "//vendor:k8s.io/client-go/dynamic", diff --git a/test/e2e/framework/statefulset_utils.go b/test/e2e/framework/statefulset_utils.go new file mode 100644 index 0000000000..3efaa01bc2 --- /dev/null +++ b/test/e2e/framework/statefulset_utils.go @@ -0,0 +1,558 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "io/ioutil" + "path/filepath" + "strconv" + "strings" + "time" + + . "github.com/onsi/gomega" + + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + utilyaml "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +const ( + // Poll interval for StatefulSet tests + StatefulSetPoll = 10 * time.Second + // Timeout interval for StatefulSet operations + StatefulSetTimeout = 10 * time.Minute + // Timeout for stateful pods to change state + StatefulPodTimeout = 5 * time.Minute +) + +// CreateStatefulSetService creates a Headless Service with Name name and Selector set to match labels. +func CreateStatefulSetService(name string, labels map[string]string) *v1.Service { + headlessService := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.ServiceSpec{ + Selector: labels, + }, + } + headlessService.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: "TCP"}, + } + headlessService.Spec.ClusterIP = "None" + return headlessService +} + +// StatefulSetFromManifest returns a StatefulSet from a manifest stored in fileName in the Namespace indicated by ns. +func StatefulSetFromManifest(fileName, ns string) *apps.StatefulSet { + var ss apps.StatefulSet + Logf("Parsing statefulset from %v", fileName) + data, err := ioutil.ReadFile(fileName) + Expect(err).NotTo(HaveOccurred()) + json, err := utilyaml.ToJSON(data) + Expect(err).NotTo(HaveOccurred()) + + Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &ss)).NotTo(HaveOccurred()) + ss.Namespace = ns + if ss.Spec.Selector == nil { + ss.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: ss.Spec.Template.Labels, + } + } + return &ss +} + +// StatefulSetTester is a struct that contains utility methods for testing StatefulSet related functionality. It uses a +// clientset.Interface to communicate with the API server. +type StatefulSetTester struct { + c clientset.Interface +} + +// NewStatefulSetTester creates a StatefulSetTester that uses c to interact with the API server. +func NewStatefulSetTester(c clientset.Interface) *StatefulSetTester { + return &StatefulSetTester{c} +} + +// CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create. +func (s *StatefulSetTester) CreateStatefulSet(manifestPath, ns string) *apps.StatefulSet { + mkpath := func(file string) string { + return filepath.Join(TestContext.RepoRoot, manifestPath, file) + } + ss := StatefulSetFromManifest(mkpath("statefulset.yaml"), ns) + + Logf(fmt.Sprintf("creating " + ss.Name + " service")) + RunKubectlOrDie("create", "-f", mkpath("service.yaml"), fmt.Sprintf("--namespace=%v", ns)) + + Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector)) + RunKubectlOrDie("create", "-f", mkpath("statefulset.yaml"), fmt.Sprintf("--namespace=%v", ns)) + s.WaitForRunningAndReady(*ss.Spec.Replicas, ss) + return ss +} + +// CheckMount checks that the mount at mountPath is valid for all Pods in ss. +func (s *StatefulSetTester) CheckMount(ss *apps.StatefulSet, mountPath string) error { + for _, cmd := range []string{ + // Print inode, size etc + fmt.Sprintf("ls -idlh %v", mountPath), + // Print subdirs + fmt.Sprintf("find %v", mountPath), + // Try writing + fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))), + } { + if err := s.ExecInStatefulPods(ss, cmd); err != nil { + return fmt.Errorf("failed to execute %v, error: %v", cmd, err) + } + } + return nil +} + +// ExecInStatefulPods executes cmd in all Pods in ss. If a error occurs it is returned and cmd is not execute in any subsequent Pods. +func (s *StatefulSetTester) ExecInStatefulPods(ss *apps.StatefulSet, cmd string) error { + podList := s.GetPodList(ss) + for _, statefulPod := range podList.Items { + stdout, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd) + Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout) + if err != nil { + return err + } + } + return nil +} + +// CheckHostname verifies that all Pods in ss have the correct Hostname. If the returned error is not nil than verification failed. +func (s *StatefulSetTester) CheckHostname(ss *apps.StatefulSet) error { + cmd := "printf $(hostname)" + podList := s.GetPodList(ss) + for _, statefulPod := range podList.Items { + hostname, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd) + if err != nil { + return err + } + if hostname != statefulPod.Name { + return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name) + } + } + return nil +} + +// Saturate waits for all Pods in ss to become Running and Ready. +func (s *StatefulSetTester) Saturate(ss *apps.StatefulSet) { + var i int32 + for i = 0; i < *(ss.Spec.Replicas); i++ { + Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running") + s.WaitForRunningAndReady(i+1, ss) + Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy") + s.SetHealthy(ss) + } +} + +// DeleteStatefulPodAtIndex deletes the Pod with ordinal index in ss. +func (s *StatefulSetTester) DeleteStatefulPodAtIndex(index int, ss *apps.StatefulSet) { + name := getStatefulSetPodNameAtIndex(index, ss) + noGrace := int64(0) + if err := s.c.Core().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil { + Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err) + } +} + +// VerifyStatefulPodFunc is a func that examines a StatefulSetPod. +type VerifyStatefulPodFunc func(*v1.Pod) + +// VerifyPodAtIndex applies a visitor patter to the Pod at index in ss. verify is is applied to the Pod to "visit" it. +func (s *StatefulSetTester) VerifyPodAtIndex(index int, ss *apps.StatefulSet, verify VerifyStatefulPodFunc) { + name := getStatefulSetPodNameAtIndex(index, ss) + pod, err := s.c.Core().Pods(ss.Namespace).Get(name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name)) + verify(pod) +} + +func getStatefulSetPodNameAtIndex(index int, ss *apps.StatefulSet) string { + // TODO: we won't use "-index" as the name strategy forever, + // pull the name out from an identity mapper. + return fmt.Sprintf("%v-%v", ss.Name, index) +} + +// Scale scales ss to count replicas. +func (s *StatefulSetTester) Scale(ss *apps.StatefulSet, count int32) error { + name := ss.Name + ns := ss.Namespace + s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count }) + + var statefulPodList *v1.PodList + pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) { + statefulPodList = s.GetPodList(ss) + if int32(len(statefulPodList.Items)) == count { + return true, nil + } + return false, nil + }) + if pollErr != nil { + unhealthy := []string{} + for _, statefulPod := range statefulPodList.Items { + delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, v1.IsPodReady(&statefulPod) + if delTs != nil || phase != v1.PodRunning || !readiness { + unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness)) + } + } + return fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy) + } + return nil +} + +// UpdateReplicas updates the replicas of ss to count. +func (s *StatefulSetTester) UpdateReplicas(ss *apps.StatefulSet, count int32) { + s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { ss.Spec.Replicas = &count }) +} + +// Restart scales ss to 0 and then back to its previous number of replicas. +func (s *StatefulSetTester) Restart(ss *apps.StatefulSet) { + oldReplicas := *(ss.Spec.Replicas) + ExpectNoError(s.Scale(ss, 0)) + s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas }) +} + +func (s *StatefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) { + for i := 0; i < 3; i++ { + ss, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{}) + if err != nil { + Failf("failed to get statefulset %q: %v", name, err) + } + update(ss) + ss, err = s.c.Apps().StatefulSets(ns).Update(ss) + if err == nil { + return + } + if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) { + Failf("failed to update statefulset %q: %v", name, err) + } + } + Failf("too many retries draining statefulset %q", name) +} + +// GetPodList gets the current Pods in ss. +func (s *StatefulSetTester) GetPodList(ss *apps.StatefulSet) *v1.PodList { + selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector) + ExpectNoError(err) + podList, err := s.c.Core().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + ExpectNoError(err) + return podList +} + +// ConfirmStatefulPodCount asserts that the current number of Pods in ss is count waiting up to timeout for ss to +// to scale to count. +func (s *StatefulSetTester) ConfirmStatefulPodCount(count int, ss *apps.StatefulSet, timeout time.Duration) { + start := time.Now() + deadline := start.Add(timeout) + for t := time.Now(); t.Before(deadline); t = time.Now() { + podList := s.GetPodList(ss) + statefulPodCount := len(podList.Items) + if statefulPodCount != count { + Failf("StatefulSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ss.Name, count, len(podList.Items), podList) + } + Logf("Verifying statefulset %v doesn't scale past %d for another %+v", ss.Name, count, deadline.Sub(t)) + time.Sleep(1 * time.Second) + } +} + +func (s *StatefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) { + pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, + func() (bool, error) { + podList := s.GetPodList(ss) + if int32(len(podList.Items)) < numStatefulPods { + Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods) + return false, nil + } + if int32(len(podList.Items)) > numStatefulPods { + return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items)) + } + for _, p := range podList.Items { + isReady := v1.IsPodReady(&p) + desiredReadiness := shouldBeReady == isReady + Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady) + if p.Status.Phase != v1.PodRunning || !desiredReadiness { + return false, nil + } + } + return true, nil + }) + if pollErr != nil { + Failf("Failed waiting for pods to enter running: %v", pollErr) + } +} + +// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready. +func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) { + s.waitForRunning(numStatefulPods, ss, true) +} + +// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and not Ready. +func (s *StatefulSetTester) WaitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) { + s.waitForRunning(numStatefulPods, ss, false) +} + +// BreakProbe breaks the readiness probe for Nginx StatefulSet containers. +func (s *StatefulSetTester) BreakProbe(ss *apps.StatefulSet, probe *v1.Probe) error { + path := probe.HTTPGet.Path + if path == "" { + return fmt.Errorf("Path expected to be not empty: %v", path) + } + cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path) + return s.ExecInStatefulPods(ss, cmd) +} + +// RestoreProbe restores the readiness probe for Nginx StatefulSet containers. +func (s *StatefulSetTester) RestoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error { + path := probe.HTTPGet.Path + if path == "" { + return fmt.Errorf("Path expected to be not empty: %v", path) + } + cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path) + return s.ExecInStatefulPods(ss, cmd) +} + +// SetHealthy updates the StatefulSet InitAnnotation to true in order to set a StatefulSet Pod to be Running and Ready. +func (s *StatefulSetTester) SetHealthy(ss *apps.StatefulSet) { + podList := s.GetPodList(ss) + markedHealthyPod := "" + for _, pod := range podList.Items { + if pod.Status.Phase != v1.PodRunning { + Failf("Found pod in %v cannot set health", pod.Status.Phase) + } + if IsStatefulSetPodInitialized(pod) { + continue + } + if markedHealthyPod != "" { + Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod) + } + p, err := UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) { + update.Annotations[apps.StatefulSetInitAnnotation] = "true" + }) + ExpectNoError(err) + Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name) + markedHealthyPod = pod.Name + } +} + +func (s *StatefulSetTester) waitForStatus(ss *apps.StatefulSet, expectedReplicas int32) { + Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas) + + ns, name := ss.Namespace, ss.Name + pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, + func() (bool, error) { + ssGet, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if ssGet.Status.Replicas != expectedReplicas { + Logf("Waiting for stateful set status to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas) + return false, nil + } + return true, nil + }) + if pollErr != nil { + Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr) + } +} + +// CheckServiceName asserts that the ServiceName for ss is equivalent to expectedServiceName. +func (p *StatefulSetTester) CheckServiceName(ss *apps.StatefulSet, expectedServiceName string) error { + Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName) + + if expectedServiceName != ss.Spec.ServiceName { + return fmt.Errorf("Wrong service name governing statefulset. Expected %s got %s", + expectedServiceName, ss.Spec.ServiceName) + } + + return nil +} + +// DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns. +func DeleteAllStatefulSets(c clientset.Interface, ns string) { + sst := &StatefulSetTester{c: c} + ssList, err := c.Apps().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) + ExpectNoError(err) + + // Scale down each statefulset, then delete it completely. + // Deleting a pvc without doing this will leak volumes, #25101. + errList := []string{} + for _, ss := range ssList.Items { + Logf("Scaling statefulset %v to 0", ss.Name) + if err := sst.Scale(&ss, 0); err != nil { + errList = append(errList, fmt.Sprintf("%v", err)) + } + sst.waitForStatus(&ss, 0) + Logf("Deleting statefulset %v", ss.Name) + if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil { + errList = append(errList, fmt.Sprintf("%v", err)) + } + } + + // pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs. + pvNames := sets.NewString() + // TODO: Don't assume all pvcs in the ns belong to a statefulset + pvcPollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) { + pvcList, err := c.Core().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) + if err != nil { + Logf("WARNING: Failed to list pvcs, retrying %v", err) + return false, nil + } + for _, pvc := range pvcList.Items { + pvNames.Insert(pvc.Spec.VolumeName) + // TODO: Double check that there are no pods referencing the pvc + Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName) + if err := c.Core().PersistentVolumeClaims(ns).Delete(pvc.Name, nil); err != nil { + return false, nil + } + } + return true, nil + }) + if pvcPollErr != nil { + errList = append(errList, fmt.Sprintf("Timeout waiting for pvc deletion.")) + } + + pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) { + pvList, err := c.Core().PersistentVolumes().List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) + if err != nil { + Logf("WARNING: Failed to list pvs, retrying %v", err) + return false, nil + } + waitingFor := []string{} + for _, pv := range pvList.Items { + if pvNames.Has(pv.Name) { + waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status)) + } + } + if len(waitingFor) == 0 { + return true, nil + } + Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n")) + return false, nil + }) + if pollErr != nil { + errList = append(errList, fmt.Sprintf("Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs.")) + } + if len(errList) != 0 { + ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n"))) + } +} + +// IsStatefulSetPodInitialized returns true if pod's StatefulSetInitAnnotation exists and is set to true. +func IsStatefulSetPodInitialized(pod v1.Pod) bool { + initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation] + if !ok { + return false + } + inited, err := strconv.ParseBool(initialized) + if err != nil { + Failf("Couldn't parse statefulset init annotations %v", initialized) + } + return inited +} + +// NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets. +func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim { + return v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{ + "volume.alpha.kubernetes.io/storage-class": "anything", + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), + }, + }, + }, + } +} + +// NewStatefulSet creates a new NGINX StatefulSet for testing. The StatefulSet is named name, is in namespace ns, +// statefulPodsMounts are the mounts that will be backed by PVs. podsMounts are the mounts that are mounted directly +// to the Pod. labels are the labels that will be usd for the StatefulSet selector. +func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *apps.StatefulSet { + mounts := append(statefulPodMounts, podMounts...) + claims := []v1.PersistentVolumeClaim{} + for _, m := range statefulPodMounts { + claims = append(claims, NewStatefulSetPVC(m.Name)) + } + + vols := []v1.Volume{} + for _, m := range podMounts { + vols = append(vols, v1.Volume{ + Name: m.Name, + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: fmt.Sprintf("/tmp/%v", m.Name), + }, + }, + }) + } + + return &apps.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: apps.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Replicas: func(i int32) *int32 { return &i }(replicas), + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Annotations: map[string]string{}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "gcr.io/google_containers/nginx-slim:0.7", + VolumeMounts: mounts, + }, + }, + Volumes: vols, + }, + }, + VolumeClaimTemplates: claims, + ServiceName: governingSvcName, + }, + } +} + +// SetStatefulSetInitializedAnnotation sets teh StatefulSetInitAnnotation to value. +func SetStatefulSetInitializedAnnotation(ss *apps.StatefulSet, value string) { + ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value +} diff --git a/test/e2e/network_partition.go b/test/e2e/network_partition.go index 2bf9dc6975..421fe43186 100644 --- a/test/e2e/network_partition.go +++ b/test/e2e/network_partition.go @@ -382,17 +382,17 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { dumpDebugInfo(c, ns) } framework.Logf("Deleting all stateful set in ns %v", ns) - deleteAllStatefulSets(c, ns) + framework.DeleteAllStatefulSets(c, ns) }) It("should come back up if node goes down [Slow] [Disruptive]", func() { petMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}} - ps := newStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels) + ps := framework.NewStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels) _, err := c.Apps().StatefulSets(ns).Create(ps) Expect(err).NotTo(HaveOccurred()) - pst := statefulSetTester{c: c} + pst := framework.NewStatefulSetTester(c) nn := framework.TestContext.CloudConfig.NumNodes nodeNames, err := framework.CheckNodesReady(f.ClientSet, framework.NodeReadyInitialTimeout, nn) @@ -400,18 +400,18 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { restartNodes(f, nodeNames) By("waiting for pods to be running again") - pst.waitForRunningAndReady(*ps.Spec.Replicas, ps) + pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps) }) It("should not reschedule stateful pods if there is a network partition [Slow] [Disruptive]", func() { - ps := newStatefulSet(psName, ns, headlessSvcName, 3, []v1.VolumeMount{}, []v1.VolumeMount{}, labels) + ps := framework.NewStatefulSet(psName, ns, headlessSvcName, 3, []v1.VolumeMount{}, []v1.VolumeMount{}, labels) _, err := c.Apps().StatefulSets(ns).Create(ps) Expect(err).NotTo(HaveOccurred()) - pst := statefulSetTester{c: c} - pst.waitForRunningAndReady(*ps.Spec.Replicas, ps) + pst := framework.NewStatefulSetTester(c) + pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps) - pod := pst.getPodList(ps).Items[0] + pod := pst.GetPodList(ps).Items[0] node, err := c.Core().Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{}) framework.ExpectNoError(err) @@ -430,7 +430,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { } By("waiting for pods to be running again") - pst.waitForRunningAndReady(*ps.Spec.Replicas, ps) + pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps) }) }) diff --git a/test/e2e/statefulset.go b/test/e2e/statefulset.go index af2dfa00c5..c4debb0b08 100644 --- a/test/e2e/statefulset.go +++ b/test/e2e/statefulset.go @@ -18,27 +18,18 @@ package e2e import ( "fmt" - "io/ioutil" - "path/filepath" - "strconv" "strings" "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - apierrs "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" klabels "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - utilyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/apimachinery/pkg/watch" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -46,11 +37,6 @@ import ( ) const ( - statefulsetPoll = 10 * time.Second - // Some statefulPods install base packages via wget - statefulsetTimeout = 10 * time.Minute - // Timeout for stateful pods to change state - statefulPodTimeout = 5 * time.Minute zookeeperManifestPath = "test/e2e/testing-manifests/statefulset/zookeeper" mysqlGaleraManifestPath = "test/e2e/testing-manifests/statefulset/mysql-galera" redisManifestPath = "test/e2e/testing-manifests/statefulset/redis" @@ -87,7 +73,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() { BeforeEach(func() { statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} podMounts = []v1.VolumeMount{{Name: "home", MountPath: "/home"}} - ss = newStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels) + ss = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels) By("Creating service " + headlessSvcName + " in namespace " + ns) headlessService := createServiceSpec(headlessSvcName, "", true, labels) @@ -100,80 +86,80 @@ var _ = framework.KubeDescribe("StatefulSet", func() { dumpDebugInfo(c, ns) } framework.Logf("Deleting all statefulset in ns %v", ns) - deleteAllStatefulSets(c, ns) + framework.DeleteAllStatefulSets(c, ns) }) It("should provide basic identity", func() { By("Creating statefulset " + ssName + " in namespace " + ns) *(ss.Spec.Replicas) = 3 - setInitializedAnnotation(ss, "false") + framework.SetStatefulSetInitializedAnnotation(ss, "false") _, err := c.Apps().StatefulSets(ns).Create(ss) Expect(err).NotTo(HaveOccurred()) - sst := statefulSetTester{c: c} + sst := framework.NewStatefulSetTester(c) By("Saturating stateful set " + ss.Name) - sst.saturate(ss) + sst.Saturate(ss) By("Verifying statefulset mounted data directory is usable") - framework.ExpectNoError(sst.checkMount(ss, "/data")) + framework.ExpectNoError(sst.CheckMount(ss, "/data")) By("Verifying statefulset provides a stable hostname for each pod") - framework.ExpectNoError(sst.checkHostname(ss)) + framework.ExpectNoError(sst.CheckHostname(ss)) By("Verifying statefulset set proper service name") - framework.ExpectNoError(sst.checkServiceName(ss, headlessSvcName)) + framework.ExpectNoError(sst.CheckServiceName(ss, headlessSvcName)) cmd := "echo $(hostname) > /data/hostname; sync;" By("Running " + cmd + " in all stateful pods") - framework.ExpectNoError(sst.execInStatefulPods(ss, cmd)) + framework.ExpectNoError(sst.ExecInStatefulPods(ss, cmd)) By("Restarting statefulset " + ss.Name) - sst.restart(ss) - sst.saturate(ss) + sst.Restart(ss) + sst.Saturate(ss) By("Verifying statefulset mounted data directory is usable") - framework.ExpectNoError(sst.checkMount(ss, "/data")) + framework.ExpectNoError(sst.CheckMount(ss, "/data")) cmd = "if [ \"$(cat /data/hostname)\" = \"$(hostname)\" ]; then exit 0; else exit 1; fi" By("Running " + cmd + " in all stateful pods") - framework.ExpectNoError(sst.execInStatefulPods(ss, cmd)) + framework.ExpectNoError(sst.ExecInStatefulPods(ss, cmd)) }) It("should not deadlock when a pod's predecessor fails", func() { By("Creating statefulset " + ssName + " in namespace " + ns) *(ss.Spec.Replicas) = 2 - setInitializedAnnotation(ss, "false") + framework.SetStatefulSetInitializedAnnotation(ss, "false") _, err := c.Apps().StatefulSets(ns).Create(ss) Expect(err).NotTo(HaveOccurred()) - sst := statefulSetTester{c: c} + sst := framework.NewStatefulSetTester(c) - sst.waitForRunningAndReady(1, ss) + sst.WaitForRunningAndReady(1, ss) By("Marking stateful pod at index 0 as healthy.") - sst.setHealthy(ss) + sst.SetHealthy(ss) By("Waiting for stateful pod at index 1 to enter running.") - sst.waitForRunningAndReady(2, ss) + sst.WaitForRunningAndReady(2, ss) // Now we have 1 healthy and 1 unhealthy stateful pod. Deleting the healthy stateful pod should *not* // create a new stateful pod till the remaining stateful pod becomes healthy, which won't happen till // we set the healthy bit. By("Deleting healthy stateful pod at index 0.") - sst.deleteStatefulPodAtIndex(0, ss) + sst.DeleteStatefulPodAtIndex(0, ss) By("Confirming stateful pod at index 0 is recreated.") - sst.waitForRunningAndReady(2, ss) + sst.WaitForRunningAndReady(2, ss) By("Deleting unhealthy stateful pod at index 1.") - sst.deleteStatefulPodAtIndex(1, ss) + sst.DeleteStatefulPodAtIndex(1, ss) By("Confirming all stateful pods in statefulset are created.") - sst.saturate(ss) + sst.Saturate(ss) }) It("should allow template updates", func() { @@ -183,9 +169,9 @@ var _ = framework.KubeDescribe("StatefulSet", func() { ss, err := c.Apps().StatefulSets(ns).Create(ss) Expect(err).NotTo(HaveOccurred()) - sst := statefulSetTester{c: c} + sst := framework.NewStatefulSetTester(c) - sst.waitForRunningAndReady(*ss.Spec.Replicas, ss) + sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) newImage := newNginxImage oldImage := ss.Spec.Template.Spec.Containers[0].Image @@ -198,17 +184,17 @@ var _ = framework.KubeDescribe("StatefulSet", func() { updateIndex := 0 By(fmt.Sprintf("Deleting stateful pod at index %d", updateIndex)) - sst.deleteStatefulPodAtIndex(updateIndex, ss) + sst.DeleteStatefulPodAtIndex(updateIndex, ss) By("Waiting for all stateful pods to be running again") - sst.waitForRunningAndReady(*ss.Spec.Replicas, ss) + sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) By(fmt.Sprintf("Verifying stateful pod at index %d is updated", updateIndex)) verify := func(pod *v1.Pod) { podImage := pod.Spec.Containers[0].Image Expect(podImage).To(Equal(newImage), fmt.Sprintf("Expected stateful pod image %s updated to %s", podImage, newImage)) } - sst.verifyPodAtIndex(updateIndex, ss, verify) + sst.VerifyPodAtIndex(updateIndex, ss, verify) }) It("Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed", func() { @@ -216,29 +202,29 @@ var _ = framework.KubeDescribe("StatefulSet", func() { testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{ Path: "/index.html", Port: intstr.IntOrString{IntVal: 80}}}} - ss := newStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels) + ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels) ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe - setInitializedAnnotation(ss, "false") + framework.SetStatefulSetInitializedAnnotation(ss, "false") ss, err := c.Apps().StatefulSets(ns).Create(ss) Expect(err).NotTo(HaveOccurred()) - sst := &statefulSetTester{c: c} - sst.waitForRunningAndReady(1, ss) + sst := framework.NewStatefulSetTester(c) + sst.WaitForRunningAndReady(1, ss) By("Scaling up stateful set " + ssName + " to 3 replicas and pausing after 2nd pod") - sst.setHealthy(ss) - sst.updateReplicas(ss, 3) - sst.waitForRunningAndReady(2, ss) + sst.SetHealthy(ss) + sst.UpdateReplicas(ss, 3) + sst.WaitForRunningAndReady(2, ss) By("Before scale up finished setting 2nd pod to be not ready by breaking readiness probe") - sst.breakProbe(ss, testProbe) - sst.waitForRunningAndNotReady(2, ss) + sst.BreakProbe(ss, testProbe) + sst.WaitForRunningAndNotReady(2, ss) By("Continue scale operation after the 2nd pod, and scaling down to 1 replica") - sst.setHealthy(ss) - sst.updateReplicas(ss, 1) + sst.SetHealthy(ss) + sst.UpdateReplicas(ss, 1) By("Verifying that the 2nd pod wont be removed if it is not running and ready") - sst.confirmStatefulPodCount(2, ss, 10*time.Second) + sst.ConfirmStatefulPodCount(2, ss, 10*time.Second) expectedPodName := ss.Name + "-1" expectedPod, err := f.ClientSet.Core().Pods(ns).Get(expectedPodName, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) @@ -251,8 +237,8 @@ var _ = framework.KubeDescribe("StatefulSet", func() { Expect(err).NotTo(HaveOccurred()) By("Verifying the 2nd pod is removed only when it becomes running and ready") - sst.restoreProbe(ss, testProbe) - _, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) { + sst.RestoreProbe(ss, testProbe) + _, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) { pod := event.Object.(*v1.Pod) if event.Type == watch.Deleted && pod.Name == expectedPodName { return false, fmt.Errorf("Pod %v was deleted before enter running", pod.Name) @@ -282,28 +268,28 @@ var _ = framework.KubeDescribe("StatefulSet", func() { testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{ Path: "/index.html", Port: intstr.IntOrString{IntVal: 80}}}} - ss := newStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels) + ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels) ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe ss, err = c.Apps().StatefulSets(ns).Create(ss) Expect(err).NotTo(HaveOccurred()) By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns) - sst := &statefulSetTester{c: c} - sst.waitForRunningAndReady(*ss.Spec.Replicas, ss) + sst := framework.NewStatefulSetTester(c) + sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) By("Confirming that stateful set scale up will halt with unhealthy stateful pod") - sst.breakProbe(ss, testProbe) - sst.waitForRunningAndNotReady(*ss.Spec.Replicas, ss) - sst.updateReplicas(ss, 3) - sst.confirmStatefulPodCount(1, ss, 10*time.Second) + sst.BreakProbe(ss, testProbe) + sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss) + sst.UpdateReplicas(ss, 3) + sst.ConfirmStatefulPodCount(1, ss, 10*time.Second) By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns) - sst.restoreProbe(ss, testProbe) - sst.waitForRunningAndReady(3, ss) + sst.RestoreProbe(ss, testProbe) + sst.WaitForRunningAndReady(3, ss) By("Verifying that stateful set " + ssName + " was scaled up in order") expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"} - _, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) { + _, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) { if event.Type != watch.Added { return false, nil } @@ -322,18 +308,18 @@ var _ = framework.KubeDescribe("StatefulSet", func() { }) Expect(err).NotTo(HaveOccurred()) - sst.breakProbe(ss, testProbe) - sst.waitForRunningAndNotReady(3, ss) - sst.updateReplicas(ss, 0) - sst.confirmStatefulPodCount(3, ss, 10*time.Second) + sst.BreakProbe(ss, testProbe) + sst.WaitForRunningAndNotReady(3, ss) + sst.UpdateReplicas(ss, 0) + sst.ConfirmStatefulPodCount(3, ss, 10*time.Second) By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns) - sst.restoreProbe(ss, testProbe) - sst.scale(ss, 0) + sst.RestoreProbe(ss, testProbe) + sst.Scale(ss, 0) By("Verifying that stateful set " + ssName + " was scaled down in reverse order") expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"} - _, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) { + _, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) { if event.Type != watch.Deleted { return false, nil } @@ -375,7 +361,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() { framework.ExpectNoError(err) By("Creating statefulset with conflicting port in namespace " + f.Namespace.Name) - ss := newStatefulSet(ssName, f.Namespace.Name, headlessSvcName, 1, nil, nil, labels) + ss := framework.NewStatefulSet(ssName, f.Namespace.Name, headlessSvcName, 1, nil, nil, labels) statefulPodContainer := &ss.Spec.Template.Spec.Containers[0] statefulPodContainer.Ports = append(statefulPodContainer.Ports, conflictingPort) ss.Spec.Template.Spec.NodeName = node.Name @@ -392,7 +378,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() { w, err := f.ClientSet.Core().Pods(f.Namespace.Name).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName})) framework.ExpectNoError(err) // we need to get UID from pod in any state and wait until stateful set controller will remove pod atleast once - _, err = watch.Until(statefulPodTimeout, w, func(event watch.Event) (bool, error) { + _, err = watch.Until(framework.StatefulPodTimeout, w, func(event watch.Event) (bool, error) { pod := event.Object.(*v1.Pod) switch event.Type { case watch.Deleted: @@ -428,16 +414,16 @@ var _ = framework.KubeDescribe("StatefulSet", func() { return fmt.Errorf("Pod %v wasn't recreated: %v == %v", statefulPod.Name, statefulPod.UID, initialStatefulPodUID) } return nil - }, statefulPodTimeout, 2*time.Second).Should(BeNil()) + }, framework.StatefulPodTimeout, 2*time.Second).Should(BeNil()) }) }) framework.KubeDescribe("Deploy clustered applications [Feature:StatefulSet] [Slow]", func() { - var sst *statefulSetTester + var sst *framework.StatefulSetTester var appTester *clusterAppTester BeforeEach(func() { - sst = &statefulSetTester{c: c} + sst = framework.NewStatefulSetTester(c) appTester = &clusterAppTester{tester: sst, ns: ns} }) @@ -446,7 +432,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() { dumpDebugInfo(c, ns) } framework.Logf("Deleting all statefulset in ns %v", ns) - deleteAllStatefulSets(c, ns) + framework.DeleteAllStatefulSets(c, ns) }) It("should creating a working zookeeper cluster", func() { @@ -504,7 +490,7 @@ type statefulPodTester interface { type clusterAppTester struct { ns string statefulPod statefulPodTester - tester *statefulSetTester + tester *framework.StatefulSetTester } func (c *clusterAppTester) run() { @@ -520,8 +506,8 @@ func (c *clusterAppTester) run() { default: if restartCluster { By("Restarting stateful set " + ss.Name) - c.tester.restart(ss) - c.tester.waitForRunningAndReady(*ss.Spec.Replicas, ss) + c.tester.Restart(ss) + c.tester.WaitForRunningAndReady(*ss.Spec.Replicas, ss) } } @@ -533,7 +519,7 @@ func (c *clusterAppTester) run() { type zookeeperTester struct { ss *apps.StatefulSet - tester *statefulSetTester + tester *framework.StatefulSetTester } func (z *zookeeperTester) name() string { @@ -541,7 +527,7 @@ func (z *zookeeperTester) name() string { } func (z *zookeeperTester) deploy(ns string) *apps.StatefulSet { - z.ss = z.tester.createStatefulSet(zookeeperManifestPath, ns) + z.ss = z.tester.CreateStatefulSet(zookeeperManifestPath, ns) return z.ss } @@ -563,7 +549,7 @@ func (z *zookeeperTester) read(statefulPodIndex int, key string) string { type mysqlGaleraTester struct { ss *apps.StatefulSet - tester *statefulSetTester + tester *framework.StatefulSetTester } func (m *mysqlGaleraTester) name() string { @@ -579,7 +565,7 @@ func (m *mysqlGaleraTester) mysqlExec(cmd, ns, podName string) string { } func (m *mysqlGaleraTester) deploy(ns string) *apps.StatefulSet { - m.ss = m.tester.createStatefulSet(mysqlGaleraManifestPath, ns) + m.ss = m.tester.CreateStatefulSet(mysqlGaleraManifestPath, ns) framework.Logf("Deployed statefulset %v, initializing database", m.ss.Name) for _, cmd := range []string{ @@ -606,7 +592,7 @@ func (m *mysqlGaleraTester) read(statefulPodIndex int, key string) string { type redisTester struct { ss *apps.StatefulSet - tester *statefulSetTester + tester *framework.StatefulSetTester } func (m *redisTester) name() string { @@ -619,7 +605,7 @@ func (m *redisTester) redisExec(cmd, ns, podName string) string { } func (m *redisTester) deploy(ns string) *apps.StatefulSet { - m.ss = m.tester.createStatefulSet(redisManifestPath, ns) + m.ss = m.tester.CreateStatefulSet(redisManifestPath, ns) return m.ss } @@ -637,7 +623,7 @@ func (m *redisTester) read(statefulPodIndex int, key string) string { type cockroachDBTester struct { ss *apps.StatefulSet - tester *statefulSetTester + tester *framework.StatefulSetTester } func (c *cockroachDBTester) name() string { @@ -650,7 +636,7 @@ func (c *cockroachDBTester) cockroachDBExec(cmd, ns, podName string) string { } func (c *cockroachDBTester) deploy(ns string) *apps.StatefulSet { - c.ss = c.tester.createStatefulSet(cockroachDBManifestPath, ns) + c.ss = c.tester.CreateStatefulSet(cockroachDBManifestPath, ns) framework.Logf("Deployed statefulset %v, initializing database", c.ss.Name) for _, cmd := range []string{ "CREATE DATABASE IF NOT EXISTS foo;", @@ -678,370 +664,6 @@ func lastLine(out string) string { return outLines[len(outLines)-1] } -func statefulSetFromManifest(fileName, ns string) *apps.StatefulSet { - var ss apps.StatefulSet - framework.Logf("Parsing statefulset from %v", fileName) - data, err := ioutil.ReadFile(fileName) - Expect(err).NotTo(HaveOccurred()) - json, err := utilyaml.ToJSON(data) - Expect(err).NotTo(HaveOccurred()) - - Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &ss)).NotTo(HaveOccurred()) - ss.Namespace = ns - if ss.Spec.Selector == nil { - ss.Spec.Selector = &metav1.LabelSelector{ - MatchLabels: ss.Spec.Template.Labels, - } - } - return &ss -} - -// statefulSetTester has all methods required to test a single statefulset. -type statefulSetTester struct { - c clientset.Interface -} - -func (s *statefulSetTester) createStatefulSet(manifestPath, ns string) *apps.StatefulSet { - mkpath := func(file string) string { - return filepath.Join(framework.TestContext.RepoRoot, manifestPath, file) - } - ss := statefulSetFromManifest(mkpath("statefulset.yaml"), ns) - - framework.Logf(fmt.Sprintf("creating " + ss.Name + " service")) - framework.RunKubectlOrDie("create", "-f", mkpath("service.yaml"), fmt.Sprintf("--namespace=%v", ns)) - - framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector)) - framework.RunKubectlOrDie("create", "-f", mkpath("statefulset.yaml"), fmt.Sprintf("--namespace=%v", ns)) - s.waitForRunningAndReady(*ss.Spec.Replicas, ss) - return ss -} - -func (s *statefulSetTester) checkMount(ss *apps.StatefulSet, mountPath string) error { - for _, cmd := range []string{ - // Print inode, size etc - fmt.Sprintf("ls -idlhZ %v", mountPath), - // Print subdirs - fmt.Sprintf("find %v", mountPath), - // Try writing - fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))), - } { - if err := s.execInStatefulPods(ss, cmd); err != nil { - return fmt.Errorf("failed to execute %v, error: %v", cmd, err) - } - } - return nil -} - -func (s *statefulSetTester) execInStatefulPods(ss *apps.StatefulSet, cmd string) error { - podList := s.getPodList(ss) - for _, statefulPod := range podList.Items { - stdout, err := framework.RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd) - framework.Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout) - if err != nil { - return err - } - } - return nil -} - -func (s *statefulSetTester) checkHostname(ss *apps.StatefulSet) error { - cmd := "printf $(hostname)" - podList := s.getPodList(ss) - for _, statefulPod := range podList.Items { - hostname, err := framework.RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd) - if err != nil { - return err - } - if hostname != statefulPod.Name { - return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name) - } - } - return nil -} -func (s *statefulSetTester) saturate(ss *apps.StatefulSet) { - // TODO: Watch events and check that creation timestamss don't overlap - var i int32 - for i = 0; i < *(ss.Spec.Replicas); i++ { - framework.Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running") - s.waitForRunningAndReady(i+1, ss) - framework.Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy") - s.setHealthy(ss) - } -} - -func (s *statefulSetTester) deleteStatefulPodAtIndex(index int, ss *apps.StatefulSet) { - name := getPodNameAtIndex(index, ss) - noGrace := int64(0) - if err := s.c.Core().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil { - framework.Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err) - } -} - -type verifyPodFunc func(*v1.Pod) - -func (s *statefulSetTester) verifyPodAtIndex(index int, ss *apps.StatefulSet, verify verifyPodFunc) { - name := getPodNameAtIndex(index, ss) - pod, err := s.c.Core().Pods(ss.Namespace).Get(name, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name)) - verify(pod) -} - -func getPodNameAtIndex(index int, ss *apps.StatefulSet) string { - // TODO: we won't use "-index" as the name strategy forever, - // pull the name out from an identity mapper. - return fmt.Sprintf("%v-%v", ss.Name, index) -} - -func (s *statefulSetTester) scale(ss *apps.StatefulSet, count int32) error { - name := ss.Name - ns := ss.Namespace - s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count }) - - var statefulPodList *v1.PodList - pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) { - statefulPodList = s.getPodList(ss) - if int32(len(statefulPodList.Items)) == count { - return true, nil - } - return false, nil - }) - if pollErr != nil { - unhealthy := []string{} - for _, statefulPod := range statefulPodList.Items { - delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, v1.IsPodReady(&statefulPod) - if delTs != nil || phase != v1.PodRunning || !readiness { - unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness)) - } - } - return fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, statefulsetTimeout, unhealthy) - } - return nil -} - -func (s *statefulSetTester) updateReplicas(ss *apps.StatefulSet, count int32) { - s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { ss.Spec.Replicas = &count }) -} - -func (s *statefulSetTester) restart(ss *apps.StatefulSet) { - oldReplicas := *(ss.Spec.Replicas) - framework.ExpectNoError(s.scale(ss, 0)) - s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas }) -} - -func (s *statefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) { - for i := 0; i < 3; i++ { - ss, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{}) - if err != nil { - framework.Failf("failed to get statefulset %q: %v", name, err) - } - update(ss) - ss, err = s.c.Apps().StatefulSets(ns).Update(ss) - if err == nil { - return - } - if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) { - framework.Failf("failed to update statefulset %q: %v", name, err) - } - } - framework.Failf("too many retries draining statefulset %q", name) -} - -func (s *statefulSetTester) getPodList(ss *apps.StatefulSet) *v1.PodList { - selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector) - framework.ExpectNoError(err) - podList, err := s.c.Core().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) - framework.ExpectNoError(err) - return podList -} - -func (s *statefulSetTester) confirmStatefulPodCount(count int, ss *apps.StatefulSet, timeout time.Duration) { - start := time.Now() - deadline := start.Add(timeout) - for t := time.Now(); t.Before(deadline); t = time.Now() { - podList := s.getPodList(ss) - statefulPodCount := len(podList.Items) - if statefulPodCount != count { - framework.Failf("StatefulSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ss.Name, count, len(podList.Items), podList) - } - framework.Logf("Verifying statefulset %v doesn't scale past %d for another %+v", ss.Name, count, deadline.Sub(t)) - time.Sleep(1 * time.Second) - } -} - -func (s *statefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) { - pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, - func() (bool, error) { - podList := s.getPodList(ss) - if int32(len(podList.Items)) < numStatefulPods { - framework.Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods) - return false, nil - } - if int32(len(podList.Items)) > numStatefulPods { - return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items)) - } - for _, p := range podList.Items { - isReady := v1.IsPodReady(&p) - desiredReadiness := shouldBeReady == isReady - framework.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady) - if p.Status.Phase != v1.PodRunning || !desiredReadiness { - return false, nil - } - } - return true, nil - }) - if pollErr != nil { - framework.Failf("Failed waiting for pods to enter running: %v", pollErr) - } -} - -func (s *statefulSetTester) waitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) { - s.waitForRunning(numStatefulPods, ss, true) -} - -func (s *statefulSetTester) waitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) { - s.waitForRunning(numStatefulPods, ss, false) -} - -func (s *statefulSetTester) breakProbe(ss *apps.StatefulSet, probe *v1.Probe) error { - path := probe.HTTPGet.Path - if path == "" { - return fmt.Errorf("Path expected to be not empty: %v", path) - } - cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path) - return s.execInStatefulPods(ss, cmd) -} - -func (s *statefulSetTester) restoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error { - path := probe.HTTPGet.Path - if path == "" { - return fmt.Errorf("Path expected to be not empty: %v", path) - } - cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path) - return s.execInStatefulPods(ss, cmd) -} - -func (s *statefulSetTester) setHealthy(ss *apps.StatefulSet) { - podList := s.getPodList(ss) - markedHealthyPod := "" - for _, pod := range podList.Items { - if pod.Status.Phase != v1.PodRunning { - framework.Failf("Found pod in %v cannot set health", pod.Status.Phase) - } - if isInitialized(pod) { - continue - } - if markedHealthyPod != "" { - framework.Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod) - } - p, err := framework.UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) { - update.Annotations[apps.StatefulSetInitAnnotation] = "true" - }) - framework.ExpectNoError(err) - framework.Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name) - markedHealthyPod = pod.Name - } -} - -func (s *statefulSetTester) waitForStatus(ss *apps.StatefulSet, expectedReplicas int32) { - framework.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas) - - ns, name := ss.Namespace, ss.Name - pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, - func() (bool, error) { - ssGet, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if ssGet.Status.Replicas != expectedReplicas { - framework.Logf("Waiting for stateful set status to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas) - return false, nil - } - return true, nil - }) - if pollErr != nil { - framework.Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr) - } -} - -func (p *statefulSetTester) checkServiceName(ps *apps.StatefulSet, expectedServiceName string) error { - framework.Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName) - - if expectedServiceName != ps.Spec.ServiceName { - return fmt.Errorf("Wrong service name governing statefulset. Expected %s got %s", expectedServiceName, ps.Spec.ServiceName) - } - - return nil -} - -func deleteAllStatefulSets(c clientset.Interface, ns string) { - sst := &statefulSetTester{c: c} - ssList, err := c.Apps().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) - framework.ExpectNoError(err) - - // Scale down each statefulset, then delete it completely. - // Deleting a pvc without doing this will leak volumes, #25101. - errList := []string{} - for _, ss := range ssList.Items { - framework.Logf("Scaling statefulset %v to 0", ss.Name) - if err := sst.scale(&ss, 0); err != nil { - errList = append(errList, fmt.Sprintf("%v", err)) - } - sst.waitForStatus(&ss, 0) - framework.Logf("Deleting statefulset %v", ss.Name) - if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil { - errList = append(errList, fmt.Sprintf("%v", err)) - } - } - - // pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs. - pvNames := sets.NewString() - // TODO: Don't assume all pvcs in the ns belong to a statefulset - pvcPollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) { - pvcList, err := c.Core().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) - if err != nil { - framework.Logf("WARNING: Failed to list pvcs, retrying %v", err) - return false, nil - } - for _, pvc := range pvcList.Items { - pvNames.Insert(pvc.Spec.VolumeName) - // TODO: Double check that there are no pods referencing the pvc - framework.Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName) - if err := c.Core().PersistentVolumeClaims(ns).Delete(pvc.Name, nil); err != nil { - return false, nil - } - } - return true, nil - }) - if pvcPollErr != nil { - errList = append(errList, "Timeout waiting for pvc deletion.") - } - - pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) { - pvList, err := c.Core().PersistentVolumes().List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) - if err != nil { - framework.Logf("WARNING: Failed to list pvs, retrying %v", err) - return false, nil - } - waitingFor := []string{} - for _, pv := range pvList.Items { - if pvNames.Has(pv.Name) { - waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status)) - } - } - if len(waitingFor) == 0 { - return true, nil - } - framework.Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n")) - return false, nil - }) - if pollErr != nil { - errList = append(errList, "Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs.") - } - if len(errList) != 0 { - framework.ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n"))) - } -} - func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, key, expectedVal string) error { err := wait.PollImmediate(time.Second, readTimeout, func() (bool, error) { val := statefulPod.read(statefulPodNumber, key) @@ -1058,100 +680,3 @@ func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, k } return err } - -func isInitialized(pod v1.Pod) bool { - initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation] - if !ok { - return false - } - inited, err := strconv.ParseBool(initialized) - if err != nil { - framework.Failf("Couldn't parse statefulset init annotations %v", initialized) - } - return inited -} - -func newPVC(name string) v1.PersistentVolumeClaim { - return v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Annotations: map[string]string{ - "volume.alpha.kubernetes.io/storage-class": "anything", - }, - }, - Spec: v1.PersistentVolumeClaimSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{ - v1.ReadWriteOnce, - }, - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), - }, - }, - }, - } -} - -func newStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *apps.StatefulSet { - mounts := append(statefulPodMounts, podMounts...) - claims := []v1.PersistentVolumeClaim{} - for _, m := range statefulPodMounts { - claims = append(claims, newPVC(m.Name)) - } - - vols := []v1.Volume{} - for _, m := range podMounts { - vols = append(vols, v1.Volume{ - Name: m.Name, - VolumeSource: v1.VolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: fmt.Sprintf("/tmp/%v", m.Name), - }, - }, - }) - } - - privileged := true - - return &apps.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1beta1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: ns, - }, - Spec: apps.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: labels, - }, - Replicas: func(i int32) *int32 { return &i }(replicas), - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: labels, - Annotations: map[string]string{}, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "nginx", - Image: nginxImage, - VolumeMounts: mounts, - SecurityContext: &v1.SecurityContext{ - Privileged: &privileged, - }, - }, - }, - Volumes: vols, - }, - }, - VolumeClaimTemplates: claims, - ServiceName: governingSvcName, - }, - } -} - -func setInitializedAnnotation(ss *apps.StatefulSet, value string) { - ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value -} diff --git a/test/e2e/upgrades/BUILD b/test/e2e/upgrades/BUILD index c48ae41e5b..1d72e72dbb 100644 --- a/test/e2e/upgrades/BUILD +++ b/test/e2e/upgrades/BUILD @@ -13,15 +13,18 @@ go_library( "deployments.go", "secrets.go", "services.go", + "statefulset.go", "upgrade.go", ], tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/controller/deployment/util:go_default_library", "//test/e2e/framework:go_default_library", "//vendor:github.com/onsi/ginkgo", + "//vendor:github.com/onsi/gomega", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/test/e2e/upgrades/statefulset.go b/test/e2e/upgrades/statefulset.go new file mode 100644 index 0000000000..32a469a2ba --- /dev/null +++ b/test/e2e/upgrades/statefulset.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrades + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/kubernetes/pkg/api/v1" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + + "k8s.io/kubernetes/test/e2e/framework" +) + +// StatefulSetUpgradeTest implements an upgrade test harness for StatefulSet upgrade testing. +type StatefulSetUpgradeTest struct { + tester *framework.StatefulSetTester + service *v1.Service + set *apps.StatefulSet +} + +// Setup creates a StatefulSet and a HeadlessService. It verifies the basic SatefulSet properties +func (t *StatefulSetUpgradeTest) Setup(f *framework.Framework) { + ssName := "ss" + labels := map[string]string{ + "foo": "bar", + "baz": "blah", + } + headlessSvcName := "test" + statefulPodMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}} + ns := f.Namespace.Name + t.set = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels) + t.service = framework.CreateStatefulSetService(ssName, labels) + *(t.set.Spec.Replicas) = 3 + framework.SetStatefulSetInitializedAnnotation(t.set, "false") + + By("Creating service " + headlessSvcName + " in namespace " + ns) + _, err := f.ClientSet.Core().Services(ns).Create(t.service) + Expect(err).NotTo(HaveOccurred()) + t.tester = framework.NewStatefulSetTester(f.ClientSet) + + By("Creating statefulset " + ssName + " in namespace " + ns) + *(t.set.Spec.Replicas) = 3 + _, err = f.ClientSet.Apps().StatefulSets(ns).Create(t.set) + Expect(err).NotTo(HaveOccurred()) + + By("Saturating stateful set " + t.set.Name) + t.tester.Saturate(t.set) + t.verify() + t.restart() + t.verify() +} + +// Waits for the upgrade to complete and verifies the StatefulSet basic functionality +func (t *StatefulSetUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) { + <-done + t.verify() +} + +// Deletes all StatefulSets +func (t *StatefulSetUpgradeTest) Teardown(f *framework.Framework) { + framework.DeleteAllStatefulSets(f.ClientSet, t.set.Name) +} + +func (t *StatefulSetUpgradeTest) verify() { + By("Verifying statefulset mounted data directory is usable") + framework.ExpectNoError(t.tester.CheckMount(t.set, "/data")) + + By("Verifying statefulset provides a stable hostname for each pod") + framework.ExpectNoError(t.tester.CheckHostname(t.set)) + + By("Verifying statefulset set proper service name") + framework.ExpectNoError(t.tester.CheckServiceName(t.set, t.set.Spec.ServiceName)) + + cmd := "echo $(hostname) > /data/hostname; sync;" + By("Running " + cmd + " in all stateful pods") + framework.ExpectNoError(t.tester.ExecInStatefulPods(t.set, cmd)) +} + +func (t *StatefulSetUpgradeTest) restart() { + By("Restarting statefulset " + t.set.Name) + t.tester.Restart(t.set) + t.tester.Saturate(t.set) +}