mirror of https://github.com/k3s-io/k3s
Merge pull request #4326 from yujuhong/phantom_pods
Allow AtomicUpdate() to surface the error when the key doesn't existpull/6/head
commit
7d43971614
|
@ -184,7 +184,7 @@ func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine stri
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj runtime.Object) (runtime.Object, error) {
|
err = r.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
|
||||||
pod, ok := obj.(*api.Pod)
|
pod, ok := obj.(*api.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
||||||
|
@ -211,7 +211,7 @@ func (r *Registry) assignPod(ctx api.Context, podID string, machine string) erro
|
||||||
}
|
}
|
||||||
// Doing the constraint check this way provides atomicity guarantees.
|
// Doing the constraint check this way provides atomicity guarantees.
|
||||||
contKey := makeBoundPodsKey(machine)
|
contKey := makeBoundPodsKey(machine)
|
||||||
err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
|
err = r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
boundPodList := in.(*api.BoundPods)
|
boundPodList := in.(*api.BoundPods)
|
||||||
boundPodList.Items = append(boundPodList.Items, *boundPod)
|
boundPodList.Items = append(boundPodList.Items, *boundPod)
|
||||||
if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 {
|
if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 {
|
||||||
|
@ -261,7 +261,7 @@ func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
containerKey := makeBoundPodsKey(podOut.Status.Host)
|
containerKey := makeBoundPodsKey(podOut.Status.Host)
|
||||||
return r.AtomicUpdate(containerKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
|
return r.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
boundPods := in.(*api.BoundPods)
|
boundPods := in.(*api.BoundPods)
|
||||||
for ix := range boundPods.Items {
|
for ix := range boundPods.Items {
|
||||||
if boundPods.Items[ix].Name == pod.Name {
|
if boundPods.Items[ix].Name == pod.Name {
|
||||||
|
@ -299,7 +299,7 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
|
||||||
}
|
}
|
||||||
// Next, remove the pod from the machine atomically.
|
// Next, remove the pod from the machine atomically.
|
||||||
contKey := makeBoundPodsKey(machine)
|
contKey := makeBoundPodsKey(machine)
|
||||||
return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
|
return r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
pods := in.(*api.BoundPods)
|
pods := in.(*api.BoundPods)
|
||||||
newPods := make([]api.BoundPod, 0, len(pods.Items))
|
newPods := make([]api.BoundPod, 0, len(pods.Items))
|
||||||
found := false
|
found := false
|
||||||
|
@ -553,7 +553,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
|
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
|
||||||
err = r.AtomicUpdate(key, &api.Endpoints{},
|
err = r.AtomicUpdate(key, &api.Endpoints{}, true,
|
||||||
func(input runtime.Object) (runtime.Object, error) {
|
func(input runtime.Object) (runtime.Object, error) {
|
||||||
// TODO: racy - label query is returning different results for two simultaneous updaters
|
// TODO: racy - label query is returning different results for two simultaneous updaters
|
||||||
return endpoints, nil
|
return endpoints, nil
|
||||||
|
|
|
@ -310,7 +310,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
|
||||||
// Example:
|
// Example:
|
||||||
//
|
//
|
||||||
// h := &util.EtcdHelper{client, encoding, versioning}
|
// h := &util.EtcdHelper{client, encoding, versioning}
|
||||||
// err := h.AtomicUpdate("myKey", &MyType{}, func(input runtime.Object) (runtime.Object, error) {
|
// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, error) {
|
||||||
// // Before this function is called, currentObj has been reset to etcd's current
|
// // Before this function is called, currentObj has been reset to etcd's current
|
||||||
// // contents for "myKey".
|
// // contents for "myKey".
|
||||||
//
|
//
|
||||||
|
@ -323,7 +323,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
|
||||||
// return cur, nil
|
// return cur, nil
|
||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, tryUpdate EtcdUpdateFunc) error {
|
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
|
||||||
v, err := conversion.EnforcePtr(ptrToType)
|
v, err := conversion.EnforcePtr(ptrToType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Panic is appropriate, because this is a programming error.
|
// Panic is appropriate, because this is a programming error.
|
||||||
|
@ -331,7 +331,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, tryUpdat
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
||||||
origBody, index, err := h.bodyAndExtractObj(key, obj, true)
|
origBody, index, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -456,7 +456,7 @@ func TestAtomicUpdate(t *testing.T) {
|
||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet("/some/key")
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
|
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -475,7 +475,7 @@ func TestAtomicUpdate(t *testing.T) {
|
||||||
// Update an existing node.
|
// Update an existing node.
|
||||||
callbackCalled := false
|
callbackCalled := false
|
||||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
||||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
|
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
callbackCalled = true
|
callbackCalled = true
|
||||||
|
|
||||||
if in.(*TestResource).Value != 1 {
|
if in.(*TestResource).Value != 1 {
|
||||||
|
@ -510,7 +510,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet("/some/key")
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
|
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -521,7 +521,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
||||||
callbackCalled := false
|
callbackCalled := false
|
||||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
fakeClient.Err = errors.New("should not be called")
|
fakeClient.Err = errors.New("should not be called")
|
||||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
|
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
callbackCalled = true
|
callbackCalled = true
|
||||||
return objUpdate, nil
|
return objUpdate, nil
|
||||||
})
|
})
|
||||||
|
@ -533,6 +533,32 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAtomicUpdateKeyNotFound(t *testing.T) {
|
||||||
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
|
fakeClient.TestIndex = true
|
||||||
|
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
|
// Create a new node.
|
||||||
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
|
|
||||||
|
f := func(in runtime.Object) (runtime.Object, error) {
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ignoreNotFound := false
|
||||||
|
err := helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Expected error for key not found.")
|
||||||
|
}
|
||||||
|
|
||||||
|
ignoreNotFound = true
|
||||||
|
err = helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %v.", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
|
@ -552,7 +578,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
||||||
defer wgDone.Done()
|
defer wgDone.Done()
|
||||||
|
|
||||||
firstCall := true
|
firstCall := true
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
|
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||||
defer func() { firstCall = false }()
|
defer func() { firstCall = false }()
|
||||||
|
|
||||||
if firstCall {
|
if firstCall {
|
||||||
|
|
Loading…
Reference in New Issue