diff --git a/pkg/kubectl/cmd/apply/apply.go b/pkg/kubectl/cmd/apply/apply.go index 08919f23b2..a0fb2dd4ef 100644 --- a/pkg/kubectl/cmd/apply/apply.go +++ b/pkg/kubectl/cmd/apply/apply.go @@ -17,6 +17,7 @@ limitations under the License. package apply import ( + "encoding/json" "fmt" "io" "strings" @@ -436,6 +437,7 @@ func (o *ApplyOptions) Run() error { GracePeriod: o.DeleteOptions.GracePeriod, ServerDryRun: o.ServerDryRun, OpenapiSchema: openapiSchema, + Retries: maxPatchRetry, } patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut) @@ -699,6 +701,12 @@ type Patcher struct { GracePeriod int ServerDryRun bool + // If set, forces the patch against a specific resourceVersion + ResourceVersion *string + + // Number of retries to make if the patch fails with conflict + Retries int + OpenapiSchema openapi.Resources } @@ -741,6 +749,22 @@ func (v *DryRunVerifier) HasSupport(gvk schema.GroupVersionKind) error { return nil } +func addResourceVersion(patch []byte, rv string) ([]byte, error) { + var patchMap map[string]interface{} + err := json.Unmarshal(patch, &patchMap) + if err != nil { + return nil, err + } + u := unstructured.Unstructured{Object: patchMap} + a, err := meta.Accessor(&u) + if err != nil { + return nil, err + } + a.SetResourceVersion(rv) + + return json.Marshal(patchMap) +} + func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { // Serialize the current configuration of the object from the server. current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) @@ -812,6 +836,13 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names return patch, obj, nil } + if p.ResourceVersion != nil { + patch, err = addResourceVersion(patch, *p.ResourceVersion) + if err != nil { + return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err) + } + } + options := metav1.UpdateOptions{} if p.ServerDryRun { options.DryRun = []string{metav1.DryRunAll} @@ -824,7 +855,10 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { var getErr error patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut) - for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ { + if p.Retries == 0 { + p.Retries = maxPatchRetry + } + for i := 1; i <= p.Retries && errors.IsConflict(err); i++ { if i > triesBeforeBackOff { p.BackOff.Sleep(backOffPeriod) } diff --git a/pkg/kubectl/cmd/diff/BUILD b/pkg/kubectl/cmd/diff/BUILD index 4a99f16125..93e0fb64eb 100644 --- a/pkg/kubectl/cmd/diff/BUILD +++ b/pkg/kubectl/cmd/diff/BUILD @@ -14,6 +14,7 @@ go_library( "//pkg/kubectl/util/i18n:go_default_library", "//pkg/kubectl/util/templates:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", @@ -21,6 +22,7 @@ go_library( "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library", "//vendor/github.com/jonboulle/clockwork:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", + "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], diff --git a/pkg/kubectl/cmd/diff/diff.go b/pkg/kubectl/cmd/diff/diff.go index 9c52dbfb6b..89ee7f360e 100644 --- a/pkg/kubectl/cmd/diff/diff.go +++ b/pkg/kubectl/cmd/diff/diff.go @@ -26,11 +26,13 @@ import ( "github.com/jonboulle/clockwork" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/klog" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/apply" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" @@ -60,6 +62,9 @@ var ( cat service.yaml | kubectl diff -f -`)) ) +// Number of times we try to diff before giving-up +const maxRetries = 4 + type DiffOptions struct { FilenameOptions resource.FilenameOptions } @@ -228,6 +233,7 @@ type InfoObject struct { Info *resource.Info Encoder runtime.Encoder OpenAPI openapi.Resources + Force bool } var _ Object = &InfoObject{} @@ -251,6 +257,16 @@ func (obj InfoObject) Merged() (runtime.Object, error) { ) } + var resourceVersion *string + if !obj.Force { + accessor, err := meta.Accessor(obj.Info.Object) + if err != nil { + return nil, err + } + str := accessor.GetResourceVersion() + resourceVersion = &str + } + modified, err := kubectl.GetModifiedConfiguration(obj.LocalObj, false, unstructured.UnstructuredJSONScheme) if err != nil { return nil, err @@ -259,12 +275,13 @@ func (obj InfoObject) Merged() (runtime.Object, error) { // This is using the patcher from apply, to keep the same behavior. // We plan on replacing this with server-side apply when it becomes available. patcher := &apply.Patcher{ - Mapping: obj.Info.Mapping, - Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping), - Overwrite: true, - BackOff: clockwork.NewRealClock(), - ServerDryRun: true, - OpenapiSchema: obj.OpenAPI, + Mapping: obj.Info.Mapping, + Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping), + Overwrite: true, + BackOff: clockwork.NewRealClock(), + ServerDryRun: true, + OpenapiSchema: obj.OpenAPI, + ResourceVersion: resourceVersion, } _, result, err := patcher.Patch(obj.Info.Object, modified, obj.Info.Source, obj.Info.Namespace, obj.Info.Name, nil) @@ -319,6 +336,10 @@ func (d *Differ) TearDown() { d.To.Dir.Delete() // Ignore error } +func isConflict(err error) bool { + return err != nil && errors.IsConflict(err) +} + // RunDiff uses the factory to parse file arguments, find the version to // diff, and find each Info object for each files, and runs against the // differ. @@ -376,21 +397,36 @@ func RunDiff(f cmdutil.Factory, diff *DiffProgram, options *DiffOptions) error { } local := info.Object.DeepCopyObject() - if err := info.Get(); err != nil { - if !errors.IsNotFound(err) { - return err + for i := 1; i <= maxRetries; i++ { + if err = info.Get(); err != nil { + if !errors.IsNotFound(err) { + return err + } + info.Object = nil } - info.Object = nil - } - obj := InfoObject{ - LocalObj: local, - Info: info, - Encoder: scheme.DefaultJSONEncoder(), - OpenAPI: schema, - } + force := i == maxRetries + if force { + klog.Warningf( + "Object (%v: %v) keeps changing, diffing without lock", + info.Object.GetObjectKind().GroupVersionKind(), + info.Name, + ) + } + obj := InfoObject{ + LocalObj: local, + Info: info, + Encoder: scheme.DefaultJSONEncoder(), + OpenAPI: schema, + Force: force, + } - return differ.Diff(obj, printer) + err = differ.Diff(obj, printer) + if !isConflict(err) { + break + } + } + return err }) if err != nil { return err