Merge pull request #10074 from nikhiljindal/retryUpdate

Allow update without resource version
pull/6/head
Maxwell Forbes 2015-06-24 10:55:41 -07:00
commit 11f9fd1dcd
20 changed files with 135 additions and 36 deletions

View File

@ -87,3 +87,7 @@ func (svcStrategy) AllowCreateOnUpdate() bool {
func (svcStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielderrors.ValidationErrorList {
return validation.ValidateServiceUpdate(old.(*api.Service), obj.(*api.Service))
}
func (svcStrategy) AllowUnconditionalUpdate() bool {
return true
}

View File

@ -40,6 +40,8 @@ type RESTUpdateStrategy interface {
// ValidateUpdate is invoked after default fields in the object have been filled in before
// the object is persisted.
ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielderrors.ValidationErrorList
// AllowUnconditionalUpdate returns true if the object can be updated unconditionally (irrespective of the latest resource version), when there is no resource version specified in the object.
AllowUnconditionalUpdate() bool
}
// BeforeUpdate ensures that common operations for all resources are performed on update. It only returns

View File

@ -97,6 +97,10 @@ func (rcStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field
return append(validationErrorList, updateErrorList...)
}
func (rcStrategy) AllowUnconditionalUpdate() bool {
return true
}
// ControllerToSelectableFields returns a label set that represents the object.
func ControllerToSelectableFields(controller *api.ReplicationController) fields.Set {
return fields.Set{

View File

@ -73,6 +73,10 @@ func (endpointsStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object
return append(errorList, validation.ValidateEndpointsUpdate(old.(*api.Endpoints), obj.(*api.Endpoints))...)
}
func (endpointsStrategy) AllowUnconditionalUpdate() bool {
return true
}
// MatchEndpoints returns a generic matcher for a given label and field selector.
func MatchEndpoints(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{label, field, EndpointsAttributes}

View File

@ -271,6 +271,14 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
if err != nil {
return nil, false, err
}
// If AllowUnconditionalUpdate() is true and the object specified by the user does not have a resource version,
// then we populate it with the latest version.
// Else, we check that the version specified by the user matches the version of latest etcd object.
resourceVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
if err != nil {
return nil, false, err
}
doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()
// TODO: expose TTL
creating := false
out := e.NewFunc()
@ -295,14 +303,22 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
}
creating = false
if doUnconditionalUpdate {
// Update the object's resource version to match the latest etcd object's resource version.
err = e.Helper.Versioner.UpdateObject(obj, res.Expiration, res.ResourceVersion)
if err != nil {
return nil, nil, err
}
} else {
// Check if the object's resource version matches the latest resource version.
newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
if err != nil {
return nil, nil, err
}
if newVersion != version {
// TODO: return the most recent version to a client?
return nil, nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the object has been modified; please apply your changes to the latest version and try again"))
}
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, nil, err
}

View File

@ -39,10 +39,12 @@ type testRESTStrategy struct {
api.NameGenerator
namespaceScoped bool
allowCreateOnUpdate bool
allowUnconditionalUpdate bool
}
func (t *testRESTStrategy) NamespaceScoped() bool { return t.namespaceScoped }
func (t *testRESTStrategy) AllowCreateOnUpdate() bool { return t.allowCreateOnUpdate }
func (t *testRESTStrategy) AllowUnconditionalUpdate() bool { return t.allowUnconditionalUpdate }
func (t *testRESTStrategy) PrepareForCreate(obj runtime.Object) {}
func (t *testRESTStrategy) PrepareForUpdate(obj, old runtime.Object) {}
@ -68,7 +70,7 @@ func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
h := tools.NewEtcdHelper(f, testapi.Codec(), etcdtest.PathPrefix())
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false}
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
podPrefix := "/pods"
return f, &Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
@ -390,7 +392,10 @@ func TestEtcdUpdate(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "1"},
Spec: api.PodSpec{NodeName: "machine2"},
}
podAWithResourceVersion := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "3"},
Spec: api.PodSpec{NodeName: "machine"},
}
nodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -424,6 +429,16 @@ func TestEtcdUpdate(t *testing.T) {
E: nil,
}
nodeWithPodAWithResourceVersion := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podAWithResourceVersion),
ModifiedIndex: 3,
CreatedIndex: 1,
},
},
E: nil,
}
emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
@ -434,6 +449,7 @@ func TestEtcdUpdate(t *testing.T) {
expect tools.EtcdResponseWithError
toUpdate runtime.Object
allowCreate bool
allowUnconditionalUpdate bool
objOK func(obj runtime.Object) bool
errOK func(error) bool
}{
@ -462,11 +478,19 @@ func TestEtcdUpdate(t *testing.T) {
toUpdate: podB,
errOK: func(err error) bool { return errors.IsConflict(err) },
},
"unconditionalUpdate": {
existing: nodeWithPodAWithResourceVersion,
allowUnconditionalUpdate: true,
toUpdate: podA,
objOK: func(obj runtime.Object) bool { return true },
errOK: func(err error) bool { return err == nil },
},
}
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = item.allowCreate
registry.UpdateStrategy.(*testRESTStrategy).allowUnconditionalUpdate = item.allowUnconditionalUpdate
path := etcdtest.AddPrefix("pods/foo")
fakeClient.Data[path] = item.existing
obj, _, err := registry.Update(api.NewDefaultContext(), item.toUpdate)

