Merge pull request #71156 from apelisse/race-condition-diff

Optimistic-locking on diff
pull/58/head
k8s-ci-robot 2018-11-21 03:35:04 -08:00 committed by GitHub
commit 28182b6606
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 19 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package apply package apply
import ( import (
"encoding/json"
"fmt" "fmt"
"io" "io"
"strings" "strings"
@ -436,6 +437,7 @@ func (o *ApplyOptions) Run() error {
GracePeriod: o.DeleteOptions.GracePeriod, GracePeriod: o.DeleteOptions.GracePeriod,
ServerDryRun: o.ServerDryRun, ServerDryRun: o.ServerDryRun,
OpenapiSchema: openapiSchema, OpenapiSchema: openapiSchema,
Retries: maxPatchRetry,
} }
patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut) patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
@ -699,6 +701,12 @@ type Patcher struct {
GracePeriod int GracePeriod int
ServerDryRun bool ServerDryRun bool
// If set, forces the patch against a specific resourceVersion
ResourceVersion *string
// Number of retries to make if the patch fails with conflict
Retries int
OpenapiSchema openapi.Resources OpenapiSchema openapi.Resources
} }
@ -741,6 +749,22 @@ func (v *DryRunVerifier) HasSupport(gvk schema.GroupVersionKind) error {
return nil return nil
} }
func addResourceVersion(patch []byte, rv string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patch, &patchMap)
if err != nil {
return nil, err
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, err
}
a.SetResourceVersion(rv)
return json.Marshal(patchMap)
}
func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
// Serialize the current configuration of the object from the server. // Serialize the current configuration of the object from the server.
current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
@ -812,6 +836,13 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names
return patch, obj, nil return patch, obj, nil
} }
if p.ResourceVersion != nil {
patch, err = addResourceVersion(patch, *p.ResourceVersion)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err)
}
}
options := metav1.UpdateOptions{} options := metav1.UpdateOptions{}
if p.ServerDryRun { if p.ServerDryRun {
options.DryRun = []string{metav1.DryRunAll} options.DryRun = []string{metav1.DryRunAll}
@ -824,7 +855,10 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names
func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) { func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
var getErr error var getErr error
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut) patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ { if p.Retries == 0 {
p.Retries = maxPatchRetry
}
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
if i > triesBeforeBackOff { if i > triesBeforeBackOff {
p.BackOff.Sleep(backOffPeriod) p.BackOff.Sleep(backOffPeriod)
} }

View File

@ -14,6 +14,7 @@ go_library(
"//pkg/kubectl/util/i18n:go_default_library", "//pkg/kubectl/util/i18n:go_default_library",
"//pkg/kubectl/util/templates:go_default_library", "//pkg/kubectl/util/templates:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
@ -21,6 +22,7 @@ go_library(
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library",
"//vendor/github.com/jonboulle/clockwork:go_default_library", "//vendor/github.com/jonboulle/clockwork:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library",
], ],

View File

@ -26,11 +26,13 @@ import (
"github.com/jonboulle/clockwork" "github.com/jonboulle/clockwork"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericclioptions/resource" "k8s.io/cli-runtime/pkg/genericclioptions/resource"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/apply" "k8s.io/kubernetes/pkg/kubectl/cmd/apply"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
@ -60,6 +62,9 @@ var (
cat service.yaml | kubectl diff -f -`)) cat service.yaml | kubectl diff -f -`))
) )
// Number of times we try to diff before giving-up
const maxRetries = 4
type DiffOptions struct { type DiffOptions struct {
FilenameOptions resource.FilenameOptions FilenameOptions resource.FilenameOptions
} }
@ -228,6 +233,7 @@ type InfoObject struct {
Info *resource.Info Info *resource.Info
Encoder runtime.Encoder Encoder runtime.Encoder
OpenAPI openapi.Resources OpenAPI openapi.Resources
Force bool
} }
var _ Object = &InfoObject{} var _ Object = &InfoObject{}
@ -251,6 +257,16 @@ func (obj InfoObject) Merged() (runtime.Object, error) {
) )
} }
var resourceVersion *string
if !obj.Force {
accessor, err := meta.Accessor(obj.Info.Object)
if err != nil {
return nil, err
}
str := accessor.GetResourceVersion()
resourceVersion = &str
}
modified, err := kubectl.GetModifiedConfiguration(obj.LocalObj, false, unstructured.UnstructuredJSONScheme) modified, err := kubectl.GetModifiedConfiguration(obj.LocalObj, false, unstructured.UnstructuredJSONScheme)
if err != nil { if err != nil {
return nil, err return nil, err
@ -259,12 +275,13 @@ func (obj InfoObject) Merged() (runtime.Object, error) {
// This is using the patcher from apply, to keep the same behavior. // This is using the patcher from apply, to keep the same behavior.
// We plan on replacing this with server-side apply when it becomes available. // We plan on replacing this with server-side apply when it becomes available.
patcher := &apply.Patcher{ patcher := &apply.Patcher{
Mapping: obj.Info.Mapping, Mapping: obj.Info.Mapping,
Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping), Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping),
Overwrite: true, Overwrite: true,
BackOff: clockwork.NewRealClock(), BackOff: clockwork.NewRealClock(),
ServerDryRun: true, ServerDryRun: true,
OpenapiSchema: obj.OpenAPI, OpenapiSchema: obj.OpenAPI,
ResourceVersion: resourceVersion,
} }
_, result, err := patcher.Patch(obj.Info.Object, modified, obj.Info.Source, obj.Info.Namespace, obj.Info.Name, nil) _, result, err := patcher.Patch(obj.Info.Object, modified, obj.Info.Source, obj.Info.Namespace, obj.Info.Name, nil)
@ -319,6 +336,10 @@ func (d *Differ) TearDown() {
d.To.Dir.Delete() // Ignore error d.To.Dir.Delete() // Ignore error
} }
func isConflict(err error) bool {
return err != nil && errors.IsConflict(err)
}
// RunDiff uses the factory to parse file arguments, find the version to // RunDiff uses the factory to parse file arguments, find the version to
// diff, and find each Info object for each files, and runs against the // diff, and find each Info object for each files, and runs against the
// differ. // differ.
@ -376,21 +397,36 @@ func RunDiff(f cmdutil.Factory, diff *DiffProgram, options *DiffOptions) error {
} }
local := info.Object.DeepCopyObject() local := info.Object.DeepCopyObject()
if err := info.Get(); err != nil { for i := 1; i <= maxRetries; i++ {
if !errors.IsNotFound(err) { if err = info.Get(); err != nil {
return err if !errors.IsNotFound(err) {
return err
}
info.Object = nil
} }
info.Object = nil
}
obj := InfoObject{ force := i == maxRetries
LocalObj: local, if force {
Info: info, klog.Warningf(
Encoder: scheme.DefaultJSONEncoder(), "Object (%v: %v) keeps changing, diffing without lock",
OpenAPI: schema, info.Object.GetObjectKind().GroupVersionKind(),
} info.Name,
)
}
obj := InfoObject{
LocalObj: local,
Info: info,
Encoder: scheme.DefaultJSONEncoder(),
OpenAPI: schema,
Force: force,
}
return differ.Diff(obj, printer) err = differ.Diff(obj, printer)
if !isConflict(err) {
break
}
}
return err
}) })
if err != nil { if err != nil {
return err return err