Merge pull request #41221 from kow3ns/ss-upgrade-test

Automatic merge from submit-queue (batch tested with PRs 41548, 41221)

StatefulSet Upgrade Test

Adds StatefulSet upgrade tests and moves common functionality into the framework package. This removes the potential for cyclic dependencies while allowing for code reuse.
```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-02-17 04:34:35 -08:00 committed by GitHub
commit 078238a461
8 changed files with 749 additions and 560 deletions

View File

@ -32,6 +32,7 @@ import (
var upgradeTests = []upgrades.Test{
&upgrades.ServiceUpgradeTest{},
&upgrades.SecretUpgradeTest{},
&upgrades.StatefulSetUpgradeTest{},
&upgrades.DeploymentUpgradeTest{},
&upgrades.ConfigMapUpgradeTest{},
&upgrades.HPAUpgradeTest{},

View File

@ -330,7 +330,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
}
})
// using out of statefulset e2e as deleting pvc is a pain
deleteAllStatefulSets(c, ns)
framework.DeleteAllStatefulSets(c, ns)
})
})

View File

@ -27,6 +27,7 @@ go_library(
"pods.go",
"resource_usage_gatherer.go",
"service_util.go",
"statefulset_utils.go",
"test_context.go",
"util.go",
],
@ -86,6 +87,7 @@ go_library(
"//vendor:google.golang.org/api/compute/v1",
"//vendor:google.golang.org/api/googleapi",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1/unstructured",
"//vendor:k8s.io/apimachinery/pkg/fields",
@ -102,6 +104,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/apimachinery/pkg/util/validation",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/util/yaml",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/discovery",
"//vendor:k8s.io/client-go/dynamic",

View File

@ -0,0 +1,558 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"time"
. "github.com/onsi/gomega"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
const (
// Poll interval for StatefulSet tests
StatefulSetPoll = 10 * time.Second
// Timeout interval for StatefulSet operations
StatefulSetTimeout = 10 * time.Minute
// Timeout for stateful pods to change state
StatefulPodTimeout = 5 * time.Minute
)
// CreateStatefulSetService creates a Headless Service with Name name and Selector set to match labels.
func CreateStatefulSetService(name string, labels map[string]string) *v1.Service {
headlessService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.ServiceSpec{
Selector: labels,
},
}
headlessService.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: "TCP"},
}
headlessService.Spec.ClusterIP = "None"
return headlessService
}
// StatefulSetFromManifest returns a StatefulSet from a manifest stored in fileName in the Namespace indicated by ns.
func StatefulSetFromManifest(fileName, ns string) *apps.StatefulSet {
var ss apps.StatefulSet
Logf("Parsing statefulset from %v", fileName)
data, err := ioutil.ReadFile(fileName)
Expect(err).NotTo(HaveOccurred())
json, err := utilyaml.ToJSON(data)
Expect(err).NotTo(HaveOccurred())
Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &ss)).NotTo(HaveOccurred())
ss.Namespace = ns
if ss.Spec.Selector == nil {
ss.Spec.Selector = &metav1.LabelSelector{
MatchLabels: ss.Spec.Template.Labels,
}
}
return &ss
}
// StatefulSetTester is a struct that contains utility methods for testing StatefulSet related functionality. It uses a
// clientset.Interface to communicate with the API server.
type StatefulSetTester struct {
c clientset.Interface
}
// NewStatefulSetTester creates a StatefulSetTester that uses c to interact with the API server.
func NewStatefulSetTester(c clientset.Interface) *StatefulSetTester {
return &StatefulSetTester{c}
}
// CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create.
func (s *StatefulSetTester) CreateStatefulSet(manifestPath, ns string) *apps.StatefulSet {
mkpath := func(file string) string {
return filepath.Join(TestContext.RepoRoot, manifestPath, file)
}
ss := StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)
Logf(fmt.Sprintf("creating " + ss.Name + " service"))
RunKubectlOrDie("create", "-f", mkpath("service.yaml"), fmt.Sprintf("--namespace=%v", ns))
Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
RunKubectlOrDie("create", "-f", mkpath("statefulset.yaml"), fmt.Sprintf("--namespace=%v", ns))
s.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
return ss
}
// CheckMount checks that the mount at mountPath is valid for all Pods in ss.
func (s *StatefulSetTester) CheckMount(ss *apps.StatefulSet, mountPath string) error {
for _, cmd := range []string{
// Print inode, size etc
fmt.Sprintf("ls -idlh %v", mountPath),
// Print subdirs
fmt.Sprintf("find %v", mountPath),
// Try writing
fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
} {
if err := s.ExecInStatefulPods(ss, cmd); err != nil {
return fmt.Errorf("failed to execute %v, error: %v", cmd, err)
}
}
return nil
}
// ExecInStatefulPods executes cmd in all Pods in ss. If a error occurs it is returned and cmd is not execute in any subsequent Pods.
func (s *StatefulSetTester) ExecInStatefulPods(ss *apps.StatefulSet, cmd string) error {
podList := s.GetPodList(ss)
for _, statefulPod := range podList.Items {
stdout, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
if err != nil {
return err
}
}
return nil
}
// CheckHostname verifies that all Pods in ss have the correct Hostname. If the returned error is not nil than verification failed.
func (s *StatefulSetTester) CheckHostname(ss *apps.StatefulSet) error {
cmd := "printf $(hostname)"
podList := s.GetPodList(ss)
for _, statefulPod := range podList.Items {
hostname, err := RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
if err != nil {
return err
}
if hostname != statefulPod.Name {
return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
}
}
return nil
}
// Saturate waits for all Pods in ss to become Running and Ready.
func (s *StatefulSetTester) Saturate(ss *apps.StatefulSet) {
var i int32
for i = 0; i < *(ss.Spec.Replicas); i++ {
Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running")
s.WaitForRunningAndReady(i+1, ss)
Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy")
s.SetHealthy(ss)
}
}
// DeleteStatefulPodAtIndex deletes the Pod with ordinal index in ss.
func (s *StatefulSetTester) DeleteStatefulPodAtIndex(index int, ss *apps.StatefulSet) {
name := getStatefulSetPodNameAtIndex(index, ss)
noGrace := int64(0)
if err := s.c.Core().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err)
}
}
// VerifyStatefulPodFunc is a func that examines a StatefulSetPod.
type VerifyStatefulPodFunc func(*v1.Pod)
// VerifyPodAtIndex applies a visitor patter to the Pod at index in ss. verify is is applied to the Pod to "visit" it.
func (s *StatefulSetTester) VerifyPodAtIndex(index int, ss *apps.StatefulSet, verify VerifyStatefulPodFunc) {
name := getStatefulSetPodNameAtIndex(index, ss)
pod, err := s.c.Core().Pods(ss.Namespace).Get(name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name))
verify(pod)
}
func getStatefulSetPodNameAtIndex(index int, ss *apps.StatefulSet) string {
// TODO: we won't use "-index" as the name strategy forever,
// pull the name out from an identity mapper.
return fmt.Sprintf("%v-%v", ss.Name, index)
}
// Scale scales ss to count replicas.
func (s *StatefulSetTester) Scale(ss *apps.StatefulSet, count int32) error {
name := ss.Name
ns := ss.Namespace
s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })
var statefulPodList *v1.PodList
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
statefulPodList = s.GetPodList(ss)
if int32(len(statefulPodList.Items)) == count {
return true, nil
}
return false, nil
})
if pollErr != nil {
unhealthy := []string{}
for _, statefulPod := range statefulPodList.Items {
delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, v1.IsPodReady(&statefulPod)
if delTs != nil || phase != v1.PodRunning || !readiness {
unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
}
}
return fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
}
return nil
}
// UpdateReplicas updates the replicas of ss to count.
func (s *StatefulSetTester) UpdateReplicas(ss *apps.StatefulSet, count int32) {
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { ss.Spec.Replicas = &count })
}
// Restart scales ss to 0 and then back to its previous number of replicas.
func (s *StatefulSetTester) Restart(ss *apps.StatefulSet) {
oldReplicas := *(ss.Spec.Replicas)
ExpectNoError(s.Scale(ss, 0))
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
}
func (s *StatefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) {
for i := 0; i < 3; i++ {
ss, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
Failf("failed to get statefulset %q: %v", name, err)
}
update(ss)
ss, err = s.c.Apps().StatefulSets(ns).Update(ss)
if err == nil {
return
}
if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
Failf("failed to update statefulset %q: %v", name, err)
}
}
Failf("too many retries draining statefulset %q", name)
}
// GetPodList gets the current Pods in ss.
func (s *StatefulSetTester) GetPodList(ss *apps.StatefulSet) *v1.PodList {
selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
ExpectNoError(err)
podList, err := s.c.Core().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
ExpectNoError(err)
return podList
}
// ConfirmStatefulPodCount asserts that the current number of Pods in ss is count waiting up to timeout for ss to
// to scale to count.
func (s *StatefulSetTester) ConfirmStatefulPodCount(count int, ss *apps.StatefulSet, timeout time.Duration) {
start := time.Now()
deadline := start.Add(timeout)
for t := time.Now(); t.Before(deadline); t = time.Now() {
podList := s.GetPodList(ss)
statefulPodCount := len(podList.Items)
if statefulPodCount != count {
Failf("StatefulSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ss.Name, count, len(podList.Items), podList)
}
Logf("Verifying statefulset %v doesn't scale past %d for another %+v", ss.Name, count, deadline.Sub(t))
time.Sleep(1 * time.Second)
}
}
func (s *StatefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) {
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) {
podList := s.GetPodList(ss)
if int32(len(podList.Items)) < numStatefulPods {
Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods)
return false, nil
}
if int32(len(podList.Items)) > numStatefulPods {
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items))
}
for _, p := range podList.Items {
isReady := v1.IsPodReady(&p)
desiredReadiness := shouldBeReady == isReady
Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
if p.Status.Phase != v1.PodRunning || !desiredReadiness {
return false, nil
}
}
return true, nil
})
if pollErr != nil {
Failf("Failed waiting for pods to enter running: %v", pollErr)
}
}
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
s.waitForRunning(numStatefulPods, ss, true)
}
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and not Ready.
func (s *StatefulSetTester) WaitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) {
s.waitForRunning(numStatefulPods, ss, false)
}
// BreakProbe breaks the readiness probe for Nginx StatefulSet containers.
func (s *StatefulSetTester) BreakProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
path := probe.HTTPGet.Path
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path)
return s.ExecInStatefulPods(ss, cmd)
}
// RestoreProbe restores the readiness probe for Nginx StatefulSet containers.
func (s *StatefulSetTester) RestoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
path := probe.HTTPGet.Path
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path)
return s.ExecInStatefulPods(ss, cmd)
}
// SetHealthy updates the StatefulSet InitAnnotation to true in order to set a StatefulSet Pod to be Running and Ready.
func (s *StatefulSetTester) SetHealthy(ss *apps.StatefulSet) {
podList := s.GetPodList(ss)
markedHealthyPod := ""
for _, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
Failf("Found pod in %v cannot set health", pod.Status.Phase)
}
if IsStatefulSetPodInitialized(pod) {
continue
}
if markedHealthyPod != "" {
Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod)
}
p, err := UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) {
update.Annotations[apps.StatefulSetInitAnnotation] = "true"
})
ExpectNoError(err)
Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name)
markedHealthyPod = pod.Name
}
}
func (s *StatefulSetTester) waitForStatus(ss *apps.StatefulSet, expectedReplicas int32) {
Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
ns, name := ss.Namespace, ss.Name
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) {
ssGet, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if ssGet.Status.Replicas != expectedReplicas {
Logf("Waiting for stateful set status to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
return false, nil
}
return true, nil
})
if pollErr != nil {
Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr)
}
}
// CheckServiceName asserts that the ServiceName for ss is equivalent to expectedServiceName.
func (p *StatefulSetTester) CheckServiceName(ss *apps.StatefulSet, expectedServiceName string) error {
Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
if expectedServiceName != ss.Spec.ServiceName {
return fmt.Errorf("Wrong service name governing statefulset. Expected %s got %s",
expectedServiceName, ss.Spec.ServiceName)
}
return nil
}
// DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns.
func DeleteAllStatefulSets(c clientset.Interface, ns string) {
sst := &StatefulSetTester{c: c}
ssList, err := c.Apps().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
ExpectNoError(err)
// Scale down each statefulset, then delete it completely.
// Deleting a pvc without doing this will leak volumes, #25101.
errList := []string{}
for _, ss := range ssList.Items {
Logf("Scaling statefulset %v to 0", ss.Name)
if err := sst.Scale(&ss, 0); err != nil {
errList = append(errList, fmt.Sprintf("%v", err))
}
sst.waitForStatus(&ss, 0)
Logf("Deleting statefulset %v", ss.Name)
if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil {
errList = append(errList, fmt.Sprintf("%v", err))
}
}
// pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs.
pvNames := sets.NewString()
// TODO: Don't assume all pvcs in the ns belong to a statefulset
pvcPollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
pvcList, err := c.Core().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
if err != nil {
Logf("WARNING: Failed to list pvcs, retrying %v", err)
return false, nil
}
for _, pvc := range pvcList.Items {
pvNames.Insert(pvc.Spec.VolumeName)
// TODO: Double check that there are no pods referencing the pvc
Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
if err := c.Core().PersistentVolumeClaims(ns).Delete(pvc.Name, nil); err != nil {
return false, nil
}
}
return true, nil
})
if pvcPollErr != nil {
errList = append(errList, fmt.Sprintf("Timeout waiting for pvc deletion."))
}
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
pvList, err := c.Core().PersistentVolumes().List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
if err != nil {
Logf("WARNING: Failed to list pvs, retrying %v", err)
return false, nil
}
waitingFor := []string{}
for _, pv := range pvList.Items {
if pvNames.Has(pv.Name) {
waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
}
}
if len(waitingFor) == 0 {
return true, nil
}
Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
return false, nil
})
if pollErr != nil {
errList = append(errList, fmt.Sprintf("Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs."))
}
if len(errList) != 0 {
ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
}
}
// IsStatefulSetPodInitialized returns true if pod's StatefulSetInitAnnotation exists and is set to true.
func IsStatefulSetPodInitialized(pod v1.Pod) bool {
initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation]
if !ok {
return false
}
inited, err := strconv.ParseBool(initialized)
if err != nil {
Failf("Couldn't parse statefulset init annotations %v", initialized)
}
return inited
}
// NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets.
func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
"volume.alpha.kubernetes.io/storage-class": "anything",
},
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
},
},
},
}
}
// NewStatefulSet creates a new NGINX StatefulSet for testing. The StatefulSet is named name, is in namespace ns,
// statefulPodsMounts are the mounts that will be backed by PVs. podsMounts are the mounts that are mounted directly
// to the Pod. labels are the labels that will be usd for the StatefulSet selector.
func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *apps.StatefulSet {
mounts := append(statefulPodMounts, podMounts...)
claims := []v1.PersistentVolumeClaim{}
for _, m := range statefulPodMounts {
claims = append(claims, NewStatefulSetPVC(m.Name))
}
vols := []v1.Volume{}
for _, m := range podMounts {
vols = append(vols, v1.Volume{
Name: m.Name,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: fmt.Sprintf("/tmp/%v", m.Name),
},
},
})
}
return &apps.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: "apps/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: apps.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Replicas: func(i int32) *int32 { return &i }(replicas),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: map[string]string{},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "gcr.io/google_containers/nginx-slim:0.7",
VolumeMounts: mounts,
},
},
Volumes: vols,
},
},
VolumeClaimTemplates: claims,
ServiceName: governingSvcName,
},
}
}
// SetStatefulSetInitializedAnnotation sets teh StatefulSetInitAnnotation to value.
func SetStatefulSetInitializedAnnotation(ss *apps.StatefulSet, value string) {
ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value
}

View File

@ -382,17 +382,17 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
dumpDebugInfo(c, ns)
}
framework.Logf("Deleting all stateful set in ns %v", ns)
deleteAllStatefulSets(c, ns)
framework.DeleteAllStatefulSets(c, ns)
})
It("should come back up if node goes down [Slow] [Disruptive]", func() {
petMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
ps := newStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels)
ps := framework.NewStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels)
_, err := c.Apps().StatefulSets(ns).Create(ps)
Expect(err).NotTo(HaveOccurred())
pst := statefulSetTester{c: c}
pst := framework.NewStatefulSetTester(c)
nn := framework.TestContext.CloudConfig.NumNodes
nodeNames, err := framework.CheckNodesReady(f.ClientSet, framework.NodeReadyInitialTimeout, nn)
@ -400,18 +400,18 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
restartNodes(f, nodeNames)
By("waiting for pods to be running again")
pst.waitForRunningAndReady(*ps.Spec.Replicas, ps)
pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps)
})
It("should not reschedule stateful pods if there is a network partition [Slow] [Disruptive]", func() {
ps := newStatefulSet(psName, ns, headlessSvcName, 3, []v1.VolumeMount{}, []v1.VolumeMount{}, labels)
ps := framework.NewStatefulSet(psName, ns, headlessSvcName, 3, []v1.VolumeMount{}, []v1.VolumeMount{}, labels)
_, err := c.Apps().StatefulSets(ns).Create(ps)
Expect(err).NotTo(HaveOccurred())
pst := statefulSetTester{c: c}
pst.waitForRunningAndReady(*ps.Spec.Replicas, ps)
pst := framework.NewStatefulSetTester(c)
pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps)
pod := pst.getPodList(ps).Items[0]
pod := pst.GetPodList(ps).Items[0]
node, err := c.Core().Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{})
framework.ExpectNoError(err)
@ -430,7 +430,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
}
By("waiting for pods to be running again")
pst.waitForRunningAndReady(*ps.Spec.Replicas, ps)
pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps)
})
})

View File

@ -18,27 +18,18 @@ package e2e
import (
"fmt"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -46,11 +37,6 @@ import (
)
const (
statefulsetPoll = 10 * time.Second
// Some statefulPods install base packages via wget
statefulsetTimeout = 10 * time.Minute
// Timeout for stateful pods to change state
statefulPodTimeout = 5 * time.Minute
zookeeperManifestPath = "test/e2e/testing-manifests/statefulset/zookeeper"
mysqlGaleraManifestPath = "test/e2e/testing-manifests/statefulset/mysql-galera"
redisManifestPath = "test/e2e/testing-manifests/statefulset/redis"
@ -87,7 +73,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
BeforeEach(func() {
statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
podMounts = []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
ss = newStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
ss = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
By("Creating service " + headlessSvcName + " in namespace " + ns)
headlessService := createServiceSpec(headlessSvcName, "", true, labels)
@ -100,80 +86,80 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
dumpDebugInfo(c, ns)
}
framework.Logf("Deleting all statefulset in ns %v", ns)
deleteAllStatefulSets(c, ns)
framework.DeleteAllStatefulSets(c, ns)
})
It("should provide basic identity", func() {
By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 3
setInitializedAnnotation(ss, "false")
framework.SetStatefulSetInitializedAnnotation(ss, "false")
_, err := c.Apps().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred())
sst := statefulSetTester{c: c}
sst := framework.NewStatefulSetTester(c)
By("Saturating stateful set " + ss.Name)
sst.saturate(ss)
sst.Saturate(ss)
By("Verifying statefulset mounted data directory is usable")
framework.ExpectNoError(sst.checkMount(ss, "/data"))
framework.ExpectNoError(sst.CheckMount(ss, "/data"))
By("Verifying statefulset provides a stable hostname for each pod")
framework.ExpectNoError(sst.checkHostname(ss))
framework.ExpectNoError(sst.CheckHostname(ss))
By("Verifying statefulset set proper service name")
framework.ExpectNoError(sst.checkServiceName(ss, headlessSvcName))
framework.ExpectNoError(sst.CheckServiceName(ss, headlessSvcName))
cmd := "echo $(hostname) > /data/hostname; sync;"
By("Running " + cmd + " in all stateful pods")
framework.ExpectNoError(sst.execInStatefulPods(ss, cmd))
framework.ExpectNoError(sst.ExecInStatefulPods(ss, cmd))
By("Restarting statefulset " + ss.Name)
sst.restart(ss)
sst.saturate(ss)
sst.Restart(ss)
sst.Saturate(ss)
By("Verifying statefulset mounted data directory is usable")
framework.ExpectNoError(sst.checkMount(ss, "/data"))
framework.ExpectNoError(sst.CheckMount(ss, "/data"))
cmd = "if [ \"$(cat /data/hostname)\" = \"$(hostname)\" ]; then exit 0; else exit 1; fi"
By("Running " + cmd + " in all stateful pods")
framework.ExpectNoError(sst.execInStatefulPods(ss, cmd))
framework.ExpectNoError(sst.ExecInStatefulPods(ss, cmd))
})
It("should not deadlock when a pod's predecessor fails", func() {
By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 2
setInitializedAnnotation(ss, "false")
framework.SetStatefulSetInitializedAnnotation(ss, "false")
_, err := c.Apps().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred())
sst := statefulSetTester{c: c}
sst := framework.NewStatefulSetTester(c)
sst.waitForRunningAndReady(1, ss)
sst.WaitForRunningAndReady(1, ss)
By("Marking stateful pod at index 0 as healthy.")
sst.setHealthy(ss)
sst.SetHealthy(ss)
By("Waiting for stateful pod at index 1 to enter running.")
sst.waitForRunningAndReady(2, ss)
sst.WaitForRunningAndReady(2, ss)
// Now we have 1 healthy and 1 unhealthy stateful pod. Deleting the healthy stateful pod should *not*
// create a new stateful pod till the remaining stateful pod becomes healthy, which won't happen till
// we set the healthy bit.
By("Deleting healthy stateful pod at index 0.")
sst.deleteStatefulPodAtIndex(0, ss)
sst.DeleteStatefulPodAtIndex(0, ss)
By("Confirming stateful pod at index 0 is recreated.")
sst.waitForRunningAndReady(2, ss)
sst.WaitForRunningAndReady(2, ss)
By("Deleting unhealthy stateful pod at index 1.")
sst.deleteStatefulPodAtIndex(1, ss)
sst.DeleteStatefulPodAtIndex(1, ss)
By("Confirming all stateful pods in statefulset are created.")
sst.saturate(ss)
sst.Saturate(ss)
})
It("should allow template updates", func() {
@ -183,9 +169,9 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
ss, err := c.Apps().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred())
sst := statefulSetTester{c: c}
sst := framework.NewStatefulSetTester(c)
sst.waitForRunningAndReady(*ss.Spec.Replicas, ss)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
newImage := newNginxImage
oldImage := ss.Spec.Template.Spec.Containers[0].Image
@ -198,17 +184,17 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
updateIndex := 0
By(fmt.Sprintf("Deleting stateful pod at index %d", updateIndex))
sst.deleteStatefulPodAtIndex(updateIndex, ss)
sst.DeleteStatefulPodAtIndex(updateIndex, ss)
By("Waiting for all stateful pods to be running again")
sst.waitForRunningAndReady(*ss.Spec.Replicas, ss)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
By(fmt.Sprintf("Verifying stateful pod at index %d is updated", updateIndex))
verify := func(pod *v1.Pod) {
podImage := pod.Spec.Containers[0].Image
Expect(podImage).To(Equal(newImage), fmt.Sprintf("Expected stateful pod image %s updated to %s", podImage, newImage))
}
sst.verifyPodAtIndex(updateIndex, ss, verify)
sst.VerifyPodAtIndex(updateIndex, ss, verify)
})
It("Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed", func() {
@ -216,29 +202,29 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := newStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels)
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, labels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
setInitializedAnnotation(ss, "false")
framework.SetStatefulSetInitializedAnnotation(ss, "false")
ss, err := c.Apps().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred())
sst := &statefulSetTester{c: c}
sst.waitForRunningAndReady(1, ss)
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(1, ss)
By("Scaling up stateful set " + ssName + " to 3 replicas and pausing after 2nd pod")
sst.setHealthy(ss)
sst.updateReplicas(ss, 3)
sst.waitForRunningAndReady(2, ss)
sst.SetHealthy(ss)
sst.UpdateReplicas(ss, 3)
sst.WaitForRunningAndReady(2, ss)
By("Before scale up finished setting 2nd pod to be not ready by breaking readiness probe")
sst.breakProbe(ss, testProbe)
sst.waitForRunningAndNotReady(2, ss)
sst.BreakProbe(ss, testProbe)
sst.WaitForRunningAndNotReady(2, ss)
By("Continue scale operation after the 2nd pod, and scaling down to 1 replica")
sst.setHealthy(ss)
sst.updateReplicas(ss, 1)
sst.SetHealthy(ss)
sst.UpdateReplicas(ss, 1)
By("Verifying that the 2nd pod wont be removed if it is not running and ready")
sst.confirmStatefulPodCount(2, ss, 10*time.Second)
sst.ConfirmStatefulPodCount(2, ss, 10*time.Second)
expectedPodName := ss.Name + "-1"
expectedPod, err := f.ClientSet.Core().Pods(ns).Get(expectedPodName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
@ -251,8 +237,8 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
Expect(err).NotTo(HaveOccurred())
By("Verifying the 2nd pod is removed only when it becomes running and ready")
sst.restoreProbe(ss, testProbe)
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
sst.RestoreProbe(ss, testProbe)
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
if event.Type == watch.Deleted && pod.Name == expectedPodName {
return false, fmt.Errorf("Pod %v was deleted before enter running", pod.Name)
@ -282,28 +268,28 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := newStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
ss, err = c.Apps().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred())
By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns)
sst := &statefulSetTester{c: c}
sst.waitForRunningAndReady(*ss.Spec.Replicas, ss)
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
By("Confirming that stateful set scale up will halt with unhealthy stateful pod")
sst.breakProbe(ss, testProbe)
sst.waitForRunningAndNotReady(*ss.Spec.Replicas, ss)
sst.updateReplicas(ss, 3)
sst.confirmStatefulPodCount(1, ss, 10*time.Second)
sst.BreakProbe(ss, testProbe)
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
sst.UpdateReplicas(ss, 3)
sst.ConfirmStatefulPodCount(1, ss, 10*time.Second)
By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
sst.restoreProbe(ss, testProbe)
sst.waitForRunningAndReady(3, ss)
sst.RestoreProbe(ss, testProbe)
sst.WaitForRunningAndReady(3, ss)
By("Verifying that stateful set " + ssName + " was scaled up in order")
expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"}
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
return false, nil
}
@ -322,18 +308,18 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
})
Expect(err).NotTo(HaveOccurred())
sst.breakProbe(ss, testProbe)
sst.waitForRunningAndNotReady(3, ss)
sst.updateReplicas(ss, 0)
sst.confirmStatefulPodCount(3, ss, 10*time.Second)
sst.BreakProbe(ss, testProbe)
sst.WaitForRunningAndNotReady(3, ss)
sst.UpdateReplicas(ss, 0)
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second)
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
sst.restoreProbe(ss, testProbe)
sst.scale(ss, 0)
sst.RestoreProbe(ss, testProbe)
sst.Scale(ss, 0)
By("Verifying that stateful set " + ssName + " was scaled down in reverse order")
expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"}
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Deleted {
return false, nil
}
@ -375,7 +361,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
framework.ExpectNoError(err)
By("Creating statefulset with conflicting port in namespace " + f.Namespace.Name)
ss := newStatefulSet(ssName, f.Namespace.Name, headlessSvcName, 1, nil, nil, labels)
ss := framework.NewStatefulSet(ssName, f.Namespace.Name, headlessSvcName, 1, nil, nil, labels)
statefulPodContainer := &ss.Spec.Template.Spec.Containers[0]
statefulPodContainer.Ports = append(statefulPodContainer.Ports, conflictingPort)
ss.Spec.Template.Spec.NodeName = node.Name
@ -392,7 +378,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
w, err := f.ClientSet.Core().Pods(f.Namespace.Name).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName}))
framework.ExpectNoError(err)
// we need to get UID from pod in any state and wait until stateful set controller will remove pod atleast once
_, err = watch.Until(statefulPodTimeout, w, func(event watch.Event) (bool, error) {
_, err = watch.Until(framework.StatefulPodTimeout, w, func(event watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
switch event.Type {
case watch.Deleted:
@ -428,16 +414,16 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
return fmt.Errorf("Pod %v wasn't recreated: %v == %v", statefulPod.Name, statefulPod.UID, initialStatefulPodUID)
}
return nil
}, statefulPodTimeout, 2*time.Second).Should(BeNil())
}, framework.StatefulPodTimeout, 2*time.Second).Should(BeNil())
})
})
framework.KubeDescribe("Deploy clustered applications [Feature:StatefulSet] [Slow]", func() {
var sst *statefulSetTester
var sst *framework.StatefulSetTester
var appTester *clusterAppTester
BeforeEach(func() {
sst = &statefulSetTester{c: c}
sst = framework.NewStatefulSetTester(c)
appTester = &clusterAppTester{tester: sst, ns: ns}
})
@ -446,7 +432,7 @@ var _ = framework.KubeDescribe("StatefulSet", func() {
dumpDebugInfo(c, ns)
}
framework.Logf("Deleting all statefulset in ns %v", ns)
deleteAllStatefulSets(c, ns)
framework.DeleteAllStatefulSets(c, ns)
})
It("should creating a working zookeeper cluster", func() {
@ -504,7 +490,7 @@ type statefulPodTester interface {
type clusterAppTester struct {
ns string
statefulPod statefulPodTester
tester *statefulSetTester
tester *framework.StatefulSetTester
}
func (c *clusterAppTester) run() {
@ -520,8 +506,8 @@ func (c *clusterAppTester) run() {
default:
if restartCluster {
By("Restarting stateful set " + ss.Name)
c.tester.restart(ss)
c.tester.waitForRunningAndReady(*ss.Spec.Replicas, ss)
c.tester.Restart(ss)
c.tester.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
}
}
@ -533,7 +519,7 @@ func (c *clusterAppTester) run() {
type zookeeperTester struct {
ss *apps.StatefulSet
tester *statefulSetTester
tester *framework.StatefulSetTester
}
func (z *zookeeperTester) name() string {
@ -541,7 +527,7 @@ func (z *zookeeperTester) name() string {
}
func (z *zookeeperTester) deploy(ns string) *apps.StatefulSet {
z.ss = z.tester.createStatefulSet(zookeeperManifestPath, ns)
z.ss = z.tester.CreateStatefulSet(zookeeperManifestPath, ns)
return z.ss
}
@ -563,7 +549,7 @@ func (z *zookeeperTester) read(statefulPodIndex int, key string) string {
type mysqlGaleraTester struct {
ss *apps.StatefulSet
tester *statefulSetTester
tester *framework.StatefulSetTester
}
func (m *mysqlGaleraTester) name() string {
@ -579,7 +565,7 @@ func (m *mysqlGaleraTester) mysqlExec(cmd, ns, podName string) string {
}
func (m *mysqlGaleraTester) deploy(ns string) *apps.StatefulSet {
m.ss = m.tester.createStatefulSet(mysqlGaleraManifestPath, ns)
m.ss = m.tester.CreateStatefulSet(mysqlGaleraManifestPath, ns)
framework.Logf("Deployed statefulset %v, initializing database", m.ss.Name)
for _, cmd := range []string{
@ -606,7 +592,7 @@ func (m *mysqlGaleraTester) read(statefulPodIndex int, key string) string {
type redisTester struct {
ss *apps.StatefulSet
tester *statefulSetTester
tester *framework.StatefulSetTester
}
func (m *redisTester) name() string {
@ -619,7 +605,7 @@ func (m *redisTester) redisExec(cmd, ns, podName string) string {
}
func (m *redisTester) deploy(ns string) *apps.StatefulSet {
m.ss = m.tester.createStatefulSet(redisManifestPath, ns)
m.ss = m.tester.CreateStatefulSet(redisManifestPath, ns)
return m.ss
}
@ -637,7 +623,7 @@ func (m *redisTester) read(statefulPodIndex int, key string) string {
type cockroachDBTester struct {
ss *apps.StatefulSet
tester *statefulSetTester
tester *framework.StatefulSetTester
}
func (c *cockroachDBTester) name() string {
@ -650,7 +636,7 @@ func (c *cockroachDBTester) cockroachDBExec(cmd, ns, podName string) string {
}
func (c *cockroachDBTester) deploy(ns string) *apps.StatefulSet {
c.ss = c.tester.createStatefulSet(cockroachDBManifestPath, ns)
c.ss = c.tester.CreateStatefulSet(cockroachDBManifestPath, ns)
framework.Logf("Deployed statefulset %v, initializing database", c.ss.Name)
for _, cmd := range []string{
"CREATE DATABASE IF NOT EXISTS foo;",
@ -678,370 +664,6 @@ func lastLine(out string) string {
return outLines[len(outLines)-1]
}
func statefulSetFromManifest(fileName, ns string) *apps.StatefulSet {
var ss apps.StatefulSet
framework.Logf("Parsing statefulset from %v", fileName)
data, err := ioutil.ReadFile(fileName)
Expect(err).NotTo(HaveOccurred())
json, err := utilyaml.ToJSON(data)
Expect(err).NotTo(HaveOccurred())
Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &ss)).NotTo(HaveOccurred())
ss.Namespace = ns
if ss.Spec.Selector == nil {
ss.Spec.Selector = &metav1.LabelSelector{
MatchLabels: ss.Spec.Template.Labels,
}
}
return &ss
}
// statefulSetTester has all methods required to test a single statefulset.
type statefulSetTester struct {
c clientset.Interface
}
func (s *statefulSetTester) createStatefulSet(manifestPath, ns string) *apps.StatefulSet {
mkpath := func(file string) string {
return filepath.Join(framework.TestContext.RepoRoot, manifestPath, file)
}
ss := statefulSetFromManifest(mkpath("statefulset.yaml"), ns)
framework.Logf(fmt.Sprintf("creating " + ss.Name + " service"))
framework.RunKubectlOrDie("create", "-f", mkpath("service.yaml"), fmt.Sprintf("--namespace=%v", ns))
framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
framework.RunKubectlOrDie("create", "-f", mkpath("statefulset.yaml"), fmt.Sprintf("--namespace=%v", ns))
s.waitForRunningAndReady(*ss.Spec.Replicas, ss)
return ss
}
func (s *statefulSetTester) checkMount(ss *apps.StatefulSet, mountPath string) error {
for _, cmd := range []string{
// Print inode, size etc
fmt.Sprintf("ls -idlhZ %v", mountPath),
// Print subdirs
fmt.Sprintf("find %v", mountPath),
// Try writing
fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
} {
if err := s.execInStatefulPods(ss, cmd); err != nil {
return fmt.Errorf("failed to execute %v, error: %v", cmd, err)
}
}
return nil
}
func (s *statefulSetTester) execInStatefulPods(ss *apps.StatefulSet, cmd string) error {
podList := s.getPodList(ss)
for _, statefulPod := range podList.Items {
stdout, err := framework.RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
framework.Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
if err != nil {
return err
}
}
return nil
}
func (s *statefulSetTester) checkHostname(ss *apps.StatefulSet) error {
cmd := "printf $(hostname)"
podList := s.getPodList(ss)
for _, statefulPod := range podList.Items {
hostname, err := framework.RunHostCmd(statefulPod.Namespace, statefulPod.Name, cmd)
if err != nil {
return err
}
if hostname != statefulPod.Name {
return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
}
}
return nil
}
func (s *statefulSetTester) saturate(ss *apps.StatefulSet) {
// TODO: Watch events and check that creation timestamss don't overlap
var i int32
for i = 0; i < *(ss.Spec.Replicas); i++ {
framework.Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running")
s.waitForRunningAndReady(i+1, ss)
framework.Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy")
s.setHealthy(ss)
}
}
func (s *statefulSetTester) deleteStatefulPodAtIndex(index int, ss *apps.StatefulSet) {
name := getPodNameAtIndex(index, ss)
noGrace := int64(0)
if err := s.c.Core().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
framework.Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err)
}
}
type verifyPodFunc func(*v1.Pod)
func (s *statefulSetTester) verifyPodAtIndex(index int, ss *apps.StatefulSet, verify verifyPodFunc) {
name := getPodNameAtIndex(index, ss)
pod, err := s.c.Core().Pods(ss.Namespace).Get(name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name))
verify(pod)
}
func getPodNameAtIndex(index int, ss *apps.StatefulSet) string {
// TODO: we won't use "-index" as the name strategy forever,
// pull the name out from an identity mapper.
return fmt.Sprintf("%v-%v", ss.Name, index)
}
func (s *statefulSetTester) scale(ss *apps.StatefulSet, count int32) error {
name := ss.Name
ns := ss.Namespace
s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })
var statefulPodList *v1.PodList
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
statefulPodList = s.getPodList(ss)
if int32(len(statefulPodList.Items)) == count {
return true, nil
}
return false, nil
})
if pollErr != nil {
unhealthy := []string{}
for _, statefulPod := range statefulPodList.Items {
delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, v1.IsPodReady(&statefulPod)
if delTs != nil || phase != v1.PodRunning || !readiness {
unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
}
}
return fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, statefulsetTimeout, unhealthy)
}
return nil
}
func (s *statefulSetTester) updateReplicas(ss *apps.StatefulSet, count int32) {
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { ss.Spec.Replicas = &count })
}
func (s *statefulSetTester) restart(ss *apps.StatefulSet) {
oldReplicas := *(ss.Spec.Replicas)
framework.ExpectNoError(s.scale(ss, 0))
s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
}
func (s *statefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) {
for i := 0; i < 3; i++ {
ss, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
framework.Failf("failed to get statefulset %q: %v", name, err)
}
update(ss)
ss, err = s.c.Apps().StatefulSets(ns).Update(ss)
if err == nil {
return
}
if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
framework.Failf("failed to update statefulset %q: %v", name, err)
}
}
framework.Failf("too many retries draining statefulset %q", name)
}
func (s *statefulSetTester) getPodList(ss *apps.StatefulSet) *v1.PodList {
selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
framework.ExpectNoError(err)
podList, err := s.c.Core().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
framework.ExpectNoError(err)
return podList
}
func (s *statefulSetTester) confirmStatefulPodCount(count int, ss *apps.StatefulSet, timeout time.Duration) {
start := time.Now()
deadline := start.Add(timeout)
for t := time.Now(); t.Before(deadline); t = time.Now() {
podList := s.getPodList(ss)
statefulPodCount := len(podList.Items)
if statefulPodCount != count {
framework.Failf("StatefulSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ss.Name, count, len(podList.Items), podList)
}
framework.Logf("Verifying statefulset %v doesn't scale past %d for another %+v", ss.Name, count, deadline.Sub(t))
time.Sleep(1 * time.Second)
}
}
func (s *statefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) {
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout,
func() (bool, error) {
podList := s.getPodList(ss)
if int32(len(podList.Items)) < numStatefulPods {
framework.Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods)
return false, nil
}
if int32(len(podList.Items)) > numStatefulPods {
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items))
}
for _, p := range podList.Items {
isReady := v1.IsPodReady(&p)
desiredReadiness := shouldBeReady == isReady
framework.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
if p.Status.Phase != v1.PodRunning || !desiredReadiness {
return false, nil
}
}
return true, nil
})
if pollErr != nil {
framework.Failf("Failed waiting for pods to enter running: %v", pollErr)
}
}
func (s *statefulSetTester) waitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
s.waitForRunning(numStatefulPods, ss, true)
}
func (s *statefulSetTester) waitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) {
s.waitForRunning(numStatefulPods, ss, false)
}
func (s *statefulSetTester) breakProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
path := probe.HTTPGet.Path
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path)
return s.execInStatefulPods(ss, cmd)
}
func (s *statefulSetTester) restoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
path := probe.HTTPGet.Path
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path)
return s.execInStatefulPods(ss, cmd)
}
func (s *statefulSetTester) setHealthy(ss *apps.StatefulSet) {
podList := s.getPodList(ss)
markedHealthyPod := ""
for _, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
framework.Failf("Found pod in %v cannot set health", pod.Status.Phase)
}
if isInitialized(pod) {
continue
}
if markedHealthyPod != "" {
framework.Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod)
}
p, err := framework.UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) {
update.Annotations[apps.StatefulSetInitAnnotation] = "true"
})
framework.ExpectNoError(err)
framework.Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name)
markedHealthyPod = pod.Name
}
}
func (s *statefulSetTester) waitForStatus(ss *apps.StatefulSet, expectedReplicas int32) {
framework.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
ns, name := ss.Namespace, ss.Name
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout,
func() (bool, error) {
ssGet, err := s.c.Apps().StatefulSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if ssGet.Status.Replicas != expectedReplicas {
framework.Logf("Waiting for stateful set status to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
return false, nil
}
return true, nil
})
if pollErr != nil {
framework.Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr)
}
}
func (p *statefulSetTester) checkServiceName(ps *apps.StatefulSet, expectedServiceName string) error {
framework.Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
if expectedServiceName != ps.Spec.ServiceName {
return fmt.Errorf("Wrong service name governing statefulset. Expected %s got %s", expectedServiceName, ps.Spec.ServiceName)
}
return nil
}
func deleteAllStatefulSets(c clientset.Interface, ns string) {
sst := &statefulSetTester{c: c}
ssList, err := c.Apps().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
framework.ExpectNoError(err)
// Scale down each statefulset, then delete it completely.
// Deleting a pvc without doing this will leak volumes, #25101.
errList := []string{}
for _, ss := range ssList.Items {
framework.Logf("Scaling statefulset %v to 0", ss.Name)
if err := sst.scale(&ss, 0); err != nil {
errList = append(errList, fmt.Sprintf("%v", err))
}
sst.waitForStatus(&ss, 0)
framework.Logf("Deleting statefulset %v", ss.Name)
if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil {
errList = append(errList, fmt.Sprintf("%v", err))
}
}
// pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs.
pvNames := sets.NewString()
// TODO: Don't assume all pvcs in the ns belong to a statefulset
pvcPollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
pvcList, err := c.Core().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
if err != nil {
framework.Logf("WARNING: Failed to list pvcs, retrying %v", err)
return false, nil
}
for _, pvc := range pvcList.Items {
pvNames.Insert(pvc.Spec.VolumeName)
// TODO: Double check that there are no pods referencing the pvc
framework.Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
if err := c.Core().PersistentVolumeClaims(ns).Delete(pvc.Name, nil); err != nil {
return false, nil
}
}
return true, nil
})
if pvcPollErr != nil {
errList = append(errList, "Timeout waiting for pvc deletion.")
}
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
pvList, err := c.Core().PersistentVolumes().List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
if err != nil {
framework.Logf("WARNING: Failed to list pvs, retrying %v", err)
return false, nil
}
waitingFor := []string{}
for _, pv := range pvList.Items {
if pvNames.Has(pv.Name) {
waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
}
}
if len(waitingFor) == 0 {
return true, nil
}
framework.Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
return false, nil
})
if pollErr != nil {
errList = append(errList, "Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs.")
}
if len(errList) != 0 {
framework.ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
}
}
func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, key, expectedVal string) error {
err := wait.PollImmediate(time.Second, readTimeout, func() (bool, error) {
val := statefulPod.read(statefulPodNumber, key)
@ -1058,100 +680,3 @@ func pollReadWithTimeout(statefulPod statefulPodTester, statefulPodNumber int, k
}
return err
}
func isInitialized(pod v1.Pod) bool {
initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation]
if !ok {
return false
}
inited, err := strconv.ParseBool(initialized)
if err != nil {
framework.Failf("Couldn't parse statefulset init annotations %v", initialized)
}
return inited
}
func newPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
"volume.alpha.kubernetes.io/storage-class": "anything",
},
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
},
},
},
}
}
func newStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *apps.StatefulSet {
mounts := append(statefulPodMounts, podMounts...)
claims := []v1.PersistentVolumeClaim{}
for _, m := range statefulPodMounts {
claims = append(claims, newPVC(m.Name))
}
vols := []v1.Volume{}
for _, m := range podMounts {
vols = append(vols, v1.Volume{
Name: m.Name,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: fmt.Sprintf("/tmp/%v", m.Name),
},
},
})
}
privileged := true
return &apps.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: "apps/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: apps.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Replicas: func(i int32) *int32 { return &i }(replicas),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: map[string]string{},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: nginxImage,
VolumeMounts: mounts,
SecurityContext: &v1.SecurityContext{
Privileged: &privileged,
},
},
},
Volumes: vols,
},
},
VolumeClaimTemplates: claims,
ServiceName: governingSvcName,
},
}
}
func setInitializedAnnotation(ss *apps.StatefulSet, value string) {
ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value
}

View File

@ -15,16 +15,19 @@ go_library(
"horizontal_pod_autoscalers.go",
"secrets.go",
"services.go",
"statefulset.go",
"upgrade.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",
"//vendor:github.com/onsi/ginkgo",
"//vendor:github.com/onsi/gomega",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/apimachinery/pkg/util/wait",

View File

@ -0,0 +1,99 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrades
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/test/e2e/framework"
)
// StatefulSetUpgradeTest implements an upgrade test harness for StatefulSet upgrade testing.
type StatefulSetUpgradeTest struct {
tester *framework.StatefulSetTester
service *v1.Service
set *apps.StatefulSet
}
// Setup creates a StatefulSet and a HeadlessService. It verifies the basic SatefulSet properties
func (t *StatefulSetUpgradeTest) Setup(f *framework.Framework) {
ssName := "ss"
labels := map[string]string{
"foo": "bar",
"baz": "blah",
}
headlessSvcName := "test"
statefulPodMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
podMounts := []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
ns := f.Namespace.Name
t.set = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
t.service = framework.CreateStatefulSetService(ssName, labels)
*(t.set.Spec.Replicas) = 3
framework.SetStatefulSetInitializedAnnotation(t.set, "false")
By("Creating service " + headlessSvcName + " in namespace " + ns)
_, err := f.ClientSet.Core().Services(ns).Create(t.service)
Expect(err).NotTo(HaveOccurred())
t.tester = framework.NewStatefulSetTester(f.ClientSet)
By("Creating statefulset " + ssName + " in namespace " + ns)
*(t.set.Spec.Replicas) = 3
_, err = f.ClientSet.Apps().StatefulSets(ns).Create(t.set)
Expect(err).NotTo(HaveOccurred())
By("Saturating stateful set " + t.set.Name)
t.tester.Saturate(t.set)
t.verify()
t.restart()
t.verify()
}
// Waits for the upgrade to complete and verifies the StatefulSet basic functionality
func (t *StatefulSetUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) {
<-done
t.verify()
}
// Deletes all StatefulSets
func (t *StatefulSetUpgradeTest) Teardown(f *framework.Framework) {
framework.DeleteAllStatefulSets(f.ClientSet, t.set.Name)
}
func (t *StatefulSetUpgradeTest) verify() {
By("Verifying statefulset mounted data directory is usable")
framework.ExpectNoError(t.tester.CheckMount(t.set, "/data"))
By("Verifying statefulset provides a stable hostname for each pod")
framework.ExpectNoError(t.tester.CheckHostname(t.set))
By("Verifying statefulset set proper service name")
framework.ExpectNoError(t.tester.CheckServiceName(t.set, t.set.Spec.ServiceName))
cmd := "echo $(hostname) > /data/hostname; sync;"
By("Running " + cmd + " in all stateful pods")
framework.ExpectNoError(t.tester.ExecInStatefulPods(t.set, cmd))
}
func (t *StatefulSetUpgradeTest) restart() {
By("Restarting statefulset " + t.set.Name)
t.tester.Restart(t.set)
t.tester.Saturate(t.set)
}