Merge pull request #48374 from mml/etcd-node-test

Automatic merge from submit-queue (batch tested with PRs 48374, 48524, 48519, 42548, 48615)

Add a node upgrade test for etcd statefulset with a PDB.

Tests for #38336
pull/6/head
Kubernetes Submit Queue 2017-07-07 14:48:27 -07:00 committed by GitHub
commit 4f8d9af1db
6 changed files with 431 additions and 0 deletions

View File

@ -0,0 +1,11 @@
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: etcd-pdb
labels:
pdb: etcd
spec:
minAvailable: 2
selector:
matchLabels:
app: etcd

View File

@ -0,0 +1,18 @@
apiVersion: v1
kind: Service
metadata:
annotations:
service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
metadata:
name: etcd
labels:
app: etcd
spec:
ports:
- port: 2380
name: etcd-server
- port: 2379
name: etcd-client
clusterIP: None
selector:
app: etcd

View File

@ -0,0 +1,178 @@
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: etcd
labels:
app: etcd
spec:
serviceName: etcd
replicas: 3
template:
metadata:
name: etcd
labels:
app: etcd
spec:
containers:
- name: etcd
image: gcr.io/google_containers/etcd-amd64:2.2.5
imagePullPolicy: Always
ports:
- containerPort: 2380
name: peer
- containerPort: 2379
name: client
resources:
requests:
cpu: 100m
memory: 512Mi
env:
- name: INITIAL_CLUSTER_SIZE
value: "3"
- name: SET_NAME
value: etcd
volumeMounts:
- name: datadir
mountPath: /var/run/etcd
lifecycle:
preStop:
exec:
command:
- "/bin/sh"
- "-ec"
- |
EPS=""
for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do
EPS="${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}:2379"
done
HOSTNAME=$(hostname)
member_hash() {
etcdctl member list | grep http://${HOSTNAME}.${SET_NAME}:2380 | cut -d':' -f1 | cut -d'[' -f1
}
echo "Removing ${HOSTNAME} from etcd cluster"
ETCDCTL_ENDPOINT=${EPS} etcdctl member remove $(member_hash)
if [ $? -eq 0 ]; then
# Remove everything otherwise the cluster will no longer scale-up
rm -rf /var/run/etcd/*
fi
command:
- "/bin/sh"
- "-ec"
- |
HOSTNAME=$(hostname)
# store member id into PVC for later member replacement
collect_member() {
while ! etcdctl member list &>/dev/null; do sleep 1; done
etcdctl member list | grep http://${HOSTNAME}.${SET_NAME}:2380 | cut -d':' -f1 | cut -d'[' -f1 > /var/run/etcd/member_id
exit 0
}
eps() {
EPS=""
for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do
EPS="${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}:2379"
done
echo ${EPS}
}
member_hash() {
etcdctl member list | grep http://${HOSTNAME}.${SET_NAME}:2380 | cut -d':' -f1 | cut -d'[' -f1
}
# re-joining after failure?
if [ -e /var/run/etcd/default.etcd ]; then
echo "Re-joining etcd member"
member_id=$(cat /var/run/etcd/member_id)
# re-join member
ETCDCTL_ENDPOINT=$(eps) etcdctl member update ${member_id} http://${HOSTNAME}.${SET_NAME}:2380
exec etcd --name ${HOSTNAME} \
--listen-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \
--listen-client-urls http://${HOSTNAME}.${SET_NAME}:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://${HOSTNAME}.${SET_NAME}:2379 \
--data-dir /var/run/etcd/default.etcd
fi
# etcd-SET_ID
SET_ID=${HOSTNAME:5:${#HOSTNAME}}
# adding a new member to existing cluster (assuming all initial pods are available)
if [ "${SET_ID}" -ge ${INITIAL_CLUSTER_SIZE} ]; then
export ETCDCTL_ENDPOINT=$(eps)
# member already added?
MEMBER_HASH=$(member_hash)
if [ -n "${MEMBER_HASH}" ]; then
# the member hash exists but for some reason etcd failed
# as the datadir has not be created, we can remove the member
# and retrieve new hash
etcdctl member remove ${MEMBER_HASH}
fi
echo "Adding new member"
etcdctl member add ${HOSTNAME} http://${HOSTNAME}.${SET_NAME}:2380 | grep "^ETCD_" > /var/run/etcd/new_member_envs
if [ $? -ne 0 ]; then
echo "Exiting"
rm -f /var/run/etcd/new_member_envs
exit 1
fi
cat /var/run/etcd/new_member_envs
source /var/run/etcd/new_member_envs
collect_member &
exec etcd --name ${HOSTNAME} \
--listen-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \
--listen-client-urls http://${HOSTNAME}.${SET_NAME}:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://${HOSTNAME}.${SET_NAME}:2379 \
--data-dir /var/run/etcd/default.etcd \
--initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \
--initial-cluster ${ETCD_INITIAL_CLUSTER} \
--initial-cluster-state ${ETCD_INITIAL_CLUSTER_STATE}
fi
for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do
while true; do
echo "Waiting for ${SET_NAME}-${i}.${SET_NAME} to come up"
ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME} > /dev/null && break
sleep 1s
done
done
PEERS=""
for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do
PEERS="${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}:2380"
done
collect_member &
# join member
exec etcd --name ${HOSTNAME} \
--initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \
--listen-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \
--listen-client-urls http://${HOSTNAME}.${SET_NAME}:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://${HOSTNAME}.${SET_NAME}:2379 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster ${PEERS} \
--initial-cluster-state new \
--data-dir /var/run/etcd/default.etcd
volumeClaimTemplates:
- metadata:
name: datadir
annotations:
volume.alpha.kubernetes.io/storage-class: anything
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
# upstream recommended max is 700M
storage: 1Gi

View File

@ -0,0 +1,24 @@
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: etcd-test-server
spec:
replicas: 3
template:
metadata:
labels:
app: test-server
spec:
containers:
- name: test-server
image: gcr.io/google-containers/etcd-statefulset-e2e-test:0.0
imagePullPolicy: Always
ports:
- containerPort: 8080
readinessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 2
periodSeconds: 2

View File

@ -14,6 +14,7 @@ go_library(
"configmaps.go",
"daemonsets.go",
"deployments.go",
"etcd.go",
"horizontal_pod_autoscalers.go",
"ingress.go",
"job.go",

199
test/e2e/upgrades/etcd.go Normal file
View File

@ -0,0 +1,199 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrades
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"path/filepath"
"sync"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/version"
"k8s.io/kubernetes/test/e2e/framework"
)
const manifestPath = "test/e2e/testing-manifests/statefulset/etcd"
type EtcdUpgradeTest struct {
ip string
successfulWrites int
ssTester *framework.StatefulSetTester
}
func (EtcdUpgradeTest) Name() string { return "etcd-upgrade" }
func (EtcdUpgradeTest) Skip(upgCtx UpgradeContext) bool {
minVersion := version.MustParseSemantic("1.6.0")
for _, vCtx := range upgCtx.Versions {
if vCtx.Version.LessThan(minVersion) {
return true
}
}
return false
}
func kubectlCreate(ns, file string) {
path := filepath.Join(framework.TestContext.RepoRoot, manifestPath, file)
framework.RunKubectlOrDie("create", "-f", path, fmt.Sprintf("--namespace=%s", ns))
}
func (t *EtcdUpgradeTest) Setup(f *framework.Framework) {
ns := f.Namespace.Name
statefulsetPoll := 30 * time.Second
statefulsetTimeout := 10 * time.Minute
t.ssTester = framework.NewStatefulSetTester(f.ClientSet)
By("Creating a PDB")
kubectlCreate(ns, "pdb.yaml")
By("Creating an etcd StatefulSet")
t.ssTester.CreateStatefulSet(manifestPath, ns)
By("Creating an etcd--test-server deployment")
kubectlCreate(ns, "tester.yaml")
By("Getting the ingress IPs from the services")
err := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
if t.ip = t.getServiceIP(f, ns, "test-server"); t.ip == "" {
return false, nil
}
if _, err := t.listUsers(); err != nil {
framework.Logf("Service endpoint is up but isn't responding")
return false, nil
}
return true, nil
})
Expect(err).NotTo(HaveOccurred())
framework.Logf("Service endpoint is up")
By("Adding 2 dummy users")
Expect(t.addUser("Alice")).NotTo(HaveOccurred())
Expect(t.addUser("Bob")).NotTo(HaveOccurred())
t.successfulWrites = 2
By("Verifying that the users exist")
users, err := t.listUsers()
Expect(err).NotTo(HaveOccurred())
Expect(len(users)).To(Equal(2))
}
func (t *EtcdUpgradeTest) listUsers() ([]string, error) {
r, err := http.Get(fmt.Sprintf("http://%s:8080/list", t.ip))
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
return nil, fmt.Errorf(string(b))
}
var names []string
if err := json.NewDecoder(r.Body).Decode(&names); err != nil {
return nil, err
}
return names, nil
}
func (t *EtcdUpgradeTest) addUser(name string) error {
val := map[string][]string{"name": {name}}
r, err := http.PostForm(fmt.Sprintf("http://%s:8080/add", t.ip), val)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
return fmt.Errorf(string(b))
}
return nil
}
func (t *EtcdUpgradeTest) getServiceIP(f *framework.Framework, ns, svcName string) string {
svc, err := f.ClientSet.CoreV1().Services(ns).Get(svcName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
ingress := svc.Status.LoadBalancer.Ingress
if len(ingress) == 0 {
return ""
}
return ingress[0].IP
}
func (t *EtcdUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) {
By("Continuously polling the database during upgrade.")
var (
success, failures, writeAttempts, lastUserCount int
mu sync.Mutex
errors = map[string]int{}
)
// Write loop.
go wait.Until(func() {
writeAttempts++
if err := t.addUser(fmt.Sprintf("user-%d", writeAttempts)); err != nil {
framework.Logf("Unable to add user: %v", err)
mu.Lock()
errors[err.Error()]++
mu.Unlock()
return
}
t.successfulWrites++
}, 10*time.Millisecond, done)
// Read loop.
wait.Until(func() {
users, err := t.listUsers()
if err != nil {
framework.Logf("Could not retrieve users: %v", err)
failures++
mu.Lock()
errors[err.Error()]++
mu.Unlock()
return
}
success++
lastUserCount = len(users)
}, 10*time.Millisecond, done)
framework.Logf("got %d users; want >=%d", lastUserCount, t.successfulWrites)
Expect(lastUserCount >= t.successfulWrites).To(BeTrue())
ratio := float64(success) / float64(success+failures)
framework.Logf("Successful gets %d/%d=%v", success, success+failures, ratio)
ratio = float64(t.successfulWrites) / float64(writeAttempts)
framework.Logf("Successful writes %d/%d=%v", t.successfulWrites, writeAttempts, ratio)
framework.Logf("Errors: %v", errors)
// TODO(maisem): tweak this value once we have a few test runs.
Expect(ratio > 0.75).To(BeTrue())
}
// Teardown does one final check of the data's availability.
func (t *EtcdUpgradeTest) Teardown(f *framework.Framework) {
users, err := t.listUsers()
Expect(err).NotTo(HaveOccurred())
Expect(len(users) >= t.successfulWrites).To(BeTrue())
}