mirror of https://github.com/k3s-io/k3s
Collect and Store etcd Snapshots and Metadata (#3239)
* Add the ability to store local etcd snapshots and etcd snapshots stored in an S3 compatible object store in a ConfigMap.pull/3257/head
parent
fceb20fe0c
commit
c5ad71ce0b
|
@ -0,0 +1,2 @@
|
||||||
|
FROM k3s:issue-3234
|
||||||
|
COPY . /go/src/github.com/rancher/k3s/
|
|
@ -0,0 +1,53 @@
|
||||||
|
ARG GOLANG=golang:1.16.2-alpine3.12
|
||||||
|
FROM ${GOLANG}
|
||||||
|
|
||||||
|
ARG http_proxy=$http_proxy
|
||||||
|
ARG https_proxy=$https_proxy
|
||||||
|
ARG no_proxy=$no_proxy
|
||||||
|
ENV http_proxy=$http_proxy
|
||||||
|
ENV https_proxy=$https_proxy
|
||||||
|
ENV no_proxy=$no_proxy
|
||||||
|
|
||||||
|
RUN apk -U --no-cache add bash git gcc musl-dev docker vim less file curl wget ca-certificates jq linux-headers \
|
||||||
|
zlib-dev tar zip squashfs-tools npm coreutils python2 openssl-dev libffi-dev libseccomp libseccomp-dev make \
|
||||||
|
libuv-static sqlite-dev sqlite-static libselinux libselinux-dev zlib-dev zlib-static zstd gzip alpine-sdk binutils-gold
|
||||||
|
RUN if [ "$(go env GOARCH)" = "arm64" ]; then \
|
||||||
|
wget https://github.com/aquasecurity/trivy/releases/download/v0.16.0/trivy_0.16.0_Linux-ARM64.tar.gz && \
|
||||||
|
tar -zxvf trivy_0.16.0_Linux-ARM64.tar.gz && \
|
||||||
|
mv trivy /usr/local/bin; \
|
||||||
|
elif [ "$(go env GOARCH)" = "arm" ]; then \
|
||||||
|
wget https://github.com/aquasecurity/trivy/releases/download/v0.16.0/trivy_0.16.0_Linux-ARM.tar.gz && \
|
||||||
|
tar -zxvf trivy_0.16.0_Linux-ARM.tar.gz && \
|
||||||
|
mv trivy /usr/local/bin; \
|
||||||
|
else \
|
||||||
|
wget https://github.com/aquasecurity/trivy/releases/download/v0.16.0/trivy_0.16.0_Linux-64bit.tar.gz && \
|
||||||
|
tar -zxvf trivy_0.16.0_Linux-64bit.tar.gz && \
|
||||||
|
mv trivy /usr/local/bin; \
|
||||||
|
fi
|
||||||
|
# this works for both go 1.15 and 1.16
|
||||||
|
RUN GO111MODULE=on GOPROXY=direct go get golang.org/x/tools/cmd/goimports@gopls/v0.6.9
|
||||||
|
RUN rm -rf /go/src /go/pkg
|
||||||
|
|
||||||
|
RUN if [ "$(go env GOARCH)" = "amd64" ]; then \
|
||||||
|
curl -sL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s v1.38.0; \
|
||||||
|
fi
|
||||||
|
|
||||||
|
ENV YQ_URL=https://github.com/mikefarah/yq/releases/download/v4.6.2/yq_linux
|
||||||
|
RUN wget -O - ${YQ_URL}_$(go env GOARCH) > /usr/bin/yq && chmod +x /usr/bin/yq
|
||||||
|
|
||||||
|
ARG SELINUX=true
|
||||||
|
ENV SELINUX $SELINUX
|
||||||
|
|
||||||
|
ENV GO111MODULE off
|
||||||
|
ENV DAPPER_RUN_ARGS --privileged -v k3s-cache:/go/src/github.com/rancher/k3s/.cache -v trivy-cache:/root/.cache/trivy
|
||||||
|
ENV DAPPER_ENV REPO TAG DRONE_TAG IMAGE_NAME SKIP_VALIDATE GCLOUD_AUTH GITHUB_TOKEN GOLANG
|
||||||
|
ENV DAPPER_SOURCE /go/src/github.com/rancher/k3s/
|
||||||
|
ENV DAPPER_OUTPUT ./bin ./dist ./build/out
|
||||||
|
ENV DAPPER_DOCKER_SOCKET true
|
||||||
|
ENV HOME ${DAPPER_SOURCE}
|
||||||
|
ENV CROSS true
|
||||||
|
ENV STATIC_BUILD true
|
||||||
|
WORKDIR ${DAPPER_SOURCE}
|
||||||
|
|
||||||
|
ENTRYPOINT ["./scripts/entry.sh"]
|
||||||
|
CMD ["ci"]
|
|
@ -18,6 +18,12 @@ func NewEtcdSnapshotCommand(action func(*cli.Context) error) cli.Command {
|
||||||
DebugFlag,
|
DebugFlag,
|
||||||
LogFile,
|
LogFile,
|
||||||
AlsoLogToStderr,
|
AlsoLogToStderr,
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "node-name",
|
||||||
|
Usage: "(agent/node) Node name",
|
||||||
|
EnvVar: version.ProgramUpper + "_NODE_NAME",
|
||||||
|
Destination: &AgentConfig.NodeName,
|
||||||
|
},
|
||||||
cli.StringFlag{
|
cli.StringFlag{
|
||||||
Name: "data-dir,d",
|
Name: "data-dir,d",
|
||||||
Usage: "(data) Folder to hold state default /var/lib/rancher/" + version.Program + " or ${HOME}/.rancher/" + version.Program + " if not root",
|
Usage: "(data) Folder to hold state default /var/lib/rancher/" + version.Program + " or ${HOME}/.rancher/" + version.Program + " if not root",
|
||||||
|
|
|
@ -31,6 +31,17 @@ func run(app *cli.Context, cfg *cmds.Server) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nodeName := app.String("node-name")
|
||||||
|
if nodeName == "" {
|
||||||
|
h, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nodeName = h
|
||||||
|
}
|
||||||
|
|
||||||
|
os.Setenv("NODE_NAME", nodeName)
|
||||||
|
|
||||||
var serverConfig server.Config
|
var serverConfig server.Config
|
||||||
serverConfig.DisableAgent = true
|
serverConfig.DisableAgent = true
|
||||||
serverConfig.ControlConfig.DataDir = dataDir
|
serverConfig.ControlConfig.DataDir = dataDir
|
||||||
|
@ -50,6 +61,7 @@ func run(app *cli.Context, cfg *cmds.Server) error {
|
||||||
serverConfig.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt")
|
serverConfig.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt")
|
||||||
serverConfig.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt")
|
serverConfig.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt")
|
||||||
serverConfig.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key")
|
serverConfig.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key")
|
||||||
|
serverConfig.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")
|
||||||
|
|
||||||
ctx := signals.SetupSignalHandler(context.Background())
|
ctx := signals.SetupSignalHandler(context.Background())
|
||||||
|
|
||||||
|
@ -67,5 +79,11 @@ func run(app *cli.Context, cfg *cmds.Server) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
serverConfig.ControlConfig.Runtime.Core = sc.Core
|
||||||
|
|
||||||
return cluster.Snapshot(ctx, &serverConfig.ControlConfig)
|
return cluster.Snapshot(ctx, &serverConfig.ControlConfig)
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,6 +107,10 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := c.managedDB.StoreSnapshotData(ctx); err != nil {
|
||||||
|
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ type Driver interface {
|
||||||
Restore(ctx context.Context) error
|
Restore(ctx context.Context) error
|
||||||
EndpointName() string
|
EndpointName() string
|
||||||
Snapshot(ctx context.Context, config *config.Control) error
|
Snapshot(ctx context.Context, config *config.Control) error
|
||||||
|
StoreSnapshotData(ctx context.Context) error
|
||||||
GetMembersClientURLs(ctx context.Context) ([]string, error)
|
GetMembersClientURLs(ctx context.Context) ([]string, error)
|
||||||
RemoveSelf(ctx context.Context) error
|
RemoveSelf(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
257
pkg/etcd/etcd.go
257
pkg/etcd/etcd.go
|
@ -11,6 +11,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -18,6 +19,7 @@ import (
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/minio/minio-go/v7"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
certutil "github.com/rancher/dynamiclistener/cert"
|
certutil "github.com/rancher/dynamiclistener/cert"
|
||||||
"github.com/rancher/k3s/pkg/clientaccess"
|
"github.com/rancher/k3s/pkg/clientaccess"
|
||||||
|
@ -31,39 +33,11 @@ import (
|
||||||
"go.etcd.io/etcd/clientv3/snapshot"
|
"go.etcd.io/etcd/clientv3/snapshot"
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
"go.etcd.io/etcd/etcdserver/etcdserverpb"
|
"go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
)
|
"k8s.io/client-go/util/retry"
|
||||||
|
|
||||||
type ETCD struct {
|
|
||||||
client *etcd.Client
|
|
||||||
config *config.Control
|
|
||||||
name string
|
|
||||||
runtime *config.ControlRuntime
|
|
||||||
address string
|
|
||||||
cron *cron.Cron
|
|
||||||
s3 *s3
|
|
||||||
}
|
|
||||||
|
|
||||||
type learnerProgress struct {
|
|
||||||
ID uint64 `json:"id,omitempty"`
|
|
||||||
Name string `json:"name,omitempty"`
|
|
||||||
RaftAppliedIndex uint64 `json:"raftAppliedIndex,omitempty"`
|
|
||||||
LastProgress metav1.Time `json:"lastProgress,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewETCD creates a new value of type
|
|
||||||
// ETCD with an initialized cron value.
|
|
||||||
func NewETCD() *ETCD {
|
|
||||||
return &ETCD{
|
|
||||||
cron: cron.New(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
learnerProgressKey = version.Program + "/etcd/learnerProgress"
|
|
||||||
// AddressKey will contain the value of api addresses list
|
|
||||||
AddressKey = version.Program + "/apiaddresses"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -82,12 +56,47 @@ const (
|
||||||
maxBackupRetention = 5
|
maxBackupRetention = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
learnerProgressKey = version.Program + "/etcd/learnerProgress"
|
||||||
|
// AddressKey will contain the value of api addresses list
|
||||||
|
AddressKey = version.Program + "/apiaddresses"
|
||||||
|
|
||||||
|
snapshotConfigMapName = version.Program + "-etcd-snapshots"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ETCD struct {
|
||||||
|
client *etcd.Client
|
||||||
|
config *config.Control
|
||||||
|
name string
|
||||||
|
runtime *config.ControlRuntime
|
||||||
|
address string
|
||||||
|
cron *cron.Cron
|
||||||
|
s3 *s3
|
||||||
|
nodeName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type learnerProgress struct {
|
||||||
|
ID uint64 `json:"id,omitempty"`
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
RaftAppliedIndex uint64 `json:"raftAppliedIndex,omitempty"`
|
||||||
|
LastProgress metav1.Time `json:"lastProgress,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// Members contains a slice that holds all
|
// Members contains a slice that holds all
|
||||||
// members of the cluster.
|
// members of the cluster.
|
||||||
type Members struct {
|
type Members struct {
|
||||||
Members []*etcdserverpb.Member `json:"members"`
|
Members []*etcdserverpb.Member `json:"members"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewETCD creates a new value of type
|
||||||
|
// ETCD with an initialized cron value.
|
||||||
|
func NewETCD() *ETCD {
|
||||||
|
return &ETCD{
|
||||||
|
cron: cron.New(),
|
||||||
|
nodeName: os.Getenv("NODE_NAME"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// EndpointName returns the name of the endpoint.
|
// EndpointName returns the name of the endpoint.
|
||||||
func (e *ETCD) EndpointName() string {
|
func (e *ETCD) EndpointName() string {
|
||||||
return "etcd"
|
return "etcd"
|
||||||
|
@ -266,6 +275,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
|
||||||
if clientAccessInfo == nil {
|
if clientAccessInfo == nil {
|
||||||
return e.newCluster(ctx, false)
|
return e.newCluster(ctx, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = e.join(ctx, clientAccessInfo)
|
err = e.join(ctx, clientAccessInfo)
|
||||||
return errors.Wrap(err, "joining etcd cluster")
|
return errors.Wrap(err, "joining etcd cluster")
|
||||||
}
|
}
|
||||||
|
@ -816,7 +826,8 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
return errors.Wrap(err, "failed to get config for etcd snapshot")
|
return errors.Wrap(err, "failed to get config for etcd snapshot")
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotName := fmt.Sprintf("%s-%d", e.config.EtcdSnapshotName, time.Now().Unix())
|
nodeName := os.Getenv("NODE_NAME")
|
||||||
|
snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, time.Now().Unix())
|
||||||
snapshotPath := filepath.Join(snapshotDir, snapshotName)
|
snapshotPath := filepath.Join(snapshotDir, snapshotName)
|
||||||
|
|
||||||
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
|
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
|
||||||
|
@ -853,10 +864,186 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return e.StoreSnapshotData(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
type s3Config struct {
|
||||||
|
Endpoint string `json:"endpoint,omitempty"`
|
||||||
|
EndpointCA string `json:"endpointCA,omitempty"`
|
||||||
|
SkipSSLVerify bool `json:"skipSSLVerify,omitempty"`
|
||||||
|
Bucket string `json:"bucket,omitempty"`
|
||||||
|
Region string `json:"region,omitempty"`
|
||||||
|
Folder string `json:"folder,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshotFile represents a single snapshot and it's
|
||||||
|
// metadata.
|
||||||
|
type snapshotFile struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
// Location contains the full path of the snapshot. For
|
||||||
|
// local paths, the location will be prefixed with "file://".
|
||||||
|
Location string `json:"location,omitempty"`
|
||||||
|
NodeName string `json:"nodeName,omitempty"`
|
||||||
|
CreatedAt *metav1.Time `json:"createdAt,omitempty"`
|
||||||
|
Size int64 `json:"size,omitempty"`
|
||||||
|
S3 *s3Config `json:"s3Config,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// listSnapshots provides a list of the currently stored
|
||||||
|
// snapshots on disk or in S3 along with their relevant
|
||||||
|
// metadata.
|
||||||
|
func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]snapshotFile, error) {
|
||||||
|
var snapshots []snapshotFile
|
||||||
|
|
||||||
|
if e.config.EtcdS3 {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if e.s3 == nil {
|
||||||
|
s3, err := newS3(ctx, e.config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
e.s3 = s3
|
||||||
|
}
|
||||||
|
|
||||||
|
objects := e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, minio.ListObjectsOptions{})
|
||||||
|
|
||||||
|
for obj := range objects {
|
||||||
|
if obj.Err != nil {
|
||||||
|
return nil, obj.Err
|
||||||
|
}
|
||||||
|
if obj.Size == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ca, err := time.Parse(time.RFC3339, obj.LastModified.Format(time.RFC3339))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshots = append(snapshots, snapshotFile{
|
||||||
|
Name: filepath.Base(obj.Key),
|
||||||
|
NodeName: "s3",
|
||||||
|
CreatedAt: &metav1.Time{
|
||||||
|
Time: ca,
|
||||||
|
},
|
||||||
|
Size: obj.Size,
|
||||||
|
S3: &s3Config{
|
||||||
|
Endpoint: e.config.EtcdS3Endpoint,
|
||||||
|
EndpointCA: e.config.EtcdS3EndpointCA,
|
||||||
|
SkipSSLVerify: e.config.EtcdS3SkipSSLVerify,
|
||||||
|
Bucket: e.config.EtcdS3BucketName,
|
||||||
|
Region: e.config.EtcdS3Region,
|
||||||
|
Folder: e.config.EtcdS3Folder,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshots, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
files, err := ioutil.ReadDir(snapshotDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
snapshots = append(snapshots, snapshotFile{
|
||||||
|
Name: f.Name(),
|
||||||
|
Location: "file://" + filepath.Join(snapshotDir, f.Name()),
|
||||||
|
NodeName: e.nodeName,
|
||||||
|
CreatedAt: &metav1.Time{
|
||||||
|
Time: f.ModTime(),
|
||||||
|
},
|
||||||
|
Size: f.Size(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshots, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateSnapshotData populates the given map with the contents of the given slice.
|
||||||
|
func updateSnapshotData(data map[string]string, snapshotFiles []snapshotFile) error {
|
||||||
|
for _, v := range snapshotFiles {
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
data[v.Name] = string(b)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// setSnapshotFunction schedules snapshots at the configured interval
|
// StoreSnapshotData stores the given snapshot data in the "snapshots" ConfigMap.
|
||||||
|
func (e *ETCD) StoreSnapshotData(ctx context.Context) error {
|
||||||
|
logrus.Infof("Saving current etcd snapshot set to %s ConfigMap", snapshotConfigMapName)
|
||||||
|
|
||||||
|
snapshotDir, err := snapshotDir(e.config)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to get the snapshot dir")
|
||||||
|
}
|
||||||
|
|
||||||
|
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
||||||
|
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
|
||||||
|
}, func() error {
|
||||||
|
// make sure the core.Factory is initialize. There can
|
||||||
|
// be a race between this core code startup.
|
||||||
|
for e.config.Runtime.Core == nil {
|
||||||
|
runtime.Gosched()
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
|
||||||
|
|
||||||
|
snapshotFiles, err := e.listSnapshots(ctx, snapshotDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
data := make(map[string]string, len(snapshotFiles))
|
||||||
|
if err := updateSnapshotData(data, snapshotFiles); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if apierrors.IsNotFound(getErr) {
|
||||||
|
cm := v1.ConfigMap{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: snapshotConfigMapName,
|
||||||
|
Namespace: metav1.NamespaceSystem,
|
||||||
|
},
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
_, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(&cm)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if snapshotConfigMap.Data == nil {
|
||||||
|
snapshotConfigMap.Data = make(map[string]string, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove entries for this node only
|
||||||
|
for k, v := range snapshotConfigMap.Data {
|
||||||
|
var sf snapshotFile
|
||||||
|
if err := json.Unmarshal([]byte(v), &sf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if sf.NodeName == e.nodeName || sf.NodeName == "s3" {
|
||||||
|
delete(snapshotConfigMap.Data, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// this node's entries to the ConfigMap
|
||||||
|
for k, v := range data {
|
||||||
|
snapshotConfigMap.Data[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// setSnapshotFunction schedules snapshots at the configured interval.
|
||||||
func (e *ETCD) setSnapshotFunction(ctx context.Context) {
|
func (e *ETCD) setSnapshotFunction(ctx context.Context) {
|
||||||
e.cron.AddFunc(e.config.EtcdSnapshotCron, func() {
|
e.cron.AddFunc(e.config.EtcdSnapshotCron, func() {
|
||||||
if err := e.Snapshot(ctx, e.config); err != nil {
|
if err := e.Snapshot(ctx, e.config); err != nil {
|
||||||
|
@ -900,12 +1087,14 @@ func (e *ETCD) Restore(ctx context.Context) error {
|
||||||
// snapshotRetention iterates through the snapshots and removes the oldest
|
// snapshotRetention iterates through the snapshots and removes the oldest
|
||||||
// leaving the desired number of snapshots.
|
// leaving the desired number of snapshots.
|
||||||
func snapshotRetention(retention int, snapshotDir string) error {
|
func snapshotRetention(retention int, snapshotDir string) error {
|
||||||
|
nodeName := os.Getenv("NODE_NAME")
|
||||||
|
|
||||||
var snapshotFiles []os.FileInfo
|
var snapshotFiles []os.FileInfo
|
||||||
if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error {
|
if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if strings.HasPrefix(info.Name(), snapshotPrefix) {
|
if strings.HasPrefix(info.Name(), snapshotPrefix+nodeName) {
|
||||||
snapshotFiles = append(snapshotFiles, info)
|
snapshotFiles = append(snapshotFiles, info)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -141,21 +141,28 @@ func (s *s3) download(ctx context.Context) error {
|
||||||
return os.Chmod(fullSnapshotPath, 0600)
|
return os.Chmod(fullSnapshotPath, 0600)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// snapshotPrefix returns the prefix used in the
|
||||||
|
// naming of the snapshots.
|
||||||
|
func (s *s3) snapshotPrefix() string {
|
||||||
|
nodeName := os.Getenv("NODE_NAME")
|
||||||
|
fullSnapshotPrefix := snapshotPrefix + nodeName
|
||||||
|
var prefix string
|
||||||
|
if s.config.EtcdS3Folder != "" {
|
||||||
|
prefix = filepath.Join(s.config.EtcdS3Folder, fullSnapshotPrefix)
|
||||||
|
} else {
|
||||||
|
prefix = fullSnapshotPrefix
|
||||||
|
}
|
||||||
|
return prefix
|
||||||
|
}
|
||||||
|
|
||||||
// snapshotRetention deletes the given snapshot from the configured S3
|
// snapshotRetention deletes the given snapshot from the configured S3
|
||||||
// compatible backend.
|
// compatible backend.
|
||||||
func (s *s3) snapshotRetention(ctx context.Context) error {
|
func (s *s3) snapshotRetention(ctx context.Context) error {
|
||||||
var snapshotFiles []minio.ObjectInfo
|
var snapshotFiles []minio.ObjectInfo
|
||||||
|
|
||||||
var prefix string
|
|
||||||
if s.config.EtcdS3Folder != "" {
|
|
||||||
prefix = filepath.Join(s.config.EtcdS3Folder, snapshotPrefix)
|
|
||||||
} else {
|
|
||||||
prefix = snapshotPrefix
|
|
||||||
}
|
|
||||||
|
|
||||||
loo := minio.ListObjectsOptions{
|
loo := minio.ListObjectsOptions{
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
Prefix: prefix,
|
Prefix: s.snapshotPrefix(),
|
||||||
}
|
}
|
||||||
for info := range s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo) {
|
for info := range s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo) {
|
||||||
if info.Err != nil {
|
if info.Err != nil {
|
||||||
|
|
|
@ -32,7 +32,7 @@ func (c *Context) Start(ctx context.Context) error {
|
||||||
return start.All(ctx, 5, c.K3s, c.Helm, c.Apps, c.Auth, c.Batch, c.Core)
|
return start.All(ctx, 5, c.K3s, c.Helm, c.Apps, c.Auth, c.Batch, c.Core)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newContext(ctx context.Context, cfg string) (*Context, error) {
|
func NewContext(ctx context.Context, cfg string) (*Context, error) {
|
||||||
restConfig, err := clientcmd.BuildConfigFromFlags("", cfg)
|
restConfig, err := clientcmd.BuildConfigFromFlags("", cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -22,7 +22,7 @@ func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error {
|
||||||
for range t.C {
|
for range t.C {
|
||||||
controlConfig := &config.ControlConfig
|
controlConfig := &config.ControlConfig
|
||||||
|
|
||||||
sc, err := newContext(ctx, controlConfig.Runtime.KubeConfigAdmin)
|
sc, err := NewContext(ctx, controlConfig.Runtime.KubeConfigAdmin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Infof("Failed to set etcd role label: %v", err)
|
logrus.Infof("Failed to set etcd role label: %v", err)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -108,7 +108,7 @@ func startOnAPIServerReady(ctx context.Context, config *Config) {
|
||||||
func runControllers(ctx context.Context, config *Config) error {
|
func runControllers(ctx context.Context, config *Config) error {
|
||||||
controlConfig := &config.ControlConfig
|
controlConfig := &config.ControlConfig
|
||||||
|
|
||||||
sc, err := newContext(ctx, controlConfig.Runtime.KubeConfigAdmin)
|
sc, err := NewContext(ctx, controlConfig.Runtime.KubeConfigAdmin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue