|
|
@ -32,13 +32,16 @@ import (
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
|
|
|
|
|
|
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
|
|
|
"k8s.io/apimachinery/pkg/types"
|
|
|
|
"k8s.io/apimachinery/pkg/types"
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
|
|
|
|
"k8s.io/client-go/tools/pager"
|
|
|
|
"k8s.io/client-go/util/retry"
|
|
|
|
"k8s.io/client-go/util/retry"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
const (
|
|
|
|
errorTTL = 24 * time.Hour
|
|
|
|
errorTTL = 24 * time.Hour
|
|
|
|
|
|
|
|
snapshotListPageSize = 20
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
var (
|
|
|
@ -720,18 +723,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// List all snapshots matching the selector
|
|
|
|
|
|
|
|
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
|
|
|
|
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
|
|
|
|
esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
|
|
|
|
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
|
|
|
|
if err != nil {
|
|
|
|
snapshotPager.PageSize = snapshotListPageSize
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// List all snapshots matching the selector
|
|
|
|
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
|
|
|
|
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
|
|
|
|
// If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes.
|
|
|
|
// If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes.
|
|
|
|
// The one exception to the last rule is failed snapshots - these must be retained for a period of time.
|
|
|
|
// The one exception to the last rule is failed snapshots - these must be retained for a period of time.
|
|
|
|
for _, esf := range esfList.Items {
|
|
|
|
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
|
|
|
|
sfKey := generateETCDSnapshotFileConfigMapKey(esf)
|
|
|
|
esf, ok := obj.(*k3s.ETCDSnapshotFile)
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
|
|
return errors.New("failed to convert object to ETCDSnapshotFile")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
|
|
|
|
logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
|
|
|
|
logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
|
|
|
|
if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
|
|
|
|
if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
|
|
|
|
// exists in both and names match, don't need to sync
|
|
|
|
// exists in both and names match, don't need to sync
|
|
|
@ -741,7 +746,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
|
|
|
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
|
|
|
|
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
|
|
|
|
expires := esf.Status.Error.Time.Add(errorTTL)
|
|
|
|
expires := esf.Status.Error.Time.Add(errorTTL)
|
|
|
|
if time.Now().Before(expires) {
|
|
|
|
if time.Now().Before(expires) {
|
|
|
|
continue
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if ok {
|
|
|
|
if ok {
|
|
|
@ -754,6 +759,9 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
|
|
|
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
|
|
|
|
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}); err != nil {
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created
|
|
|
|
// Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created
|
|
|
@ -794,15 +802,18 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// List and remove all snapshots stored on nodes that do not match the selector
|
|
|
|
// List and remove all snapshots stored on nodes that do not match the selector
|
|
|
|
esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
|
|
|
|
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
|
|
|
|
if err != nil {
|
|
|
|
esf, ok := obj.(*k3s.ETCDSnapshotFile)
|
|
|
|
return err
|
|
|
|
if !ok {
|
|
|
|
}
|
|
|
|
return errors.New("failed to convert object to ETCDSnapshotFile")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, esf := range esfList.Items {
|
|
|
|
|
|
|
|
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
|
|
|
|
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
|
|
|
|
logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err)
|
|
|
|
logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}); err != nil {
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
|
|
|
|
// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
|
|
|
|