mirror of https://github.com/k3s-io/k3s
Use pagination when retrieving etcd snapshot list
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit c2216a62ad
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/10674/head
parent
95b2dec026
commit
c936fc02d0
|
@ -32,13 +32,16 @@ import (
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/tools/pager"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
errorTTL = 24 * time.Hour
|
errorTTL = 24 * time.Hour
|
||||||
|
snapshotListPageSize = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -720,18 +723,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// List all snapshots matching the selector
|
|
||||||
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
|
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
|
||||||
esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
|
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
|
||||||
if err != nil {
|
snapshotPager.PageSize = snapshotListPageSize
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// List all snapshots matching the selector
|
||||||
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
|
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
|
||||||
// If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes.
|
// If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes.
|
||||||
// The one exception to the last rule is failed snapshots - these must be retained for a period of time.
|
// The one exception to the last rule is failed snapshots - these must be retained for a period of time.
|
||||||
for _, esf := range esfList.Items {
|
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
|
||||||
sfKey := generateETCDSnapshotFileConfigMapKey(esf)
|
esf, ok := obj.(*k3s.ETCDSnapshotFile)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("failed to convert object to ETCDSnapshotFile")
|
||||||
|
}
|
||||||
|
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
|
||||||
logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
|
logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
|
||||||
if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
|
if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
|
||||||
// exists in both and names match, don't need to sync
|
// exists in both and names match, don't need to sync
|
||||||
|
@ -741,7 +746,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
|
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
|
||||||
expires := esf.Status.Error.Time.Add(errorTTL)
|
expires := esf.Status.Error.Time.Add(errorTTL)
|
||||||
if time.Now().Before(expires) {
|
if time.Now().Before(expires) {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -754,6 +759,9 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
|
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created
|
// Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created
|
||||||
|
@ -794,15 +802,18 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// List and remove all snapshots stored on nodes that do not match the selector
|
// List and remove all snapshots stored on nodes that do not match the selector
|
||||||
esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
|
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
|
||||||
if err != nil {
|
esf, ok := obj.(*k3s.ETCDSnapshotFile)
|
||||||
return err
|
if !ok {
|
||||||
}
|
return errors.New("failed to convert object to ETCDSnapshotFile")
|
||||||
|
}
|
||||||
|
|
||||||
for _, esf := range esfList.Items {
|
|
||||||
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
|
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
|
||||||
logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err)
|
logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
|
// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
|
apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
|
||||||
|
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
|
||||||
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
|
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
|
||||||
controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
|
controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
|
||||||
"github.com/k3s-io/k3s/pkg/util"
|
"github.com/k3s-io/k3s/pkg/util"
|
||||||
|
@ -20,7 +21,9 @@ import (
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/tools/pager"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -216,20 +219,25 @@ func (e *etcdSnapshotHandler) reconcile() error {
|
||||||
logrus.Infof("Reconciling snapshot ConfigMap data")
|
logrus.Infof("Reconciling snapshot ConfigMap data")
|
||||||
|
|
||||||
// Get a list of existing snapshots
|
// Get a list of existing snapshots
|
||||||
snapshotList, err := e.snapshots.List(metav1.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshots := map[string]*apisv1.ETCDSnapshotFile{}
|
snapshots := map[string]*apisv1.ETCDSnapshotFile{}
|
||||||
for i := range snapshotList.Items {
|
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return e.snapshots.List(opts) }))
|
||||||
esf := &snapshotList.Items[i]
|
snapshotPager.PageSize = snapshotListPageSize
|
||||||
|
|
||||||
|
if err := snapshotPager.EachListItem(e.ctx, metav1.ListOptions{}, func(obj k8sruntime.Object) error {
|
||||||
|
esf, ok := obj.(*k3s.ETCDSnapshotFile)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("failed to convert object to ETCDSnapshotFile")
|
||||||
|
}
|
||||||
|
|
||||||
// Do not create entries for snapshots that have been deleted or do not have extra metadata
|
// Do not create entries for snapshots that have been deleted or do not have extra metadata
|
||||||
if !esf.DeletionTimestamp.IsZero() || len(esf.Spec.Metadata) == 0 {
|
if !esf.DeletionTimestamp.IsZero() || len(esf.Spec.Metadata) == 0 {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
|
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
|
||||||
snapshots[sfKey] = esf
|
snapshots[sfKey] = esf
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
|
snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
|
||||||
|
|
Loading…
Reference in New Issue