View File

@ -82,6 +82,10 @@ func (nodeStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fie
return append(errorList, validation.ValidateNodeUpdate(old.(*api.Node), obj.(*api.Node))...)
}
func (nodeStrategy) AllowUnconditionalUpdate() bool {
return true
}
type nodeStatusStrategy struct {
nodeStrategy
}

View File

@ -93,6 +93,10 @@ func (namespaceStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object
return append(errorList, validation.ValidateNamespaceUpdate(obj.(*api.Namespace), old.(*api.Namespace))...)
}
func (namespaceStrategy) AllowUnconditionalUpdate() bool {
return true
}
type namespaceStatusStrategy struct {
namespaceStrategy
}

View File

@ -69,6 +69,10 @@ func (persistentvolumeStrategy) ValidateUpdate(ctx api.Context, obj, old runtime
return append(errorList, validation.ValidatePersistentVolumeUpdate(obj.(*api.PersistentVolume), old.(*api.PersistentVolume))...)
}
func (persistentvolumeStrategy) AllowUnconditionalUpdate() bool {
return true
}
type persistentvolumeStatusStrategy struct {
persistentvolumeStrategy
}

View File

@ -69,6 +69,10 @@ func (persistentvolumeclaimStrategy) ValidateUpdate(ctx api.Context, obj, old ru
return append(errorList, validation.ValidatePersistentVolumeClaimUpdate(obj.(*api.PersistentVolumeClaim), old.(*api.PersistentVolumeClaim))...)
}
func (persistentvolumeclaimStrategy) AllowUnconditionalUpdate() bool {
return true
}
type persistentvolumeclaimStatusStrategy struct {
persistentvolumeclaimStrategy
}

View File

@ -81,6 +81,10 @@ func (podStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fiel
return append(errorList, validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))...)
}
func (podStrategy) AllowUnconditionalUpdate() bool {
return true
}
// CheckGracefulDelete allows a pod to be gracefully deleted.
func (podStrategy) CheckGracefulDelete(obj runtime.Object, options *api.DeleteOptions) bool {
return false

View File

@ -69,6 +69,10 @@ func (podTemplateStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje
return validation.ValidatePodTemplateUpdate(obj.(*api.PodTemplate), old.(*api.PodTemplate))
}
func (podTemplateStrategy) AllowUnconditionalUpdate() bool {
return true
}
// MatchPodTemplate returns a generic matcher for a given label and field selector.
func MatchPodTemplate(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {

View File

@ -73,6 +73,10 @@ func (resourcequotaStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Ob
return append(errorList, validation.ValidateResourceQuotaUpdate(obj.(*api.ResourceQuota), old.(*api.ResourceQuota))...)
}
func (resourcequotaStrategy) AllowUnconditionalUpdate() bool {
return true
}
type resourcequotaStatusStrategy struct {
resourcequotaStrategy
}

View File

@ -65,6 +65,10 @@ func (strategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielder
return validation.ValidateSecretUpdate(old.(*api.Secret), obj.(*api.Secret))
}
func (strategy) AllowUnconditionalUpdate() bool {
return true
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {

View File

@ -68,6 +68,10 @@ func (strategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielder
return validation.ValidateServiceAccountUpdate(old.(*api.ServiceAccount), obj.(*api.ServiceAccount))
}
func (strategy) AllowUnconditionalUpdate() bool {
return true
}
// Matcher returns a generic matcher for a given label and field selector.
func Matcher(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {

View File

@ -207,7 +207,7 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
}
if h.Versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
@ -377,7 +377,7 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
body = node.Value
err = h.Codec.DecodeInto([]byte(body), objPtr)
if h.Versioner != nil {
_ = h.Versioner.UpdateObject(objPtr, node)
_ = h.Versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
return body, node, err
@ -492,6 +492,11 @@ type ResponseMeta struct {
// zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag).
TTL int64
// Expiration is the time at which the node that contained the returned object will expire and be deleted.
// This can be nil if there is no expiration time set for the node.
Expiration *time.Time
// The resource version of the node that contained the returned object.
ResourceVersion uint64
}
// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed.
@ -545,8 +550,12 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
meta := ResponseMeta{}
if node != nil {
meta.TTL = node.TTL
if node.Expiration != nil {
meta.Expiration = node.Expiration
}
meta.ResourceVersion = node.ModifiedIndex
}
// Get the object to be written by calling tryUpdate.
ret, newTTL, err := tryUpdate(obj, meta)
if err != nil {
return err
@ -589,9 +598,11 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
}
startTime := time.Now()
// Swap origBody with data, if origBody is the latest etcd data.
response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
recordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if IsEtcdTestFailed(err) {
// Try again.
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)

View File

@ -270,7 +270,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.UpdateObject(obj, node); err != nil {
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)
}
}

View File

@ -18,12 +18,11 @@ package tools
import (
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
// APIObjectVersioner implements versioning and extracting etcd node information
@ -31,18 +30,17 @@ import (
type APIObjectVersioner struct{}
// UpdateObject implements EtcdVersioner
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, node *etcd.Node) error {
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error {
objectMeta, err := api.ObjectMetaFor(obj)
if err != nil {
return err
}
if ttl := node.Expiration; ttl != nil {
objectMeta.DeletionTimestamp = &util.Time{*node.Expiration}
if expiration != nil {
objectMeta.DeletionTimestamp = &util.Time{*expiration}
}
version := node.ModifiedIndex
versionString := ""
if version != 0 {
versionString = strconv.FormatUint(version, 10)
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
objectMeta.ResourceVersion = versionString
return nil

View File

@ -22,7 +22,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
func TestObjectVersioner(t *testing.T) {
@ -34,7 +33,7 @@ func TestObjectVersioner(t *testing.T) {
t.Errorf("unexpected version: %d %v", ver, err)
}
obj := &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
if err := v.UpdateObject(obj, &etcd.Node{ModifiedIndex: 5}); err != nil {
if err := v.UpdateObject(obj, nil, 5); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.ResourceVersion != "5" || obj.DeletionTimestamp != nil {
@ -42,7 +41,7 @@ func TestObjectVersioner(t *testing.T) {
}
now := util.Time{time.Now()}
obj = &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
if err := v.UpdateObject(obj, &etcd.Node{ModifiedIndex: 5, Expiration: &now.Time}); err != nil {
if err := v.UpdateObject(obj, &now.Time, 5); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.ResourceVersion != "5" || *obj.DeletionTimestamp != now {

View File

@ -19,6 +19,7 @@ package tools
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/coreos/go-etcd/etcd"
"time"
)
const (
@ -66,7 +67,7 @@ type EtcdVersioner interface {
// UpdateObject sets etcd storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from etcd.
UpdateObject(obj runtime.Object, node *etcd.Node) error
UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error
// UpdateList sets the resource version into an API list object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from etcd.