From 69bcdc20ffe93d0f4b285570ebf12752276b86a3 Mon Sep 17 00:00:00 2001 From: AdoHe Date: Thu, 9 Jun 2016 00:14:17 -0400 Subject: [PATCH] kubectl apply retry stale resource version --- pkg/kubectl/apply.go | 6 +- pkg/kubectl/cmd/apply.go | 120 +++++++++++++++++++++++++--------- pkg/kubectl/cmd/apply_test.go | 58 ++++++++++++++++ 3 files changed, 149 insertions(+), 35 deletions(-) diff --git a/pkg/kubectl/apply.go b/pkg/kubectl/apply.go index c75c4f8a01..1836fc2507 100644 --- a/pkg/kubectl/apply.go +++ b/pkg/kubectl/apply.go @@ -31,8 +31,8 @@ type debugError interface { // GetOriginalConfiguration retrieves the original configuration of the object // from the annotation, or nil if no annotation was found. -func GetOriginalConfiguration(info *resource.Info) ([]byte, error) { - annots, err := info.Mapping.MetadataAccessor.Annotations(info.Object) +func GetOriginalConfiguration(mapping *meta.RESTMapping, obj runtime.Object) ([]byte, error) { + annots, err := mapping.MetadataAccessor.Annotations(obj) if err != nil { return nil, err } @@ -168,7 +168,7 @@ func GetModifiedConfiguration(info *resource.Info, annotate bool, codec runtime. // UpdateApplyAnnotation calls CreateApplyAnnotation if the last applied // configuration annotation is already present. Otherwise, it does nothing. func UpdateApplyAnnotation(info *resource.Info, codec runtime.Encoder) error { - if original, err := GetOriginalConfiguration(info); err != nil || len(original) <= 0 { + if original, err := GetOriginalConfiguration(info.Mapping, info.Object); err != nil || len(original) <= 0 { return err } return CreateApplyAnnotation(info, codec) diff --git a/pkg/kubectl/cmd/apply.go b/pkg/kubectl/cmd/apply.go index d63b79200c..8236662d27 100644 --- a/pkg/kubectl/cmd/apply.go +++ b/pkg/kubectl/cmd/apply.go @@ -19,11 +19,14 @@ package cmd import ( "fmt" "io" + "time" + "github.com/jonboulle/clockwork" "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/kubectl" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" @@ -38,6 +41,15 @@ type ApplyOptions struct { Recursive bool } +const ( + // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure + maxPatchRetry = 5 + // backOffPeriod is the period to back off when apply patch resutls in error. + backOffPeriod = 1 * time.Second + // how many times we can retry before back off + triesBeforeBackOff = 1 +) + const ( apply_long = `Apply a configuration to a resource by filename or stdin. The resource will be created if it doesn't exist yet. @@ -154,43 +166,16 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap return nil } - // Serialize the current configuration of the object from the server. - current, err := runtime.Encode(encoder, info.Object) - if err != nil { - return cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", info), info.Source, err) - } - - // Retrieve the original configuration of the object from the annotation. - original, err := kubectl.GetOriginalConfiguration(info) - if err != nil { - return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", info), info.Source, err) - } - - // Create the versioned struct from the original from the server for - // strategic patch. - // TODO: Move all structs in apply to use raw data. Can be done once - // builder has a RawResult method which delivers raw data instead of - // internal objects. - versionedObject, _, err := decoder.Decode(current, nil, nil) - if err != nil { - return cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", info), info.Source, err) - } - - // Compute a three way strategic merge patch to send to server. - patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true) - if err != nil { - format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfrom:\n%v\nfor:" - return cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current, info), info.Source, err) - } - helper := resource.NewHelper(info.Client, info.Mapping) - _, err = helper.Patch(info.Namespace, info.Name, api.StrategicMergePatchType, patch) + patcher := NewPatcher(encoder, decoder, info.Mapping, helper) + + patchBytes, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name) if err != nil { - return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patch, info), info.Source, err) + return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err) } if cmdutil.ShouldRecord(cmd, info) { - patch, err = cmdutil.ChangeResourcePatch(info, f.Command()) + patch, err := cmdutil.ChangeResourcePatch(info, f.Command()) if err != nil { return err } @@ -215,3 +200,74 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap return nil } + +type patcher struct { + encoder runtime.Encoder + decoder runtime.Decoder + + mapping *meta.RESTMapping + helper *resource.Helper + + backOff clockwork.Clock +} + +func NewPatcher(encoder runtime.Encoder, decoder runtime.Decoder, mapping *meta.RESTMapping, helper *resource.Helper) *patcher { + return &patcher{ + encoder: encoder, + decoder: decoder, + mapping: mapping, + helper: helper, + backOff: clockwork.NewRealClock(), + } +} + +func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) { + // Serialize the current configuration of the object from the server. + current, err := runtime.Encode(p.encoder, obj) + if err != nil { + return nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err) + } + + // Retrieve the original configuration of the object from the annotation. + original, err := kubectl.GetOriginalConfiguration(p.mapping, obj) + if err != nil { + return nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err) + } + + // Create the versioned struct from the original from the server for + // strategic patch. + // TODO: Move all structs in apply to use raw data. Can be done once + // builder has a RawResult method which delivers raw data instead of + // internal objects. + versionedObject, _, err := p.decoder.Decode(current, nil, nil) + if err != nil { + return nil, cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", obj), source, err) + } + + // Compute a three way strategic merge patch to send to server. + patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true) + if err != nil { + format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:" + return nil, cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current), source, err) + } + + _, err = p.helper.Patch(namespace, name, api.StrategicMergePatchType, patch) + return patch, err +} + +func (p *patcher) patch(current runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) { + var getErr error + patchBytes, err := p.patchSimple(current, modified, source, namespace, name) + for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ { + if i > triesBeforeBackOff { + p.backOff.Sleep(backOffPeriod) + } + current, getErr = p.helper.Get(namespace, name, false) + if getErr != nil { + return nil, getErr + } + patchBytes, err = p.patchSimple(current, modified, source, namespace, name) + } + + return patchBytes, err +} diff --git a/pkg/kubectl/cmd/apply_test.go b/pkg/kubectl/cmd/apply_test.go index a711802d7b..dc1217bc9d 100644 --- a/pkg/kubectl/cmd/apply_test.go +++ b/pkg/kubectl/cmd/apply_test.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "encoding/json" + "fmt" "io/ioutil" "net/http" "os" @@ -29,7 +30,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/annotations" + kubeerr "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/fake" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/runtime" @@ -213,6 +216,61 @@ func TestApplyObject(t *testing.T) { } } +func TestApplyRetry(t *testing.T) { + initTestErrorHandler(t) + nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC) + pathRC := "/namespaces/test/replicationcontrollers/" + nameRC + + firstPatch := true + retry := false + getCount := 0 + f, tf, codec := NewAPIFactory() + tf.Printer = &testPrinter{} + tf.Client = &fake.RESTClient{ + Codec: codec, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + switch p, m := req.URL.Path, req.Method; { + case p == pathRC && m == "GET": + getCount++ + bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC)) + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil + case p == pathRC && m == "PATCH": + if firstPatch { + firstPatch = false + statusErr := kubeerr.NewConflict(unversioned.GroupResource{Group: "", Resource: "rc"}, "test-rc", fmt.Errorf("the object has been modified. Please apply at first.")) + bodyBytes, _ := json.Marshal(statusErr) + bodyErr := ioutil.NopCloser(bytes.NewReader(bodyBytes)) + return &http.Response{StatusCode: http.StatusConflict, Header: defaultHeader(), Body: bodyErr}, nil + } + retry = true + validatePatchApplication(t, req) + bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC)) + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil + default: + t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) + return nil, nil + } + }), + } + tf.Namespace = "test" + buf := bytes.NewBuffer([]byte{}) + + cmd := NewCmdApply(f, buf) + cmd.Flags().Set("filename", filenameRC) + cmd.Flags().Set("output", "name") + cmd.Run(cmd, []string{}) + + if !retry || getCount != 2 { + t.Fatalf("apply didn't retry when get conflict error") + } + + // uses the name from the file, not the response + expectRC := "replicationcontroller/" + nameRC + "\n" + if buf.String() != expectRC { + t.Fatalf("unexpected output: %s\nexpected: %s", buf.String(), expectRC) + } +} + func TestApplyNonExistObject(t *testing.T) { nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC) pathRC := "/namespaces/test/replicationcontrollers"