Reimplement migrate-if-needed.sh in go

pull/8/head
Joe Betz 2018-03-10 00:03:43 -08:00
parent 7fed970f36
commit dc4d92e154
29 changed files with 1962 additions and 651 deletions

View File

@ -16,8 +16,7 @@ filegroup(
"//cluster/addons:all-srcs",
"//cluster/gce:all-srcs",
"//cluster/images/etcd-version-monitor:all-srcs",
"//cluster/images/etcd/attachlease:all-srcs",
"//cluster/images/etcd/rollback:all-srcs",
"//cluster/images/etcd/migrate:all-srcs",
"//cluster/images/hyperkube:all-srcs",
"//cluster/images/kubemark:all-srcs",
],

View File

@ -158,7 +158,7 @@ METADATA_AGENT_VERSION="${KUBE_METADATA_AGENT_VERSION:-0.2-0.0.16-1}"
# Useful for scheduling heapster in large clusters with nodes of small size.
HEAPSTER_MACHINE_TYPE="${HEAPSTER_MACHINE_TYPE:-}"
# Set etcd image (e.g. k8s.gcr.io/etcd) and version (e.g. 3.1.12) if you need
# Set etcd image (e.g. k8s.gcr.io/etcd) and version (e.g. 3.1.12-1) if you need
# non-default version.
ETCD_IMAGE="${TEST_ETCD_IMAGE:-}"
ETCD_DOCKER_REPOSITORY="${TEST_ETCD_DOCKER_REPOSITORY:-}"

View File

@ -13,7 +13,7 @@
"containers":[
{
"name": "etcd-container",
"image": "{{ pillar.get('etcd_docker_repository', 'k8s.gcr.io/etcd') }}:{{ pillar.get('etcd_docker_tag', '3.1.12') }}",
"image": "{{ pillar.get('etcd_docker_repository', 'k8s.gcr.io/etcd') }}:{{ pillar.get('etcd_docker_tag', '3.1.12-1') }}",
"resources": {
"requests": {
"cpu": {{ cpulimit }}
@ -37,6 +37,12 @@
{ "name": "INITIAL_CLUSTER",
"value": "{{ etcd_cluster }}"
},
{ "name": "LISTEN_PEER_URLS",
"value": "{{ etcd_protocol }}://{{ host_ip }}:{{ server_port }}"
},
{ "name": "INITIAL_ADVERTISE_PEER_URLS",
"value": "{{ etcd_protocol }}://{{ hostname }}:{{ server_port }}"
},
{ "name": "ETCD_CREDS",
"value": "{{ etcd_creds }}"
}

View File

@ -161,7 +161,7 @@ export KUBE_GCE_ENABLE_IP_ALIASES=true
export SECONDARY_RANGE_NAME="pods-default"
export STORAGE_BACKEND="etcd3"
export STORAGE_MEDIA_TYPE="application/vnd.kubernetes.protobuf"
export ETCD_IMAGE=3.1.12
export ETCD_IMAGE=3.1.12-1
export ETCD_VERSION=3.1.12
# Upgrade master with updated kube envs

View File

@ -16,4 +16,4 @@ FROM BASEIMAGE
EXPOSE 2379 2380 4001 7001
COPY etcd* etcdctl* /usr/local/bin/
COPY migrate-if-needed.sh start-stop-etcd.sh attachlease rollback /usr/local/bin/
COPY migrate-if-needed.sh migrate /usr/local/bin/

View File

@ -15,29 +15,38 @@
# Build the etcd image
#
# Usage:
# [TAGS=2.2.1 2.3.7 3.0.17 3.1.12] [REGISTRY=k8s.gcr.io] [ARCH=amd64] [BASEIMAGE=busybox] make (build|push)
# [BUNDLED_ETCD_VERSIONS=2.2.1 2.3.7 3.0.17 3.1.12] [REGISTRY=k8s.gcr.io] [ARCH=amd64] [BASEIMAGE=busybox] make (build|push)
# The image contains different etcd versions to simplify
# upgrades. Thus be careful when removing any tag from here.
# upgrades. Thus be careful when removing any versions from here.
#
# NOTE: The etcd upgrade rules are that you can upgrade only 1 minor
# version at a time, and patch release don't matter.
#
# Except from etcd-$(tag) and etcdctl-$(tag) binaries, we also
# Except from etcd-$(version) and etcdctl-$(version) binaries, we also
# need etcd and etcdctl binaries for backward compatibility reasons.
# That binary will be set to the last tag from $(TAGS).
TAGS?=2.2.1 2.3.7 3.0.17 3.1.12
REGISTRY_TAG?=3.1.12
# ROLLBACK_REGISTRY_TAG specified the tag that REGISTRY_TAG may be rolled back to.
ROLLBACK_REGISTRY_TAG?=3.1.12
# That binary will be set to the last version from $(BUNDLED_ETCD_VERSIONS).
BUNDLED_ETCD_VERSIONS?=2.2.1 2.3.7 3.0.17 3.1.12
# LATEST_ETCD_VERSION identifies the most recent etcd version available.
LATEST_ETCD_VERSION?=3.1.12
# REVISION provides a version number fo this image and all it's bundled
# artifacts. It should start at zero for each LATEST_ETCD_VERSION and increment
# for each revision of this image at that etcd version.
REVISION?=1
# IMAGE_TAG Uniquely identifies k8s.gcr.io/etcd docker image with a tag of the form "<etcd-version>-<revision>".
IMAGE_TAG=$(LATEST_ETCD_VERSION)-$(REVISION)
ARCH?=amd64
# Image should be pulled from k8s.gcr.io, which will auto-detect
# region (us, eu, asia, ...) and pull from the closest.
REGISTRY?=k8s.gcr.io
# Images should be pushed to staging-k8s.gcr.io.
PUSH_REGISTRY?=staging-k8s.gcr.io
# golang version should match the golang version from https://github.com/coreos/etcd/releases for REGISTRY_TAG version of etcd.
GOLANG_VERSION?=1.8.5
# golang version should match the golang version from https://github.com/coreos/etcd/releases for the current ETCD_VERSION.
GOLANG_VERSION?=1.8.7
GOARM=7
TEMP_DIR:=$(shell mktemp -d)
@ -62,40 +71,37 @@ build:
# without copying the subdirectories.
find ./ -maxdepth 1 -type f | xargs -I {} cp {} $(TEMP_DIR)
# Compile attachlease
# Compile migrate
docker run --interactive -v $(shell pwd)/../../../:/go/src/k8s.io/kubernetes -v $(TEMP_DIR):/build -e GOARCH=$(ARCH) golang:$(GOLANG_VERSION) \
/bin/bash -c "CGO_ENABLED=0 go build -o /build/attachlease k8s.io/kubernetes/cluster/images/etcd/attachlease"
# Compile rollback
docker run --interactive -v $(shell pwd)/../../../:/go/src/k8s.io/kubernetes -v $(TEMP_DIR):/build -e GOARCH=$(ARCH) golang:$(GOLANG_VERSION) \
/bin/bash -c "CGO_ENABLED=0 go build -o /build/rollback k8s.io/kubernetes/cluster/images/etcd/rollback"
/bin/bash -c "CGO_ENABLED=0 go build -o /build/migrate k8s.io/kubernetes/cluster/images/etcd/migrate"
ifeq ($(ARCH),amd64)
# Do not compile if we should make an image for amd64, use the official etcd binaries instead
# For each release create a tmp dir 'etcd_release_tmp_dir' and unpack the release tar there.
for tag in $(TAGS); do \
for version in $(BUNDLED_ETCD_VERSIONS); do \
etcd_release_tmp_dir=$(shell mktemp -d); \
curl -sSL --retry 5 https://github.com/coreos/etcd/releases/download/v$$tag/etcd-v$$tag-linux-amd64.tar.gz | tar -xz -C $$etcd_release_tmp_dir --strip-components=1; \
curl -sSL --retry 5 https://github.com/coreos/etcd/releases/download/v$$version/etcd-v$$version-linux-amd64.tar.gz | tar -xz -C $$etcd_release_tmp_dir --strip-components=1; \
cp $$etcd_release_tmp_dir/etcd $$etcd_release_tmp_dir/etcdctl $(TEMP_DIR)/; \
cp $(TEMP_DIR)/etcd $(TEMP_DIR)/etcd-$$tag; \
cp $(TEMP_DIR)/etcdctl $(TEMP_DIR)/etcdctl-$$tag; \
cp $(TEMP_DIR)/etcd $(TEMP_DIR)/etcd-$$version; \
cp $(TEMP_DIR)/etcdctl $(TEMP_DIR)/etcdctl-$$version; \
done
else
# Download etcd in a golang container and cross-compile it statically
# For each release create a tmp dir 'etcd_release_tmp_dir' and unpack the release tar there.
for tag in $(TAGS); do \
for version in $(BUNDLED_ETCD_VERSIONS); do \
etcd_release_tmp_dir=$(shell mktemp -d); \
docker run --interactive -v $${etcd_release_tmp_dir}:/etcdbin golang:$(GOLANG_VERSION) /bin/bash -c \
"git clone https://github.com/coreos/etcd /go/src/github.com/coreos/etcd \
&& cd /go/src/github.com/coreos/etcd \
&& git checkout v$${tag} \
&& git checkout v$${version} \
&& GOARM=$(GOARM) GOARCH=$(ARCH) ./build \
&& cp -f bin/$(ARCH)/etcd* bin/etcd* /etcdbin; echo 'done'"; \
cp $$etcd_release_tmp_dir/etcd $$etcd_release_tmp_dir/etcdctl $(TEMP_DIR)/; \
cp $(TEMP_DIR)/etcd $(TEMP_DIR)/etcd-$$tag; \
cp $(TEMP_DIR)/etcdctl $(TEMP_DIR)/etcdctl-$$tag; \
cp $(TEMP_DIR)/etcd $(TEMP_DIR)/etcd-$$version; \
cp $(TEMP_DIR)/etcdctl $(TEMP_DIR)/etcdctl-$$version; \
done
# Add this ENV variable in order to workaround an unsupported arch blocker
@ -109,139 +115,36 @@ endif
cd $(TEMP_DIR) && sed -i.bak 's|BASEIMAGE|$(BASEIMAGE)|g' Dockerfile
# And build the image
docker build --pull -t $(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) $(TEMP_DIR)
docker build --pull -t $(REGISTRY)/etcd-$(ARCH):$(IMAGE_TAG) $(TEMP_DIR)
push: build
docker tag $(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) $(PUSH_REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG)
docker push $(PUSH_REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG)
docker tag $(REGISTRY)/etcd-$(ARCH):$(IMAGE_TAG) $(PUSH_REGISTRY)/etcd-$(ARCH):$(IMAGE_TAG)
docker push $(PUSH_REGISTRY)/etcd-$(ARCH):$(IMAGE_TAG)
ifeq ($(ARCH),amd64)
# Backward compatibility. TODO: deprecate this image tag
docker tag $(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) $(REGISTRY)/etcd:$(REGISTRY_TAG)
docker push $(REGISTRY)/etcd:$(REGISTRY_TAG)
docker tag $(REGISTRY)/etcd-$(ARCH):$(IMAGE_TAG) $(PUSH_REGISTRY)/etcd:$(IMAGE_TAG)
docker push $(PUSH_REGISTRY)/etcd:$(IMAGE_TAG)
endif
ETCD2_ROLLBACK_NEW_TAG=3.0.17
ETCD2_ROLLBACK_OLD_TAG=2.2.1
unit-test:
docker run --interactive -v $(shell pwd)/../../../:/go/src/k8s.io/kubernetes -e GOARCH=$(ARCH) golang:$(GOLANG_VERSION) \
/bin/bash -c "CGO_ENABLED=0 go test -v k8s.io/kubernetes/cluster/images/etcd/migrate"
# Test a rollback to etcd2 from the earliest etcd3 version.
test-rollback-etcd2:
mkdir -p $(TEMP_DIR)/rollback-etcd2
cd $(TEMP_DIR)/rollback-etcd2
# Integration tests require both a golang build environment and all the etcd binaries from a `k8s.gcr.io/etcd` image (`/usr/local/bin/etcd-<version>`, ...).
# Since the `k8s.gcr.io/etcd` image is for runtime only and does not have a build golang environment, we create a new docker image to run integration tests
# with.
build-integration-test-image: build
cp -r $(TEMP_DIR) $(TEMP_DIR)_integration_test
cp Dockerfile $(TEMP_DIR)_integration_test/Dockerfile
cd $(TEMP_DIR)_integration_test && sed -i.bak 's|BASEIMAGE|golang:$(GOLANG_VERSION)|g' Dockerfile
docker build --pull -t etcd-integration-test $(TEMP_DIR)_integration_test
@echo "Starting $(ETCD2_ROLLBACK_NEW_TAG) etcd and writing some sample data."
docker run --tty --interactive -v $(TEMP_DIR)/rollback-etcd2:/var/etcd \
-e "TARGET_STORAGE=etcd3" \
-e "TARGET_VERSION=$(ETCD2_ROLLBACK_NEW_TAG)" \
-e "DATA_DIRECTORY=/var/etcd/data" \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'INITIAL_CLUSTER=etcd-$$(hostname)=http://localhost:2380 \
/usr/local/bin/migrate-if-needed.sh && \
source /usr/local/bin/start-stop-etcd.sh && \
START_STORAGE=etcd3 START_VERSION=$(ETCD2_ROLLBACK_NEW_TAG) start_etcd && \
ETCDCTL_API=3 /usr/local/bin/etcdctl-$(ETCD2_ROLLBACK_NEW_TAG) --endpoints http://127.0.0.1:$${ETCD_PORT} put /registry/k1 value1 && \
stop_etcd && \
[ $$(cat /var/etcd/data/version.txt) = $(ETCD2_ROLLBACK_NEW_TAG)/etcd3 ]'
@echo "Rolling back to the previous version of etcd and recording keyspace to a flat file."
docker run --tty --interactive -v $(TEMP_DIR)/rollback-etcd2:/var/etcd \
-e "TARGET_STORAGE=etcd2" \
-e "TARGET_VERSION=$(ETCD2_ROLLBACK_OLD_TAG)" \
-e "DATA_DIRECTORY=/var/etcd/data" \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'INITIAL_CLUSTER=etcd-$$(hostname)=http://localhost:2380 \
/usr/local/bin/migrate-if-needed.sh && \
source /usr/local/bin/start-stop-etcd.sh && \
START_STORAGE=etcd2 START_VERSION=$(ETCD2_ROLLBACK_OLD_TAG) start_etcd && \
/usr/local/bin/etcdctl-$(ETCD2_ROLLBACK_OLD_TAG) --endpoint 127.0.0.1:$${ETCD_PORT} get /registry/k1 > /var/etcd/keyspace.txt && \
stop_etcd'
@echo "Checking if rollback successfully downgraded etcd to $(ETCD2_ROLLBACK_OLD_TAG)"
docker run --tty --interactive -v $(TEMP_DIR)/rollback-etcd2:/var/etcd \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'[ $$(cat /var/etcd/data/version.txt) = $(ETCD2_ROLLBACK_OLD_TAG)/etcd2 ] && \
grep -q value1 /var/etcd/keyspace.txt'
# Test a rollback from the latest version to the previous version.
test-rollback:
mkdir -p $(TEMP_DIR)/rollback-test
cd $(TEMP_DIR)/rollback-test
@echo "Starting $(REGISTRY_TAG) etcd and writing some sample data."
docker run --tty --interactive -v $(TEMP_DIR)/rollback-test:/var/etcd \
-e "TARGET_STORAGE=etcd3" \
-e "TARGET_VERSION=$(REGISTRY_TAG)" \
-e "DATA_DIRECTORY=/var/etcd/data" \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'INITIAL_CLUSTER=etcd-$$(hostname)=http://localhost:2380 \
/usr/local/bin/migrate-if-needed.sh && \
source /usr/local/bin/start-stop-etcd.sh && \
START_STORAGE=etcd3 START_VERSION=$(REGISTRY_TAG) start_etcd && \
ETCDCTL_API=3 /usr/local/bin/etcdctl --endpoints http://127.0.0.1:$${ETCD_PORT} put /registry/k1 value1 && \
stop_etcd'
@echo "Rolling back to the previous version of etcd and recording keyspace to a flat file."
docker run --tty --interactive -v $(TEMP_DIR)/rollback-test:/var/etcd \
-e "TARGET_STORAGE=etcd3" \
-e "TARGET_VERSION=$(ROLLBACK_REGISTRY_TAG)" \
-e "DATA_DIRECTORY=/var/etcd/data" \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'INITIAL_CLUSTER=etcd-$$(hostname)=http://localhost:2380 \
/usr/local/bin/migrate-if-needed.sh && \
source /usr/local/bin/start-stop-etcd.sh && \
START_STORAGE=etcd3 START_VERSION=$(ROLLBACK_REGISTRY_TAG) start_etcd && \
ETCDCTL_API=3 /usr/local/bin/etcdctl --endpoints http://127.0.0.1:$${ETCD_PORT} get --prefix / > /var/etcd/keyspace.txt && \
stop_etcd'
@echo "Checking if rollback successfully downgraded etcd to $(ROLLBACK_REGISTRY_TAG)"
docker run --tty --interactive -v $(TEMP_DIR)/rollback-test:/var/etcd \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'[ $$(cat /var/etcd/data/version.txt) = $(ROLLBACK_REGISTRY_TAG)/etcd3 ] && \
grep -q value1 /var/etcd/keyspace.txt'
# Test migrating from each supported versions to the latest version.
test-migrate:
for tag in $(TAGS); do \
echo "Testing migration from $${tag} to $(REGISTRY_TAG)" && \
mkdir -p $(TEMP_DIR)/migrate-$${tag} && \
cd $(TEMP_DIR)/migrate-$${tag} && \
MAJOR_VERSION=$$(echo $${tag} | cut -c 1) && \
echo "Starting etcd $${tag} and writing sample data to keyspace" && \
docker run --tty --interactive -v $(TEMP_DIR)/migrate-$${tag}:/var/etcd \
-e "TARGET_STORAGE=etcd$${MAJOR_VERSION}" \
-e "TARGET_VERSION=$${tag}" \
-e "DATA_DIRECTORY=/var/etcd/data" \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
"INITIAL_CLUSTER=etcd-\$$(hostname)=http://localhost:2380 \
/usr/local/bin/migrate-if-needed.sh && \
source /usr/local/bin/start-stop-etcd.sh && \
START_STORAGE=etcd$${MAJOR_VERSION} START_VERSION=$${tag} start_etcd && \
if [ $${MAJOR_VERSION} == 2 ]; then \
/usr/local/bin/etcdctl --endpoint http://127.0.0.1:\$${ETCD_PORT} set /registry/k1 value1; \
else \
ETCDCTL_API=3 /usr/local/bin/etcdctl --endpoints http://127.0.0.1:\$${ETCD_PORT} put /registry/k1 value1; \
fi && \
stop_etcd" && \
echo " Migrating from $${tag} to $(REGISTRY_TAG) and capturing keyspace" && \
docker run --tty --interactive -v $(TEMP_DIR)/migrate-$${tag}:/var/etcd \
-e "TARGET_STORAGE=etcd3" \
-e "TARGET_VERSION=$(REGISTRY_TAG)" \
-e "DATA_DIRECTORY=/var/etcd/data" \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'INITIAL_CLUSTER=etcd-$$(hostname)=http://localhost:2380 \
/usr/local/bin/migrate-if-needed.sh && \
source /usr/local/bin/start-stop-etcd.sh && \
START_STORAGE=etcd3 START_VERSION=$(REGISTRY_TAG) start_etcd && \
ETCDCTL_API=3 /usr/local/bin/etcdctl --endpoints http://127.0.0.1:$${ETCD_PORT} get --prefix / > /var/etcd/keyspace.txt && \
stop_etcd' && \
echo "Checking if migrate from $${tag} successfully upgraded etcd to $(REGISTRY_TAG)" && \
docker run --tty --interactive -v $(TEMP_DIR)/migrate-$${tag}:/var/etcd \
$(REGISTRY)/etcd-$(ARCH):$(REGISTRY_TAG) /bin/sh -c \
'[ $$(cat /var/etcd/data/version.txt) = $(REGISTRY_TAG)/etcd3 ] && \
grep -q value1 /var/etcd/keyspace.txt'; \
done
test: test-rollback test-rollback-etcd2 test-migrate
integration-test:
docker run --interactive -v $(shell pwd)/../../../:/go/src/k8s.io/kubernetes -e GOARCH=$(ARCH) etcd-integration-test \
/bin/bash -c "CGO_ENABLED=0 go test -tags=integration -v k8s.io/kubernetes/cluster/images/etcd/migrate -args -v 10 -logtostderr true"
integration-build-test: build-integration-test-image integration-test
test: unit-test integration-build-test
all: build test
.PHONY: build push test-rollback test-rollback-etcd2 test-migrate test
.PHONY: build push unit-test build-integration-test-image integration-test integration-build-test test

View File

@ -1,17 +1,30 @@
### etcd
### k8s.gcr.io/etcd docker image
This is a small etcd image used in Kubernetes setups where `etcd` is deployed as a docker image.
Provides docker images containing etcd and etcdctl binaries for multiple etcd
version as well as a migration operator utility for upgrading and downgrading
etcd--it's data directory in particular--to a target version.
For `amd64`, official `etcd` and `etcdctl` binaries are downloaded from Github to maintain official support.
For other architectures, `etcd` is cross-compiled from source. Arch-specific `busybox` images serve as base images.
#### Versioning
### Upgrading and Downgrading
Each `k8s.gcr.io/etcd` docker image is tagged with an version string of the form
`<etcd-version>-<image-revision>`, e.g. `3.0.17-0`. The etcd version is the
SemVer of latest etcd version available in the image. The image revision
distinguishes between docker images with the same lastest etcd version but
changes (bug fixes and backward compatible improvements) to the migration
utility bundled with the image.
To upgrade to a newer etcd version, or to downgrade to the previous minor
version, always run `/usr/local/bin/migrate-if-needed.sh` before starting the
etcd server.
In addition to the latest etcd version, each `k8s.gcr.io/etcd` image contains
etcd and etcdctl binaries for older versions of etcd. These are used by the
migration operator utility when performing downgrades and multi-step upgrades,
but can also be used as the etcd target version.
`migrate-if-needed.sh` writes a `version.txt` file to track the "current" version
#### Usage
Always run `/usr/local/bin/migrate` (or the
`/usr/local/bin/migrate-if-needed.sh` wrapper script) before starting the etcd
server.
`migrate` writes a `version.txt` file to track the "current" version
of etcd that was used to persist data to disk. A "target" version may also be provided
by the `TARGET_STORAGE` (e.g. "etcd3") and `TARGET_VERSION` (e.g. "3.2.11" )
environment variables. If the persisted version differs from the target version,
@ -23,15 +36,30 @@ in steps to each minor version until the target version is reached.
Downgrades to the previous minor version of the 3.x series and from 3.0 to 2.3.7 are supported.
#### Permissions
By default, `migrate` will write data directory files with default permissions
according to the umask it is run with. When run in the published
`k8s.gcr.io/etcd` images the default umask is 0022 which will result in 0755
directory permissions and 0644 file permissions.
#### Cross building
For `amd64`, official `etcd` and `etcdctl` binaries are downloaded from Github
to maintain official support. For other architectures, `etcd` is cross-compiled
from source. Arch-specific `busybox` images serve as base images.
#### How to release
First, run the migration and rollback tests.
First, update `ETCD_VERSION` and `REVSION` in the `Makefile`.
Next, build and test the image:
```console
$ make build test
```
Next, build and push the docker images for all supported architectures.
Last, build and push the docker images for all supported architectures.
```console
# Build for linux/amd64 (default)

View File

@ -1,35 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
)
go_binary(
name = "attachlease",
embed = [":go_default_library"],
)
go_library(
name = "go_default_library",
srcs = ["attachlease.go"],
importpath = "k8s.io/kubernetes/cluster/images/etcd/attachlease",
deps = [
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,70 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"flag"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/golang/glog"
)
var (
etcdAddress = flag.String("etcd-address", "", "Etcd address")
ttlKeysPrefix = flag.String("ttl-keys-prefix", "", "Prefix for TTL keys")
leaseDuration = flag.Duration("lease-duration", time.Hour, "Lease duration (seconds granularity)")
)
func main() {
flag.Parse()
if *etcdAddress == "" {
glog.Fatalf("--etcd-address flag is required")
}
client, err := clientv3.New(clientv3.Config{Endpoints: []string{*etcdAddress}})
if err != nil {
glog.Fatalf("Error while creating etcd client: %v", err)
}
// Make sure that ttlKeysPrefix is ended with "/" so that we only get children "directories".
if !strings.HasSuffix(*ttlKeysPrefix, "/") {
*ttlKeysPrefix += "/"
}
ctx := context.Background()
objectsResp, err := client.KV.Get(ctx, *ttlKeysPrefix, clientv3.WithPrefix())
if err != nil {
glog.Fatalf("Error while getting objects to attach to the lease")
}
lease, err := client.Lease.Grant(ctx, int64(*leaseDuration/time.Second))
if err != nil {
glog.Fatalf("Error while creating lease: %v", err)
}
glog.Infof("Lease with TTL: %v created", lease.TTL)
glog.Infof("Attaching lease to %d entries", len(objectsResp.Kvs))
for _, kv := range objectsResp.Kvs {
_, err := client.KV.Put(ctx, string(kv.Key), string(kv.Value), clientv3.WithLease(lease.ID))
if err != nil {
glog.Errorf("Error while attaching lease to: %s", string(kv.Key))
}
}
}

View File

@ -39,95 +39,49 @@
set -o errexit
set -o nounset
source $(dirname "$0")/start-stop-etcd.sh
# NOTE: BUNDLED_VERSION has to match release binaries present in the
# etcd image (to make this script work correctly).
BUNDLED_VERSIONS="2.2.1, 2.3.7, 3.0.17, 3.1.12"
# Rollback to previous minor version of etcd 3.x, if needed.
#
# Warning: For HA etcd clusters (any cluster with more than one member), all members must be stopped before rolling back, zero
# downtime rollbacks are not supported.
rollback_etcd3_minor_version() {
if [ ${TARGET_MINOR_VERSION} != $((${CURRENT_MINOR_VERSION}-1)) ]; then
echo "Rollback from ${CURRENT_VERSION} to ${TARGET_VERSION} not supported, only rollbacks to the previous minor version are supported."
exit 1
fi
echo "Performing etcd ${CURRENT_VERSION} -> ${TARGET_VERSION} rollback"
ROLLBACK_BACKUP_DIR="${DATA_DIRECTORY}.bak"
rm -rf "${ROLLBACK_BACKUP_DIR}"
SNAPSHOT_FILE="${DATA_DIRECTORY}.snapshot.db"
rm -rf "${SNAPSHOT_FILE}"
ETCD_CMD="/usr/local/bin/etcd-${CURRENT_VERSION}"
ETCDCTL_CMD="/usr/local/bin/etcdctl-${CURRENT_VERSION}"
# Start CURRENT_VERSION of etcd.
START_VERSION="${CURRENT_VERSION}"
START_STORAGE="${CURRENT_STORAGE}"
echo "Starting etcd version ${START_VERSION} to capture rollback snapshot."
if ! start_etcd; then
echo "Unable to automatically downgrade etcd: starting etcd version ${START_VERSION} to capture rollback snapshot failed."
echo "See https://coreos.com/etcd/docs/latest/op-guide/recovery.html for manual downgrade options."
exit 1
else
ETCDCTL_API=3 ${ETCDCTL_CMD} snapshot --endpoints "http://127.0.0.1:${ETCD_PORT}" save "${SNAPSHOT_FILE}"
fi
stop_etcd
# Backup the data before rolling back.
mv "${DATA_DIRECTORY}" "${ROLLBACK_BACKUP_DIR}"
ETCDCTL_CMD="/usr/local/bin/etcdctl-${TARGET_VERSION}"
NAME="etcd-$(hostname)"
ETCDCTL_API=3 ${ETCDCTL_CMD} snapshot restore "${SNAPSHOT_FILE}" \
--data-dir "${DATA_DIRECTORY}" --name "${NAME}" --initial-cluster "${INITIAL_CLUSTER}"
CURRENT_VERSION="${TARGET_VERSION}"
echo "${CURRENT_VERSION}/${CURRENT_STORAGE}" > "${DATA_DIRECTORY}/${VERSION_FILE}"
}
# Rollback from "3.0.x" version in 'etcd3' mode to "2.2.1" version in 'etcd2' mode, if needed.
rollback_to_etcd2() {
if [ "$(echo ${CURRENT_VERSION} | cut -c1-4)" != "3.0." -o "${TARGET_VERSION}" != "2.2.1" ]; then
echo "etcd3 -> etcd2 downgrade is supported only between 3.0.x and 2.2.1"
return 0
fi
echo "Backup and remove all existing v2 data"
ROLLBACK_BACKUP_DIR="${DATA_DIRECTORY}.bak"
rm -rf "${ROLLBACK_BACKUP_DIR}"
mkdir -p "${ROLLBACK_BACKUP_DIR}"
cp -r "${DATA_DIRECTORY}" "${ROLLBACK_BACKUP_DIR}"
echo "Performing etcd3 -> etcd2 rollback"
${ROLLBACK} --data-dir "${DATA_DIRECTORY}"
if [ "$?" -ne "0" ]; then
echo "Rollback to etcd2 failed"
exit 1
fi
CURRENT_STORAGE="etcd2"
CURRENT_VERSION="2.2.1"
echo "${CURRENT_VERSION}/${CURRENT_STORAGE}" > "${DATA_DIRECTORY}/${VERSION_FILE}"
}
if [ -z "${TARGET_STORAGE:-}" ]; then
echo "TARGET_STORAGE variable unset - unexpected failure"
ETCD_NAME="${ETCD_NAME:-etcd-$(hostname)}"
if [ -z "${DATA_DIRECTORY:-}" ]; then
echo "DATA_DIRECTORY variable unset - unexpected failure"
exit 1
fi
case "${DATA_DIRECTORY}" in
*event*)
ETCD_PEER_PORT=2381
ETCD_CLIENT_PORT=18631
;;
*)
ETCD_PEER_PORT=2380
ETCD_CLIENT_PORT=18629
;;
esac
if [ -z "${INITIAL_CLUSTER:-}" ]; then
echo "Warn: INITIAL_CLUSTER variable unset - defaulting to ${ETCD_NAME}=http://localhost:${ETCD_PEER_PORT}"
INITIAL_CLUSTER="${ETCD_NAME}=http://localhost:${ETCD_PEER_PORT}"
fi
if [ -z "${LISTEN_PEER_URLS:-}" ]; then
echo "Warn: LISTEN_PEER_URLS variable unset - defaulting to http://localhost:${ETCD_PEER_PORT}"
LISTEN_PEER_URLS="http://localhost:${ETCD_PEER_PORT}"
fi
if [ -z "${INITIAL_ADVERTISE_PEER_URLS:-}" ]; then
echo "Warn: INITIAL_ADVERTISE_PEER_URLS variable unset - defaulting to http://localhost:${ETCD_PEER_PORT}"
INITIAL_ADVERTISE_PEER_URLS="http://localhost:${ETCD_PEER_PORT}"
fi
if [ -z "${TARGET_VERSION:-}" ]; then
echo "TARGET_VERSION variable unset - unexpected failure"
exit 1
fi
if [ -z "${DATA_DIRECTORY:-}" ]; then
echo "DATA_DIRECTORY variable unset - unexpected failure"
exit 1
fi
if [ -z "${INITIAL_CLUSTER:-}" ]; then
echo "Warn: INITIAL_CLUSTER variable unset - defaulting to etcd-$(hostname)=http://localhost:2380"
INITIAL_CLUSTER="etcd-$(hostname)=http://localhost:2380"
fi
echo "$(date +'%Y-%m-%d %H:%M:%S') Detecting if migration is needed"
if [ "${TARGET_STORAGE}" != "etcd2" -a "${TARGET_STORAGE}" != "etcd3" ]; then
echo "Not supported version of storage: ${TARGET_STORAGE}"
if [ -z "${TARGET_STORAGE:-}" ]; then
echo "TARGET_STORAGE variable unset - unexpected failure"
exit 1
fi
ETCD_DATA_PREFIX="${ETCD_DATA_PREFIX:-/registry}"
ETCD_CREDS="${ETCD_CREDS:-}"
# Correctly support upgrade and rollback to non-default version.
if [ "${DO_NOT_MOVE_BINARIES:-}" != "true" ]; then
@ -135,144 +89,16 @@ if [ "${DO_NOT_MOVE_BINARIES:-}" != "true" ]; then
cp "/usr/local/bin/etcdctl-${TARGET_VERSION}" "/usr/local/bin/etcdctl"
fi
# NOTE: SUPPORTED_VERSION has to match release binaries present in the
# etcd image (to make this script work correctly).
# We cannot use array since sh doesn't support it.
SUPPORTED_VERSIONS_STRING="2.2.1 2.3.7 3.0.17 3.1.12"
SUPPORTED_VERSIONS=$(echo "${SUPPORTED_VERSIONS_STRING}" | tr " " "\n")
VERSION_FILE="version.txt"
CURRENT_STORAGE="etcd2"
CURRENT_VERSION="2.2.1"
if [ -e "${DATA_DIRECTORY}/${VERSION_FILE}" ]; then
VERSION_CONTENTS="$(cat ${DATA_DIRECTORY}/${VERSION_FILE})"
# Example usage: if contents of VERSION_FILE is 2.3.7/etcd2, then
# - CURRENT_VERSION would be '2.3.7'
# - CURRENT_STORAGE would be 'etcd2'
CURRENT_VERSION="$(echo $VERSION_CONTENTS | cut -d '/' -f 1)"
CURRENT_STORAGE="$(echo $VERSION_CONTENTS | cut -d '/' -f 2)"
fi
ETCD_DATA_PREFIX="${ETCD_DATA_PREFIX:-/registry}"
# If there is no data in DATA_DIRECTORY, this means that we are
# starting etcd from scratch. In that case, we don't need to do
# any migration.
if [ ! -d "${DATA_DIRECTORY}" ]; then
mkdir -p "${DATA_DIRECTORY}"
fi
if [ -z "$(ls -A ${DATA_DIRECTORY})" ]; then
echo "${DATA_DIRECTORY} is empty - skipping migration"
echo "${TARGET_VERSION}/${TARGET_STORAGE}" > "${DATA_DIRECTORY}/${VERSION_FILE}"
exit 0
fi
ATTACHLEASE="${ATTACHLEASE:-/usr/local/bin/attachlease}"
ROLLBACK="${ROLLBACK:-/usr/local/bin/rollback}"
# If we are upgrading from 2.2.1 and this is the first try for upgrade,
# do the backup to allow restoring from it in case of failed upgrade.
BACKUP_DIR="${DATA_DIRECTORY}/migration-backup"
if [ "${CURRENT_VERSION}" = "2.2.1" -a "${CURRENT_VERSION}" != "${TARGET_VERSION}" -a ! -d "${BACKUP_DIR}" ]; then
echo "Backup etcd before starting migration"
mkdir ${BACKUP_DIR}
ETCDCTL_CMD="/usr/local/bin/etcdctl-2.2.1"
ETCDCTL_API=2 ${ETCDCTL_CMD} --debug backup --data-dir=${DATA_DIRECTORY} \
--backup-dir=${BACKUP_DIR}
echo "Backup done in ${BACKUP_DIR}"
fi
CURRENT_MINOR_VERSION="$(echo ${CURRENT_VERSION} | awk -F'.' '{print $2}')"
TARGET_MINOR_VERSION="$(echo ${TARGET_VERSION} | awk -F'.' '{print $2}')"
# "rollback-if-needed"
case "${CURRENT_STORAGE}-${TARGET_STORAGE}" in
"etcd3-etcd3")
[ ${TARGET_MINOR_VERSION} -lt ${CURRENT_MINOR_VERSION} ] && rollback_etcd3_minor_version
break
;;
"etcd3-etcd2")
rollback_to_etcd2
break
;;
*)
break
;;
esac
# Do the roll-forward migration if needed.
# The migration goes as following:
# 1. for all versions starting one after the current version of etcd
# we do "start, wait until healthy and stop etcd". This is the
# procedure that etcd documentation suggests for upgrading binaries.
# 2. For the first 3.0.x version that we encounter, if we are still in
# v2 API, we do upgrade to v3 API using the "etcdct migrate" and
# attachlease commands.
SKIP_STEP=true
for step in ${SUPPORTED_VERSIONS}; do
if [ "${step}" = "${CURRENT_VERSION}" ]; then
SKIP_STEP=false
elif [ "${SKIP_STEP}" != "true" ]; then
# Do the migration step, by just starting etcd in this version.
START_VERSION="${step}"
START_STORAGE="${CURRENT_STORAGE}"
if ! start_etcd; then
# Migration failed.
echo "Starting etcd ${step} failed"
exit 1
fi
# Kill etcd and wait until this is down.
stop_etcd
CURRENT_VERSION=${step}
echo "${CURRENT_VERSION}/${CURRENT_STORAGE}" > "${DATA_DIRECTORY}/${VERSION_FILE}"
fi
if [ "$(echo ${CURRENT_VERSION} | cut -c1-2)" = "3." -a "${CURRENT_VERSION}" = "${step}" -a "${CURRENT_STORAGE}" = "etcd2" -a "${TARGET_STORAGE}" = "etcd3" ]; then
# If it is the first 3.x release in the list and we are migrating
# also from 'etcd2' to 'etcd3', do the migration now.
echo "Performing etcd2 -> etcd3 migration"
START_VERSION="${step}"
START_STORAGE="etcd3"
ETCDCTL_CMD="${ETCDCTL:-/usr/local/bin/etcdctl-${START_VERSION}}"
ETCDCTL_API=3 ${ETCDCTL_CMD} migrate --data-dir=${DATA_DIRECTORY}
echo "Attaching leases to TTL entries"
# Now attach lease to all keys.
# To do it, we temporarily start etcd on a random port (so that
# apiserver actually cannot access it).
if ! start_etcd; then
echo "Starting etcd ${step} in v3 mode failed"
exit 1
fi
# Create a lease and attach all keys to it.
${ATTACHLEASE} \
--etcd-address http://127.0.0.1:${ETCD_PORT} \
--ttl-keys-prefix "${TTL_KEYS_DIRECTORY:-${ETCD_DATA_PREFIX}/events}" \
--lease-duration 1h
# Kill etcd and wait until this is down.
stop_etcd
CURRENT_STORAGE="etcd3"
echo "${CURRENT_VERSION}/${CURRENT_STORAGE}" > "${DATA_DIRECTORY}/${VERSION_FILE}"
fi
if [ "$(echo ${CURRENT_VERSION} | cut -c1-4)" = "3.1." -a "${CURRENT_VERSION}" = "${step}" -a "${CURRENT_STORAGE}" = "etcd3" ]; then
# If we are upgrading to 3.1.* release, if the cluster was migrated
# from v2 version, the v2 data may still be around. So now is the
# time to actually remove them.
echo "Remove stale v2 data"
START_VERSION="${step}"
START_STORAGE="etcd3"
ETCDCTL_CMD="${ETCDCTL:-/usr/local/bin/etcdctl-${START_VERSION}}"
if ! start_etcd; then
echo "Starting etcd ${step} in v3 mode failed"
exit 1
fi
${ETCDCTL_CMD} --endpoints "http://127.0.0.1:${ETCD_PORT}" rm --recursive "${ETCD_DATA_PREFIX}"
# Kill etcd and wait until this is down.
stop_etcd
echo "Successfully remove v2 data"
# Also remove backup from v2->v3 migration.
rm -rf "${BACKUP_DIR}"
fi
if [ "${CURRENT_VERSION}" = "${TARGET_VERSION}" -a "${CURRENT_STORAGE}" = "${TARGET_STORAGE}" ]; then
break
fi
done
echo "$(date +'%Y-%m-%d %H:%M:%S') Migration finished"
/usr/local/bin/migrate \
--name "${ETCD_NAME}" \
--port "${ETCD_CLIENT_PORT}" \
--listen-peer-urls "${LISTEN_PEER_URLS}" \
--initial-advertise-peer-urls "${INITIAL_ADVERTISE_PEER_URLS}" \
--data-dir "${DATA_DIRECTORY}" \
--bundled-versions "${BUNDLED_VERSIONS}" \
--initial-cluster "${INITIAL_CLUSTER}" \
--target-version "${TARGET_VERSION}" \
--target-storage "${TARGET_STORAGE}" \
--etcd-data-prefix "${ETCD_DATA_PREFIX}" \
--ttl-keys-directory "${TTL_KEYS_DIRECTORY:-${ETCD_DATA_PREFIX}/events}" \
--etcd-server-extra-args "${ETCD_CREDS}"

View File

@ -4,19 +4,31 @@ load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
)
go_binary(
name = "rollback",
name = "migrate",
embed = [":go_default_library"],
)
go_library(
name = "go_default_library",
srcs = ["rollback.go"],
importpath = "k8s.io/kubernetes/cluster/images/etcd/rollback",
srcs = [
"data_dir.go",
"migrate.go",
"migrate_client.go",
"migrate_server.go",
"migrator.go",
"rollback_v2.go",
"versions.go",
],
importpath = "k8s.io/kubernetes/cluster/images/etcd/migrate",
deps = [
"//third_party/forked/etcd221/wal:go_default_library",
"//vendor/github.com/blang/semver:go_default_library",
"//vendor/github.com/coreos/etcd/client:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver/etcdserverpb:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver/membership:go_default_library",
@ -31,6 +43,7 @@ go_library(
"//vendor/github.com/coreos/etcd/wal/walpb:go_default_library",
"//vendor/github.com/coreos/go-semver/semver:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
],
)
@ -46,3 +59,14 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = [
"data_dir_test.go",
"versions_test.go",
],
data = glob(["testdata/**"]),
embed = [":go_default_library"],
deps = ["//vendor/github.com/blang/semver:go_default_library"],
)

View File

@ -0,0 +1,157 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/golang/glog"
)
// DataDirectory provides utilities for initializing and backing up an
// etcd "data-dir" as well as managing a version.txt file to track the
// etcd server version and storage verion of the etcd data in the
// directory.
type DataDirectory struct {
path string
versionFile *VersionFile
}
// OpenOrCreateDataDirectory opens a data directory, creating the directory
// if it doesn't not already exist.
func OpenOrCreateDataDirectory(path string) (*DataDirectory, error) {
exists, err := exists(path)
if err != nil {
return nil, err
}
if !exists {
glog.Infof("data directory '%s' does not exist, creating it", path)
err := os.MkdirAll(path, 0777)
if err != nil {
return nil, fmt.Errorf("failed to create data directory %s: %v", path, err)
}
}
versionFile := &VersionFile{
path: filepath.Join(path, versionFilename),
}
return &DataDirectory{path, versionFile}, nil
}
// Initialize set the version.txt to the target version if the data
// directory is empty. If the data directory is non-empty, no
// version.txt file will be written since the actual version of etcd
// used to create the data is unknown.
func (d *DataDirectory) Initialize(target *EtcdVersionPair) error {
isEmpty, err := d.IsEmpty()
if err != nil {
return err
}
if isEmpty {
glog.Infof("data directory '%s' is empty, writing target version '%s' to version.txt", d.path, target)
err = d.versionFile.Write(target)
if err != nil {
return fmt.Errorf("failed to write version.txt to '%s': %v", d.path, err)
}
return nil
}
return nil
}
// Backup creates a backup copy of data directory.
func (d *DataDirectory) Backup() error {
backupDir := fmt.Sprintf("%s.bak", d.path)
err := os.RemoveAll(backupDir)
if err != nil {
return err
}
err = os.MkdirAll(backupDir, 0777)
if err != nil {
return err
}
err = exec.Command("cp", "-r", d.path, backupDir).Run()
if err != nil {
return err
}
return nil
}
// IsEmpty returns true if the data directory is entirely empty.
func (d *DataDirectory) IsEmpty() (bool, error) {
dir, err := os.Open(d.path)
if err != nil {
return false, fmt.Errorf("failed to open data directory %s: %v", d.path, err)
}
defer dir.Close()
_, err = dir.Readdirnames(1)
if err == io.EOF {
return true, nil
}
return false, err
}
// String returns the data directory path.
func (d *DataDirectory) String() string {
return d.path
}
// VersionFile provides utilities for reading and writing version.txt files
// to etcd "data-dir" for tracking the etcd server and storage verions
// of the data in the directory.
type VersionFile struct {
path string
}
// Exists returns true if a version.txt file exists on the filesystem.
func (v *VersionFile) Exists() (bool, error) {
return exists(v.path)
}
// Read parses the version.txt file and returns it's contents.
func (v *VersionFile) Read() (*EtcdVersionPair, error) {
data, err := ioutil.ReadFile(v.path)
if err != nil {
return nil, fmt.Errorf("failed to read version file %s: %v", v.path, err)
}
txt := strings.TrimSpace(string(data))
vp, err := ParseEtcdVersionPair(txt)
if err != nil {
return nil, fmt.Errorf("failed to parse etcd '<version>/<storage-version>' string from version.txt file contents '%s': %v", txt, err)
}
return vp, nil
}
// Write creates or overwrites the contents of the version.txt file with the given EtcdVersionPair.
func (v *VersionFile) Write(vp *EtcdVersionPair) error {
data := []byte(fmt.Sprintf("%s/%s", vp.version, vp.storageVersion))
return ioutil.WriteFile(v.path, data, 0666)
}
func exists(path string) (bool, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}

View File

@ -0,0 +1,159 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/blang/semver"
)
var (
latestVersion = semver.MustParse("3.1.12")
)
func TestExistingDataDirWithVersionFile(t *testing.T) {
d, err := OpenOrCreateDataDirectory("testdata/datadir_with_version")
if err != nil {
t.Fatalf("Failed to open data dir: %v", err)
}
isEmpty, err := d.IsEmpty()
if err != nil {
t.Fatalf("Failed to check if data dir is empty: %v", err)
}
if isEmpty {
t.Errorf("Data directory is non-empty")
}
exists, err := d.versionFile.Exists()
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("Expected version file %s to exist", d.versionFile.path)
}
vp, err := d.versionFile.Read()
if err != nil {
t.Fatalf("Failed to read version file %s: %v", d.versionFile.path, err)
}
expectedVersion := &EtcdVersionPair{&EtcdVersion{latestVersion}, storageEtcd3}
if !vp.Equals(expectedVersion) {
t.Errorf("Expected version file to contain %s, but got %s", expectedVersion, vp)
}
}
func TestExistingDataDirWithoutVersionFile(t *testing.T) {
targetVersion := &EtcdVersionPair{&EtcdVersion{latestVersion}, storageEtcd3}
d, err := OpenOrCreateDataDirectory("testdata/datadir_without_version")
if err != nil {
t.Fatalf("Failed to open data dir: %v", err)
}
exists, err := d.versionFile.Exists()
if err != nil {
t.Fatal(err)
}
if exists {
t.Errorf("Expected version file %s not to exist", d.versionFile.path)
}
err = d.Initialize(targetVersion)
if err != nil {
t.Fatalf("Failed initialize data directory %s: %v", d.path, err)
}
exists, err = d.versionFile.Exists()
if err != nil {
t.Fatal(err)
}
if exists {
t.Fatalf("Expected version file %s not to exist after initializing non-empty data-dir", d.versionFile.path)
}
}
func TestNonexistingDataDir(t *testing.T) {
targetVersion := &EtcdVersionPair{&EtcdVersion{latestVersion}, storageEtcd3}
path := newTestPath(t)
d, err := OpenOrCreateDataDirectory(filepath.Join(path, "data-dir"))
if err != nil {
t.Fatalf("Failed to open data dir: %v", err)
}
isEmpty, err := d.IsEmpty()
if err != nil {
t.Fatalf("Failed to check if data dir is empty: %v", err)
}
if !isEmpty {
t.Errorf("Data directory is empty")
}
err = d.Initialize(targetVersion)
if err != nil {
t.Fatalf("Failed initialize data directory %s: %v", d.path, err)
}
exists, err := d.versionFile.Exists()
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("Expected version file %s to exist", d.versionFile.path)
}
isEmpty, err = d.IsEmpty()
if err != nil {
t.Fatalf("Failed to check if data dir is empty: %v", err)
}
if isEmpty {
t.Errorf("Data directory is non-empty")
}
vp, err := d.versionFile.Read()
if err != nil {
t.Fatalf("Failed to read version file %s: %v", d.versionFile.path, err)
}
if !vp.Equals(targetVersion) {
t.Errorf("Expected version file to contain %s, but got %s", targetVersion, vp)
}
}
func TestBackup(t *testing.T) {
path := newTestPath(t)
d, err := OpenOrCreateDataDirectory(filepath.Join(path, "data-dir"))
if err != nil {
t.Fatalf("Failed to open data dir: %v", err)
}
err = d.Backup()
if err != nil {
t.Fatalf("Failed to backup data directory %s: %v", d.path, err)
}
bak, err := OpenOrCreateDataDirectory(filepath.Join(path, "data-dir.bak"))
if err != nil {
t.Fatalf("Failed to open backup data dir: %v", err)
}
isEmpty, err := bak.IsEmpty()
if err != nil {
t.Fatal(err)
}
if isEmpty {
t.Errorf("Expected non-empty backup directory afer Backup()")
}
}
func newTestPath(t *testing.T) string {
path, err := ioutil.TempDir("", "etcd-migrate-test-")
os.Chmod(path, 0777)
if err != nil {
t.Fatalf("Failed to create tmp dir for test: %v", err)
}
return path
}

View File

@ -0,0 +1,356 @@
// +build integration
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"io/ioutil"
"math/big"
"net"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/blang/semver"
)
var (
testSupportedVersions = MustParseSupportedVersions("2.2.1, 2.3.7, 3.0.17, 3.1.12")
testVersionOldest = &EtcdVersion{semver.MustParse("2.2.1")}
testVersionPrevious = &EtcdVersion{semver.MustParse("3.0.17")}
testVersionLatest = &EtcdVersion{semver.MustParse("3.1.12")}
)
func TestMigrate(t *testing.T) {
migrations := []struct {
title string
memberCount int
startVersion string
endVersion string
protocol string
}{
// upgrades
{"v2-v3-up", 1, "2.2.1/etcd2", "3.0.17/etcd3", "https"},
{"v3-v3-up", 1, "3.0.17/etcd3", "3.1.12/etcd3", "https"},
{"oldest-newest-up", 1, "2.2.1/etcd2", "3.1.12/etcd3", "https"},
// warning: v2->v3 ha upgrades not currently supported.
{"ha-v3-v3-up", 3, "3.0.17/etcd3", "3.1.12/etcd3", "https"},
// downgrades
{"v3-v2-down", 1, "3.0.17/etcd3", "2.2.1/etcd2", "https"},
{"v3-v3-down", 1, "3.1.12/etcd3", "3.0.17/etcd3", "https"},
// warning: ha downgrades not yet supported.
}
for _, m := range migrations {
t.Run(m.title, func(t *testing.T) {
start := MustParseEtcdVersionPair(m.startVersion)
end := MustParseEtcdVersionPair(m.endVersion)
testCfgs := clusterConfig(t, m.title, m.memberCount, m.protocol)
servers := []*EtcdMigrateServer{}
for _, cfg := range testCfgs {
client, err := NewEtcdMigrateClient(cfg)
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
server := NewEtcdMigrateServer(cfg, client)
servers = append(servers, server)
}
// Start the servers.
parallel(servers, func(server *EtcdMigrateServer) {
dataDir, err := OpenOrCreateDataDirectory(server.cfg.dataDirectory)
if err != nil {
t.Fatalf("Error opening or creating data directory %s: %v", server.cfg.dataDirectory, err)
}
migrator := &Migrator{server.cfg, dataDir, server.client}
err = migrator.MigrateIfNeeded(start)
if err != nil {
t.Fatalf("Migration failed: %v", err)
}
err = server.Start(start.version)
if err != nil {
t.Fatalf("Failed to start server: %v", err)
}
})
// Write a value to each server, read it back.
parallel(servers, func(server *EtcdMigrateServer) {
key := fmt.Sprintf("/registry/%s", server.cfg.name)
value := fmt.Sprintf("value-%s", server.cfg.name)
err := server.client.Put(start.version, key, value)
if err != nil {
t.Fatalf("failed to write text value: %v", err)
}
checkVal, err := server.client.Get(start.version, key)
if err != nil {
t.Errorf("Error getting %s for validation: %v", key, err)
}
if checkVal != value {
t.Errorf("Expected %s from %s but got %s", value, key, checkVal)
}
})
// Migrate the servers in series.
serial(servers, func(server *EtcdMigrateServer) {
err := server.Stop()
if err != nil {
t.Fatalf("Stop server failed: %v", err)
}
dataDir, err := OpenOrCreateDataDirectory(server.cfg.dataDirectory)
if err != nil {
t.Fatalf("Error opening or creating data directory %s: %v", server.cfg.dataDirectory, err)
}
migrator := &Migrator{server.cfg, dataDir, server.client}
err = migrator.MigrateIfNeeded(end)
if err != nil {
t.Fatalf("Migration failed: %v", err)
}
err = server.Start(end.version)
if err != nil {
t.Fatalf("Start server failed: %v", err)
}
})
// Check that all test values can be read back from all the servers.
parallel(servers, func(server *EtcdMigrateServer) {
for _, s := range servers {
key := fmt.Sprintf("/registry/%s", s.cfg.name)
value := fmt.Sprintf("value-%s", s.cfg.name)
checkVal, err := server.client.Get(end.version, key)
if err != nil {
t.Errorf("Error getting %s from etcd 2.x after rollback from 3.x: %v", key, err)
}
if checkVal != value {
t.Errorf("Expected %s from %s but got %s when reading after rollback from %s to %s", value, key, checkVal, start, end)
}
}
})
// Stop the servers.
parallel(servers, func(server *EtcdMigrateServer) {
err := server.Stop()
if err != nil {
t.Fatalf("Failed to stop server: %v", err)
}
})
// Check that version.txt contains the correct end version.
parallel(servers, func(server *EtcdMigrateServer) {
dataDir, err := OpenOrCreateDataDirectory(server.cfg.dataDirectory)
v, err := dataDir.versionFile.Read()
if err != nil {
t.Fatalf("Failed to read version.txt file: %v", err)
}
if !v.Equals(end) {
t.Errorf("Expected version.txt to contain %s but got %s", end, v)
}
// Integration tests are run in a docker container with umask of 0022.
checkPermissions(t, server.cfg.dataDirectory, 0755|os.ModeDir)
checkPermissions(t, dataDir.versionFile.path, 0644)
})
})
}
}
func parallel(servers []*EtcdMigrateServer, fn func(server *EtcdMigrateServer)) {
var wg sync.WaitGroup
wg.Add(len(servers))
for _, server := range servers {
go func(s *EtcdMigrateServer) {
defer wg.Done()
fn(s)
}(server)
}
wg.Wait()
}
func serial(servers []*EtcdMigrateServer, fn func(server *EtcdMigrateServer)) {
for _, server := range servers {
fn(server)
}
}
func checkPermissions(t *testing.T, path string, expected os.FileMode) {
info, err := os.Stat(path)
if err != nil {
t.Fatalf("Failed to stat file %s: %v", path, err)
}
if info.Mode() != expected {
t.Errorf("Expected permissions for file %s of %s, but got %s", path, expected, info.Mode())
}
}
func clusterConfig(t *testing.T, name string, memberCount int, protocol string) []*EtcdMigrateCfg {
peers := []string{}
for i := 0; i < memberCount; i++ {
memberName := fmt.Sprintf("%s-%d", name, i)
peerPort := uint64(2380 + i*10000)
peer := fmt.Sprintf("%s=%s://127.0.0.1:%d", memberName, protocol, peerPort)
peers = append(peers, peer)
}
initialCluster := strings.Join(peers, ",")
extraArgs := ""
if protocol == "https" {
extraArgs = getOrCreateTLSPeerCertArgs(t)
}
cfgs := []*EtcdMigrateCfg{}
for i := 0; i < memberCount; i++ {
memberName := fmt.Sprintf("%s-%d", name, i)
peerURL := fmt.Sprintf("%s://127.0.0.1:%d", protocol, uint64(2380+i*10000))
cfg := &EtcdMigrateCfg{
binPath: "/usr/local/bin",
name: memberName,
initialCluster: initialCluster,
port: uint64(2379 + i*10000),
peerListenUrls: peerURL,
peerAdvertiseUrls: peerURL,
etcdDataPrefix: "/registry",
ttlKeysDirectory: "/registry/events",
supportedVersions: testSupportedVersions,
dataDirectory: fmt.Sprintf("/tmp/etcd-data-dir-%s", memberName),
etcdServerArgs: extraArgs,
}
cfgs = append(cfgs, cfg)
}
return cfgs
}
func getOrCreateTLSPeerCertArgs(t *testing.T) string {
spec := TestCertSpec{
host: "localhost",
ips: []string{"127.0.0.1"},
}
certDir := "/tmp/certs"
certFile := filepath.Join(certDir, "test.crt")
keyFile := filepath.Join(certDir, "test.key")
err := getOrCreateTestCertFiles(certFile, keyFile, spec)
if err != nil {
t.Fatalf("failed to create server cert: %v", err)
}
return fmt.Sprintf("--peer-client-cert-auth --peer-trusted-ca-file=%s --peer-cert-file=%s --peer-key-file=%s", certFile, certFile, keyFile)
}
type TestCertSpec struct {
host string
names, ips []string // in certificate
}
func getOrCreateTestCertFiles(certFileName, keyFileName string, spec TestCertSpec) (err error) {
if _, err := os.Stat(certFileName); err == nil {
if _, err := os.Stat(keyFileName); err == nil {
return nil
}
}
certPem, keyPem, err := generateSelfSignedCertKey(spec.host, parseIPList(spec.ips), spec.names)
if err != nil {
return err
}
os.MkdirAll(filepath.Dir(certFileName), os.FileMode(0777))
err = ioutil.WriteFile(certFileName, certPem, os.FileMode(0777))
if err != nil {
return err
}
os.MkdirAll(filepath.Dir(keyFileName), os.FileMode(0777))
err = ioutil.WriteFile(keyFileName, keyPem, os.FileMode(0777))
if err != nil {
return err
}
return nil
}
func parseIPList(ips []string) []net.IP {
var netIPs []net.IP
for _, ip := range ips {
netIPs = append(netIPs, net.ParseIP(ip))
}
return netIPs
}
// generateSelfSignedCertKey creates a self-signed certificate and key for the given host.
// Host may be an IP or a DNS name
// You may also specify additional subject alt names (either ip or dns names) for the certificate
func generateSelfSignedCertKey(host string, alternateIPs []net.IP, alternateDNS []string) ([]byte, []byte, error) {
priv, err := rsa.GenerateKey(cryptorand.Reader, 2048)
if err != nil {
return nil, nil, err
}
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},
NotBefore: time.Unix(0, 0),
NotAfter: time.Now().Add(time.Hour * 24 * 365 * 100),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
BasicConstraintsValid: true,
IsCA: true,
}
if ip := net.ParseIP(host); ip != nil {
template.IPAddresses = append(template.IPAddresses, ip)
} else {
template.DNSNames = append(template.DNSNames, host)
}
template.IPAddresses = append(template.IPAddresses, alternateIPs...)
template.DNSNames = append(template.DNSNames, alternateDNS...)
derBytes, err := x509.CreateCertificate(cryptorand.Reader, &template, &template, &priv.PublicKey, priv)
if err != nil {
return nil, nil, err
}
// Generate cert
certBuffer := bytes.Buffer{}
if err := pem.Encode(&certBuffer, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {
return nil, nil, err
}
// Generate key
keyBuffer := bytes.Buffer{}
if err := pem.Encode(&keyBuffer, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)}); err != nil {
return nil, nil, err
}
return certBuffer.Bytes(), keyBuffer.Bytes(), nil
}

View File

@ -0,0 +1,188 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/golang/glog"
"github.com/spf13/cobra"
)
const (
versionFilename = "version.txt"
defaultPort uint64 = 18629
)
var (
migrateCmd = &cobra.Command{
Short: "Upgrade/downgrade etcd data across multiple versions",
Long: `Upgrade or downgrade etcd data across multiple versions to the target version
Given a 'bin-dir' directory of etcd and etcdctl binaries, an etcd 'data-dir' with a 'version.txt' file and
a target etcd version, this tool will upgrade or downgrade the etcd data from the version specified in
'version.txt' to the target version.
`,
Run: func(cmd *cobra.Command, args []string) {
runMigrate()
},
}
opts = migrateOpts{}
)
type migrateOpts struct {
name string
port uint64
peerListenUrls string
peerAdvertiseUrls string
binDir string
dataDir string
bundledVersionString string
etcdDataPrefix string
ttlKeysDirectory string
initialCluster string
targetVersion string
targetStorage string
etcdServerArgs string
}
func main() {
flags := migrateCmd.Flags()
flags.StringVar(&opts.name, "name", "", "etcd cluster member name. Defaults to etcd-{hostname}")
flags.Uint64Var(&opts.port, "port", defaultPort, "etcd client port to use during migration operations. This should be a different port than typically used by etcd to avoid clients accidentally connecting during upgrade/downgrade operations.")
flags.StringVar(&opts.peerListenUrls, "listen-peer-urls", "", "etcd --listen-peer-urls flag, required for HA clusters")
flags.StringVar(&opts.peerAdvertiseUrls, "initial-advertise-peer-urls", "", "etcd --initial-advertise-peer-urls flag, required for HA clusters")
flags.StringVar(&opts.binDir, "bin-dir", "/usr/local/bin", "directory of etcd and etcdctl binaries, must contain etcd-<version> and etcdctl-<version> for each version listed in bindled-versions")
flags.StringVar(&opts.dataDir, "data-dir", "", "etcd data directory of etcd server to migrate")
flags.StringVar(&opts.bundledVersionString, "bundled-versions", "", "comma separated list of etcd binary versions present under the bin-dir")
flags.StringVar(&opts.etcdDataPrefix, "etcd-data-prefix", "/registry", "etcd key prefix under which all objects are kept")
flags.StringVar(&opts.ttlKeysDirectory, "ttl-keys-directory", "", "etcd key prefix under which all keys with TTLs are kept. Defaults to {etcd-data-prefix}/events")
flags.StringVar(&opts.initialCluster, "initial-cluster", "", "comma separated list of name=endpoint pairs. Defaults to etcd-{hostname}=http://localhost:2380")
flags.StringVar(&opts.targetVersion, "target-version", "", "version of etcd to migrate to. Format must be '<major>.<minor>.<patch>'")
flags.StringVar(&opts.targetStorage, "target-storage", "", "storage version of etcd to migrate to, one of: etcd2, etcd3")
flags.StringVar(&opts.etcdServerArgs, "etcd-server-extra-args", "", "additional etcd server args for starting etcd servers during migration steps, --peer-* TLS cert flags should be added for etcd clusters with more than 1 member that use mutual TLS for peer communication.")
migrateCmd.Execute()
}
// runMigrate validates the command line flags and starts the migration.
func runMigrate() {
if opts.name == "" {
hostname, err := os.Hostname()
if err != nil {
glog.Errorf("Error while getting hostname to supply default --name: %v", err)
os.Exit(1)
}
opts.name = fmt.Sprintf("etcd-%s", hostname)
}
if opts.ttlKeysDirectory == "" {
opts.ttlKeysDirectory = fmt.Sprintf("%s/events", opts.etcdDataPrefix)
}
if opts.initialCluster == "" {
opts.initialCluster = fmt.Sprintf("%s=http://localhost:2380", opts.name)
}
if opts.targetStorage == "" {
glog.Errorf("--target-storage is required")
os.Exit(1)
}
if opts.targetVersion == "" {
glog.Errorf("--target-version is required")
os.Exit(1)
}
if opts.dataDir == "" {
glog.Errorf("--data-dir is required")
os.Exit(1)
}
if opts.bundledVersionString == "" {
glog.Errorf("--bundled-versions is required")
os.Exit(1)
}
bundledVersions, err := ParseSupportedVersions(opts.bundledVersionString)
if err != nil {
glog.Errorf("Failed to parse --supported-versions: %v", err)
}
err = validateBundledVersions(bundledVersions, opts.binDir)
if err != nil {
glog.Errorf("Failed to validate that 'etcd-<version>' and 'etcdctl-<version>' binaries exist in --bin-dir '%s' for all --bundled-verions '%s': %v",
opts.binDir, opts.bundledVersionString, err)
os.Exit(1)
}
target := &EtcdVersionPair{
version: MustParseEtcdVersion(opts.targetVersion),
storageVersion: MustParseEtcdStorageVersion(opts.targetStorage),
}
migrate(opts.name, opts.port, opts.peerListenUrls, opts.peerAdvertiseUrls, opts.binDir, opts.dataDir, opts.etcdDataPrefix, opts.ttlKeysDirectory, opts.initialCluster, target, bundledVersions, opts.etcdServerArgs)
}
// migrate opens or initializes the etcd data directory, configures the migrator, and starts the migration.
func migrate(name string, port uint64, peerListenUrls string, peerAdvertiseUrls string, binPath string, dataDirPath string, etcdDataPrefix string, ttlKeysDirectory string,
initialCluster string, target *EtcdVersionPair, bundledVersions SupportedVersions, etcdServerArgs string) {
dataDir, err := OpenOrCreateDataDirectory(dataDirPath)
if err != nil {
glog.Errorf("Error opening or creating data directory %s: %v", dataDirPath, err)
os.Exit(1)
}
cfg := &EtcdMigrateCfg{
binPath: binPath,
name: name,
port: port,
peerListenUrls: peerListenUrls,
peerAdvertiseUrls: peerAdvertiseUrls,
etcdDataPrefix: etcdDataPrefix,
ttlKeysDirectory: ttlKeysDirectory,
initialCluster: initialCluster,
supportedVersions: bundledVersions,
dataDirectory: dataDirPath,
etcdServerArgs: etcdServerArgs,
}
client, err := NewEtcdMigrateClient(cfg)
if err != nil {
glog.Errorf("Migration failed: %v", err)
os.Exit(1)
}
defer client.Close()
migrator := &Migrator{cfg, dataDir, client}
err = migrator.MigrateIfNeeded(target)
if err != nil {
glog.Errorf("Migration failed: %v", err)
os.Exit(1)
}
}
// validateBundledVersions checks that 'etcd-<version>' and 'etcdctl-<version>' binaries exist in the binDir
// for each version in the bundledVersions list.
func validateBundledVersions(bundledVersions SupportedVersions, binDir string) error {
for _, v := range bundledVersions {
for _, binaryName := range []string{"etcd", "etcdctl"} {
fn := filepath.Join(binDir, fmt.Sprintf("%s-%s", binaryName, v))
if _, err := os.Stat(fn); err != nil {
return fmt.Errorf("failed to validate '%s' binary exists for bundled-version '%s': %v", fn, v, err)
}
}
}
return nil
}

View File

@ -0,0 +1,223 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"context"
clientv2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/golang/glog"
)
// CombinedEtcdClient provides an implementation of EtcdMigrateClient using a combination of the etcd v2 client, v3 client
// and etcdctl commands called via the shell.
type CombinedEtcdClient struct {
cfg *EtcdMigrateCfg
}
// NewEtcdMigrateClient creates a new EtcdMigrateClient from the given EtcdMigrateCfg.
func NewEtcdMigrateClient(cfg *EtcdMigrateCfg) (EtcdMigrateClient, error) {
return &CombinedEtcdClient{cfg}, nil
}
// Close closes the client and releases any resources it holds.
func (e *CombinedEtcdClient) Close() error {
return nil
}
// SetEtcdVersionKeyValue writes the given version to the etcd 'etcd_version' key.
// If no error is returned, the write was successful, indicating the etcd server is available
// and able to perform consensus writes.
func (e *CombinedEtcdClient) SetEtcdVersionKeyValue(version *EtcdVersion) error {
return e.Put(version, "etcd_version", version.String())
}
// Put write a single key value pair to etcd.
func (e *CombinedEtcdClient) Put(version *EtcdVersion, key, value string) error {
if version.Major == 2 {
v2client, err := e.clientV2()
if err != nil {
return err
}
_, err = v2client.Set(context.Background(), key, value, nil)
return err
}
v3client, err := e.clientV3()
if err != nil {
return err
}
defer v3client.Close()
_, err = v3client.KV.Put(context.Background(), key, value)
return err
}
// Get reads a single value for a given key.
func (e *CombinedEtcdClient) Get(version *EtcdVersion, key string) (string, error) {
if version.Major == 2 {
v2client, err := e.clientV2()
if err != nil {
return "", err
}
resp, err := v2client.Get(context.Background(), key, nil)
if err != nil {
return "", err
}
return resp.Node.Value, nil
}
v3client, err := e.clientV3()
if err != nil {
return "", err
}
defer v3client.Close()
resp, err := v3client.KV.Get(context.Background(), key)
if err != nil {
return "", err
}
kvs := resp.Kvs
if len(kvs) != 1 {
return "", fmt.Errorf("expected exactly one value for key %s but got %d", key, len(kvs))
}
return string(kvs[0].Value), nil
}
func (e *CombinedEtcdClient) clientV2() (clientv2.KeysAPI, error) {
v2client, err := clientv2.New(clientv2.Config{Endpoints: []string{e.endpoint()}})
if err != nil {
return nil, err
}
return clientv2.NewKeysAPI(v2client), nil
}
func (e *CombinedEtcdClient) clientV3() (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{Endpoints: []string{e.endpoint()}})
}
// Backup creates a backup of an etcd2 data directory at the given backupDir.
func (e *CombinedEtcdClient) Backup(version *EtcdVersion, backupDir string) error {
// We cannot use etcd/client (v2) to make this call. It is implemented in the etcdctl client code.
if version.Major != 2 {
return fmt.Errorf("etcd 2.x required but got version '%s'", version)
}
return e.runEtcdctlCommand(version,
"--debug",
"backup",
"--data-dir", e.cfg.dataDirectory,
"--backup-dir", backupDir,
)
}
// Snapshot captures a snapshot from a running etcd3 server and saves it to the given snapshotFile.
// We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
func (e *CombinedEtcdClient) Snapshot(version *EtcdVersion, snapshotFile string) error {
if version.Major != 3 {
return fmt.Errorf("etcd 3.x required but got version '%s'", version)
}
return e.runEtcdctlCommand(version,
"--endpoints", e.endpoint(),
"snapshot", "save", snapshotFile,
)
}
// Restore restores a given snapshotFile into the data directory specified this clients config.
func (e *CombinedEtcdClient) Restore(version *EtcdVersion, snapshotFile string) error {
// We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
if version.Major != 3 {
return fmt.Errorf("etcd 3.x required but got version '%s'", version)
}
return e.runEtcdctlCommand(version,
"snapshot", "restore", snapshotFile,
"--data-dir", e.cfg.dataDirectory,
"--name", e.cfg.name,
"--initial-advertise-peer-urls", e.cfg.peerAdvertiseUrls,
"--initial-cluster", e.cfg.initialCluster,
)
}
// Migrate upgrades a 'etcd2' storage version data directory to a 'etcd3' storage version
// data directory.
func (e *CombinedEtcdClient) Migrate(version *EtcdVersion) error {
// We cannot use etcd/clientv3 to make this call as it is implemented in etcd/etcdctl.
if version.Major != 3 {
return fmt.Errorf("etcd 3.x required but got version '%s'", version)
}
return e.runEtcdctlCommand(version,
"migrate",
"--data-dir", e.cfg.dataDirectory,
)
}
func (e *CombinedEtcdClient) runEtcdctlCommand(version *EtcdVersion, args ...string) error {
etcdctlCmd := exec.Command(filepath.Join(e.cfg.binPath, fmt.Sprintf("etcdctl-%s", version)), args...)
etcdctlCmd.Env = []string{fmt.Sprintf("ETCDCTL_API=%d", version.Major)}
etcdctlCmd.Stdout = os.Stdout
etcdctlCmd.Stderr = os.Stderr
return etcdctlCmd.Run()
}
// AttachLease attaches leases of the given leaseDuration to all the etcd objects under
// ttlKeysDirectory specified in this client's config.
func (e *CombinedEtcdClient) AttachLease(leaseDuration time.Duration) error {
ttlKeysPrefix := e.cfg.ttlKeysDirectory
// Make sure that ttlKeysPrefix is ended with "/" so that we only get children "directories".
if !strings.HasSuffix(ttlKeysPrefix, "/") {
ttlKeysPrefix += "/"
}
ctx := context.Background()
v3client, err := e.clientV3()
if err != nil {
return err
}
defer v3client.Close()
objectsResp, err := v3client.KV.Get(ctx, ttlKeysPrefix, clientv3.WithPrefix())
if err != nil {
return fmt.Errorf("Error while getting objects to attach to the lease")
}
lease, err := v3client.Lease.Grant(ctx, int64(leaseDuration/time.Second))
if err != nil {
return fmt.Errorf("Error while creating lease: %v", err)
}
glog.Infof("Lease with TTL: %v created", lease.TTL)
glog.Infof("Attaching lease to %d entries", len(objectsResp.Kvs))
for _, kv := range objectsResp.Kvs {
putResp, err := v3client.KV.Put(ctx, string(kv.Key), string(kv.Value), clientv3.WithLease(lease.ID), clientv3.WithPrevKV())
if err != nil {
glog.Errorf("Error while attaching lease to: %s", string(kv.Key))
}
if bytes.Compare(putResp.PrevKv.Value, kv.Value) != 0 {
return fmt.Errorf("concurrent access to key detected when setting lease on %s, expected previous value of %s but got %s",
kv.Key, kv.Value, putResp.PrevKv.Value)
}
}
return nil
}
func (e *CombinedEtcdClient) endpoint() string {
return fmt.Sprintf("http://127.0.0.1:%d", e.cfg.port)
}

View File

@ -0,0 +1,132 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"os"
"os/exec"
"strings"
"time"
"github.com/golang/glog"
)
// EtcdMigrateServer manages starting and stopping a versioned etcd server binary.
type EtcdMigrateServer struct {
cfg *EtcdMigrateCfg
client EtcdMigrateClient
cmd *exec.Cmd
}
// NewEtcdMigrateServer creates a EtcdMigrateServer for starting and stopping a etcd server at the given version.
func NewEtcdMigrateServer(cfg *EtcdMigrateCfg, client EtcdMigrateClient) *EtcdMigrateServer {
return &EtcdMigrateServer{cfg: cfg, client: client}
}
// Start starts an etcd server as a separate process, waits until it has started, and returns a exec.Cmd.
func (r *EtcdMigrateServer) Start(version *EtcdVersion) error {
etcdCmd := exec.Command(
fmt.Sprintf("%s/etcd-%s", r.cfg.binPath, version),
"--name", r.cfg.name,
"--initial-cluster", r.cfg.initialCluster,
"--debug",
"--data-dir", r.cfg.dataDirectory,
"--listen-client-urls", fmt.Sprintf("http://127.0.0.1:%d", r.cfg.port),
"--advertise-client-urls", fmt.Sprintf("http://127.0.0.1:%d", r.cfg.port),
"--listen-peer-urls", r.cfg.peerListenUrls,
"--initial-advertise-peer-urls", r.cfg.peerAdvertiseUrls,
)
if r.cfg.etcdServerArgs != "" {
extraArgs := strings.Fields(r.cfg.etcdServerArgs)
etcdCmd.Args = append(etcdCmd.Args, extraArgs...)
}
fmt.Printf("Starting server %s: %+v\n", r.cfg.name, etcdCmd.Args)
etcdCmd.Stdout = os.Stdout
etcdCmd.Stderr = os.Stderr
err := etcdCmd.Start()
if err != nil {
return err
}
interval := time.NewTicker(time.Millisecond * 500)
defer interval.Stop()
done := make(chan bool)
go func() {
time.Sleep(time.Minute * 2)
done <- true
}()
for {
select {
case <-interval.C:
err := r.client.SetEtcdVersionKeyValue(version)
if err != nil {
glog.Infof("Still waiting for etcd to start, current error: %v", err)
// keep waiting
} else {
glog.Infof("Etcd on port %d is up.", r.cfg.port)
r.cmd = etcdCmd
return nil
}
case <-done:
err = etcdCmd.Process.Kill()
if err != nil {
return fmt.Errorf("error killing etcd: %v", err)
}
return fmt.Errorf("Timed out waiting for etcd on port %d", r.cfg.port)
}
}
}
// Stop terminates the etcd server process. If the etcd server process has not been started
// or is not still running, this returns an error.
func (r *EtcdMigrateServer) Stop() error {
if r.cmd == nil {
return fmt.Errorf("cannot stop EtcdMigrateServer that has not been started")
}
err := r.cmd.Process.Signal(os.Interrupt)
if err != nil {
return fmt.Errorf("error sending SIGINT to etcd for graceful shutdown: %v", err)
}
gracefulWait := time.Minute * 2
stopped := make(chan bool)
timedout := make(chan bool)
go func() {
time.Sleep(gracefulWait)
timedout <- true
}()
go func() {
select {
case <-stopped:
return
case <-timedout:
glog.Infof("etcd server has not terminated gracefully after %s, killing it.", gracefulWait)
r.cmd.Process.Kill()
return
}
}()
err = r.cmd.Wait()
stopped <- true
if exiterr, ok := err.(*exec.ExitError); ok {
glog.Infof("etcd server stopped (signal: %s)", exiterr.Error())
// stopped
} else if err != nil {
return fmt.Errorf("error waiting for etcd to stop: %v", err)
}
glog.Infof("Stopped etcd server %s", r.cfg.name)
return nil
}

View File

@ -0,0 +1,258 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"os"
"os/exec"
"time"
"github.com/blang/semver"
"github.com/golang/glog"
)
// EtcdMigrateCfg provides all configuration required to perform etcd data upgrade/downgrade migrations.
type EtcdMigrateCfg struct {
binPath string
name string
initialCluster string
port uint64
peerListenUrls string
peerAdvertiseUrls string
etcdDataPrefix string
ttlKeysDirectory string
supportedVersions SupportedVersions
dataDirectory string
etcdServerArgs string
}
// EtcdMigrateClient defines the etcd client operations required to perform migrations.
type EtcdMigrateClient interface {
SetEtcdVersionKeyValue(version *EtcdVersion) error
Get(version *EtcdVersion, key string) (string, error)
Put(version *EtcdVersion, key, value string) error
Backup(version *EtcdVersion, backupDir string) error
Snapshot(version *EtcdVersion, snapshotFile string) error
Restore(version *EtcdVersion, snapshotFile string) error
Migrate(version *EtcdVersion) error
AttachLease(leaseDuration time.Duration) error
Close() error
}
// Migrator manages etcd data migrations.
type Migrator struct {
cfg *EtcdMigrateCfg // TODO: don't wire this directly in
dataDirectory *DataDirectory
client EtcdMigrateClient
}
// MigrateIfNeeded upgrades or downgrades the etcd data directory to the given target version.
func (m *Migrator) MigrateIfNeeded(target *EtcdVersionPair) error {
glog.Infof("Starting migration to %s", target)
err := m.dataDirectory.Initialize(target)
if err != nil {
return fmt.Errorf("failed to initialize data directory %s: %v", m.dataDirectory.path, err)
}
var current *EtcdVersionPair
vfExists, err := m.dataDirectory.versionFile.Exists()
if err != nil {
return err
}
if vfExists {
current, err = m.dataDirectory.versionFile.Read()
if err != nil {
return err
}
} else {
return fmt.Errorf("existing data directory '%s' is missing version.txt file, unable to migrate", m.dataDirectory.path)
}
for {
glog.Infof("Converging current version '%s' to target version '%s'", current, target)
currentNextMinorVersion := &EtcdVersion{Version: semver.Version{Major: current.version.Major, Minor: current.version.Minor + 1}}
switch {
case current.version.MajorMinorEquals(target.version) || currentNextMinorVersion.MajorMinorEquals(target.version):
glog.Infof("current version '%s' equals or is one minor version previous of target version '%s' - migration complete", current, target)
err = m.dataDirectory.versionFile.Write(target)
if err != nil {
return fmt.Errorf("failed to write version.txt to '%s': %v", m.dataDirectory.path, err)
}
return nil
case current.storageVersion == storageEtcd2 && target.storageVersion == storageEtcd3:
glog.Infof("upgrading from etcd2 storage to etcd3 storage")
current, err = m.etcd2ToEtcd3Upgrade(current, target)
case current.version.Major == 3 && target.version.Major == 2:
glog.Infof("downgrading from etcd 3.x to 2.x")
current, err = m.rollbackToEtcd2(current, target)
case current.version.Major == target.version.Major && current.version.Minor < target.version.Minor:
stepVersion := m.cfg.supportedVersions.NextVersionPair(current)
glog.Infof("upgrading etcd from %s to %s", current, stepVersion)
current, err = m.minorVersionUpgrade(current, stepVersion)
case current.version.Major == 3 && target.version.Major == 3 && current.version.Minor > target.version.Minor:
glog.Infof("rolling etcd back from %s to %s", current, target)
current, err = m.rollbackEtcd3MinorVersion(current, target)
}
if err != nil {
return err
}
}
}
func (m *Migrator) backupEtcd2(current *EtcdVersion) error {
backupDir := fmt.Sprintf("%s/%s", m.dataDirectory, "migration-backup")
glog.Infof("Backup etcd before starting migration")
err := os.Mkdir(backupDir, 0666)
if err != nil {
return fmt.Errorf("failed to create backup directory before starting migration: %v", err)
}
m.client.Backup(current, backupDir)
glog.Infof("Backup done in %s", backupDir)
return nil
}
func (m *Migrator) rollbackEtcd3MinorVersion(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
if target.version.Minor != current.version.Minor-1 {
return nil, fmt.Errorf("rollback from %s to %s not supported, only rollbacks to the previous minor version are supported", current.version, target.version)
}
glog.Infof("Performing etcd %s -> %s rollback", current.version, target.version)
err := m.dataDirectory.Backup()
if err != nil {
return nil, err
}
snapshotFilename := fmt.Sprintf("%s.snapshot.db", m.dataDirectory.path)
err = os.Remove(snapshotFilename)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to clean snapshot file before rollback: %v", err)
}
// Start current version of etcd.
runner := m.newServer()
glog.Infof("Starting etcd version %s to capture rollback snapshot.", current.version)
err = runner.Start(current.version)
if err != nil {
glog.Fatalf("Unable to automatically downgrade etcd: starting etcd version %s to capture rollback snapshot failed: %v", current.version, err)
return nil, err
}
glog.Infof("Snapshotting etcd %s to %s", current.version, snapshotFilename)
err = m.client.Snapshot(current.version, snapshotFilename)
if err != nil {
return nil, err
}
err = runner.Stop()
if err != nil {
return nil, err
}
glog.Infof("Backing up data before rolling back")
backupDir := fmt.Sprintf("%s.bak", m.dataDirectory)
err = os.RemoveAll(backupDir)
if err != nil {
return nil, err
}
origInfo, err := os.Stat(m.dataDirectory.path)
if err != nil {
return nil, err
}
err = exec.Command("mv", m.dataDirectory.path, backupDir).Run()
if err != nil {
return nil, err
}
glog.Infof("Restoring etcd %s from %s", target.version, snapshotFilename)
err = m.client.Restore(target.version, snapshotFilename)
if err != nil {
return nil, err
}
err = os.Chmod(m.dataDirectory.path, origInfo.Mode())
if err != nil {
return nil, err
}
return target, nil
}
func (m *Migrator) rollbackToEtcd2(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
if !(current.version.Major == 3 && current.version.Minor == 0 && target.version.Major == 2 && target.version.Minor == 2) {
return nil, fmt.Errorf("etcd3 -> etcd2 downgrade is supported only between 3.0.x and 2.2.x, got current %s target %s", current, target)
}
glog.Infof("Backup and remove all existing v2 data")
err := m.dataDirectory.Backup()
if err != nil {
return nil, err
}
err = RollbackV3ToV2(m.dataDirectory.path, time.Hour)
if err != nil {
return nil, fmt.Errorf("rollback to etcd 2.x failed: %v", err)
}
return target, nil
}
func (m *Migrator) etcd2ToEtcd3Upgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
if current.storageVersion != storageEtcd2 || target.version.Major != 3 || target.storageVersion != storageEtcd3 {
return nil, fmt.Errorf("etcd2 to etcd3 upgrade is supported only for x.x.x/etcd2 to 3.0.x/etcd3, got current %s target %s", current, target)
}
runner := m.newServer()
glog.Infof("Performing etcd2 -> etcd3 migration")
err := m.client.Migrate(target.version)
if err != nil {
return nil, err
}
glog.Infof("Attaching leases to TTL entries")
// Now attach lease to all keys.
// To do it, we temporarily start etcd on a random port (so that
// apiserver actually cannot access it).
err = runner.Start(target.version)
if err != nil {
return nil, err
}
defer func() {
err = runner.Stop()
}()
// Create a lease and attach all keys to it.
err = m.client.AttachLease(1 * time.Hour)
if err != nil {
return nil, err
}
return target, err
}
func (m *Migrator) minorVersionUpgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
runner := m.newServer()
// Do the migration step, by just starting etcd in the target version.
err := runner.Start(target.version)
if err != nil {
return nil, err
}
err = runner.Stop()
return target, err
}
func (m *Migrator) newServer() *EtcdMigrateServer {
return NewEtcdMigrateServer(m.cfg, m.client)
}

View File

@ -18,8 +18,6 @@ package main
import (
"encoding/json"
"flag"
"fmt"
"os"
"path"
"strconv"
@ -49,17 +47,10 @@ import (
const rollbackVersion = "2.2.0"
var (
migrateDatadir = flag.String("data-dir", "", "Path to the data directory")
ttl = flag.Duration("ttl", time.Hour, "TTL of event keys (default 1 hour)")
)
func main() {
flag.Parse()
if len(*migrateDatadir) == 0 {
glog.Fatal("need to set '--data-dir'")
}
dbpath := path.Join(*migrateDatadir, "member", "snap", "db")
// RollbackV3ToV2 rolls back an etcd 3.0.x data directory to the 2.x.x version specified by rollbackVersion.
func RollbackV3ToV2(migrateDatadir string, ttl time.Duration) error {
dbpath := path.Join(migrateDatadir, "member", "snap", "db")
glog.Infof("Rolling db file %s back to etcd 2.x", dbpath)
// etcd3 store backend. We will use it to parse v3 data files and extract information.
be := backend.NewDefaultBackend(dbpath)
@ -67,7 +58,7 @@ func main() {
// etcd2 store backend. We will use v3 data to update this and then save snapshot to disk.
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
expireTime := time.Now().Add(*ttl)
expireTime := time.Now().Add(ttl)
tx.Lock()
err := tx.UnsafeForEach([]byte("key"), func(k, v []byte) error {
@ -97,43 +88,43 @@ func main() {
return nil
})
if err != nil {
glog.Fatal(err)
return err
}
tx.Unlock()
if err := traverseAndDeleteEmptyDir(st, "/"); err != nil {
glog.Fatal(err)
return err
}
// rebuild cluster state.
metadata, hardstate, oldSt, err := rebuild(*migrateDatadir)
metadata, hardstate, oldSt, err := rebuild(migrateDatadir)
if err != nil {
glog.Fatal(err)
return err
}
// In the following, it's low level logic that saves metadata and data into v2 snapshot.
backupPath := *migrateDatadir + ".rollback.backup"
if err := os.Rename(*migrateDatadir, backupPath); err != nil {
glog.Fatal(err)
backupPath := migrateDatadir + ".rollback.backup"
if err := os.Rename(migrateDatadir, backupPath); err != nil {
return err
}
if err := os.MkdirAll(path.Join(*migrateDatadir, "member", "snap"), 0700); err != nil {
glog.Fatal(err)
if err := os.MkdirAll(path.Join(migrateDatadir, "member", "snap"), 0777); err != nil {
return err
}
walDir := path.Join(*migrateDatadir, "member", "wal")
walDir := path.Join(migrateDatadir, "member", "wal")
w, err := oldwal.Create(walDir, metadata)
if err != nil {
glog.Fatal(err)
return err
}
err = w.SaveSnapshot(walpb.Snapshot{Index: hardstate.Commit, Term: hardstate.Term})
if err != nil {
glog.Fatal(err)
}
w.Close()
if err != nil {
return err
}
event, err := oldSt.Get(etcdserver.StoreClusterPrefix, true, false)
if err != nil {
glog.Fatal(err)
return err
}
// nodes (members info) for ConfState
nodes := []uint64{}
@ -148,7 +139,7 @@ func main() {
v = rollbackVersion
}
if _, err := st.Set(n.Key, n.Dir, v, store.TTLOptionSet{}); err != nil {
glog.Fatal(err)
glog.Error(err)
}
// update nodes
@ -165,7 +156,7 @@ func main() {
data, err := st.Save()
if err != nil {
glog.Fatal(err)
return err
}
raftSnap := raftpb.Snapshot{
Data: data,
@ -177,11 +168,12 @@ func main() {
},
},
}
snapshotter := snap.New(path.Join(*migrateDatadir, "member", "snap"))
snapshotter := snap.New(path.Join(migrateDatadir, "member", "snap"))
if err := snapshotter.SaveSnap(raftSnap); err != nil {
glog.Fatal(err)
return err
}
fmt.Println("Finished successfully")
glog.Infof("Finished successfully")
return nil
}
func traverseMetadata(head *store.NodeExtern, handleFunc func(*store.NodeExtern)) {

View File

@ -0,0 +1 @@
3.1.12/etcd3

View File

@ -0,0 +1,198 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"strings"
"github.com/blang/semver"
)
// EtcdVersion specifies an etcd server binaries SemVer.
type EtcdVersion struct {
semver.Version
}
// ParseEtcdVersion parses a SemVer string to an EtcdVersion.
func ParseEtcdVersion(s string) (*EtcdVersion, error) {
v, err := semver.Make(s)
if err != nil {
return nil, err
}
return &EtcdVersion{v}, nil
}
// MustParseEtcdVersion parses a SemVer string to an EtcdVersion and panics if the parse fails.
func MustParseEtcdVersion(s string) *EtcdVersion {
return &EtcdVersion{semver.MustParse(s)}
}
// String returns the version in SemVer string format.
func (v *EtcdVersion) String() string {
return v.Version.String()
}
// Equals returns true if the versions are exactly equal.
func (v *EtcdVersion) Equals(o *EtcdVersion) bool {
return v.Version.Equals(o.Version)
}
// MajorMinorEquals returns true if the major and minor parts of the versions are equal;
// if only patch versions differ, this returns true.
func (v *EtcdVersion) MajorMinorEquals(o *EtcdVersion) bool {
return v.Major == o.Major && v.Minor == o.Minor
}
// EtcdStorageVersion identifies the storage version of an etcd data directory.
type EtcdStorageVersion int
const (
storageUnknown EtcdStorageVersion = iota
storageEtcd2
storageEtcd3
)
// ParseEtcdStorageVersion parses an etcd storage version string to an EtcdStorageVersion.
func ParseEtcdStorageVersion(s string) (EtcdStorageVersion, error) {
switch s {
case "etcd2":
return storageEtcd2, nil
case "etcd3":
return storageEtcd3, nil
default:
return storageUnknown, fmt.Errorf("unrecognized storage version: %s", s)
}
}
// MustParseEtcdStorageVersion parses an etcd storage version string to an EtcdStorageVersion and
// panics if the parse fails.
func MustParseEtcdStorageVersion(s string) EtcdStorageVersion {
version, err := ParseEtcdStorageVersion(s)
if err != nil {
panic(err)
}
return version
}
// String returns the text representation of the EtcdStorageVersion, 'etcd2' or 'etcd3'.
func (v EtcdStorageVersion) String() string {
switch v {
case storageEtcd2:
return "etcd2"
case storageEtcd3:
return "etcd3"
default:
panic(fmt.Sprintf("enum value %d missing from EtcdStorageVersion String() function", v))
}
}
// EtcdVersionPair is composed of an etcd version and storage version.
type EtcdVersionPair struct {
version *EtcdVersion
storageVersion EtcdStorageVersion
}
// ParseEtcdVersionPair parses a "<version>/<storage-version>" string to an EtcdVersionPair.
func ParseEtcdVersionPair(s string) (*EtcdVersionPair, error) {
parts := strings.Split(s, "/")
if len(parts) != 2 {
return nil, fmt.Errorf("Malformed version file, expected <major>.<minor>.<patch>/<storage> but got %s", s)
}
version, err := ParseEtcdVersion(parts[0])
if err != nil {
return nil, err
}
storageVersion, err := ParseEtcdStorageVersion(parts[1])
if err != nil {
return nil, err
}
return &EtcdVersionPair{version, storageVersion}, nil
}
// MustParseEtcdVersionPair parses a "<version>/<storage-version>" string to an EtcdVersionPair
// or panics if the parse fails.
func MustParseEtcdVersionPair(s string) *EtcdVersionPair {
pair, err := ParseEtcdVersionPair(s)
if err != nil {
panic(err)
}
return pair
}
// String returns "<version>/<storage-version>" string of the EtcdVersionPair.
func (vp *EtcdVersionPair) String() string {
return fmt.Sprintf("%s/%s", vp.version, vp.storageVersion)
}
// Equals returns true if both the versions and storage versions are exactly equal.
func (vp *EtcdVersionPair) Equals(o *EtcdVersionPair) bool {
return vp.version.Equals(o.version) && vp.storageVersion == o.storageVersion
}
// SupportedVersions provides a list of etcd versions that are "supported" for some purpose.
// The list must be sorted from lowest semantic version to high.
type SupportedVersions []*EtcdVersion
// NextVersion returns the next supported version after the given current version, or nil if no
// next version exists.
func (sv SupportedVersions) NextVersion(current *EtcdVersion) *EtcdVersion {
var nextVersion *EtcdVersion
for i, supportedVersion := range sv {
if current.MajorMinorEquals(supportedVersion) && len(sv) > i+1 {
nextVersion = sv[i+1]
}
}
return nextVersion
}
// NextVersionPair returns the next supported version after the given current version and infers
// the storage version from the major version part of the next version.
func (sv SupportedVersions) NextVersionPair(current *EtcdVersionPair) *EtcdVersionPair {
nextVersion := sv.NextVersion(current.version)
if nextVersion == nil {
return nil
}
storageVersion := storageEtcd3
if nextVersion.Major == 2 {
storageVersion = storageEtcd2
}
return &EtcdVersionPair{version: nextVersion, storageVersion: storageVersion}
}
// ParseSupportedVersions parses a comma separated list of etcd versions.
func ParseSupportedVersions(s string) (SupportedVersions, error) {
var err error
list := strings.Split(s, ",")
versions := make(SupportedVersions, len(list))
for i, v := range list {
versions[i], err = ParseEtcdVersion(strings.TrimSpace(v))
if err != nil {
return nil, err
}
}
return versions, nil
}
// MustParseSupportedVersions parses a comma separated list of etcd versions or panics if the parse fails.
func MustParseSupportedVersions(s string) SupportedVersions {
versions, err := ParseSupportedVersions(s)
if err != nil {
panic(err)
}
return versions
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"testing"
"github.com/blang/semver"
)
func TestSerializeEtcdVersionPair(t *testing.T) {
cases := []struct {
versionTxt string
version *EtcdVersionPair
match bool
}{
{"3.1.2/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("3.1.2")}, storageEtcd3}, true},
{"2.2.1/etcd2", &EtcdVersionPair{&EtcdVersion{semver.MustParse("2.2.1")}, storageEtcd2}, true},
{"1.1.1-rc.0/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("1.1.1-rc.0")}, storageEtcd3}, true},
{"10.100.1000/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("10.100.1000")}, storageEtcd3}, true},
{"2.2.2/etcd2", &EtcdVersionPair{&EtcdVersion{semver.MustParse("2.2.1")}, storageEtcd2}, false},
{"2.2.1/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("2.2.1")}, storageEtcd2}, false},
}
for _, c := range cases {
vp, err := ParseEtcdVersionPair(c.versionTxt)
if err != nil {
t.Errorf("Failed to parse '%s': %v", c.versionTxt, err)
}
if vp.Equals(c.version) != c.match {
t.Errorf("Expected '%s' to be parsed as '%+v', got '%+v'", c.versionTxt, c.version, vp)
}
if vp.String() != c.versionTxt {
t.Errorf("Expected round trip serialization back to '%s', got '%s'", c.versionTxt, vp.String())
}
}
unparsables := []string{
"1.1/etcd3",
"1.1.1.1/etcd3",
"1.1.1/etcd4",
}
for _, unparsable := range unparsables {
vp, err := ParseEtcdVersionPair(unparsable)
if err == nil {
t.Errorf("Should have failed to parse '%s' but got '%s'", unparsable, vp)
}
}
}
func TestMajorMinorEquals(t *testing.T) {
cases := []struct {
first *EtcdVersion
second *EtcdVersion
match bool
}{
{&EtcdVersion{semver.Version{Major: 3, Minor: 1, Patch: 2}}, &EtcdVersion{semver.Version{Major: 3, Minor: 1, Patch: 0}}, true},
{&EtcdVersion{semver.Version{Major: 3, Minor: 1, Patch: 2}}, &EtcdVersion{semver.Version{Major: 3, Minor: 1, Patch: 2}}, true},
{&EtcdVersion{semver.Version{Major: 3, Minor: 0, Patch: 0}}, &EtcdVersion{semver.Version{Major: 3, Minor: 1, Patch: 0}}, false},
{&EtcdVersion{semver.Version{Major: 2, Minor: 0, Patch: 0}}, &EtcdVersion{semver.Version{Major: 3, Minor: 0, Patch: 0}}, false},
}
for _, c := range cases {
if c.first.MajorMinorEquals(c.second) != c.match {
t.Errorf("Expected (%+v == %+v) == %t, got %t", c.first, c.second, c.match, !c.match)
}
}
}

View File

@ -1,45 +0,0 @@
# Rollback workflow
Build it in this directory.
Make sure you have etcd dependency ready. Last time we use etcd v3.0.7.
```
$ go build .
```
Run it:
```
$ ./rollback2 --data-dir $ETCD_DATA_DIR --ttl 1h
```
This will rollback KV pairs from v3 into v2.
If a key was attached to a lease before, it will be created with given TTL (default to 1h).
On success, it will print at the end:
```
Finished successfully
```
Repeat this on all etcd members.
You can do simple check on keys (if any exists):
```
etcdctl ls /
```
Important Note
------
This tool isn't recommended to use if any problem comes up in etcd3 backend.
Please report bugs and we will fix it soon.
If it's still preferred to run this tool, please backup all your data beforehand.
This tool will also back up datadir to same path with ".rollback.backup" suffix.
Caveats:
- The tool doesn't preserve versions of keys.
- If any v2 data exists before rollback, they will be wiped out.
- v3 data only exists in the backup after successful rollback.
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/cluster/images/etcd/rollback/README.md?pixel)]()

View File

@ -1,73 +0,0 @@
#!/bin/sh
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Starts 'etcd' version ${START_VERSION} and writes to it:
# 'etcd_version' -> "${START_VERSION}"
# ETCD_CREDS may optionally be set to provide flags for TLS credentials
# such as '--cert-file' and '--peer-cert-file'. For a complete list of
# flags, see https://coreos.com/etcd/docs/latest/op-guide/security.html.
# Successful write confirms that etcd is up and running.
# Sets ETCD_PID at the end.
# Returns 0 if etcd was successfully started, non-0 otherwise.
start_etcd() {
# Use random ports, so that apiserver cannot connect to etcd.
ETCD_PORT=18629
ETCD_PEER_PORT=2380
ETCD_CREDS="${ETCD_CREDS:-}"
# Avoid collisions between etcd and event-etcd.
case "${DATA_DIRECTORY}" in
*event*)
ETCD_PORT=18631
ETCD_PEER_PORT=2381
;;
esac
local ETCD_CMD="${ETCD:-/usr/local/bin/etcd-${START_VERSION}}"
local ETCDCTL_CMD="${ETCDCTL:-/usr/local/bin/etcdctl-${START_VERSION}}"
local API_VERSION="$(echo ${START_STORAGE} | cut -c5-5)"
if [ "${API_VERSION}" = "2" ]; then
ETCDCTL_CMD="${ETCDCTL_CMD} --debug --endpoint=http://127.0.0.1:${ETCD_PORT} set"
else
ETCDCTL_CMD="${ETCDCTL_CMD} --endpoints=http://127.0.0.1:${ETCD_PORT} put"
fi
${ETCD_CMD} \
--name="etcd-$(hostname)" \
--initial-cluster="etcd-$(hostname)=http://127.0.0.1:${ETCD_PEER_PORT}" \
--debug \
--data-dir=${DATA_DIRECTORY} \
--listen-client-urls http://127.0.0.1:${ETCD_PORT} \
--advertise-client-urls http://127.0.0.1:${ETCD_PORT} \
--listen-peer-urls http://127.0.0.1:${ETCD_PEER_PORT} \
--initial-advertise-peer-urls http://127.0.0.1:${ETCD_PEER_PORT} \
${ETCD_CREDS} &
ETCD_PID=$!
# Wait until we can write to etcd.
for i in $(seq 240); do
sleep 0.5
ETCDCTL_API="${API_VERSION}" ${ETCDCTL_CMD} 'etcd_version' ${START_VERSION}
if [ "$?" -eq "0" ]; then
echo "Etcd on port ${ETCD_PORT} is up."
return 0
fi
done
echo "Timeout while waiting for etcd on port ${ETCD_PORT}"
return 1
}
# Stops etcd with ${ETCD_PID} pid.
stop_etcd() {
kill "${ETCD_PID-}" >/dev/null 2>&1 || :
wait "${ETCD_PID-}" >/dev/null 2>&1 || :
}

View File

@ -62,7 +62,7 @@ TERMINATED_POD_GC_THRESHOLD=${TERMINATED_POD_GC_THRESHOLD:-100}
KUBE_APISERVER_REQUEST_TIMEOUT=300
ETCD_COMPACTION_INTERVAL_SEC="${KUBEMARK_ETCD_COMPACTION_INTERVAL_SEC:-}"
# Set etcd image (e.g. k8s.gcr.io/etcd) and version (e.g. 3.1.12) if you need
# Set etcd image (e.g. k8s.gcr.io/etcd) and version (e.g. 3.1.12-1) if you need
# non-default version.
ETCD_IMAGE="${TEST_ETCD_IMAGE:-}"
ETCD_VERSION="${TEST_ETCD_VERSION:-}"

View File

@ -97,7 +97,7 @@ function cleanup() {
trap cleanup EXIT SIGINT
make -C "${KUBE_ROOT}" WHAT=cmd/kube-apiserver
make -C "${KUBE_ROOT}" WHAT=cluster/images/etcd/attachlease
make -C "${KUBE_ROOT}" WHAT=cluster/images/etcd/migrate
kube::etcd::start
echo "${ETCD_VERSION}" > "${ETCD_DIR}/version.txt"

View File

@ -67,7 +67,7 @@ func etcdUpgradeGCE(target_storage, target_version string) error {
os.Environ(),
"TEST_ETCD_VERSION="+target_version,
"STORAGE_BACKEND="+target_storage,
"TEST_ETCD_IMAGE=3.1.12")
"TEST_ETCD_IMAGE=3.1.12-1")
_, _, err := RunCmdEnv(env, gceUpgradeScript(), "-l", "-M")
return err
@ -107,7 +107,7 @@ func masterUpgradeGCE(rawV string, enableKubeProxyDaemonSet bool) error {
env = append(env,
"TEST_ETCD_VERSION="+TestContext.EtcdUpgradeVersion,
"STORAGE_BACKEND="+TestContext.EtcdUpgradeStorage,
"TEST_ETCD_IMAGE=3.1.12")
"TEST_ETCD_IMAGE=3.1.12-1")
} else {
// In e2e tests, we skip the confirmation prompt about
// implicit etcd upgrades to simulate the user entering "y".

View File

@ -59,7 +59,7 @@ SERVICE_CLUSTER_IP_RANGE="${SERVICE_CLUSTER_IP_RANGE:-}"
EVENT_PD="${EVENT_PD:-}"
# Etcd related variables.
ETCD_IMAGE="${ETCD_IMAGE:-3.1.12}"
ETCD_IMAGE="${ETCD_IMAGE:-3.1.12-1}"
ETCD_VERSION="${ETCD_VERSION:-}"
# Controller-manager related variables.