mirror of https://github.com/k3s-io/k3s
Merge pull request #48394 from smarterclayton/must_serialize_if_data_differs
Automatic merge from submit-queue (batch tested with PRs 48439, 48440, 48394) GuaranteedUpdate must write if stored data is not canonical An optimization added to the GuaranteedUpdate loop changed the comparison of the current objects serialization against the stored data, instead comparing to the in memory object, which defeated the mechanism we use to migrate stored data (GET then PUT should update the version stored in etcd if the canonical serialization has changed) This commit preserves that optimization but correctly verifies the in memory serialization against the on disk serialization by fetching the latest serialized data. Since most updates are not no-ops, this should not regress the performance of the normal path. Fixes #48393 ```release-note When performing a GET then PUT, the kube-apiserver must write the canonical representation of the object to etcd if the current value does not match. That allows external agents to migrate content in etcd from one API version to another, across different storage types, or across varying encryption levels. This fixes a bug introduced in 1.5 where we unintentionally stopped writing the newest data. ```pull/6/head
commit
4ae3b032f4
|
@ -264,11 +264,13 @@ func (s *store) GuaranteedUpdate(
|
|||
key = path.Join(s.pathPrefix, key)
|
||||
|
||||
var origState *objState
|
||||
var mustCheckData bool
|
||||
if len(suggestion) == 1 && suggestion[0] != nil {
|
||||
origState, err = s.getStateFromObject(suggestion[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = true
|
||||
} else {
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
if err != nil {
|
||||
|
@ -297,6 +299,21 @@ func (s *store) GuaranteedUpdate(
|
|||
return err
|
||||
}
|
||||
if !origState.stale && bytes.Equal(data, origState.data) {
|
||||
// if we skipped the original Get in this loop, we must refresh from
|
||||
// etcd in order to be sure the data in the store is equivalent to
|
||||
// our desired serialization
|
||||
if mustCheckData {
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origState, err = s.getState(getResp, key, v, ignoreNotFound)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = false
|
||||
continue
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
}
|
||||
|
||||
|
@ -330,6 +347,7 @@ func (s *store) GuaranteedUpdate(
|
|||
return err
|
||||
}
|
||||
trace.Step("Retry value restored")
|
||||
mustCheckData = false
|
||||
continue
|
||||
}
|
||||
putResp := txnResp.Responses[0].GetResponsePut()
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"bytes"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
|
@ -459,6 +460,41 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
|||
testCheckEventType(t, watch.Deleted, w)
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key := "/somekey"
|
||||
|
||||
// serialize input into etcd with data that would be normalized by a write - in this case, leading
|
||||
// and trailing whitespace
|
||||
codec := codecs.LegacyCodec(examplev1.SchemeGroupVersion)
|
||||
data, err := runtime.Encode(codec, input)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp, err := store.client.Put(ctx, key, "test! "+string(data)+" ")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// this update should write the canonical value to etcd because the new serialization differs
|
||||
// from the stored serialization
|
||||
input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10)
|
||||
out := &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
return input, nil, nil
|
||||
}, input)
|
||||
if err != nil {
|
||||
t.Fatalf("Update failed: %v", err)
|
||||
}
|
||||
if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) {
|
||||
t.Errorf("guaranteed update should have updated the serialized data, got %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
@ -569,10 +605,10 @@ func TestTransformationFailure(t *testing.T) {
|
|||
}); !storage.IsInternalError(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
// GuaranteedUpdate with suggestion should not return an error if we don't change the object
|
||||
// GuaranteedUpdate with suggestion should return an error if we don't change the object
|
||||
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
||||
return input, nil, nil
|
||||
}, preset[1].obj); err != nil {
|
||||
}, preset[1].obj); err == nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue