diff --git a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json index 2f70977e25..9acff35e7f 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json @@ -1478,10 +1478,6 @@ "ImportPath": "k8s.io/apiserver/pkg/storage/etcd/testing/testingcert", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/apiserver/pkg/storage/etcd/util", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/apiserver/pkg/storage/etcd3", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/BUILD b/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/BUILD index 196a391b0a..bc4ad5fdf0 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/BUILD @@ -15,7 +15,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest/resttest:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/tester.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/tester.go index 1ee637f09d..5db5c49d3f 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/tester.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/testing/tester.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest/resttest" - etcdstorage "k8s.io/apiserver/pkg/storage/etcd" storagetesting "k8s.io/apiserver/pkg/storage/testing" ) @@ -136,7 +135,7 @@ func (t *Tester) TestWatch(valid runtime.Object, labelsPass, labelsFail []labels fieldsPass, fieldsFail, // TODO: This should be filtered, the registry should not be aware of this level of detail - []string{etcdstorage.EtcdCreate, etcdstorage.EtcdDelete}, + []string{"create", "delete"}, ) } @@ -185,9 +184,9 @@ func (t *Tester) emitObject(obj runtime.Object, action string) error { var err error switch action { - case etcdstorage.EtcdCreate: + case "create": err = t.createObject(ctx, obj) - case etcdstorage.EtcdDelete: + case "delete": var accessor metav1.Object accessor, err = meta.Accessor(obj) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index b6a32f6f16..b910cf5f64 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -60,7 +60,6 @@ type EtcdOptions struct { var storageTypes = sets.NewString( storagebackend.StorageTypeUnset, - storagebackend.StorageTypeETCD2, storagebackend.StorageTypeETCD3, ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go index bbdc4b9a05..113ac573cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go @@ -24,8 +24,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/recognizer" "k8s.io/apiserver/pkg/storage/storagebackend" - - "github.com/golang/glog" ) // StorageCodecConfig are the arguments passed to newStorageCodecFn @@ -48,11 +46,6 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) { return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType) } - if opts.Config.Type == storagebackend.StorageTypeETCD2 && mediaType != "application/json" { - glog.Warningf(`storage type %q does not support media type %q, using "application/json"`, storagebackend.StorageTypeETCD2, mediaType) - mediaType = "application/json" - } - serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType) if !ok { return nil, fmt.Errorf("unable to find serializer for %q", mediaType) @@ -60,11 +53,6 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) { s := serializer.Serializer - // make sure the selected encoder supports string data - if !serializer.EncodesAsText && opts.Config.Type == storagebackend.StorageTypeETCD2 { - return nil, fmt.Errorf("storage type %q does not support binary media type %q", storagebackend.StorageTypeETCD2, mediaType) - } - // Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will // be passed to the recognizer so that multiple decoders are available. var encoder runtime.Encoder = s diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd/BUILD index eea138675b..c6d0e60fd6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/BUILD @@ -8,33 +8,12 @@ load( go_test( name = "go_default_test", - srcs = [ - "api_object_versioner_test.go", - "etcd_helper_test.go", - "etcd_watcher_test.go", - ], + srcs = ["api_object_versioner_test.go"], embed = [":go_default_library"], deps = [ - "//staging/src/k8s.io/apimachinery/pkg/api/apitesting:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/etcd/etcdtest:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/tests:go_default_library", - "//vendor/github.com/coreos/etcd/client:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) @@ -43,26 +22,14 @@ go_library( srcs = [ "api_object_versioner.go", "doc.go", - "etcd_helper.go", - "etcd_watcher.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd", importpath = "k8s.io/apiserver/pkg/storage/etcd", deps = [ "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/cache:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/etcd/util:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/trace:go_default_library", - "//vendor/github.com/coreos/etcd/client:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go deleted file mode 100644 index c6bdd9c92c..0000000000 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go +++ /dev/null @@ -1,637 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package etcd - -import ( - "context" - "errors" - "fmt" - "path" - "reflect" - "time" - - etcd "github.com/coreos/etcd/client" - "github.com/golang/glog" - - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/conversion" - "k8s.io/apimachinery/pkg/runtime" - utilcache "k8s.io/apimachinery/pkg/util/cache" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd/metrics" - etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" - utiltrace "k8s.io/apiserver/pkg/util/trace" -) - -// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods -// must be able to undo the transformation caused by the other. -type ValueTransformer interface { - // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. - // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object - // have not changed. - TransformStringFromStorage(string) (value string, stale bool, err error) - // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. - TransformStringToStorage(string) (value string, err error) -} - -type identityTransformer struct{} - -func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) { - return s, false, nil -} -func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil } - -// IdentityTransformer performs no transformation on the provided values. -var IdentityTransformer ValueTransformer = identityTransformer{} - -// Creates a new storage interface from the client -// TODO: deprecate in favor of storage.Config abstraction over time -func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, transformer ValueTransformer) storage.Interface { - return &etcdHelper{ - etcdMembersAPI: etcd.NewMembersAPI(client), - etcdKeysAPI: etcd.NewKeysAPI(client), - codec: codec, - versioner: APIObjectVersioner{}, - transformer: transformer, - pathPrefix: path.Join("/", prefix), - quorum: quorum, - cache: utilcache.NewCache(cacheSize), - } -} - -// etcdHelper is the reference implementation of storage.Interface. -type etcdHelper struct { - etcdMembersAPI etcd.MembersAPI - etcdKeysAPI etcd.KeysAPI - codec runtime.Codec - transformer ValueTransformer - // Note that versioner is required for etcdHelper to work correctly. - // The public constructors (NewStorage & NewEtcdStorage) are setting it - // correctly, so be careful when manipulating with it manually. - // optional, has to be set to perform any atomic operations - versioner storage.Versioner - // prefix for all etcd keys - pathPrefix string - // if true, perform quorum read - quorum bool - - // We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent - // to resourceVersion. - // This depends on etcd's indexes being globally unique across all objects/types. This will - // have to revisited if we decide to do things like multiple etcd clusters, or etcd will - // support multi-object transaction that will result in many objects with the same index. - // Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant. - // TODO: Measure how much this cache helps after the conversion code is optimized. - cache utilcache.Cache -} - -// Implements storage.Interface. -func (h *etcdHelper) Versioner() storage.Versioner { - return h.versioner -} - -// Implements storage.Interface. -func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - trace := utiltrace.New("etcdHelper::Create " + getTypeName(obj)) - defer trace.LogIfLong(250 * time.Millisecond) - if ctx == nil { - glog.Errorf("Context is nil") - } - key = path.Join(h.pathPrefix, key) - data, err := runtime.Encode(h.codec, obj) - trace.Step("Object encoded") - if err != nil { - return err - } - if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { - return errors.New("resourceVersion may not be set on objects to be created") - } - if err := h.versioner.PrepareObjectForStorage(obj); err != nil { - return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err) - } - trace.Step("Version checked") - - startTime := time.Now() - opts := etcd.SetOptions{ - TTL: time.Duration(ttl) * time.Second, - PrevExist: etcd.PrevNoExist, - } - - newBody, err := h.transformer.TransformStringToStorage(string(data)) - if err != nil { - return storage.NewInternalError(err.Error()) - } - - response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts) - trace.Step("Object created") - metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) - if err != nil { - return toStorageErr(err, key, 0) - } - if out != nil { - if _, err := conversion.EnforcePtr(out); err != nil { - panic("unable to convert output object to pointer") - } - _, _, _, err = h.extractObj(response, err, out, false, false) - } - return err -} - -// Implements storage.Interface. -func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error { - if ctx == nil { - glog.Errorf("Context is nil") - } - key = path.Join(h.pathPrefix, key) - v, err := conversion.EnforcePtr(out) - if err != nil { - panic("unable to convert output object to pointer") - } - - if preconditions == nil { - startTime := time.Now() - response, err := h.etcdKeysAPI.Delete(ctx, key, nil) - metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) - if !etcdutil.IsEtcdNotFound(err) { - // if the object that existed prior to the delete is returned by etcd, update the out object. - if err != nil || response.PrevNode != nil { - _, _, _, err = h.extractObj(response, err, out, false, true) - } - } - return toStorageErr(err, key, 0) - } - - // Check the preconditions match. - obj := reflect.New(v.Type()).Interface().(runtime.Object) - for { - _, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false) - if err != nil { - return toStorageErr(err, key, 0) - } - if err := preconditions.Check(key, obj); err != nil { - return toStorageErr(err, key, 0) - } - index := uint64(0) - if node != nil { - index = node.ModifiedIndex - } else if res != nil { - index = res.Index - } - opt := etcd.DeleteOptions{PrevIndex: index} - startTime := time.Now() - response, err := h.etcdKeysAPI.Delete(ctx, key, &opt) - metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) - if !etcdutil.IsEtcdTestFailed(err) { - if !etcdutil.IsEtcdNotFound(err) { - // if the object that existed prior to the delete is returned by etcd, update the out object. - if err != nil || response.PrevNode != nil { - _, _, _, err = h.extractObj(response, err, out, false, true) - } - } - return toStorageErr(err, key, 0) - } - - glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) - } -} - -// Implements storage.Interface. -func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { - if ctx == nil { - glog.Errorf("Context is nil") - } - watchRV, err := h.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return nil, err - } - key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h) - go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) - return w, nil -} - -// Implements storage.Interface. -func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { - if ctx == nil { - glog.Errorf("Context is nil") - } - watchRV, err := h.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return nil, err - } - key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h) - go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) - return w, nil -} - -// Implements storage.Interface. -func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { - if ctx == nil { - glog.Errorf("Context is nil") - } - key = path.Join(h.pathPrefix, key) - _, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) - return err -} - -// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information -// about the response, like the current etcd index and the ttl. -func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) { - if ctx == nil { - glog.Errorf("Context is nil") - } - startTime := time.Now() - - opts := &etcd.GetOptions{ - Quorum: h.quorum, - } - - response, err := h.etcdKeysAPI.Get(ctx, key, opts) - metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) - if err != nil && !etcdutil.IsEtcdNotFound(err) { - return "", nil, nil, false, toStorageErr(err, key, 0) - } - body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) - return body, node, response, stale, toStorageErr(err, key, 0) -} - -func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) { - if response != nil { - if prevNode { - node = response.PrevNode - } else { - node = response.Node - } - } - if inErr != nil || node == nil || len(node.Value) == 0 { - if ignoreNotFound { - v, err := conversion.EnforcePtr(objPtr) - if err != nil { - return "", nil, false, err - } - v.Set(reflect.Zero(v.Type())) - return "", nil, false, nil - } else if inErr != nil { - return "", nil, false, inErr - } - return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response) - } - - body, stale, err = h.transformer.TransformStringFromStorage(node.Value) - if err != nil { - return body, nil, stale, storage.NewInternalError(err.Error()) - } - out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) - if err != nil { - return body, nil, stale, err - } - if out != objPtr { - return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) - } - // being unable to set the version does not prevent the object from being extracted - _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex) - return body, node, stale, err -} - -// Implements storage.Interface. -func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { - if ctx == nil { - glog.Errorf("Context is nil") - } - trace := utiltrace.New("GetToList " + getTypeName(listObj)) - listPtr, err := meta.GetItemsPtr(listObj) - if err != nil { - return err - } - key = path.Join(h.pathPrefix, key) - startTime := time.Now() - trace.Step("About to read etcd node") - - opts := &etcd.GetOptions{ - Quorum: h.quorum, - } - response, err := h.etcdKeysAPI.Get(ctx, key, opts) - trace.Step("Etcd node read") - metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) - if err != nil { - if etcdutil.IsEtcdNotFound(err) { - if etcdErr, ok := err.(etcd.Error); ok { - return h.versioner.UpdateList(listObj, etcdErr.Index, "") - } - return fmt.Errorf("unexpected error from storage: %#v", err) - } - return toStorageErr(err, key, 0) - } - - nodes := make([]*etcd.Node, 0) - nodes = append(nodes, response.Node) - - if err := h.decodeNodeList(nodes, pred, listPtr); err != nil { - return err - } - trace.Step("Object decoded") - if err := h.versioner.UpdateList(listObj, response.Index, ""); err != nil { - return err - } - return nil -} - -// decodeNodeList walks the tree of each node in the list and decodes into the specified object -func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error { - trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr)) - defer trace.LogIfLong(400 * time.Millisecond) - v, err := conversion.EnforcePtr(slicePtr) - if err != nil || v.Kind() != reflect.Slice { - // This should not happen at runtime. - panic("need ptr to slice") - } - for _, node := range nodes { - if node.Dir { - // IMPORTANT: do not log each key as a discrete step in the trace log - // as it produces an immense amount of log spam when there is a large - // amount of content in the list. - if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil { - return err - } - continue - } - if obj, found := h.getFromCache(node.ModifiedIndex, pred); found { - // obj != nil iff it matches the pred function. - if obj != nil { - v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) - } - } else { - body, _, err := h.transformer.TransformStringFromStorage(node.Value) - if err != nil { - // omit items from lists and watches that cannot be transformed, but log the error - utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err)) - continue - } - - obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) - if err != nil { - return err - } - // being unable to set the version does not prevent the object from being extracted - _ = h.versioner.UpdateObject(obj, node.ModifiedIndex) - if matched, err := pred.Matches(obj); err == nil && matched { - v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) - } - if node.ModifiedIndex != 0 { - h.addToCache(node.ModifiedIndex, obj) - } - } - } - trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes))) - return nil -} - -// Implements storage.Interface. -func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { - if ctx == nil { - glog.Errorf("Context is nil") - } - trace := utiltrace.New("List " + getTypeName(listObj)) - defer trace.LogIfLong(400 * time.Millisecond) - listPtr, err := meta.GetItemsPtr(listObj) - if err != nil { - return err - } - key = path.Join(h.pathPrefix, key) - startTime := time.Now() - trace.Step("About to list etcd node") - nodes, index, err := h.listEtcdNode(ctx, key) - trace.Step("Etcd node listed") - metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) - if err != nil { - return err - } - if err := h.decodeNodeList(nodes, pred, listPtr); err != nil { - return err - } - trace.Step("Node list decoded") - if err := h.versioner.UpdateList(listObj, index, ""); err != nil { - return err - } - return nil -} - -func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) { - if ctx == nil { - glog.Errorf("Context is nil") - } - opts := etcd.GetOptions{ - Recursive: true, - Sort: true, - Quorum: h.quorum, - } - result, err := h.etcdKeysAPI.Get(ctx, key, &opts) - if err != nil { - var index uint64 - if etcdError, ok := err.(etcd.Error); ok { - index = etcdError.Index - } - nodes := make([]*etcd.Node, 0) - if etcdutil.IsEtcdNotFound(err) { - return nodes, index, nil - } else { - return nodes, index, toStorageErr(err, key, 0) - } - } - return result.Node.Nodes, result.Index, nil -} - -// Implements storage.Interface. -func (h *etcdHelper) GuaranteedUpdate( - ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, - preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error { - // Ignore the suggestion about current object. - if ctx == nil { - glog.Errorf("Context is nil") - } - v, err := conversion.EnforcePtr(ptrToType) - if err != nil { - // Panic is appropriate, because this is a programming error. - panic("need ptr to type") - } - key = path.Join(h.pathPrefix, key) - for { - obj := reflect.New(v.Type()).Interface().(runtime.Object) - origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound) - if err != nil { - return toStorageErr(err, key, 0) - } - if err := preconditions.Check(key, obj); err != nil { - return toStorageErr(err, key, 0) - } - meta := storage.ResponseMeta{} - if node != nil { - meta.TTL = node.TTL - meta.ResourceVersion = node.ModifiedIndex - } - // Get the object to be written by calling tryUpdate. - ret, newTTL, err := tryUpdate(obj, meta) - if err != nil { - return toStorageErr(err, key, 0) - } - - index := uint64(0) - ttl := uint64(0) - if node != nil { - index = node.ModifiedIndex - if node.TTL != 0 { - ttl = uint64(node.TTL) - } - if node.Expiration != nil && ttl == 0 { - ttl = 1 - } - } else if res != nil { - index = res.Index - } - - if newTTL != nil { - if ttl != 0 && *newTTL == 0 { - // TODO: remove this after we have verified this is no longer an issue - glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key) - } - ttl = *newTTL - } - - // Since update object may have a resourceVersion set, we need to clear it here. - if err := h.versioner.PrepareObjectForStorage(ret); err != nil { - return errors.New("resourceVersion cannot be set on objects store in etcd") - } - - newBodyData, err := runtime.Encode(h.codec, ret) - if err != nil { - return err - } - newBody := string(newBodyData) - data, err := h.transformer.TransformStringToStorage(newBody) - if err != nil { - return storage.NewInternalError(err.Error()) - } - - // First time this key has been used, try creating new value. - if index == 0 { - startTime := time.Now() - opts := etcd.SetOptions{ - TTL: time.Duration(ttl) * time.Second, - PrevExist: etcd.PrevNoExist, - } - response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts) - metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) - if etcdutil.IsEtcdNodeExist(err) { - continue - } - _, _, _, err = h.extractObj(response, err, ptrToType, false, false) - return toStorageErr(err, key, 0) - } - - // If we don't send an update, we simply return the currently existing - // version of the object. However, the value transformer may indicate that - // the on disk representation has changed and that we must commit an update. - if newBody == origBody && !stale { - _, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) - return err - } - - startTime := time.Now() - // Swap origBody with data, if origBody is the latest etcd data. - opts := etcd.SetOptions{ - PrevIndex: index, - TTL: time.Duration(ttl) * time.Second, - } - response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts) - metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) - if etcdutil.IsEtcdTestFailed(err) { - // Try again. - continue - } - _, _, _, err = h.extractObj(response, err, ptrToType, false, false) - return toStorageErr(err, key, int64(index)) - } -} - -func (*etcdHelper) Count(pathPerfix string) (int64, error) { - return 0, fmt.Errorf("Count is unimplemented for etcd2!") -} - -// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by -// their Node.ModifiedIndex, which is unique across all types. -// All implementations must be thread-safe. -type etcdCache interface { - getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) - addToCache(index uint64, obj runtime.Object) -} - -func getTypeName(obj interface{}) string { - return reflect.TypeOf(obj).String() -} - -func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) { - startTime := time.Now() - defer func() { - metrics.ObserveGetCache(startTime) - }() - obj, found := h.cache.Get(index) - if found { - if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched { - return nil, true - } - // We should not return the object itself to avoid polluting the cache if someone - // modifies returned values. - objCopy := obj.(runtime.Object).DeepCopyObject() - metrics.ObserveCacheHit() - return objCopy.(runtime.Object), true - } - metrics.ObserveCacheMiss() - return nil, false -} - -func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) { - startTime := time.Now() - defer func() { - metrics.ObserveAddCache(startTime) - }() - objCopy := obj.DeepCopyObject() - isOverwrite := h.cache.Add(index, objCopy) - if !isOverwrite { - metrics.ObserveNewEntry() - } -} - -func toStorageErr(err error, key string, rv int64) error { - if err == nil { - return nil - } - switch { - case etcdutil.IsEtcdNotFound(err): - return storage.NewKeyNotFoundError(key, rv) - case etcdutil.IsEtcdNodeExist(err): - return storage.NewKeyExistsError(key, rv) - case etcdutil.IsEtcdTestFailed(err): - return storage.NewResourceVersionConflictsError(key, rv) - case etcdutil.IsEtcdUnreachable(err): - return storage.NewUnreachableError(key, rv) - default: - return err - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go deleted file mode 100644 index a695fe253a..0000000000 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go +++ /dev/null @@ -1,747 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package etcd - -import ( - "context" - "fmt" - "path" - "reflect" - "strings" - "sync" - "testing" - "time" - - etcd "github.com/coreos/etcd/client" - "github.com/stretchr/testify/require" - - apitesting "k8s.io/apimachinery/pkg/api/apitesting" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/conversion" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/apimachinery/pkg/util/diff" - "k8s.io/apiserver/pkg/apis/example" - examplev1 "k8s.io/apiserver/pkg/apis/example/v1" - "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd/etcdtest" - etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" - storagetesting "k8s.io/apiserver/pkg/storage/testing" - storagetests "k8s.io/apiserver/pkg/storage/tests" -) - -// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. -type prefixTransformer struct { - prefix string - stale bool - err error -} - -func (p prefixTransformer) TransformStringFromStorage(s string) (string, bool, error) { - if !strings.HasPrefix(s, p.prefix) { - return "", false, fmt.Errorf("value does not have expected prefix: %s", s) - } - return strings.TrimPrefix(s, p.prefix), p.stale, p.err -} -func (p prefixTransformer) TransformStringToStorage(s string) (string, error) { - if len(s) > 0 { - return p.prefix + s, p.err - } - return s, p.err -} - -func defaultPrefix(s string) string { - return "test!" + s -} - -func defaultPrefixValue(value []byte) string { - return defaultPrefix(string(value)) -} - -func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) { - scheme := runtime.NewScheme() - scheme.Log(t) - scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{}) - require.NoError(t, example.AddToScheme(scheme)) - require.NoError(t, examplev1.AddToScheme(scheme)) - require.NoError(t, scheme.AddConversionFuncs( - func(in *storagetesting.TestResource, out *storagetesting.TestResource, s conversion.Scope) error { - *out = *in - return nil - }, - func(in, out *time.Time, s conversion.Scope) error { - *out = *in - return nil - }, - )) - codecs := serializer.NewCodecFactory(scheme) - return scheme, codecs -} - -func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper { - return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, prefixTransformer{prefix: "test!"}).(*etcdHelper) -} - -func createObj(t *testing.T, helper etcdHelper, name string, obj, out runtime.Object, ttl uint64) error { - err := helper.Create(context.TODO(), name, obj, out, ttl) - if err != nil { - t.Errorf("Unexpected error %v", err) - } - return err -} - -func createPodList(t *testing.T, helper etcdHelper, list *example.PodList) error { - for i := range list.Items { - returnedObj := &example.Pod{} - err := createObj(t, helper, list.Items[i].Name, &list.Items[i], returnedObj, 0) - if err != nil { - return err - } - list.Items[i] = *returnedObj - } - return nil -} - -func TestList(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - list := example.PodList{ - Items: []example.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "baz"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - }, - } - - createPodList(t, helper, &list) - var got example.PodList - // TODO: a sorted filter function could be applied such implied - // ordering on the returned list doesn't matter. - err := helper.List(context.TODO(), "/", "", storage.Everything, &got) - if err != nil { - t.Errorf("Unexpected error %v", err) - } - - if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %#v, got %#v", e, a) - } -} - -func TestTransformationFailure(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - pods := []example.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "baz"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - } - createPodList(t, helper, &example.PodList{Items: pods[:1]}) - - // create a second resource with an invalid prefix - oldTransformer := helper.transformer - helper.transformer = prefixTransformer{prefix: "otherprefix!"} - createPodList(t, helper, &example.PodList{Items: pods[1:]}) - helper.transformer = oldTransformer - - // only the first item is returned, and no error - var got example.PodList - if err := helper.List(context.TODO(), "/", "", storage.Everything, &got); err != nil { - t.Errorf("Unexpected error %v", err) - } - if e, a := pods[:1], got.Items; !reflect.DeepEqual(e, a) { - t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a)) - } - - // Get should fail - if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsInternalError(err) { - t.Errorf("Unexpected error: %v", err) - } - // GuaranteedUpdate should return an error - if err := helper.GuaranteedUpdate(context.TODO(), "/baz", &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { - return input, nil, nil - }, &pods[1]); !storage.IsInternalError(err) { - t.Errorf("Unexpected error: %v", err) - } - - // Delete succeeds but reports an error because we cannot access the body - if err := helper.Delete(context.TODO(), "/baz", &example.Pod{}, nil); !storage.IsInternalError(err) { - t.Errorf("Unexpected error: %v", err) - } - - if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsNotFound(err) { - t.Errorf("Unexpected error: %v", err) - } -} - -func TestListFiltered(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - list := example.PodList{ - Items: []example.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "baz"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - }, - } - - createPodList(t, helper, &list) - // List only "bar" pod - p := storage.SelectionPredicate{ - Label: labels.Everything(), - Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, - } - var got example.PodList - err := helper.List(context.TODO(), "/", "", p, &got) - if err != nil { - t.Errorf("Unexpected error %v", err) - } - // Check to make certain that the filter function only returns "bar" - if e, a := list.Items[0], got.Items[0]; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %#v, got %#v", e, a) - } -} - -// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query -func TestListAcrossDirectories(t *testing.T) { - _, codecs := testScheme(t) - server := etcdtesting.NewEtcdTestClientServer(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - defer server.Terminate(t) - - roothelper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - helper1 := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()+"/dir1") - helper2 := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()+"/dir2") - - list := example.PodList{ - Items: []example.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "baz"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: storagetests.DeepEqualSafePodSpec(), - }, - }, - } - - returnedObj := &example.Pod{} - // create the 1st 2 elements in one directory - createObj(t, helper1, list.Items[0].Name, &list.Items[0], returnedObj, 0) - list.Items[0] = *returnedObj - createObj(t, helper1, list.Items[1].Name, &list.Items[1], returnedObj, 0) - list.Items[1] = *returnedObj - // create the last element in the other directory - createObj(t, helper2, list.Items[2].Name, &list.Items[2], returnedObj, 0) - list.Items[2] = *returnedObj - - var got example.PodList - err := roothelper.List(context.TODO(), "/", "", storage.Everything, &got) - if err != nil { - t.Errorf("Unexpected error %v", err) - } - if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %#v, got %#v", e, a) - } -} - -func TestGet(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - expect := example.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: storagetests.DeepEqualSafePodSpec(), - } - var got example.Pod - if err := helper.Create(context.TODO(), key, &expect, &got, 0); err != nil { - t.Errorf("Unexpected error %#v", err) - } - expect = got - if err := helper.Get(context.TODO(), key, "", &got, false); err != nil { - t.Errorf("Unexpected error %#v", err) - } - if !reflect.DeepEqual(got, expect) { - t.Errorf("Wanted %#v, got %#v", expect, got) - } -} - -func TestGetToList(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - storedObj := &example.Pod{} - if err := helper.Create(context.TODO(), key, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, storedObj, 0); err != nil { - t.Errorf("Unexpected error %#v", err) - } - - tests := []struct { - key string - pred storage.SelectionPredicate - expectedOut []*example.Pod - }{{ // test GetToList on existing key - key: key, - pred: storage.Everything, - expectedOut: []*example.Pod{storedObj}, - }, { // test GetToList on non-existing key - key: "/non-existing", - pred: storage.Everything, - expectedOut: nil, - }, { // test GetToList with matching pod name - key: "/non-existing", - pred: storage.SelectionPredicate{ - Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, - }, - expectedOut: nil, - }} - - for i, tt := range tests { - out := &example.PodList{} - err := helper.GetToList(context.TODO(), tt.key, "", tt.pred, out) - if err != nil { - t.Fatalf("GetToList failed: %v", err) - } - if len(out.ResourceVersion) == 0 { - t.Errorf("#%d: unset resourceVersion", i) - } - if len(out.Items) != len(tt.expectedOut) { - t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) - continue - } - for j, wantPod := range tt.expectedOut { - getPod := &out.Items[j] - if !reflect.DeepEqual(wantPod, getPod) { - t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) - } - } - } -} - -func TestGetNotFoundErr(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"}) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - boguskey := "/some/boguskey" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - var got example.Pod - err := helper.Get(context.TODO(), boguskey, "", &got, false) - if !storage.IsNotFound(err) { - t.Errorf("Unexpected response on key=%v, err=%v", boguskey, err) - } -} - -func TestCreate(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - returnedObj := &example.Pod{} - err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - _, err = runtime.Encode(codec, obj) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - err = helper.Get(context.TODO(), "/some/key", "", returnedObj, false) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - _, err = runtime.Encode(codec, returnedObj) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - if obj.Name != returnedObj.Name { - t.Errorf("Wanted %v, got %v", obj.Name, returnedObj.Name) - } -} - -func TestCreateNilOutParam(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - err := helper.Create(context.TODO(), "/some/key", obj, nil, 5) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } -} - -func TestGuaranteedUpdate(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - return obj, nil - })) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - - // Update an existing node. - callbackCalled := false - objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - callbackCalled = true - - if in.(*storagetesting.TestResource).Value != 1 { - t.Errorf("Callback input was not current set value") - } - - return objUpdate, nil - })) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - objCheck := &storagetesting.TestResource{} - err = helper.Get(context.TODO(), key, "", objCheck, false) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - if objCheck.Value != 2 { - t.Errorf("Value should have been 2 but got %v", objCheck.Value) - } - - if !callbackCalled { - t.Errorf("tryUpdate callback should have been called.") - } -} - -func TestGuaranteedUpdateNoChange(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} - original := &storagetesting.TestResource{} - err := helper.GuaranteedUpdate(context.TODO(), key, original, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - return obj, nil - })) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - - // Update an existing node with the same data - callbackCalled := false - objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: original.ResourceVersion}, Value: 1} - result := &storagetesting.TestResource{} - err = helper.GuaranteedUpdate(context.TODO(), key, result, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - callbackCalled = true - return objUpdate, nil - })) - if err != nil { - t.Fatalf("Unexpected error %#v", err) - } - if !callbackCalled { - t.Errorf("tryUpdate callback should have been called.") - } - if result.ResourceVersion != original.ResourceVersion { - t.Fatalf("updated the object resource version") - } - - // Update an existing node with the same data but return stale - helper.transformer = prefixTransformer{prefix: "test!", stale: true} - callbackCalled = false - result = &storagetesting.TestResource{} - objUpdate = &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} - err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - callbackCalled = true - return objUpdate, nil - })) - if err != nil { - t.Fatalf("Unexpected error %#v", err) - } - if !callbackCalled { - t.Errorf("tryUpdate callback should have been called.") - } - if result.ResourceVersion == original.ResourceVersion { - t.Errorf("did not update the object resource version") - } -} - -func TestGuaranteedUpdateKeyNotFound(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - // Create a new node. - obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} - - f := storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - return obj, nil - }) - - ignoreNotFound := false - err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, nil, f) - if err == nil { - t.Errorf("Expected error for key not found.") - } - - ignoreNotFound = true - err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, nil, f) - if err != nil { - t.Errorf("Unexpected error %v.", err) - } -} - -func TestGuaranteedUpdate_CreateCollision(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - const concurrency = 10 - var wgDone sync.WaitGroup - var wgForceCollision sync.WaitGroup - wgDone.Add(concurrency) - wgForceCollision.Add(concurrency) - - for i := 0; i < concurrency; i++ { - // Increment storagetesting.TestResource.Value by 1 - go func() { - defer wgDone.Done() - - firstCall := true - err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - defer func() { firstCall = false }() - - if firstCall { - // Force collision by joining all concurrent GuaranteedUpdate operations here. - wgForceCollision.Done() - wgForceCollision.Wait() - } - - currValue := in.(*storagetesting.TestResource).Value - obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: currValue + 1} - return obj, nil - })) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - }() - } - wgDone.Wait() - - stored := &storagetesting.TestResource{} - err := helper.Get(context.TODO(), key, "", stored, false) - if err != nil { - t.Errorf("Unexpected error %#v", stored) - } - if stored.Value != concurrency { - t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value) - } -} - -func TestGuaranteedUpdateUIDMismatch(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - prefix := path.Join("/", etcdtest.PathPrefix()) - helper := newEtcdHelper(server.Client, codec, prefix) - - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} - podPtr := &example.Pod{} - err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0) - if err != nil { - t.Fatalf("Unexpected error %#v", err) - } - err = helper.GuaranteedUpdate(context.TODO(), "/some/key", podPtr, true, storage.NewUIDPreconditions("B"), storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { - return obj, nil - })) - if !storage.IsInvalidObj(err) { - t.Fatalf("Expect a Test Failed (write conflict) error, got: %v", err) - } -} - -func TestDeleteUIDMismatch(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - prefix := path.Join("/", etcdtest.PathPrefix()) - helper := newEtcdHelper(server.Client, codec, prefix) - - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} - podPtr := &example.Pod{} - err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0) - if err != nil { - t.Fatalf("Unexpected error %#v", err) - } - err = helper.Delete(context.TODO(), "/some/key", obj, storage.NewUIDPreconditions("B")) - if !storage.IsInvalidObj(err) { - t.Fatalf("Expect a Test Failed (write conflict) error, got: %v", err) - } -} - -type getFunc func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) - -type fakeDeleteKeysAPI struct { - etcd.KeysAPI - fakeGetFunc getFunc - getCount int - // The fakeGetFunc will be called fakeGetCap times before the KeysAPI's Get will be called. - fakeGetCap int -} - -func (f *fakeDeleteKeysAPI) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) { - f.getCount++ - if f.getCount < f.fakeGetCap { - return f.fakeGetFunc(ctx, key, opts) - } - return f.KeysAPI.Get(ctx, key, opts) -} - -// This is to emulate the case where another party updates the object when -// etcdHelper.Delete has verified the preconditions, but hasn't carried out the -// deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper -// should retry until there is no conflict. -func TestDeleteWithRetry(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - prefix := path.Join("/", etcdtest.PathPrefix()) - - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} - // fakeGet returns a large ModifiedIndex to emulate the case that another - // party has updated the object. - fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) { - data, _ := runtime.Encode(codec, obj) - return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil - } - expectedRetries := 3 - helper := newEtcdHelper(server.Client, codec, prefix) - fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet} - helper.etcdKeysAPI = fake - - returnedObj := &example.Pod{} - err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 0) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - - err = helper.Delete(context.TODO(), "/some/key", obj, storage.NewUIDPreconditions("A")) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - if fake.getCount != expectedRetries { - t.Errorf("Expect %d retries, got %d", expectedRetries, fake.getCount) - } - err = helper.Get(context.TODO(), "/some/key", "", obj, false) - if !storage.IsNotFound(err) { - t.Errorf("Expect an NotFound error, got %v", err) - } -} - -func TestPrefix(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - - testcases := map[string]string{ - "custom/prefix": "/custom/prefix", - "/custom//prefix//": "/custom/prefix", - "/registry": "/registry", - } - for configuredPrefix, effectivePrefix := range testcases { - helper := newEtcdHelper(server.Client, codec, configuredPrefix) - if helper.pathPrefix != effectivePrefix { - t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, helper.pathPrefix) - } - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go deleted file mode 100644 index d3a2e3c53a..0000000000 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go +++ /dev/null @@ -1,496 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package etcd - -import ( - "context" - "fmt" - "net/http" - "reflect" - "sync" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/storage" - etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" - - etcd "github.com/coreos/etcd/client" - "github.com/golang/glog" -) - -// Etcd watch event actions -const ( - EtcdCreate = "create" - EtcdGet = "get" - EtcdSet = "set" - EtcdCAS = "compareAndSwap" - EtcdDelete = "delete" - EtcdCAD = "compareAndDelete" - EtcdExpire = "expire" -) - -// TransformFunc attempts to convert an object to another object for use with a watcher. -type TransformFunc func(runtime.Object) (runtime.Object, error) - -// includeFunc returns true if the given key should be considered part of a watch -type includeFunc func(key string) bool - -// exceptKey is an includeFunc that returns false when the provided key matches the watched key -func exceptKey(except string) includeFunc { - return func(key string) bool { - return key != except - } -} - -// etcdWatcher converts a native etcd watch to a watch.Interface. -type etcdWatcher struct { - // HighWaterMarks for performance debugging. - // Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms - // See: https://golang.org/pkg/sync/atomic/ for more information - incomingHWM storage.HighWaterMark - outgoingHWM storage.HighWaterMark - - encoding runtime.Codec - // Note that versioner is required for etcdWatcher to work correctly. - // There is no public constructor of it, so be careful when manipulating - // with it manually. - versioner storage.Versioner - transform TransformFunc - valueTransformer ValueTransformer - - list bool // If we're doing a recursive watch, should be true. - quorum bool // If we enable quorum, should be true - include includeFunc - pred storage.SelectionPredicate - - etcdIncoming chan *etcd.Response - etcdError chan error - ctx context.Context - cancel context.CancelFunc - etcdCallEnded chan struct{} - - outgoing chan watch.Event - userStop chan struct{} - stopped bool - stopLock sync.Mutex - // wg is used to avoid calls to etcd after Stop(), and to make sure - // that the translate goroutine is not leaked. - wg sync.WaitGroup - - // Injectable for testing. Send the event down the outgoing channel. - emit func(watch.Event) - - cache etcdCache -} - -// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. -// The versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, quorum bool, include includeFunc, pred storage.SelectionPredicate, - encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, - valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher { - w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - valueTransformer: valueTransformer, - - list: list, - quorum: quorum, - include: include, - pred: pred, - // Buffer this channel, so that the etcd client is not forced - // to context switch with every object it gets, and so that a - // long time spent decoding an object won't block the *next* - // object. Basically, we see a lot of "401 window exceeded" - // errors from etcd, and that's due to the client not streaming - // results but rather getting them one at a time. So we really - // want to never block the etcd client, if possible. The 100 is - // mostly arbitrary--we know it goes as high as 50, though. - // There's a V(2) log message that prints the length so we can - // monitor how much of this buffer is actually used. - etcdIncoming: make(chan *etcd.Response, 100), - etcdError: make(chan error, 1), - // Similarly to etcdIncomming, we don't want to force context - // switch on every new incoming object. - outgoing: make(chan watch.Event, 100), - userStop: make(chan struct{}), - stopped: false, - wg: sync.WaitGroup{}, - cache: cache, - ctx: nil, - cancel: nil, - } - w.emit = func(e watch.Event) { - if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) { - // Monitor if this gets backed up, and how much. - glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen) - } - // Give up on user stop, without this we leak a lot of goroutines in tests. - select { - case w.outgoing <- e: - case <-w.userStop: - } - } - // translate will call done. We need to Add() here because otherwise, - // if Stop() gets called before translate gets started, there'd be a - // problem. - w.wg.Add(1) - go w.translate() - return w -} - -// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called -// as a goroutine. -func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) { - defer utilruntime.HandleCrash() - defer close(w.etcdError) - defer close(w.etcdIncoming) - - // All calls to etcd are coming from this function - once it is finished - // no other call to etcd should be generated by this watcher. - done := func() {} - - // We need to be prepared, that Stop() can be called at any time. - // It can potentially also be called, even before this function is called. - // If that is the case, we simply skip all the code here. - // See #18928 for more details. - var watcher etcd.Watcher - returned := func() bool { - w.stopLock.Lock() - defer w.stopLock.Unlock() - if w.stopped { - // Watcher has already been stopped - don't event initiate it here. - return true - } - w.wg.Add(1) - done = w.wg.Done - // Perform initialization of watcher under lock - we want to avoid situation when - // Stop() is called in the meantime (which in tests can cause etcd termination and - // strange behavior here). - if resourceVersion == 0 { - latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming) - if err != nil { - w.etcdError <- err - return true - } - resourceVersion = latest - } - - opts := etcd.WatcherOptions{ - Recursive: w.list, - AfterIndex: resourceVersion, - } - watcher = client.Watcher(key, &opts) - w.ctx, w.cancel = context.WithCancel(ctx) - return false - }() - defer done() - if returned { - return - } - - for { - resp, err := watcher.Next(w.ctx) - if err != nil { - w.etcdError <- err - return - } - w.etcdIncoming <- resp - } -} - -// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent -func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { - opts := etcd.GetOptions{ - Recursive: recursive, - Sort: false, - Quorum: quorum, - } - resp, err := client.Get(ctx, key, &opts) - if err != nil { - if !etcdutil.IsEtcdNotFound(err) { - utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)) - return resourceVersion, toStorageErr(err, key, 0) - } - if etcdError, ok := err.(etcd.Error); ok { - resourceVersion = etcdError.Index - } - return resourceVersion, nil - } - resourceVersion = resp.Index - convertRecursiveResponse(resp.Node, resp, incoming) - return -} - -// convertRecursiveResponse turns a recursive get response from etcd into individual response objects -// by copying the original response. This emulates the behavior of a recursive watch. -func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) { - if node.Dir { - for i := range node.Nodes { - convertRecursiveResponse(node.Nodes[i], response, incoming) - } - return - } - copied := *response - copied.Action = "get" - copied.Node = node - incoming <- &copied -} - -// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be -// called as a goroutine. -func (w *etcdWatcher) translate() { - defer w.wg.Done() - defer close(w.outgoing) - defer utilruntime.HandleCrash() - - for { - select { - case err := <-w.etcdError: - if err != nil { - var status *metav1.Status - switch { - case etcdutil.IsEtcdWatchExpired(err): - status = &metav1.Status{ - Status: metav1.StatusFailure, - Message: err.Error(), - Code: http.StatusGone, // Gone - Reason: metav1.StatusReasonExpired, - } - // TODO: need to generate errors using api/errors which has a circular dependency on this package - // no other way to inject errors - // case etcdutil.IsEtcdUnreachable(err): - // status = errors.NewServerTimeout(...) - default: - status = &metav1.Status{ - Status: metav1.StatusFailure, - Message: err.Error(), - Code: http.StatusInternalServerError, - Reason: metav1.StatusReasonInternalError, - } - } - w.emit(watch.Event{ - Type: watch.Error, - Object: status, - }) - } - return - case <-w.userStop: - return - case res, ok := <-w.etcdIncoming: - if ok { - if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) { - // Monitor if this gets backed up, and how much. - glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen) - } - w.sendResult(res) - } - // If !ok, don't return here-- must wait for etcdError channel - // to give an error or be closed. - } - } -} - -// decodeObject extracts an object from the provided etcd node or returns an error. -func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { - if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found { - return obj, nil - } - - body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value) - if err != nil { - return nil, err - } - - obj, err := runtime.Decode(w.encoding, []byte(body)) - if err != nil { - return nil, err - } - - // ensure resource version is set on the object we load from etcd - if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil { - utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) - } - - // perform any necessary transformation - if w.transform != nil { - obj, err = w.transform(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err)) - return nil, err - } - } - - if node.ModifiedIndex != 0 { - w.cache.addToCache(node.ModifiedIndex, obj) - } - return obj, nil -} - -func (w *etcdWatcher) sendAdd(res *etcd.Response) { - if res.Node == nil { - utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res)) - return - } - if w.include != nil && !w.include(res.Node.Key) { - return - } - obj, err := w.decodeObject(res.Node) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) - // TODO: expose an error through watch.Interface? - // Ignore this value. If we stop the watch on a bad value, a client that uses - // the resourceVersion to resume will never be able to get past a bad value. - return - } - if matched, err := w.pred.Matches(obj); err != nil || !matched { - return - } - action := watch.Added - w.emit(watch.Event{ - Type: action, - Object: obj, - }) -} - -func (w *etcdWatcher) sendModify(res *etcd.Response) { - if res.Node == nil { - glog.Errorf("unexpected nil node: %#v", res) - return - } - if w.include != nil && !w.include(res.Node.Key) { - return - } - curObj, err := w.decodeObject(res.Node) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) - // TODO: expose an error through watch.Interface? - // Ignore this value. If we stop the watch on a bad value, a client that uses - // the resourceVersion to resume will never be able to get past a bad value. - return - } - curObjPasses := false - if matched, err := w.pred.Matches(curObj); err == nil && matched { - curObjPasses = true - } - oldObjPasses := false - var oldObj runtime.Object - if res.PrevNode != nil && res.PrevNode.Value != "" { - // Ignore problems reading the old object. - if oldObj, err = w.decodeObject(res.PrevNode); err == nil { - if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil { - utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err)) - } - if matched, err := w.pred.Matches(oldObj); err == nil && matched { - oldObjPasses = true - } - } - } - // Some changes to an object may cause it to start or stop matching a pred. - // We need to report those as adds/deletes. So we have to check both the previous - // and current value of the object. - switch { - case curObjPasses && oldObjPasses: - w.emit(watch.Event{ - Type: watch.Modified, - Object: curObj, - }) - case curObjPasses && !oldObjPasses: - w.emit(watch.Event{ - Type: watch.Added, - Object: curObj, - }) - case !curObjPasses && oldObjPasses: - w.emit(watch.Event{ - Type: watch.Deleted, - Object: oldObj, - }) - } - // Do nothing if neither new nor old object passed the pred. -} - -func (w *etcdWatcher) sendDelete(res *etcd.Response) { - if res.PrevNode == nil { - utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res)) - return - } - if w.include != nil && !w.include(res.PrevNode.Key) { - return - } - node := *res.PrevNode - if res.Node != nil { - // Note that this sends the *old* object with the etcd index for the time at - // which it gets deleted. This will allow users to restart the watch at the right - // index. - node.ModifiedIndex = res.Node.ModifiedIndex - } - obj, err := w.decodeObject(&node) - if err != nil { - utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node)) - // TODO: expose an error through watch.Interface? - // Ignore this value. If we stop the watch on a bad value, a client that uses - // the resourceVersion to resume will never be able to get past a bad value. - return - } - if matched, err := w.pred.Matches(obj); err != nil || !matched { - return - } - w.emit(watch.Event{ - Type: watch.Deleted, - Object: obj, - }) -} - -func (w *etcdWatcher) sendResult(res *etcd.Response) { - switch res.Action { - case EtcdCreate, EtcdGet: - // "Get" will only happen in watch 0 case, where we explicitly want ADDED event - // for initial state. - w.sendAdd(res) - case EtcdSet, EtcdCAS: - w.sendModify(res) - case EtcdDelete, EtcdExpire, EtcdCAD: - w.sendDelete(res) - default: - utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action)) - } -} - -// ResultChan implements watch.Interface. -func (w *etcdWatcher) ResultChan() <-chan watch.Event { - return w.outgoing -} - -// Stop implements watch.Interface. -func (w *etcdWatcher) Stop() { - w.stopLock.Lock() - if w.cancel != nil { - w.cancel() - w.cancel = nil - } - if !w.stopped { - w.stopped = true - close(w.userStop) - } - w.stopLock.Unlock() - - // Wait until all calls to etcd are finished and no other - // will be issued. - w.wg.Wait() -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go deleted file mode 100644 index 38ccb308ee..0000000000 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go +++ /dev/null @@ -1,579 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package etcd - -import ( - "context" - rt "runtime" - "testing" - - apitesting "k8s.io/apimachinery/pkg/api/apitesting" - apiequality "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/apis/example" - examplev1 "k8s.io/apiserver/pkg/apis/example/v1" - "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd/etcdtest" - etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" - - etcd "github.com/coreos/etcd/client" -) - -var versioner = APIObjectVersioner{} - -// Implements etcdCache interface as empty methods (i.e. does not cache any objects) -type fakeEtcdCache struct{} - -func (f *fakeEtcdCache) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) { - return nil, false -} - -func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) { -} - -var _ etcdCache = &fakeEtcdCache{} - -func TestWatchInterpretations(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - // Declare some pods to make the test cases compact. - podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} - podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}} - - // All of these test cases will be run with the firstLetterIsB SelectionPredicate. - table := map[string]struct { - actions []string // Run this test item for every action here. - prevNodeValue string - nodeValue string - expectEmit bool - expectType watch.EventType - expectObject runtime.Object - }{ - "create": { - actions: []string{"create", "get"}, - nodeValue: runtime.EncodeOrDie(codec, podBar), - expectEmit: true, - expectType: watch.Added, - expectObject: podBar, - }, - "create but filter blocks": { - actions: []string{"create", "get"}, - nodeValue: runtime.EncodeOrDie(codec, podFoo), - expectEmit: false, - }, - "delete": { - actions: []string{"delete"}, - prevNodeValue: runtime.EncodeOrDie(codec, podBar), - expectEmit: true, - expectType: watch.Deleted, - expectObject: podBar, - }, - "delete but filter blocks": { - actions: []string{"delete"}, - nodeValue: runtime.EncodeOrDie(codec, podFoo), - expectEmit: false, - }, - "modify appears to create 1": { - actions: []string{"set", "compareAndSwap"}, - nodeValue: runtime.EncodeOrDie(codec, podBar), - expectEmit: true, - expectType: watch.Added, - expectObject: podBar, - }, - "modify appears to create 2": { - actions: []string{"set", "compareAndSwap"}, - prevNodeValue: runtime.EncodeOrDie(codec, podFoo), - nodeValue: runtime.EncodeOrDie(codec, podBar), - expectEmit: true, - expectType: watch.Added, - expectObject: podBar, - }, - "modify appears to delete": { - actions: []string{"set", "compareAndSwap"}, - prevNodeValue: runtime.EncodeOrDie(codec, podBar), - nodeValue: runtime.EncodeOrDie(codec, podFoo), - expectEmit: true, - expectType: watch.Deleted, - expectObject: podBar, // Should return last state that passed the filter! - }, - "modify modifies": { - actions: []string{"set", "compareAndSwap"}, - prevNodeValue: runtime.EncodeOrDie(codec, podBar), - nodeValue: runtime.EncodeOrDie(codec, podBaz), - expectEmit: true, - expectType: watch.Modified, - expectObject: podBaz, - }, - "modify ignores": { - actions: []string{"set", "compareAndSwap"}, - nodeValue: runtime.EncodeOrDie(codec, podFoo), - expectEmit: false, - }, - } - - // Should use fieldSelector here. - // But for the sake of tests (simplifying the codes), use labelSelector to support set-based requirements - selector, err := labels.Parse("metadata.name in (bar, baz)") - if err != nil { - t.Fatal(err) - } - firstLetterIsB := storage.SelectionPredicate{ - Label: selector, - Field: fields.Everything(), - IncludeUninitialized: true, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return labels.Set{"metadata.name": pod.Name}, nil, pod.Initializers != nil, nil - }, - } - for name, item := range table { - for _, action := range item.actions { - w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) - emitCalled := false - w.emit = func(event watch.Event) { - emitCalled = true - if !item.expectEmit { - return - } - if e, a := item.expectType, event.Type; e != a { - t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) - } - if e, a := item.expectObject, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { - t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) - } - } - - var n, pn *etcd.Node - if item.nodeValue != "" { - n = &etcd.Node{Value: defaultPrefix(item.nodeValue)} - } - if item.prevNodeValue != "" { - pn = &etcd.Node{Value: defaultPrefix(item.prevNodeValue)} - } - - w.sendResult(&etcd.Response{ - Action: action, - Node: n, - PrevNode: pn, - }) - - if e, a := item.expectEmit, emitCalled; e != a { - t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) - } - w.Stop() - } - } -} - -func TestWatchInterpretation_ResponseNotSet(t *testing.T) { - _, codecs := testScheme(t) - codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) - w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) - } - - w.sendResult(&etcd.Response{ - Action: "update", - }) - w.Stop() -} - -func TestWatchInterpretation_ResponseNoNode(t *testing.T) { - _, codecs := testScheme(t) - codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) - actions := []string{"create", "set", "compareAndSwap", "delete"} - for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) - } - w.sendResult(&etcd.Response{ - Action: action, - }) - w.Stop() - } -} - -func TestWatchInterpretation_ResponseBadData(t *testing.T) { - _, codecs := testScheme(t) - codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) - actions := []string{"create", "set", "compareAndSwap", "delete"} - for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) - } - w.sendResult(&etcd.Response{ - Action: action, - Node: &etcd.Node{ - Value: defaultPrefix("foobar"), - }, - }) - w.sendResult(&etcd.Response{ - Action: action, - PrevNode: &etcd.Node{ - Value: defaultPrefix("foobar"), - }, - }) - w.Stop() - } -} - -func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - selector, _ := fields.ParseSelector("metadata.name!=bar") - pred := storage.SelectionPredicate{ - Label: labels.Everything(), - Field: selector, - IncludeUninitialized: true, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, - } - w := newEtcdWatcher(false, false, nil, pred, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) - - eventChan := make(chan watch.Event, 1) - w.emit = func(e watch.Event) { - eventChan <- e - } - - fooPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - barPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} - fooBytes, err := runtime.Encode(codec, fooPod) - if err != nil { - t.Fatalf("Encode failed: %v", err) - } - barBytes, err := runtime.Encode(codec, barPod) - if err != nil { - t.Fatalf("Encode failed: %v", err) - } - - tests := []struct { - response *etcd.Response - expRV string - }{{ // Delete event - response: &etcd.Response{ - Action: EtcdDelete, - Node: &etcd.Node{ - ModifiedIndex: 2, - }, - PrevNode: &etcd.Node{ - Value: defaultPrefixValue(fooBytes), - ModifiedIndex: 1, - }, - }, - expRV: "2", - }, { // Modify event with uninterested data - response: &etcd.Response{ - Action: EtcdSet, - Node: &etcd.Node{ - Value: defaultPrefixValue(barBytes), - ModifiedIndex: 2, - }, - PrevNode: &etcd.Node{ - Value: defaultPrefixValue(fooBytes), - ModifiedIndex: 1, - }, - }, - expRV: "2", - }} - - for i, tt := range tests { - w.sendResult(tt.response) - ev := <-eventChan - if ev.Type != watch.Deleted { - t.Errorf("#%d: event type want=Deleted, get=%s", i, ev.Type) - return - } - rv := ev.Object.(*example.Pod).ResourceVersion - if rv != tt.expRV { - t.Errorf("#%d: resource version want=%s, get=%s", i, tt.expRV, rv) - } - } - w.Stop() -} - -func TestWatch(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - // watching is explicitly closed below. - - // Test normal case - pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - returnObj := &example.Pod{} - err = h.Create(context.TODO(), key, pod, returnObj, 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - event := <-watching.ResultChan() - if e, a := watch.Added, event.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - - watching.Stop() - - // There is a race in etcdWatcher so that after calling Stop() one of - // two things can happen: - // - ResultChan() may be closed (triggered by closing userStop channel) - // - an Error "context cancelled" may be emitted (triggered by cancelling request - // to etcd and putting that error to etcdError channel) - // We need to be prepared for both here. - event, open := <-watching.ResultChan() - if open && event.Type != watch.Error { - t.Errorf("Unexpected event from stopped watcher: %#v", event) - } -} - -func TestWatchEtcdState(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - key := "/somekey/foo" - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watching.Stop() - - pod := &example.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - } - - err = h.Create(context.TODO(), key, pod, pod, 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - event := <-watching.ResultChan() - if event.Type != watch.Added { - t.Errorf("Unexpected event %#v", event) - } - - pod.ResourceVersion = "" - pod.Status = example.PodStatus{ - Phase: example.PodPhase("Running"), - } - - // CAS the previous value - updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - return pod.DeepCopyObject(), nil, nil - } - err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - event = <-watching.ResultChan() - if event.Type != watch.Modified { - t.Errorf("Unexpected event %#v", event) - } - - if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { - t.Errorf("Unexpected error: expected %#v, got %#v", e, a) - } -} - -func TestWatchFromZeroIndex(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - - key := "/somekey/foo" - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - // set before the watch and verify events - err := h.Create(context.TODO(), key, pod, pod, 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - pod.ResourceVersion = "" - - watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // The create trigger ADDED event when watching from 0 - event := <-watching.ResultChan() - watching.Stop() - if event.Type != watch.Added { - t.Errorf("Unexpected event %#v", event) - } - - // check for concatenation on watch event with CAS - updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - pod := input.(*example.Pod) - pod.Name = "bar" - return pod, nil, nil - } - err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - watching, err = h.Watch(context.TODO(), key, "0", storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watching.Stop() - - // because we watch from 0, first event that we receive will always be ADDED - event = <-watching.ResultChan() - if event.Type != watch.Added { - t.Errorf("Unexpected event %#v", event) - } - - pod.Name = "baz" - updateFn = func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - pod := input.(*example.Pod) - pod.Name = "baz" - return pod, nil, nil - } - err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - event = <-watching.ResultChan() - if event.Type != watch.Modified { - t.Errorf("Unexpected event %#v", event) - } - - if e, a := pod, event.Object; a == nil || !apiequality.Semantic.DeepDerivative(e, a) { - t.Errorf("Unexpected error: expected %#v, got %#v", e, a) - } -} - -func TestWatchListFromZeroIndex(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - prefix := "/some/key" - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - h := newEtcdHelper(server.Client, codec, prefix) - - watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watching.Stop() - - // creates foo which should trigger the WatchList for "/" - pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - err = h.Create(context.TODO(), pod.Name, pod, pod, 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - event, _ := <-watching.ResultChan() - if event.Type != watch.Added { - t.Errorf("Unexpected event %#v", event) - } - - if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { - t.Errorf("Unexpected error: expected %v, got %v", e, a) - } -} - -func TestWatchListIgnoresRootKey(t *testing.T) { - _, codecs := testScheme(t) - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - key := "/some/key" - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - h := newEtcdHelper(server.Client, codec, key) - - watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watching.Stop() - - // creates key/foo which should trigger the WatchList for "key" - err = h.Create(context.TODO(), key, pod, pod, 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // force context switch to ensure watches would catch and notify. - rt.Gosched() - - select { - case event, _ := <-watching.ResultChan(): - t.Fatalf("Unexpected event: %#v", event) - default: - // fall through, expected behavior - } -} - -func TestWatchPurposefulShutdown(t *testing.T) { - _, codecs := testScheme(t) - codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - key := "/some/key" - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - - // Test purposeful shutdown - watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - watching.Stop() - rt.Gosched() - - // There is a race in etcdWatcher so that after calling Stop() one of - // two things can happen: - // - ResultChan() may be closed (triggered by closing userStop channel) - // - an Error "context cancelled" may be emitted (triggered by cancelling request - // to etcd and putting that error to etcdError channel) - // We need to be prepared for both here. - event, open := <-watching.ResultChan() - if open && event.Type != watch.Error { - t.Errorf("Unexpected event from stopped watcher: %#v", event) - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 8d7ecf37c6..cff8672863 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -25,7 +25,6 @@ import ( const ( StorageTypeUnset = "" - StorageTypeETCD2 = "etcd2" StorageTypeETCD3 = "etcd3" DefaultCompactInterval = 5 * time.Minute diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD index 3e4ef1fbae..c9a902f007 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD @@ -28,21 +28,17 @@ go_test( go_library( name = "go_default_library", srcs = [ - "etcd2.go", "etcd3.go", "factory.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory", importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory", deps = [ - "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", - "//vendor/github.com/coreos/etcd/client:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library", "//vendor/github.com/coreos/etcd/pkg/transport:go_default_library", "//vendor/github.com/grpc-ecosystem/go-grpc-prometheus:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go deleted file mode 100644 index 292553a17e..0000000000 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go +++ /dev/null @@ -1,106 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package factory - -import ( - "context" - "fmt" - "net" - "net/http" - "time" - - etcd2client "github.com/coreos/etcd/client" - "github.com/coreos/etcd/pkg/transport" - - utilnet "k8s.io/apimachinery/pkg/util/net" - "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd" - "k8s.io/apiserver/pkg/storage/storagebackend" -) - -func newETCD2HealthCheck(c storagebackend.Config) (func() error, error) { - tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) - if err != nil { - return nil, err - } - - client, err := newETCD2Client(tr, c.ServerList) - if err != nil { - return nil, err - } - - members := etcd2client.NewMembersAPI(client) - - return func() error { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - if _, err := members.List(ctx); err != nil { - return fmt.Errorf("error listing etcd members: %v", err) - } - return nil - }, nil -} - -func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { - tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) - if err != nil { - return nil, nil, err - } - client, err := newETCD2Client(tr, c.ServerList) - if err != nil { - return nil, nil, err - } - s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, etcd.IdentityTransformer) - return s, tr.CloseIdleConnections, nil -} - -func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) { - cli, err := etcd2client.New(etcd2client.Config{ - Endpoints: serverList, - Transport: tr, - }) - if err != nil { - return nil, err - } - - return cli, nil -} - -func newTransportForETCD2(certFile, keyFile, caFile string) (*http.Transport, error) { - info := transport.TLSInfo{ - CertFile: certFile, - KeyFile: keyFile, - CAFile: caFile, - } - cfg, err := info.ClientConfig() - if err != nil { - return nil, err - } - // Copied from etcd.DefaultTransport declaration. - // TODO: Determine if transport needs optimization - tr := utilnet.SetTransportDefaults(&http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - TLSHandshakeTimeout: 10 * time.Second, - MaxIdleConnsPerHost: 500, - TLSClientConfig: cfg, - }) - return tr, nil -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index bba1fa209a..e4cb566c87 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -29,8 +29,8 @@ type DestroyFunc func() // Create creates a storage backend based on given config. func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { switch c.Type { - case storagebackend.StorageTypeETCD2: - return newETCD2Storage(c) + case "etcd2": + return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: // TODO: We have the following features to implement: // - Support secure connection by using key, cert, and CA files. @@ -45,8 +45,8 @@ func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { // CreateHealthCheck creates a healthcheck function based on given config. func CreateHealthCheck(c storagebackend.Config) (func() error, error) { switch c.Type { - case storagebackend.StorageTypeETCD2: - return newETCD2HealthCheck(c) + case "etcd2": + return nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: return newETCD3HealthCheck(c) default: diff --git a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json index 4c5901100b..0ea2b14861 100644 --- a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json @@ -38,10 +38,6 @@ "ImportPath": "github.com/coreos/etcd/auth/authpb", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" }, - { - "ImportPath": "github.com/coreos/etcd/client", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, { "ImportPath": "github.com/coreos/etcd/clientv3", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" @@ -58,14 +54,6 @@ "ImportPath": "github.com/coreos/etcd/mvcc/mvccpb", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" }, - { - "ImportPath": "github.com/coreos/etcd/pkg/pathutil", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, - { - "ImportPath": "github.com/coreos/etcd/pkg/srv", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, { "ImportPath": "github.com/coreos/etcd/pkg/tlsutil", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" @@ -78,14 +66,6 @@ "ImportPath": "github.com/coreos/etcd/pkg/types", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" }, - { - "ImportPath": "github.com/coreos/etcd/version", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, - { - "ImportPath": "github.com/coreos/go-semver/semver", - "Rev": "568e959cd89871e61434c1143528d9162da89ef2" - }, { "ImportPath": "github.com/coreos/go-systemd/daemon", "Rev": "48702e0da86bd25e76cfef347e2adeb434a0d0a6" @@ -322,10 +302,6 @@ "ImportPath": "github.com/stretchr/testify/assert", "Rev": "c679ae2cc0cb27ec3293fea7e254e47386f05d69" }, - { - "ImportPath": "github.com/ugorji/go/codec", - "Rev": "ded73eae5db7e7a0ef6f55aace87a2873c5d2b74" - }, { "ImportPath": "golang.org/x/crypto/ssh/terminal", "Rev": "de0752318171da717af4ce24d0a2e8626afaeb11" @@ -1162,10 +1138,6 @@ "ImportPath": "k8s.io/apiserver/pkg/storage/etcd/metrics", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/apiserver/pkg/storage/etcd/util", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/apiserver/pkg/storage/etcd3", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json index 7003a50e3c..4daa77c9b6 100644 --- a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json @@ -38,10 +38,6 @@ "ImportPath": "github.com/coreos/etcd/auth/authpb", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" }, - { - "ImportPath": "github.com/coreos/etcd/client", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, { "ImportPath": "github.com/coreos/etcd/clientv3", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" @@ -58,14 +54,6 @@ "ImportPath": "github.com/coreos/etcd/mvcc/mvccpb", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" }, - { - "ImportPath": "github.com/coreos/etcd/pkg/pathutil", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, - { - "ImportPath": "github.com/coreos/etcd/pkg/srv", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, { "ImportPath": "github.com/coreos/etcd/pkg/tlsutil", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" @@ -78,14 +66,6 @@ "ImportPath": "github.com/coreos/etcd/pkg/types", "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" }, - { - "ImportPath": "github.com/coreos/etcd/version", - "Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1" - }, - { - "ImportPath": "github.com/coreos/go-semver/semver", - "Rev": "568e959cd89871e61434c1143528d9162da89ef2" - }, { "ImportPath": "github.com/coreos/go-systemd/daemon", "Rev": "48702e0da86bd25e76cfef347e2adeb434a0d0a6" @@ -302,10 +282,6 @@ "ImportPath": "github.com/spf13/pflag", "Rev": "583c0c0531f06d5278b7d917446061adc344b5cd" }, - { - "ImportPath": "github.com/ugorji/go/codec", - "Rev": "ded73eae5db7e7a0ef6f55aace87a2873c5d2b74" - }, { "ImportPath": "golang.org/x/crypto/ssh/terminal", "Rev": "de0752318171da717af4ce24d0a2e8626afaeb11" @@ -1126,10 +1102,6 @@ "ImportPath": "k8s.io/apiserver/pkg/storage/etcd/metrics", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, - { - "ImportPath": "k8s.io/apiserver/pkg/storage/etcd/util", - "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - }, { "ImportPath": "k8s.io/apiserver/pkg/storage/etcd3", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/test/e2e_node/e2e_node_suite_test.go b/test/e2e_node/e2e_node_suite_test.go index 91f3586ad4..3cd0c01a7c 100644 --- a/test/e2e_node/e2e_node_suite_test.go +++ b/test/e2e_node/e2e_node_suite_test.go @@ -86,7 +86,7 @@ const rootfs = "/rootfs" func TestE2eNode(t *testing.T) { if *runServicesMode { // If run-services-mode is specified, only run services in current process. - services.RunE2EServices() + services.RunE2EServices(t) return } if *runKubeletMode { diff --git a/test/e2e_node/services/BUILD b/test/e2e_node/services/BUILD index 56ca17f5fd..60d41f37d6 100644 --- a/test/e2e_node/services/BUILD +++ b/test/e2e_node/services/BUILD @@ -9,7 +9,6 @@ go_library( name = "go_default_library", srcs = [ "apiserver.go", - "etcd.go", "internal_services.go", "kubelet.go", "logs.go", @@ -29,6 +28,8 @@ go_library( "//pkg/kubelet/kubeletconfig/util/codec:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", @@ -39,10 +40,6 @@ go_library( "//test/e2e/framework:go_default_library", "//test/e2e_node/builder:go_default_library", "//test/e2e_node/remote:go_default_library", - "//vendor/github.com/coreos/etcd/etcdserver:go_default_library", - "//vendor/github.com/coreos/etcd/etcdserver/api/v2http:go_default_library", - "//vendor/github.com/coreos/etcd/pkg/transport:go_default_library", - "//vendor/github.com/coreos/etcd/pkg/types:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/kardianos/osext:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", diff --git a/test/e2e_node/services/apiserver.go b/test/e2e_node/services/apiserver.go index 1d956fbecd..d66ff6e98c 100644 --- a/test/e2e_node/services/apiserver.go +++ b/test/e2e_node/services/apiserver.go @@ -20,6 +20,7 @@ import ( "fmt" "net" + "k8s.io/apiserver/pkg/storage/storagebackend" apiserver "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" ) @@ -31,21 +32,23 @@ const ( ) // APIServer is a server which manages apiserver. -type APIServer struct{} +type APIServer struct { + storageConfig storagebackend.Config + stopCh chan struct{} +} // NewAPIServer creates an apiserver. -func NewAPIServer() *APIServer { - return &APIServer{} +func NewAPIServer(storageConfig storagebackend.Config) *APIServer { + return &APIServer{ + storageConfig: storageConfig, + stopCh: make(chan struct{}), + } } // Start starts the apiserver, returns when apiserver is ready. func (a *APIServer) Start() error { o := options.NewServerRunOptions() - o.Etcd.StorageConfig.ServerList = []string{getEtcdClientURL()} - // TODO: Current setup of etcd in e2e-node tests doesn't support etcd v3 - // protocol. We should migrate it to use the same infrastructure as all - // other tests (pkg/storage/etcd/testing). - o.Etcd.StorageConfig.Type = "etcd2" + o.Etcd.StorageConfig = a.storageConfig _, ipnet, err := net.ParseCIDR(clusterIPRange) if err != nil { return err @@ -56,14 +59,12 @@ func (a *APIServer) Start() error { errCh := make(chan error) go func() { defer close(errCh) - stopCh := make(chan struct{}) - defer close(stopCh) completedOptions, err := apiserver.Complete(o) if err != nil { errCh <- fmt.Errorf("set apiserver default options error: %v", err) return } - err = apiserver.Run(completedOptions, stopCh) + err = apiserver.Run(completedOptions, a.stopCh) if err != nil { errCh <- fmt.Errorf("run apiserver error: %v", err) return @@ -80,6 +81,10 @@ func (a *APIServer) Start() error { // Stop stops the apiserver. Currently, there is no way to stop the apiserver. // The function is here only for completion. func (a *APIServer) Stop() error { + if a.stopCh != nil { + close(a.stopCh) + a.stopCh = nil + } return nil } diff --git a/test/e2e_node/services/etcd.go b/test/e2e_node/services/etcd.go deleted file mode 100644 index 8c40dc7797..0000000000 --- a/test/e2e_node/services/etcd.go +++ /dev/null @@ -1,161 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package services - -import ( - "crypto/tls" - "net" - "net/http" - "net/url" - "time" - - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/api/v2http" - "github.com/coreos/etcd/pkg/transport" - "github.com/coreos/etcd/pkg/types" - "github.com/golang/glog" -) - -// TODO: These tests should not be leveraging v2http -// TODO(random-liu): Add service interface to manage services with the same behaviour. - -// All following configurations are got from etcd source code. -// TODO(random-liu): Use embed.NewConfig after etcd3 is supported. -const ( - etcdName = "etcd" - clientURLStr = "http://localhost:4001" // clientURL has listener created and handles etcd API traffic - peerURLStr = "http://localhost:7001" // peerURL does't have listener created, it is used to pass Etcd validation - snapCount = etcdserver.DefaultSnapCount - maxSnapFiles = 5 - maxWALFiles = 5 - tickMs = 100 - electionTicks = 10 - etcdHealthCheckURL = clientURLStr + "/v2/keys/" // Trailing slash is required, -) - -// EtcdServer is a server which manages etcd. -type EtcdServer struct { - *etcdserver.EtcdServer - config *etcdserver.ServerConfig - clientListen net.Listener -} - -// NewEtcd creates a new default etcd server using 'dataDir' for persistence. -func NewEtcd(dataDir string) *EtcdServer { - clientURLs, err := types.NewURLs([]string{clientURLStr}) - if err != nil { - glog.Fatalf("Failed to parse client url %q: %v", clientURLStr, err) - } - peerURLs, err := types.NewURLs([]string{peerURLStr}) - if err != nil { - glog.Fatalf("Failed to parse peer url %q: %v", peerURLStr, err) - } - - config := &etcdserver.ServerConfig{ - Name: etcdName, - ClientURLs: clientURLs, - PeerURLs: peerURLs, - DataDir: dataDir, - InitialPeerURLsMap: map[string]types.URLs{etcdName: peerURLs}, - NewCluster: true, - SnapCount: snapCount, - MaxSnapFiles: maxSnapFiles, - MaxWALFiles: maxWALFiles, - TickMs: tickMs, - ElectionTicks: electionTicks, - AuthToken: "simple", - } - - return &EtcdServer{ - config: config, - } -} - -// Start starts the etcd server and listening for client connections -func (e *EtcdServer) Start() error { - var err error - e.EtcdServer, err = etcdserver.NewServer(e.config) - if err != nil { - return err - } - // create client listener, there should be only one url - e.clientListen, err = createListener(e.config.ClientURLs[0]) - if err != nil { - return err - } - - // start etcd - e.EtcdServer.Start() - - // setup client listener - ch := v2http.NewClientHandler(e.EtcdServer, e.config.ReqTimeout()) - errCh := make(chan error) - go func(l net.Listener) { - defer close(errCh) - srv := &http.Server{ - Handler: ch, - ReadTimeout: 5 * time.Minute, - } - // Serve always returns a non-nil error. - errCh <- srv.Serve(l) - }(e.clientListen) - - err = readinessCheck("etcd", []string{etcdHealthCheckURL}, errCh) - if err != nil { - return err - } - return nil -} - -// Stop closes all connections and stops the Etcd server -func (e *EtcdServer) Stop() error { - if e.EtcdServer != nil { - e.EtcdServer.Stop() - } - if e.clientListen != nil { - err := e.clientListen.Close() - if err != nil { - return err - } - } - return nil -} - -// Name returns the server's unique name -func (e *EtcdServer) Name() string { - return etcdName -} - -func createListener(url url.URL) (net.Listener, error) { - l, err := net.Listen("tcp", url.Host) - if err != nil { - return nil, err - } - l, err = transport.NewKeepAliveListener(l, url.Scheme, &tls.Config{}) - if err != nil { - return nil, err - } - return l, nil -} - -func getEtcdClientURL() string { - return clientURLStr -} - -func getEtcdHealthCheckURL() string { - return etcdHealthCheckURL -} diff --git a/test/e2e_node/services/internal_services.go b/test/e2e_node/services/internal_services.go index 499b624ae3..356c00ef14 100644 --- a/test/e2e_node/services/internal_services.go +++ b/test/e2e_node/services/internal_services.go @@ -17,9 +17,11 @@ limitations under the License. package services import ( - "io/ioutil" "os" + "testing" + etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" + "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/kubernetes/test/e2e/framework" "github.com/golang/glog" @@ -29,7 +31,8 @@ import ( type e2eServices struct { rmDirs []string // statically linked e2e services - etcdServer *EtcdServer + etcdServer *etcdtesting.EtcdTestServer + etcdStorage *storagebackend.Config apiServer *APIServer nsController *NamespaceController } @@ -40,9 +43,9 @@ func newE2EServices() *e2eServices { // run starts all e2e services and wait for the termination signal. Once receives the // termination signal, it will stop the e2e services gracefully. -func (es *e2eServices) run() error { - defer es.stop() - if err := es.start(); err != nil { +func (es *e2eServices) run(t *testing.T) error { + defer es.stop(t) + if err := es.start(t); err != nil { return err } // Wait until receiving a termination signal. @@ -51,13 +54,13 @@ func (es *e2eServices) run() error { } // start starts the tests embedded services or returns an error. -func (es *e2eServices) start() error { +func (es *e2eServices) start(t *testing.T) error { glog.Info("Starting e2e services...") - err := es.startEtcd() + err := es.startEtcd(t) if err != nil { return err } - err = es.startApiServer() + err = es.startApiServer(es.etcdStorage) if err != nil { return err } @@ -70,7 +73,7 @@ func (es *e2eServices) start() error { } // stop stops the embedded e2e services. -func (es *e2eServices) stop() { +func (es *e2eServices) stop(t *testing.T) { glog.Info("Stopping e2e services...") // TODO(random-liu): Use a loop to stop all services after introducing // service interface. @@ -90,9 +93,7 @@ func (es *e2eServices) stop() { glog.Info("Stopping etcd") if es.etcdServer != nil { - if err := es.etcdServer.Stop(); err != nil { - glog.Errorf("Failed to stop %q: %v", es.etcdServer.Name(), err) - } + es.etcdServer.Terminate(t) } for _, d := range es.rmDirs { @@ -107,23 +108,18 @@ func (es *e2eServices) stop() { } // startEtcd starts the embedded etcd instance or returns an error. -func (es *e2eServices) startEtcd() error { +func (es *e2eServices) startEtcd(t *testing.T) error { glog.Info("Starting etcd") - // Create data directory in current working space. - dataDir, err := ioutil.TempDir(".", "etcd") - if err != nil { - return err - } - // Mark the dataDir as directories to remove. - es.rmDirs = append(es.rmDirs, dataDir) - es.etcdServer = NewEtcd(dataDir) - return es.etcdServer.Start() + server, etcdStorage := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) + es.etcdServer = server + es.etcdStorage = etcdStorage + return nil } // startApiServer starts the embedded API server or returns an error. -func (es *e2eServices) startApiServer() error { +func (es *e2eServices) startApiServer(etcdStorage *storagebackend.Config) error { glog.Info("Starting API server") - es.apiServer = NewAPIServer() + es.apiServer = NewAPIServer(*etcdStorage) return es.apiServer.Start() } @@ -137,7 +133,6 @@ func (es *e2eServices) startNamespaceController() error { // getServicesHealthCheckURLs returns the health check urls for the internal services. func getServicesHealthCheckURLs() []string { return []string{ - getEtcdHealthCheckURL(), getAPIServerHealthCheckURL(), } } diff --git a/test/e2e_node/services/services.go b/test/e2e_node/services/services.go index 7a442d38e6..b73658ab30 100644 --- a/test/e2e_node/services/services.go +++ b/test/e2e_node/services/services.go @@ -22,6 +22,7 @@ import ( "os" "os/exec" "path" + "testing" "github.com/golang/glog" "github.com/kardianos/osext" @@ -105,12 +106,12 @@ func (e *E2EServices) Stop() { // RunE2EServices actually start the e2e services. This function is used to // start e2e services in current process. This is only used in run-services-mode. -func RunE2EServices() { +func RunE2EServices(t *testing.T) { // Populate global DefaultFeatureGate with value from TestContext.FeatureGates. // This way, statically-linked components see the same feature gate config as the test context. utilfeature.DefaultFeatureGate.SetFromMap(framework.TestContext.FeatureGates) e := newE2EServices() - if err := e.run(); err != nil { + if err := e.run(t); err != nil { glog.Fatalf("Failed to run e2e services: %v", err) } }