mirror of https://github.com/k3s-io/k3s
kubectl apply retry stale resource version
parent
77de942e08
commit
69bcdc20ff
|
@ -31,8 +31,8 @@ type debugError interface {
|
|||
|
||||
// GetOriginalConfiguration retrieves the original configuration of the object
|
||||
// from the annotation, or nil if no annotation was found.
|
||||
func GetOriginalConfiguration(info *resource.Info) ([]byte, error) {
|
||||
annots, err := info.Mapping.MetadataAccessor.Annotations(info.Object)
|
||||
func GetOriginalConfiguration(mapping *meta.RESTMapping, obj runtime.Object) ([]byte, error) {
|
||||
annots, err := mapping.MetadataAccessor.Annotations(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -168,7 +168,7 @@ func GetModifiedConfiguration(info *resource.Info, annotate bool, codec runtime.
|
|||
// UpdateApplyAnnotation calls CreateApplyAnnotation if the last applied
|
||||
// configuration annotation is already present. Otherwise, it does nothing.
|
||||
func UpdateApplyAnnotation(info *resource.Info, codec runtime.Encoder) error {
|
||||
if original, err := GetOriginalConfiguration(info); err != nil || len(original) <= 0 {
|
||||
if original, err := GetOriginalConfiguration(info.Mapping, info.Object); err != nil || len(original) <= 0 {
|
||||
return err
|
||||
}
|
||||
return CreateApplyAnnotation(info, codec)
|
||||
|
|
|
@ -19,11 +19,14 @@ package cmd
|
|||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||
|
@ -38,6 +41,15 @@ type ApplyOptions struct {
|
|||
Recursive bool
|
||||
}
|
||||
|
||||
const (
|
||||
// maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
|
||||
maxPatchRetry = 5
|
||||
// backOffPeriod is the period to back off when apply patch resutls in error.
|
||||
backOffPeriod = 1 * time.Second
|
||||
// how many times we can retry before back off
|
||||
triesBeforeBackOff = 1
|
||||
)
|
||||
|
||||
const (
|
||||
apply_long = `Apply a configuration to a resource by filename or stdin.
|
||||
The resource will be created if it doesn't exist yet.
|
||||
|
@ -154,43 +166,16 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap
|
|||
return nil
|
||||
}
|
||||
|
||||
// Serialize the current configuration of the object from the server.
|
||||
current, err := runtime.Encode(encoder, info.Object)
|
||||
if err != nil {
|
||||
return cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", info), info.Source, err)
|
||||
}
|
||||
|
||||
// Retrieve the original configuration of the object from the annotation.
|
||||
original, err := kubectl.GetOriginalConfiguration(info)
|
||||
if err != nil {
|
||||
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", info), info.Source, err)
|
||||
}
|
||||
|
||||
// Create the versioned struct from the original from the server for
|
||||
// strategic patch.
|
||||
// TODO: Move all structs in apply to use raw data. Can be done once
|
||||
// builder has a RawResult method which delivers raw data instead of
|
||||
// internal objects.
|
||||
versionedObject, _, err := decoder.Decode(current, nil, nil)
|
||||
if err != nil {
|
||||
return cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", info), info.Source, err)
|
||||
}
|
||||
|
||||
// Compute a three way strategic merge patch to send to server.
|
||||
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
|
||||
if err != nil {
|
||||
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfrom:\n%v\nfor:"
|
||||
return cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current, info), info.Source, err)
|
||||
}
|
||||
|
||||
helper := resource.NewHelper(info.Client, info.Mapping)
|
||||
_, err = helper.Patch(info.Namespace, info.Name, api.StrategicMergePatchType, patch)
|
||||
patcher := NewPatcher(encoder, decoder, info.Mapping, helper)
|
||||
|
||||
patchBytes, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name)
|
||||
if err != nil {
|
||||
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patch, info), info.Source, err)
|
||||
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err)
|
||||
}
|
||||
|
||||
if cmdutil.ShouldRecord(cmd, info) {
|
||||
patch, err = cmdutil.ChangeResourcePatch(info, f.Command())
|
||||
patch, err := cmdutil.ChangeResourcePatch(info, f.Command())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -215,3 +200,74 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
type patcher struct {
|
||||
encoder runtime.Encoder
|
||||
decoder runtime.Decoder
|
||||
|
||||
mapping *meta.RESTMapping
|
||||
helper *resource.Helper
|
||||
|
||||
backOff clockwork.Clock
|
||||
}
|
||||
|
||||
func NewPatcher(encoder runtime.Encoder, decoder runtime.Decoder, mapping *meta.RESTMapping, helper *resource.Helper) *patcher {
|
||||
return &patcher{
|
||||
encoder: encoder,
|
||||
decoder: decoder,
|
||||
mapping: mapping,
|
||||
helper: helper,
|
||||
backOff: clockwork.NewRealClock(),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
|
||||
// Serialize the current configuration of the object from the server.
|
||||
current, err := runtime.Encode(p.encoder, obj)
|
||||
if err != nil {
|
||||
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
|
||||
}
|
||||
|
||||
// Retrieve the original configuration of the object from the annotation.
|
||||
original, err := kubectl.GetOriginalConfiguration(p.mapping, obj)
|
||||
if err != nil {
|
||||
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
|
||||
}
|
||||
|
||||
// Create the versioned struct from the original from the server for
|
||||
// strategic patch.
|
||||
// TODO: Move all structs in apply to use raw data. Can be done once
|
||||
// builder has a RawResult method which delivers raw data instead of
|
||||
// internal objects.
|
||||
versionedObject, _, err := p.decoder.Decode(current, nil, nil)
|
||||
if err != nil {
|
||||
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", obj), source, err)
|
||||
}
|
||||
|
||||
// Compute a three way strategic merge patch to send to server.
|
||||
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
|
||||
if err != nil {
|
||||
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
|
||||
return nil, cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current), source, err)
|
||||
}
|
||||
|
||||
_, err = p.helper.Patch(namespace, name, api.StrategicMergePatchType, patch)
|
||||
return patch, err
|
||||
}
|
||||
|
||||
func (p *patcher) patch(current runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
|
||||
var getErr error
|
||||
patchBytes, err := p.patchSimple(current, modified, source, namespace, name)
|
||||
for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
|
||||
if i > triesBeforeBackOff {
|
||||
p.backOff.Sleep(backOffPeriod)
|
||||
}
|
||||
current, getErr = p.helper.Get(namespace, name, false)
|
||||
if getErr != nil {
|
||||
return nil, getErr
|
||||
}
|
||||
patchBytes, err = p.patchSimple(current, modified, source, namespace, name)
|
||||
}
|
||||
|
||||
return patchBytes, err
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package cmd
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -29,7 +30,9 @@ import (
|
|||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/annotations"
|
||||
kubeerr "k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/fake"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
|
@ -213,6 +216,61 @@ func TestApplyObject(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestApplyRetry(t *testing.T) {
|
||||
initTestErrorHandler(t)
|
||||
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
|
||||
pathRC := "/namespaces/test/replicationcontrollers/" + nameRC
|
||||
|
||||
firstPatch := true
|
||||
retry := false
|
||||
getCount := 0
|
||||
f, tf, codec := NewAPIFactory()
|
||||
tf.Printer = &testPrinter{}
|
||||
tf.Client = &fake.RESTClient{
|
||||
Codec: codec,
|
||||
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
|
||||
switch p, m := req.URL.Path, req.Method; {
|
||||
case p == pathRC && m == "GET":
|
||||
getCount++
|
||||
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
|
||||
case p == pathRC && m == "PATCH":
|
||||
if firstPatch {
|
||||
firstPatch = false
|
||||
statusErr := kubeerr.NewConflict(unversioned.GroupResource{Group: "", Resource: "rc"}, "test-rc", fmt.Errorf("the object has been modified. Please apply at first."))
|
||||
bodyBytes, _ := json.Marshal(statusErr)
|
||||
bodyErr := ioutil.NopCloser(bytes.NewReader(bodyBytes))
|
||||
return &http.Response{StatusCode: http.StatusConflict, Header: defaultHeader(), Body: bodyErr}, nil
|
||||
}
|
||||
retry = true
|
||||
validatePatchApplication(t, req)
|
||||
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
tf.Namespace = "test"
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
|
||||
cmd := NewCmdApply(f, buf)
|
||||
cmd.Flags().Set("filename", filenameRC)
|
||||
cmd.Flags().Set("output", "name")
|
||||
cmd.Run(cmd, []string{})
|
||||
|
||||
if !retry || getCount != 2 {
|
||||
t.Fatalf("apply didn't retry when get conflict error")
|
||||
}
|
||||
|
||||
// uses the name from the file, not the response
|
||||
expectRC := "replicationcontroller/" + nameRC + "\n"
|
||||
if buf.String() != expectRC {
|
||||
t.Fatalf("unexpected output: %s\nexpected: %s", buf.String(), expectRC)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyNonExistObject(t *testing.T) {
|
||||
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
|
||||
pathRC := "/namespaces/test/replicationcontrollers"
|
||||
|
|
Loading…
Reference in New Issue