mirror of https://github.com/k3s-io/k3s
Merge pull request #5516 from smarterclayton/add_ttl_to_helper
Add TTL support to etcd_helper in preparation for graceful deletepull/6/head
commit
1291401c2e
|
@ -50,11 +50,6 @@ var Codec = v1beta1.Codec
|
|||
// accessor is the shared static metadata accessor for the API.
|
||||
var accessor = meta.NewAccessor()
|
||||
|
||||
// ResourceVersioner describes a default versioner that can handle all types
|
||||
// of versioning.
|
||||
// TODO: when versioning changes, make this part of each API definition.
|
||||
var ResourceVersioner = runtime.ResourceVersioner(accessor)
|
||||
|
||||
// SelfLinker can set or get the SelfLink field of all API types.
|
||||
// TODO: when versioning changes, make this part of each API definition.
|
||||
// TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses
|
||||
|
|
|
@ -27,7 +27,7 @@ import (
|
|||
|
||||
func TestResourceVersioner(t *testing.T) {
|
||||
pod := internal.Pod{ObjectMeta: internal.ObjectMeta{ResourceVersion: "10"}}
|
||||
version, err := ResourceVersioner.ResourceVersion(&pod)
|
||||
version, err := accessor.ResourceVersion(&pod)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ func TestResourceVersioner(t *testing.T) {
|
|||
}
|
||||
|
||||
podList := internal.PodList{ListMeta: internal.ListMeta{ResourceVersion: "10"}}
|
||||
version, err = ResourceVersioner.ResourceVersion(&podList)
|
||||
version, err = accessor.ResourceVersion(&podList)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
|
@ -35,16 +35,27 @@ func HasObjectMetaSystemFieldValues(meta *ObjectMeta) bool {
|
|||
len(meta.UID) != 0
|
||||
}
|
||||
|
||||
// GetObjectMetaPtr returns a pointer to a provided object's ObjectMeta.
|
||||
// ObjectMetaFor returns a pointer to a provided object's ObjectMeta.
|
||||
// TODO: allow runtime.Unknown to extract this object
|
||||
func ObjectMetaFor(obj runtime.Object) (*ObjectMeta, error) {
|
||||
v, err := conversion.EnforcePtr(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var objectMeta *ObjectMeta
|
||||
if err := runtime.FieldPtr(v, "ObjectMeta", &objectMeta); err != nil {
|
||||
var meta *ObjectMeta
|
||||
err = runtime.FieldPtr(v, "ObjectMeta", &meta)
|
||||
return meta, err
|
||||
}
|
||||
|
||||
// ListMetaFor returns a pointer to a provided object's ListMeta,
|
||||
// or an error if the object does not have that pointer.
|
||||
// TODO: allow runtime.Unknown to extract this object
|
||||
func ListMetaFor(obj runtime.Object) (*ListMeta, error) {
|
||||
v, err := conversion.EnforcePtr(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objectMeta, nil
|
||||
var meta *ListMeta
|
||||
err = runtime.FieldPtr(v, "ListMeta", &meta)
|
||||
return meta, err
|
||||
}
|
||||
|
|
|
@ -178,7 +178,7 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
|
|||
if err != nil {
|
||||
return helper, err
|
||||
}
|
||||
return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil
|
||||
return tools.NewEtcdHelper(client, versionInterfaces.Codec), nil
|
||||
}
|
||||
|
||||
// setDefaults fills in any fields not set that are required to have valid data.
|
||||
|
|
|
@ -322,9 +322,9 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er
|
|||
}
|
||||
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
|
||||
err = r.AtomicUpdate(key, &api.Endpoints{}, true,
|
||||
func(input runtime.Object) (runtime.Object, error) {
|
||||
func(input runtime.Object) (runtime.Object, uint64, error) {
|
||||
// TODO: racy - label query is returning different results for two simultaneous updaters
|
||||
return endpoints, nil
|
||||
return endpoints, 0, nil
|
||||
})
|
||||
return etcderr.InterpretUpdateError(err, "endpoints", endpoints.Name)
|
||||
}
|
||||
|
|
|
@ -36,12 +36,12 @@ import (
|
|||
)
|
||||
|
||||
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
|
||||
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil)
|
||||
registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil)
|
||||
return registry
|
||||
}
|
||||
|
||||
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
|
||||
helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(client, latest.Codec)
|
||||
podStorage, _, _ := podetcd.NewREST(helper)
|
||||
registry := NewRegistry(helper, pod.NewRegistry(podStorage))
|
||||
return registry
|
||||
|
@ -195,6 +195,7 @@ func TestEtcdDeleteController(t *testing.T) {
|
|||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
err := registry.DeleteController(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -532,6 +533,11 @@ func TestEtcdDeleteService(t *testing.T) {
|
|||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
endpointsKey, _ := makeServiceEndpointsKey(ctx, "foo")
|
||||
fakeClient.Set(endpointsKey, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP"}), 0)
|
||||
|
||||
err := registry.DeleteService(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -540,13 +546,11 @@ func TestEtcdDeleteService(t *testing.T) {
|
|||
if len(fakeClient.DeletedKeys) != 2 {
|
||||
t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys)
|
||||
}
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
if fakeClient.DeletedKeys[0] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||
}
|
||||
key, _ = makeServiceEndpointsKey(ctx, "foo")
|
||||
if fakeClient.DeletedKeys[1] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], key)
|
||||
if fakeClient.DeletedKeys[1] != endpointsKey {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], endpointsKey)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -906,6 +910,9 @@ func TestEtcdDeleteMinion(t *testing.T) {
|
|||
ctx := api.NewContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key := "/registry/minions/foo"
|
||||
fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
|
||||
err := registry.DeleteMinion(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -914,7 +921,6 @@ func TestEtcdDeleteMinion(t *testing.T) {
|
|||
if len(fakeClient.DeletedKeys) != 1 {
|
||||
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
|
||||
}
|
||||
key := "/registry/minions/foo"
|
||||
if fakeClient.DeletedKeys[0] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ var testTTL uint64 = 60
|
|||
func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
return f, NewEtcdRegistry(h, testTTL)
|
||||
}
|
||||
|
||||
|
|
|
@ -251,35 +251,49 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
|
|||
// TODO: expose TTL
|
||||
creating := false
|
||||
out := e.NewFunc()
|
||||
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) {
|
||||
version, err := e.Helper.ResourceVersioner.ResourceVersion(existing)
|
||||
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
|
||||
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
if version == 0 {
|
||||
if !e.UpdateStrategy.AllowCreateOnUpdate() {
|
||||
return nil, kubeerr.NewNotFound(e.EndpointName, name)
|
||||
return nil, 0, kubeerr.NewNotFound(e.EndpointName, name)
|
||||
}
|
||||
creating = true
|
||||
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
return obj, nil
|
||||
ttl := uint64(0)
|
||||
if e.TTLFunc != nil {
|
||||
ttl, err = e.TTLFunc(obj, true)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
return obj, ttl, nil
|
||||
}
|
||||
|
||||
creating = false
|
||||
newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
|
||||
newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
if newVersion != version {
|
||||
// TODO: return the most recent version to a client?
|
||||
return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
|
||||
return nil, 0, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
|
||||
}
|
||||
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
return obj, nil
|
||||
ttl := uint64(0)
|
||||
if e.TTLFunc != nil {
|
||||
ttl, err = e.TTLFunc(obj, false)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
return obj, ttl, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -64,7 +64,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
|
|||
func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false}
|
||||
return f, &Etcd{
|
||||
NewFunc: func() runtime.Object { return &api.Pod{} },
|
||||
|
@ -624,16 +624,16 @@ func TestEtcdDelete(t *testing.T) {
|
|||
"notExisting": {
|
||||
existing: emptyNode,
|
||||
expect: emptyNode,
|
||||
errOK: func(err error) bool { return err == nil },
|
||||
errOK: func(err error) bool { return errors.IsNotFound(err) },
|
||||
},
|
||||
}
|
||||
|
||||
for name, item := range table {
|
||||
fakeClient, registry := NewTestGenericEtcdRegistry(t)
|
||||
fakeClient.Data[path] = item.existing
|
||||
_, err := registry.Delete(api.NewContext(), key)
|
||||
obj, err := registry.Delete(api.NewContext(), key)
|
||||
if !item.errOK(err) {
|
||||
t.Errorf("%v: unexpected error: %v", name, err)
|
||||
t.Errorf("%v: unexpected error: %v (%#v)", name, err, obj)
|
||||
}
|
||||
|
||||
if item.expect.E != nil {
|
||||
|
|
|
@ -36,7 +36,7 @@ import (
|
|||
func NewTestLimitRangeEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
return f, NewEtcdRegistry(h)
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ import (
|
|||
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
|
||||
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
||||
fakeEtcdClient.TestIndex = true
|
||||
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
|
||||
return fakeEtcdClient, helper
|
||||
}
|
||||
|
||||
|
|
|
@ -160,18 +160,18 @@ func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine s
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
|
||||
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
||||
return nil, 0, fmt.Errorf("unexpected object: %#v", obj)
|
||||
}
|
||||
if pod.Spec.Host != oldMachine || pod.Status.Host != oldMachine {
|
||||
return nil, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host)
|
||||
return nil, 0, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host)
|
||||
}
|
||||
pod.Spec.Host = machine
|
||||
pod.Status.Host = machine
|
||||
finalPod = pod
|
||||
return pod, nil
|
||||
return pod, 0, nil
|
||||
})
|
||||
return finalPod, err
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) {
|
|||
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
|
||||
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
||||
fakeEtcdClient.TestIndex = true
|
||||
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
|
||||
return fakeEtcdClient, helper
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ import (
|
|||
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
|
||||
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
||||
fakeEtcdClient.TestIndex = true
|
||||
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
|
||||
return fakeEtcdClient, helper
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ import (
|
|||
func NewTestSecretEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
return f, NewEtcdRegistry(h)
|
||||
}
|
||||
|
||||
|
|
|
@ -14,5 +14,6 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package tools implements general tools which depend on the api package.
|
||||
// Package tools implements types which help work with etcd which depend on the api package.
|
||||
// TODO: move this package to an etcd specific utility package.
|
||||
package tools
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
"net/http"
|
||||
"os/exec"
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
|
@ -33,82 +32,22 @@ import (
|
|||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
EtcdErrorCodeNotFound = 100
|
||||
EtcdErrorCodeTestFailed = 101
|
||||
EtcdErrorCodeNodeExist = 105
|
||||
EtcdErrorCodeValueRequired = 200
|
||||
)
|
||||
|
||||
var (
|
||||
EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound}
|
||||
EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed}
|
||||
EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist}
|
||||
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
|
||||
)
|
||||
|
||||
// EtcdClient is an injectable interface for testing.
|
||||
type EtcdClient interface {
|
||||
GetCluster() []string
|
||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
|
||||
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
|
||||
type EtcdGetSet interface {
|
||||
GetCluster() []string
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
type EtcdResourceVersioner interface {
|
||||
SetResourceVersion(obj runtime.Object, version uint64) error
|
||||
ResourceVersion(obj runtime.Object) (uint64, error)
|
||||
}
|
||||
|
||||
// RuntimeVersionAdapter converts a string based versioner to EtcdResourceVersioner
|
||||
type RuntimeVersionAdapter struct {
|
||||
Versioner runtime.ResourceVersioner
|
||||
}
|
||||
|
||||
// SetResourceVersion implements EtcdResourceVersioner
|
||||
func (a RuntimeVersionAdapter) SetResourceVersion(obj runtime.Object, version uint64) error {
|
||||
if version == 0 {
|
||||
return a.Versioner.SetResourceVersion(obj, "")
|
||||
}
|
||||
s := strconv.FormatUint(version, 10)
|
||||
return a.Versioner.SetResourceVersion(obj, s)
|
||||
}
|
||||
|
||||
// SetResourceVersion implements EtcdResourceVersioner
|
||||
func (a RuntimeVersionAdapter) ResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
version, err := a.Versioner.ResourceVersion(obj)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if version == "" {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(version, 10, 64)
|
||||
}
|
||||
|
||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||
type EtcdHelper struct {
|
||||
Client EtcdGetSet
|
||||
Codec runtime.Codec
|
||||
// optional, no atomic operations can be performed without this interface
|
||||
ResourceVersioner EtcdResourceVersioner
|
||||
Versioner EtcdVersioner
|
||||
}
|
||||
|
||||
// NewEtcdHelper creates a helper that works against objects that use the internal
|
||||
// Kubernetes API objects.
|
||||
func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec) EtcdHelper {
|
||||
return EtcdHelper{
|
||||
Client: client,
|
||||
Codec: codec,
|
||||
Versioner: APIObjectVersioner{},
|
||||
}
|
||||
}
|
||||
|
||||
// IsEtcdNotFound returns true iff err is an etcd not found error.
|
||||
|
@ -163,19 +102,6 @@ func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
|
|||
return result.Node.Nodes, result.EtcdIndex, nil
|
||||
}
|
||||
|
||||
// ExtractList extracts a go object per etcd node into a slice with the resource version.
|
||||
// DEPRECATED: Use ExtractToList instead, it's more convenient.
|
||||
func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersion *uint64) error {
|
||||
nodes, index, err := h.listEtcdNode(key)
|
||||
if resourceVersion != nil {
|
||||
*resourceVersion = index
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return h.decodeNodeList(nodes, slicePtr)
|
||||
}
|
||||
|
||||
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
||||
func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
|
||||
v, err := conversion.EnforcePtr(slicePtr)
|
||||
|
@ -194,28 +120,31 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
|||
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.ResourceVersioner != nil {
|
||||
_ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex)
|
||||
if h.Versioner != nil {
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExtractToList is just like ExtractList, but it works on a ThingyList api object.
|
||||
// extracts a go object per etcd node into a slice with the resource version.
|
||||
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
|
||||
// definition) and extracts a go object per etcd node into a slice with the resource version.
|
||||
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
|
||||
var resourceVersion uint64
|
||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.ExtractList(key, listPtr, &resourceVersion); err != nil {
|
||||
nodes, index, err := h.listEtcdNode(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.ResourceVersioner != nil {
|
||||
if err := h.ResourceVersioner.SetResourceVersion(listObj, resourceVersion); err != nil {
|
||||
if err := h.decodeNodeList(nodes, listPtr); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.Versioner != nil {
|
||||
if err := h.Versioner.UpdateList(listObj, index); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -263,8 +192,8 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
|||
}
|
||||
body = node.Value
|
||||
err = h.Codec.DecodeInto([]byte(body), objPtr)
|
||||
if h.ResourceVersioner != nil {
|
||||
_ = h.ResourceVersioner.SetResourceVersion(objPtr, node.ModifiedIndex)
|
||||
if h.Versioner != nil {
|
||||
_ = h.Versioner.UpdateObject(objPtr, node)
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
}
|
||||
return body, node.ModifiedIndex, err
|
||||
|
@ -278,8 +207,8 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.ResourceVersioner != nil {
|
||||
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
|
||||
if h.Versioner != nil {
|
||||
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion may not be set on objects to be created")
|
||||
}
|
||||
}
|
||||
|
@ -328,8 +257,8 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
|||
}
|
||||
|
||||
create := true
|
||||
if h.ResourceVersioner != nil {
|
||||
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
|
||||
if h.Versioner != nil {
|
||||
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
create = false
|
||||
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
|
||||
if err != nil {
|
||||
|
@ -357,7 +286,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
|||
|
||||
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
|
||||
// See the comment for AtomicUpdate for more detail.
|
||||
type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error)
|
||||
type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error)
|
||||
|
||||
// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
|
||||
// Note, tryUpdate may be called more than once.
|
||||
|
@ -365,7 +294,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
|
|||
// Example:
|
||||
//
|
||||
// h := &util.EtcdHelper{client, encoding, versioning}
|
||||
// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, error) {
|
||||
// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) {
|
||||
// // Before this function is called, currentObj has been reset to etcd's current
|
||||
// // contents for "myKey".
|
||||
//
|
||||
|
@ -374,8 +303,9 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
|
|||
// // Make a *modification*.
|
||||
// cur.Counter++
|
||||
//
|
||||
// // Return the modified object. Return an error to stop iterating.
|
||||
// return cur, nil
|
||||
// // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set
|
||||
// // the TTL on the object.
|
||||
// return cur, 0, nil
|
||||
// })
|
||||
//
|
||||
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
|
||||
|
@ -391,7 +321,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
|
|||
return err
|
||||
}
|
||||
|
||||
ret, err := tryUpdate(obj)
|
||||
ret, ttl, err := tryUpdate(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -403,7 +333,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
|
|||
|
||||
// First time this key has been used, try creating new value.
|
||||
if index == 0 {
|
||||
response, err := h.Client.Create(key, string(data), 0)
|
||||
response, err := h.Client.Create(key, string(data), ttl)
|
||||
if IsEtcdNodeExist(err) {
|
||||
continue
|
||||
}
|
||||
|
@ -415,7 +345,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
|
|||
return nil
|
||||
}
|
||||
|
||||
response, err := h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
|
||||
response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
|
||||
if IsEtcdTestFailed(err) {
|
||||
continue
|
||||
}
|
|
@ -26,7 +26,6 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
|
@ -44,7 +43,6 @@ func (*TestResource) IsAnAPIObject() {}
|
|||
|
||||
var scheme *runtime.Scheme
|
||||
var codec runtime.Codec
|
||||
var versioner = RuntimeVersionAdapter{meta.NewAccessor()}
|
||||
|
||||
func init() {
|
||||
scheme = runtime.NewScheme()
|
||||
|
@ -129,7 +127,7 @@ func TestExtractToList(t *testing.T) {
|
|||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
|
@ -212,7 +210,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
|
|||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
|
@ -282,7 +280,7 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
|
|||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
|
@ -302,7 +300,7 @@ func TestExtractObj(t *testing.T) {
|
|||
},
|
||||
}
|
||||
fakeClient.Set("/some/key", runtime.EncodeOrDie(testapi.Codec(), &expect), 0)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj("/some/key", &got, false)
|
||||
if err != nil {
|
||||
|
@ -335,7 +333,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
try := func(key string) {
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj(key, &got, false)
|
||||
|
@ -356,7 +354,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
|||
func TestCreateObj(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.CreateObj("/some/key", obj, returnedObj, 5)
|
||||
if err != nil {
|
||||
|
@ -381,7 +379,7 @@ func TestCreateObj(t *testing.T) {
|
|||
func TestCreateObjNilOutParam(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.CreateObj("/some/key", obj, nil, 5)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
|
@ -391,7 +389,7 @@ func TestCreateObjNilOutParam(t *testing.T) {
|
|||
func TestSetObj(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.SetObj("/some/key", obj, returnedObj, 5)
|
||||
if err != nil {
|
||||
|
@ -418,7 +416,7 @@ func TestSetObjFailCAS(t *testing.T) {
|
|||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.CasErr = fakeClient.NewError(123)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.SetObj("/some/key", obj, nil, 5)
|
||||
if err == nil {
|
||||
t.Errorf("Expecting error.")
|
||||
|
@ -438,7 +436,7 @@ func TestSetObjWithVersion(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.SetObj("/some/key", obj, returnedObj, 7)
|
||||
if err != nil {
|
||||
|
@ -500,13 +498,13 @@ func TestSetObjNilOutParam(t *testing.T) {
|
|||
func TestAtomicUpdate(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||
return obj, nil
|
||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
return obj, 0, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
|
@ -524,14 +522,14 @@ func TestAtomicUpdate(t *testing.T) {
|
|||
// Update an existing node.
|
||||
callbackCalled := false
|
||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
callbackCalled = true
|
||||
|
||||
if in.(*TestResource).Value != 1 {
|
||||
t.Errorf("Callback input was not current set value")
|
||||
}
|
||||
|
||||
return objUpdate, nil
|
||||
return objUpdate, 0, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
|
@ -554,13 +552,13 @@ func TestAtomicUpdate(t *testing.T) {
|
|||
func TestAtomicUpdateNoChange(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||
return obj, nil
|
||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
return obj, 0, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
|
@ -569,10 +567,10 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
|||
// Update an existing node with the same data
|
||||
callbackCalled := false
|
||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
fakeClient.Err = errors.New("should not be called")
|
||||
callbackCalled = true
|
||||
return objUpdate, nil
|
||||
return objUpdate, 0, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %#v", err)
|
||||
|
@ -585,14 +583,14 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
|||
func TestAtomicUpdateKeyNotFound(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||
|
||||
f := func(in runtime.Object) (runtime.Object, error) {
|
||||
return obj, nil
|
||||
f := func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
return obj, 0, nil
|
||||
}
|
||||
|
||||
ignoreNotFound := false
|
||||
|
@ -611,7 +609,7 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) {
|
|||
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
|
||||
|
@ -627,7 +625,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
|||
defer wgDone.Done()
|
||||
|
||||
firstCall := true
|
||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
|
||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
defer func() { firstCall = false }()
|
||||
|
||||
if firstCall {
|
||||
|
@ -638,7 +636,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
|||
|
||||
currValue := in.(*TestResource).Value
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1}
|
||||
return obj, nil
|
||||
return obj, 0, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
|
@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
|||
// watch.Interface. resourceVersion may be used to specify what version to begin
|
||||
// watching (e.g., for reconnecting without missing any updates).
|
||||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil)
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w, nil
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
|
|||
//
|
||||
// Errors will be sent down the channel.
|
||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform)
|
||||
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ func exceptKey(except string) includeFunc {
|
|||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||
type etcdWatcher struct {
|
||||
encoding runtime.Codec
|
||||
versioner EtcdResourceVersioner
|
||||
versioner EtcdVersioner
|
||||
transform TransformFunc
|
||||
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
|
@ -137,7 +137,7 @@ const watchWaitDuration = 100 * time.Millisecond
|
|||
|
||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
|
||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
encoding: encoding,
|
||||
versioner: versioner,
|
||||
|
@ -240,16 +240,16 @@ func (w *etcdWatcher) translate() {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) decodeObject(data []byte, index uint64) (runtime.Object, error) {
|
||||
obj, err := w.encoding.Decode(data)
|
||||
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
||||
obj, err := w.encoding.Decode([]byte(node.Value))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ensure resource version is set on the object we load from etcd
|
||||
if w.versioner != nil {
|
||||
if err := w.versioner.SetResourceVersion(obj, index); err != nil {
|
||||
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err)
|
||||
if err := w.versioner.UpdateObject(obj, node); err != nil {
|
||||
glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -273,10 +273,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
|||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
data := []byte(res.Node.Value)
|
||||
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
|
||||
obj, err := w.decodeObject(res.Node)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node)
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
||||
|
@ -303,10 +302,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
|||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
curData := []byte(res.Node.Value)
|
||||
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
|
||||
curObj, err := w.decodeObject(res.Node)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node)
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
||||
|
@ -317,7 +315,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
|||
var oldObj runtime.Object
|
||||
if res.PrevNode != nil && res.PrevNode.Value != "" {
|
||||
// Ignore problems reading the old object.
|
||||
if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil {
|
||||
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
|
||||
oldObjPasses = w.filter(oldObj)
|
||||
}
|
||||
}
|
||||
|
@ -352,17 +350,16 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
|||
if w.include != nil && !w.include(res.PrevNode.Key) {
|
||||
return
|
||||
}
|
||||
data := []byte(res.PrevNode.Value)
|
||||
index := res.PrevNode.ModifiedIndex
|
||||
node := *res.PrevNode
|
||||
if res.Node != nil {
|
||||
// Note that this sends the *old* object with the etcd index for the time at
|
||||
// which it gets deleted. This will allow users to restart the watch at the right
|
||||
// index.
|
||||
index = res.Node.ModifiedIndex
|
||||
node.ModifiedIndex = res.Node.ModifiedIndex
|
||||
}
|
||||
obj, err := w.decodeObject(data, index)
|
||||
obj, err := w.decodeObject(&node)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode)
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.PrevNode.Value), res, res.PrevNode)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
|
@ -29,6 +29,8 @@ import (
|
|||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
var versioner = APIObjectVersioner{}
|
||||
|
||||
func TestWatchInterpretations(t *testing.T) {
|
||||
codec := latest.Codec
|
||||
// Declare some pods to make the test cases compact.
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
// APIObjectVersioner implements versioning and extracting etcd node information
|
||||
// for objects that have an embedded ObjectMeta or ListMeta field.
|
||||
type APIObjectVersioner struct{}
|
||||
|
||||
// UpdateObject implements EtcdVersioner
|
||||
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, node *etcd.Node) error {
|
||||
objectMeta, err := api.ObjectMetaFor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
version := node.ModifiedIndex
|
||||
versionString := ""
|
||||
if version != 0 {
|
||||
versionString = strconv.FormatUint(version, 10)
|
||||
}
|
||||
objectMeta.ResourceVersion = versionString
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateList implements EtcdVersioner
|
||||
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error {
|
||||
listMeta, err := api.ListMetaFor(obj)
|
||||
if err != nil || listMeta == nil {
|
||||
return err
|
||||
}
|
||||
versionString := ""
|
||||
if resourceVersion != 0 {
|
||||
versionString = strconv.FormatUint(resourceVersion, 10)
|
||||
}
|
||||
listMeta.ResourceVersion = versionString
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObjectResourceVersion implements EtcdVersioner
|
||||
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
meta, err := api.ObjectMetaFor(obj)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
version := meta.ResourceVersion
|
||||
if len(version) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(version, 10, 64)
|
||||
}
|
||||
|
||||
// APIObjectVersioner implements EtcdVersioner
|
||||
var _ EtcdVersioner = APIObjectVersioner{}
|
|
@ -283,7 +283,17 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
|
|||
|
||||
f.Mutex.Lock()
|
||||
defer f.Mutex.Unlock()
|
||||
existing := f.Data[key]
|
||||
existing, ok := f.Data[key]
|
||||
if !ok {
|
||||
return &etcd.Response{}, &etcd.EtcdError{
|
||||
ErrorCode: EtcdErrorCodeNotFound,
|
||||
Index: f.ChangeIndex,
|
||||
}
|
||||
}
|
||||
if IsEtcdNotFound(existing.E) {
|
||||
f.DeletedKeys = append(f.DeletedKeys, key)
|
||||
return existing.R, existing.E
|
||||
}
|
||||
index := f.generateIndex()
|
||||
f.Data[key] = EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
const (
|
||||
EtcdErrorCodeNotFound = 100
|
||||
EtcdErrorCodeTestFailed = 101
|
||||
EtcdErrorCodeNodeExist = 105
|
||||
EtcdErrorCodeValueRequired = 200
|
||||
)
|
||||
|
||||
var (
|
||||
EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound}
|
||||
EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed}
|
||||
EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist}
|
||||
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
|
||||
)
|
||||
|
||||
// EtcdClient is an injectable interface for testing.
|
||||
type EtcdClient interface {
|
||||
GetCluster() []string
|
||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
|
||||
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
|
||||
type EtcdGetSet interface {
|
||||
GetCluster() []string
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object
|
||||
// or list.
|
||||
type EtcdVersioner interface {
|
||||
// UpdateObject sets etcd storage metadata into an API object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from etcd.
|
||||
UpdateObject(obj runtime.Object, node *etcd.Node) error
|
||||
// UpdateList sets the resource version into an API list object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from etcd.
|
||||
UpdateList(obj runtime.Object, resourceVersion uint64) error
|
||||
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
|
||||
// Should return an error if the specified object does not have a persistable version.
|
||||
ObjectResourceVersion(obj runtime.Object) (uint64, error)
|
||||
}
|
|
@ -93,7 +93,7 @@ func TestExtractObj(t *testing.T) {
|
|||
|
||||
func TestWatch(t *testing.T) {
|
||||
client := newEtcdClient()
|
||||
helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(client, latest.Codec)
|
||||
withEtcdKey(func(key string) {
|
||||
resp, err := client.Set(key, runtime.EncodeOrDie(v1beta1.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue