Elide old snapshot data when apiserver rejects configmap with ErrRequestEntityTooLarge

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 2b0e2e8ada)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/8642/head
Brad Davidson 2023-09-29 02:28:11 +00:00 committed by Brad Davidson
parent eed767ea74
commit 7a0eecd601
2 changed files with 155 additions and 89 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -92,7 +93,7 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
// upload uploads the given snapshot to the configured S3
// compatible backend.
func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*snapshotFile, error) {
func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) {
logrus.Infof("Uploading snapshot %s to S3", snapshot)
basename := filepath.Base(snapshot)
var snapshotFileName string
@ -115,7 +116,6 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
if err != nil {
sf = snapshotFile{
Name: filepath.Base(uploadInfo.Key),
Metadata: extraMetadata,
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
@ -132,6 +132,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
metadataSource: extraMetadata,
}
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
} else {
@ -142,7 +143,6 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
sf = snapshotFile{
Name: filepath.Base(uploadInfo.Key),
Metadata: extraMetadata,
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: ca,
@ -158,6 +158,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
metadataSource: extraMetadata,
}
}
return &sf, nil

View File

@ -12,6 +12,7 @@ import (
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"time"
@ -35,6 +36,7 @@ import (
const (
maxConcurrentSnapshots = 1
pruneStepSize = 5
compressedExtension = ".zip"
)
@ -187,7 +189,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
defer e.snapshotSem.Release(maxConcurrentSnapshots)
// make sure the core.Factory is initialized before attempting to add snapshot metadata
var extraMetadata string
var extraMetadata *v1.ConfigMap
if e.config.Runtime.Core == nil {
logrus.Debugf("Cannot retrieve extra metadata from %s ConfigMap: runtime core not ready", snapshotExtraMetadataConfigMapName)
} else {
@ -195,13 +197,8 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil {
logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
} else {
if m, err := json.Marshal(snapshotExtraMetadataConfigMap.Data); err != nil {
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
} else {
logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName)
logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m))
extraMetadata = base64.StdEncoding.EncodeToString(m)
}
logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName)
extraMetadata = snapshotExtraMetadataConfigMap
}
}
@ -259,15 +256,15 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
sf = &snapshotFile{
Name: snapshotName,
Location: "",
Metadata: extraMetadata,
NodeName: nodeName,
CreatedAt: &metav1.Time{
Time: now,
},
Status: failedSnapshotStatus,
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
Size: 0,
Compressed: e.config.EtcdSnapshotCompress,
Status: failedSnapshotStatus,
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
Size: 0,
Compressed: e.config.EtcdSnapshotCompress,
metadataSource: extraMetadata,
}
logrus.Errorf("Failed to take etcd snapshot: %v", err)
if err := e.addSnapshotData(*sf); err != nil {
@ -295,15 +292,15 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
}
sf = &snapshotFile{
Name: f.Name(),
Metadata: extraMetadata,
Location: "file://" + snapshotPath,
NodeName: nodeName,
CreatedAt: &metav1.Time{
Time: f.ModTime(),
},
Status: successfulSnapshotStatus,
Size: f.Size(),
Compressed: e.config.EtcdSnapshotCompress,
Status: successfulSnapshotStatus,
Size: f.Size(),
Compressed: e.config.EtcdSnapshotCompress,
metadataSource: extraMetadata,
}
if err := e.addSnapshotData(*sf); err != nil {
@ -321,7 +318,6 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
logrus.Warnf("Unable to initialize S3 client: %v", err)
sf = &snapshotFile{
Name: filepath.Base(snapshotPath),
Metadata: extraMetadata,
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
@ -338,6 +334,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
Folder: e.config.EtcdS3Folder,
Insecure: e.config.EtcdS3Insecure,
},
metadataSource: extraMetadata,
}
}
// sf should be nil if we were able to successfully initialize the S3 client.
@ -392,6 +389,8 @@ type snapshotFile struct {
Status snapshotStatus `json:"status,omitempty"`
S3 *s3Config `json:"s3Config,omitempty"`
Compressed bool `json:"compressed"`
metadataSource *v1.ConfigMap `json:"-"`
}
// listLocalSnapshots provides a list of the currently stored
@ -572,7 +571,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
for obj := range e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, opts) {
if obj.Err != nil {
logrus.Error(obj.Err)
logrus.Errorf("Failed to list snapshots from S3: %v", obj.Err)
return
}
@ -630,24 +629,40 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
return e.ReconcileSnapshotData(ctx)
}
func marshalSnapshotFile(sf snapshotFile) ([]byte, error) {
if sf.metadataSource != nil {
if m, err := json.Marshal(sf.metadataSource.Data); err != nil {
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
} else {
logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m))
sf.Metadata = base64.StdEncoding.EncodeToString(m)
}
}
return json.Marshal(sf)
}
// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata
// available at the time.
func (e *ETCD) addSnapshotData(sf snapshotFile) error {
// make sure the core.Factory is initialized. There can
// be a race between this core code startup.
for e.config.Runtime.Core == nil {
runtime.Gosched()
}
sfKey := generateSnapshotConfigMapKey(sf)
marshalledSnapshotFile, err := marshalSnapshotFile(sf)
if err != nil {
return err
}
pruneCount := pruneStepSize
var lastErr error
return retry.OnError(snapshotDataBackoff, func(err error) bool {
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err)
}, func() error {
// make sure the core.Factory is initialized. There can
// be a race between this core code startup.
for e.config.Runtime.Core == nil {
runtime.Gosched()
}
snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
sfKey := generateSnapshotConfigMapKey(sf)
marshalledSnapshotFile, err := json.Marshal(sf)
if err != nil {
return err
}
if apierrors.IsNotFound(getErr) {
cm := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
@ -664,10 +679,21 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error {
snapshotConfigMap.Data = make(map[string]string)
}
// If the configmap update was rejected due to size, drop the oldest entries from the map.
// We will continue to remove an increasing number of old snapshots from the map until the request succeeds,
// or the number we would attempt to remove exceeds the number stored.
if isTooLargeError(lastErr) {
logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount)
if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil {
return err
}
pruneCount += pruneStepSize
}
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshotFile)
_, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap)
return err
_, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap)
return lastErr
})
}
@ -679,34 +705,68 @@ func generateSnapshotConfigMapKey(sf snapshotFile) string {
return "local-" + name
}
// pruneConfigMap drops the oldest entries from the configMap.
// Note that the actual snapshot files are not removed, just the entries that track them in the configmap.
func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error {
if pruneCount > len(snapshotConfigMap.Data) {
return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots")
}
var snapshotFiles []snapshotFile
retention := len(snapshotConfigMap.Data) - pruneCount
for name := range snapshotConfigMap.Data {
basename, compressed := strings.CutSuffix(name, compressedExtension)
ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed})
}
// sort newest-first so we can prune entries past the retention count
sort.Slice(snapshotFiles, func(i, j int) bool {
return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt)
})
for _, snapshotFile := range snapshotFiles[retention:] {
delete(snapshotConfigMap.Data, snapshotFile.Name)
}
return nil
}
// ReconcileSnapshotData reconciles snapshot data in the snapshot ConfigMap.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots
// and reconcile snapshots from S3. Notably,
// and reconcile snapshots from S3.
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
// make sure the core.Factory is initialized. There can
// be a race between this core code startup.
for e.config.Runtime.Core == nil {
runtime.Gosched()
}
logrus.Infof("Reconciling etcd snapshot data in %s ConfigMap", snapshotConfigMapName)
defer logrus.Infof("Reconciliation of snapshot data in %s ConfigMap complete", snapshotConfigMapName)
pruneCount := pruneStepSize
var lastErr error
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err)
}, func() error {
// make sure the core.Factory is initialize. There can
// be a race between this core code startup.
for e.config.Runtime.Core == nil {
runtime.Gosched()
}
logrus.Debug("core.Factory is initialized")
snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
if apierrors.IsNotFound(getErr) {
// Can't reconcile what doesn't exist.
return errors.New("No snapshot configmap found")
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: snapshotConfigMapName,
Namespace: metav1.NamespaceSystem,
},
}
cm, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(cm)
if err != nil {
return err
}
snapshotConfigMap = cm
}
logrus.Debugf("Attempting to reconcile etcd snapshot data for configmap generation %d", snapshotConfigMap.Generation)
// if the snapshot config map data is nil, no need to reconcile.
if snapshotConfigMap.Data == nil {
return nil
snapshotConfigMap.Data = map[string]string{}
}
snapshotFiles, err := e.listLocalSnapshots()
@ -716,11 +776,11 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
// s3ListSuccessful is set to true if we are successful at listing snapshots from S3 to eliminate accidental
// clobbering of S3 snapshots in the configmap due to misconfigured S3 credentials/details
s3ListSuccessful := false
var s3ListSuccessful bool
if e.config.EtcdS3 {
if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil {
logrus.Errorf("error retrieving S3 snapshots for reconciliation: %v", err)
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
} else {
for k, v := range s3Snapshots {
snapshotFiles[k] = v
@ -764,21 +824,16 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
// Apply the failed snapshot retention policy to locally failed snapshots
if len(failedSnapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 {
// sort newest-first so we can record only the retention count
sort.Slice(failedSnapshots, func(i, j int) bool {
return failedSnapshots[i].Name > failedSnapshots[j].Name
return failedSnapshots[j].CreatedAt.Before(failedSnapshots[i].CreatedAt)
})
var keepCount int
if e.config.EtcdSnapshotRetention >= len(failedSnapshots) {
keepCount = len(failedSnapshots)
} else {
keepCount = e.config.EtcdSnapshotRetention
}
for _, dfs := range failedSnapshots[:keepCount] {
for _, dfs := range failedSnapshots[:e.config.EtcdSnapshotRetention] {
sfKey := generateSnapshotConfigMapKey(dfs)
marshalledSnapshot, err := json.Marshal(dfs)
marshalledSnapshot, err := marshalSnapshotFile(dfs)
if err != nil {
logrus.Errorf("unable to marshal snapshot to store in configmap %v", err)
logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err)
} else {
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot)
}
@ -787,21 +842,16 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
// Apply the failed snapshot retention policy to the S3 snapshots
if len(failedS3Snapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 {
// sort newest-first so we can record only the retention count
sort.Slice(failedS3Snapshots, func(i, j int) bool {
return failedS3Snapshots[i].Name > failedS3Snapshots[j].Name
return failedS3Snapshots[j].CreatedAt.Before(failedS3Snapshots[i].CreatedAt)
})
var keepCount int
if e.config.EtcdSnapshotRetention >= len(failedS3Snapshots) {
keepCount = len(failedS3Snapshots)
} else {
keepCount = e.config.EtcdSnapshotRetention
}
for _, dfs := range failedS3Snapshots[:keepCount] {
for _, dfs := range failedS3Snapshots[:e.config.EtcdSnapshotRetention] {
sfKey := generateSnapshotConfigMapKey(dfs)
marshalledSnapshot, err := json.Marshal(dfs)
marshalledSnapshot, err := marshalSnapshotFile(dfs)
if err != nil {
logrus.Errorf("unable to marshal snapshot to store in configmap %v", err)
logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err)
} else {
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot)
}
@ -815,7 +865,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
if v, ok := deletedSnapshots[sfKey]; ok {
// use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it
if err := json.Unmarshal([]byte(v), &sf); err != nil {
logrus.Errorf("error unmarshaling snapshot file: %v", err)
logrus.Errorf("Error unmarshaling snapshot file: %v", err)
// use the snapshot with info we sourced from disk/S3 (will be missing metadata, but something is better than nothing)
sf = snapshot
}
@ -824,18 +874,28 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
}
sf.Status = successfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful.
marshalledSnapshot, err := json.Marshal(sf)
marshalledSnapshot, err := marshalSnapshotFile(sf)
if err != nil {
logrus.Warnf("unable to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err)
logrus.Warnf("Failed to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err)
} else {
snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot)
}
}
// If the configmap update was rejected due to size, drop the oldest entries from the map.
// We will continue to remove an increasing number of old snapshots from the map until the request succeeds,
// or the number we would attempt to remove exceeds the number stored.
if isTooLargeError(lastErr) {
logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount)
if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil {
return err
}
pruneCount += pruneStepSize
}
logrus.Debugf("Updating snapshot ConfigMap (%s) with %d entries", snapshotConfigMapName, len(snapshotConfigMap.Data))
_, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap)
return err
_, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap)
return lastErr
})
}
@ -848,7 +908,7 @@ func (e *ETCD) setSnapshotFunction(ctx context.Context) {
// when updating the snapshot list configmap.
time.Sleep(time.Duration(rand.Float64() * float64(snapshotJitterMax)))
if err := e.Snapshot(ctx); err != nil {
logrus.Error(err)
logrus.Errorf("Failed to take scheduled snapshot: %v", err)
}
})))
}
@ -862,13 +922,15 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
logrus.Infof("Applying local snapshot retention policy: retention: %d, snapshotPrefix: %s, directory: %s", retention, snapshotPrefix, snapshotDir)
var snapshotFiles []os.FileInfo
var snapshotFiles []snapshotFile
if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if strings.HasPrefix(info.Name(), snapshotPrefix) {
snapshotFiles = append(snapshotFiles, info)
basename, compressed := strings.CutSuffix(info.Name(), compressedExtension)
ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
snapshotFiles = append(snapshotFiles, snapshotFile{Name: info.Name(), CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed})
}
return nil
}); err != nil {
@ -877,16 +939,14 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
if len(snapshotFiles) <= retention {
return nil
}
sort.Slice(snapshotFiles, func(firstSnapshot, secondSnapshot int) bool {
// it takes the name from the snapshot file ex: etcd-snapshot-example-{date}, makes the split using "-" to find the date, takes the date and sort by date
firstSnapshotName, secondSnapshotName := strings.Split(snapshotFiles[firstSnapshot].Name(), "-"), strings.Split(snapshotFiles[secondSnapshot].Name(), "-")
firstSnapshotDate, secondSnapshotDate := firstSnapshotName[len(firstSnapshotName)-1], secondSnapshotName[len(secondSnapshotName)-1]
return firstSnapshotDate < secondSnapshotDate
// sort newest-first so we can prune entries past the retention count
sort.Slice(snapshotFiles, func(i, j int) bool {
return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt)
})
delCount := len(snapshotFiles) - retention
for _, df := range snapshotFiles[:delCount] {
snapshotPath := filepath.Join(snapshotDir, df.Name())
for _, df := range snapshotFiles[retention:] {
snapshotPath := filepath.Join(snapshotDir, df.Name)
logrus.Infof("Removing local snapshot %s", snapshotPath)
if err := os.Remove(snapshotPath); err != nil {
return err
@ -895,3 +955,8 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
return nil
}
func isTooLargeError(err error) bool {
// There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string.
return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long"))
}