Merge pull request #69310 from liggitt/remove-etcd2

Remove etcd2 storage backend
pull/58/head
k8s-ci-robot 2018-10-04 13:40:00 -07:00 committed by GitHub
commit 409871ecae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 50 additions and 2891 deletions

View File

@ -1478,10 +1478,6 @@
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd/testing/testingcert",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd/util",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -15,7 +15,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest/resttest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
],
)

View File

@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest/resttest"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
)
@ -136,7 +135,7 @@ func (t *Tester) TestWatch(valid runtime.Object, labelsPass, labelsFail []labels
fieldsPass,
fieldsFail,
// TODO: This should be filtered, the registry should not be aware of this level of detail
[]string{etcdstorage.EtcdCreate, etcdstorage.EtcdDelete},
[]string{"create", "delete"},
)
}
@ -185,9 +184,9 @@ func (t *Tester) emitObject(obj runtime.Object, action string) error {
var err error
switch action {
case etcdstorage.EtcdCreate:
case "create":
err = t.createObject(ctx, obj)
case etcdstorage.EtcdDelete:
case "delete":
var accessor metav1.Object
accessor, err = meta.Accessor(obj)
if err != nil {

View File

@ -60,7 +60,6 @@ type EtcdOptions struct {
var storageTypes = sets.NewString(
storagebackend.StorageTypeUnset,
storagebackend.StorageTypeETCD2,
storagebackend.StorageTypeETCD3,
)

View File

@ -24,8 +24,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
"k8s.io/apiserver/pkg/storage/storagebackend"
"github.com/golang/glog"
)
// StorageCodecConfig are the arguments passed to newStorageCodecFn
@ -48,11 +46,6 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
}
if opts.Config.Type == storagebackend.StorageTypeETCD2 && mediaType != "application/json" {
glog.Warningf(`storage type %q does not support media type %q, using "application/json"`, storagebackend.StorageTypeETCD2, mediaType)
mediaType = "application/json"
}
serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unable to find serializer for %q", mediaType)
@ -60,11 +53,6 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
s := serializer.Serializer
// make sure the selected encoder supports string data
if !serializer.EncodesAsText && opts.Config.Type == storagebackend.StorageTypeETCD2 {
return nil, fmt.Errorf("storage type %q does not support binary media type %q", storagebackend.StorageTypeETCD2, mediaType)
}
// Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will
// be passed to the recognizer so that multiple decoders are available.
var encoder runtime.Encoder = s

View File

@ -8,33 +8,12 @@ load(
go_test(
name = "go_default_test",
srcs = [
"api_object_versioner_test.go",
"etcd_helper_test.go",
"etcd_watcher_test.go",
],
srcs = ["api_object_versioner_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/apitesting:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/etcdtest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/tests:go_default_library",
"//vendor/github.com/coreos/etcd/client:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)
@ -43,26 +22,14 @@ go_library(
srcs = [
"api_object_versioner.go",
"doc.go",
"etcd_helper.go",
"etcd_watcher.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd",
importpath = "k8s.io/apiserver/pkg/storage/etcd",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/cache:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/util:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/trace:go_default_library",
"//vendor/github.com/coreos/etcd/client:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)

View File

@ -1,637 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"context"
"errors"
"fmt"
"path"
"reflect"
"time"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
utilcache "k8s.io/apimachinery/pkg/util/cache"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd/metrics"
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
utiltrace "k8s.io/apiserver/pkg/util/trace"
)
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other.
type ValueTransformer interface {
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
// have not changed.
TransformStringFromStorage(string) (value string, stale bool, err error)
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
TransformStringToStorage(string) (value string, err error)
}
type identityTransformer struct{}
func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) {
return s, false, nil
}
func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
// IdentityTransformer performs no transformation on the provided values.
var IdentityTransformer ValueTransformer = identityTransformer{}
// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, transformer ValueTransformer) storage.Interface {
return &etcdHelper{
etcdMembersAPI: etcd.NewMembersAPI(client),
etcdKeysAPI: etcd.NewKeysAPI(client),
codec: codec,
versioner: APIObjectVersioner{},
transformer: transformer,
pathPrefix: path.Join("/", prefix),
quorum: quorum,
cache: utilcache.NewCache(cacheSize),
}
}
// etcdHelper is the reference implementation of storage.Interface.
type etcdHelper struct {
etcdMembersAPI etcd.MembersAPI
etcdKeysAPI etcd.KeysAPI
codec runtime.Codec
transformer ValueTransformer
// Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually.
// optional, has to be set to perform any atomic operations
versioner storage.Versioner
// prefix for all etcd keys
pathPrefix string
// if true, perform quorum read
quorum bool
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
// This depends on etcd's indexes being globally unique across all objects/types. This will
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
// support multi-object transaction that will result in many objects with the same index.
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
// TODO: Measure how much this cache helps after the conversion code is optimized.
cache utilcache.Cache
}
// Implements storage.Interface.
func (h *etcdHelper) Versioner() storage.Versioner {
return h.versioner
}
// Implements storage.Interface.
func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
trace := utiltrace.New("etcdHelper::Create " + getTypeName(obj))
defer trace.LogIfLong(250 * time.Millisecond)
if ctx == nil {
glog.Errorf("Context is nil")
}
key = path.Join(h.pathPrefix, key)
data, err := runtime.Encode(h.codec, obj)
trace.Step("Object encoded")
if err != nil {
return err
}
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
if err := h.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err)
}
trace.Step("Version checked")
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist,
}
newBody, err := h.transformer.TransformStringToStorage(string(data))
if err != nil {
return storage.NewInternalError(err.Error())
}
response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts)
trace.Step("Object created")
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return toStorageErr(err, key, 0)
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, _, err = h.extractObj(response, err, out, false, false)
}
return err
}
// Implements storage.Interface.
func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = path.Join(h.pathPrefix, key)
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
if preconditions == nil {
startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
}
// Check the preconditions match.
obj := reflect.New(v.Type()).Interface().(runtime.Object)
for {
_, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false)
if err != nil {
return toStorageErr(err, key, 0)
}
if err := preconditions.Check(key, obj); err != nil {
return toStorageErr(err, key, 0)
}
index := uint64(0)
if node != nil {
index = node.ModifiedIndex
} else if res != nil {
index = res.Index
}
opt := etcd.DeleteOptions{PrevIndex: index}
startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !etcdutil.IsEtcdTestFailed(err) {
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
}
glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
}
}
// Implements storage.Interface.
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := h.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
// Implements storage.Interface.
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := h.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
// Implements storage.Interface.
func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = path.Join(h.pathPrefix, key)
_, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
return err
}
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl.
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
startTime := time.Now()
opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !etcdutil.IsEtcdNotFound(err) {
return "", nil, nil, false, toStorageErr(err, key, 0)
}
body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
return body, node, response, stale, toStorageErr(err, key, 0)
}
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) {
if response != nil {
if prevNode {
node = response.PrevNode
} else {
node = response.Node
}
}
if inErr != nil || node == nil || len(node.Value) == 0 {
if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
return "", nil, false, err
}
v.Set(reflect.Zero(v.Type()))
return "", nil, false, nil
} else if inErr != nil {
return "", nil, false, inErr
}
return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body, stale, err = h.transformer.TransformStringFromStorage(node.Value)
if err != nil {
return body, nil, stale, storage.NewInternalError(err.Error())
}
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
if err != nil {
return body, nil, stale, err
}
if out != objPtr {
return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
return body, node, stale, err
}
// Implements storage.Interface.
func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := utiltrace.New("GetToList " + getTypeName(listObj))
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = path.Join(h.pathPrefix, key)
startTime := time.Now()
trace.Step("About to read etcd node")
opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
trace.Step("Etcd node read")
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil {
if etcdutil.IsEtcdNotFound(err) {
if etcdErr, ok := err.(etcd.Error); ok {
return h.versioner.UpdateList(listObj, etcdErr.Index, "")
}
return fmt.Errorf("unexpected error from storage: %#v", err)
}
return toStorageErr(err, key, 0)
}
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
return err
}
trace.Step("Object decoded")
if err := h.versioner.UpdateList(listObj, response.Index, ""); err != nil {
return err
}
return nil
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error {
trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(400 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
for _, node := range nodes {
if node.Dir {
// IMPORTANT: do not log each key as a discrete step in the trace log
// as it produces an immense amount of log spam when there is a large
// amount of content in the list.
if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil {
return err
}
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex, pred); found {
// obj != nil iff it matches the pred function.
if obj != nil {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
} else {
body, _, err := h.transformer.TransformStringFromStorage(node.Value)
if err != nil {
// omit items from lists and watches that cannot be transformed, but log the error
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
continue
}
obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
if matched, err := pred.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj)
}
}
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}
// Implements storage.Interface.
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := utiltrace.New("List " + getTypeName(listObj))
defer trace.LogIfLong(400 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = path.Join(h.pathPrefix, key)
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(ctx, key)
trace.Step("Etcd node listed")
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
if err := h.versioner.UpdateList(listObj, index, ""); err != nil {
return err
}
return nil
}
func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
opts := etcd.GetOptions{
Recursive: true,
Sort: true,
Quorum: h.quorum,
}
result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
if err != nil {
var index uint64
if etcdError, ok := err.(etcd.Error); ok {
index = etcdError.Index
}
nodes := make([]*etcd.Node, 0)
if etcdutil.IsEtcdNotFound(err) {
return nodes, index, nil
} else {
return nodes, index, toStorageErr(err, key, 0)
}
}
return result.Node.Nodes, result.Index, nil
}
// Implements storage.Interface.
func (h *etcdHelper) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
// Ignore the suggestion about current object.
if ctx == nil {
glog.Errorf("Context is nil")
}
v, err := conversion.EnforcePtr(ptrToType)
if err != nil {
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
key = path.Join(h.pathPrefix, key)
for {
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
if err != nil {
return toStorageErr(err, key, 0)
}
if err := preconditions.Check(key, obj); err != nil {
return toStorageErr(err, key, 0)
}
meta := storage.ResponseMeta{}
if node != nil {
meta.TTL = node.TTL
meta.ResourceVersion = node.ModifiedIndex
}
// Get the object to be written by calling tryUpdate.
ret, newTTL, err := tryUpdate(obj, meta)
if err != nil {
return toStorageErr(err, key, 0)
}
index := uint64(0)
ttl := uint64(0)
if node != nil {
index = node.ModifiedIndex
if node.TTL != 0 {
ttl = uint64(node.TTL)
}
if node.Expiration != nil && ttl == 0 {
ttl = 1
}
} else if res != nil {
index = res.Index
}
if newTTL != nil {
if ttl != 0 && *newTTL == 0 {
// TODO: remove this after we have verified this is no longer an issue
glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
}
ttl = *newTTL
}
// Since update object may have a resourceVersion set, we need to clear it here.
if err := h.versioner.PrepareObjectForStorage(ret); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
newBodyData, err := runtime.Encode(h.codec, ret)
if err != nil {
return err
}
newBody := string(newBodyData)
data, err := h.transformer.TransformStringToStorage(newBody)
if err != nil {
return storage.NewInternalError(err.Error())
}
// First time this key has been used, try creating new value.
if index == 0 {
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist,
}
response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdNodeExist(err) {
continue
}
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, 0)
}
// If we don't send an update, we simply return the currently existing
// version of the object. However, the value transformer may indicate that
// the on disk representation has changed and that we must commit an update.
if newBody == origBody && !stale {
_, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
return err
}
startTime := time.Now()
// Swap origBody with data, if origBody is the latest etcd data.
opts := etcd.SetOptions{
PrevIndex: index,
TTL: time.Duration(ttl) * time.Second,
}
response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdTestFailed(err) {
// Try again.
continue
}
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, int64(index))
}
}
func (*etcdHelper) Count(pathPerfix string) (int64, error) {
return 0, fmt.Errorf("Count is unimplemented for etcd2!")
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched {
return nil, true
}
// We should not return the object itself to avoid polluting the cache if someone
// modifies returned values.
objCopy := obj.(runtime.Object).DeepCopyObject()
metrics.ObserveCacheHit()
return objCopy.(runtime.Object), true
}
metrics.ObserveCacheMiss()
return nil, false
}
func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
metrics.ObserveAddCache(startTime)
}()
objCopy := obj.DeepCopyObject()
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
metrics.ObserveNewEntry()
}
}
func toStorageErr(err error, key string, rv int64) error {
if err == nil {
return nil
}
switch {
case etcdutil.IsEtcdNotFound(err):
return storage.NewKeyNotFoundError(key, rv)
case etcdutil.IsEtcdNodeExist(err):
return storage.NewKeyExistsError(key, rv)
case etcdutil.IsEtcdTestFailed(err):
return storage.NewResourceVersionConflictsError(key, rv)
case etcdutil.IsEtcdUnreachable(err):
return storage.NewUnreachableError(key, rv)
default:
return err
}
}

View File

@ -1,747 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"context"
"fmt"
"path"
"reflect"
"strings"
"sync"
"testing"
"time"
etcd "github.com/coreos/etcd/client"
"github.com/stretchr/testify/require"
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
storagetests "k8s.io/apiserver/pkg/storage/tests"
)
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type prefixTransformer struct {
prefix string
stale bool
err error
}
func (p prefixTransformer) TransformStringFromStorage(s string) (string, bool, error) {
if !strings.HasPrefix(s, p.prefix) {
return "", false, fmt.Errorf("value does not have expected prefix: %s", s)
}
return strings.TrimPrefix(s, p.prefix), p.stale, p.err
}
func (p prefixTransformer) TransformStringToStorage(s string) (string, error) {
if len(s) > 0 {
return p.prefix + s, p.err
}
return s, p.err
}
func defaultPrefix(s string) string {
return "test!" + s
}
func defaultPrefixValue(value []byte) string {
return defaultPrefix(string(value))
}
func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
scheme := runtime.NewScheme()
scheme.Log(t)
scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{})
require.NoError(t, example.AddToScheme(scheme))
require.NoError(t, examplev1.AddToScheme(scheme))
require.NoError(t, scheme.AddConversionFuncs(
func(in *storagetesting.TestResource, out *storagetesting.TestResource, s conversion.Scope) error {
*out = *in
return nil
},
func(in, out *time.Time, s conversion.Scope) error {
*out = *in
return nil
},
))
codecs := serializer.NewCodecFactory(scheme)
return scheme, codecs
}
func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, prefixTransformer{prefix: "test!"}).(*etcdHelper)
}
func createObj(t *testing.T, helper etcdHelper, name string, obj, out runtime.Object, ttl uint64) error {
err := helper.Create(context.TODO(), name, obj, out, ttl)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
return err
}
func createPodList(t *testing.T, helper etcdHelper, list *example.PodList) error {
for i := range list.Items {
returnedObj := &example.Pod{}
err := createObj(t, helper, list.Items[i].Name, &list.Items[i], returnedObj, 0)
if err != nil {
return err
}
list.Items[i] = *returnedObj
}
return nil
}
func TestList(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
list := example.PodList{
Items: []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
},
}
createPodList(t, helper, &list)
var got example.PodList
// TODO: a sorted filter function could be applied such implied
// ordering on the returned list doesn't matter.
err := helper.List(context.TODO(), "/", "", storage.Everything, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
func TestTransformationFailure(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
pods := []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
}
createPodList(t, helper, &example.PodList{Items: pods[:1]})
// create a second resource with an invalid prefix
oldTransformer := helper.transformer
helper.transformer = prefixTransformer{prefix: "otherprefix!"}
createPodList(t, helper, &example.PodList{Items: pods[1:]})
helper.transformer = oldTransformer
// only the first item is returned, and no error
var got example.PodList
if err := helper.List(context.TODO(), "/", "", storage.Everything, &got); err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := pods[:1], got.Items; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a))
}
// Get should fail
if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
// GuaranteedUpdate should return an error
if err := helper.GuaranteedUpdate(context.TODO(), "/baz", &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
return input, nil, nil
}, &pods[1]); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
// Delete succeeds but reports an error because we cannot access the body
if err := helper.Delete(context.TODO(), "/baz", &example.Pod{}, nil); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
}
func TestListFiltered(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
list := example.PodList{
Items: []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
},
}
createPodList(t, helper, &list)
// List only "bar" pod
p := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}
var got example.PodList
err := helper.List(context.TODO(), "/", "", p, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
// Check to make certain that the filter function only returns "bar"
if e, a := list.Items[0], got.Items[0]; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
func TestListAcrossDirectories(t *testing.T) {
_, codecs := testScheme(t)
server := etcdtesting.NewEtcdTestClientServer(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
defer server.Terminate(t)
roothelper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
helper1 := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()+"/dir1")
helper2 := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()+"/dir2")
list := example.PodList{
Items: []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
},
}
returnedObj := &example.Pod{}
// create the 1st 2 elements in one directory
createObj(t, helper1, list.Items[0].Name, &list.Items[0], returnedObj, 0)
list.Items[0] = *returnedObj
createObj(t, helper1, list.Items[1].Name, &list.Items[1], returnedObj, 0)
list.Items[1] = *returnedObj
// create the last element in the other directory
createObj(t, helper2, list.Items[2].Name, &list.Items[2], returnedObj, 0)
list.Items[2] = *returnedObj
var got example.PodList
err := roothelper.List(context.TODO(), "/", "", storage.Everything, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
func TestGet(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
expect := example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: storagetests.DeepEqualSafePodSpec(),
}
var got example.Pod
if err := helper.Create(context.TODO(), key, &expect, &got, 0); err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect = got
if err := helper.Get(context.TODO(), key, "", &got, false); err != nil {
t.Errorf("Unexpected error %#v", err)
}
if !reflect.DeepEqual(got, expect) {
t.Errorf("Wanted %#v, got %#v", expect, got)
}
}
func TestGetToList(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
storedObj := &example.Pod{}
if err := helper.Create(context.TODO(), key, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, storedObj, 0); err != nil {
t.Errorf("Unexpected error %#v", err)
}
tests := []struct {
key string
pred storage.SelectionPredicate
expectedOut []*example.Pod
}{{ // test GetToList on existing key
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
}, { // test GetToList on non-existing key
key: "/non-existing",
pred: storage.Everything,
expectedOut: nil,
}, { // test GetToList with matching pod name
key: "/non-existing",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: nil,
}}
for i, tt := range tests {
out := &example.PodList{}
err := helper.GetToList(context.TODO(), tt.key, "", tt.pred, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
if len(out.ResourceVersion) == 0 {
t.Errorf("#%d: unset resourceVersion", i)
}
if len(out.Items) != len(tt.expectedOut) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
continue
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
}
}
}
}
func TestGetNotFoundErr(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
boguskey := "/some/boguskey"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
var got example.Pod
err := helper.Get(context.TODO(), boguskey, "", &got, false)
if !storage.IsNotFound(err) {
t.Errorf("Unexpected response on key=%v, err=%v", boguskey, err)
}
}
func TestCreate(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
returnedObj := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
_, err = runtime.Encode(codec, obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
err = helper.Get(context.TODO(), "/some/key", "", returnedObj, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
_, err = runtime.Encode(codec, returnedObj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if obj.Name != returnedObj.Name {
t.Errorf("Wanted %v, got %v", obj.Name, returnedObj.Name)
}
}
func TestCreateNilOutParam(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
err := helper.Create(context.TODO(), "/some/key", obj, nil, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
}
func TestGuaranteedUpdate(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
// Update an existing node.
callbackCalled := false
objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
if in.(*storagetesting.TestResource).Value != 1 {
t.Errorf("Callback input was not current set value")
}
return objUpdate, nil
}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
objCheck := &storagetesting.TestResource{}
err = helper.Get(context.TODO(), key, "", objCheck, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if objCheck.Value != 2 {
t.Errorf("Value should have been 2 but got %v", objCheck.Value)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
}
func TestGuaranteedUpdateNoChange(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
original := &storagetesting.TestResource{}
err := helper.GuaranteedUpdate(context.TODO(), key, original, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
// Update an existing node with the same data
callbackCalled := false
objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: original.ResourceVersion}, Value: 1}
result := &storagetesting.TestResource{}
err = helper.GuaranteedUpdate(context.TODO(), key, result, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
return objUpdate, nil
}))
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
if result.ResourceVersion != original.ResourceVersion {
t.Fatalf("updated the object resource version")
}
// Update an existing node with the same data but return stale
helper.transformer = prefixTransformer{prefix: "test!", stale: true}
callbackCalled = false
result = &storagetesting.TestResource{}
objUpdate = &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
return objUpdate, nil
}))
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
if result.ResourceVersion == original.ResourceVersion {
t.Errorf("did not update the object resource version")
}
}
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
// Create a new node.
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
f := storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
})
ignoreNotFound := false
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, nil, f)
if err == nil {
t.Errorf("Expected error for key not found.")
}
ignoreNotFound = true
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, nil, f)
if err != nil {
t.Errorf("Unexpected error %v.", err)
}
}
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
const concurrency = 10
var wgDone sync.WaitGroup
var wgForceCollision sync.WaitGroup
wgDone.Add(concurrency)
wgForceCollision.Add(concurrency)
for i := 0; i < concurrency; i++ {
// Increment storagetesting.TestResource.Value by 1
go func() {
defer wgDone.Done()
firstCall := true
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
defer func() { firstCall = false }()
if firstCall {
// Force collision by joining all concurrent GuaranteedUpdate operations here.
wgForceCollision.Done()
wgForceCollision.Wait()
}
currValue := in.(*storagetesting.TestResource).Value
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: currValue + 1}
return obj, nil
}))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
}()
}
wgDone.Wait()
stored := &storagetesting.TestResource{}
err := helper.Get(context.TODO(), key, "", stored, false)
if err != nil {
t.Errorf("Unexpected error %#v", stored)
}
if stored.Value != concurrency {
t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value)
}
}
func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, codec, prefix)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
err = helper.GuaranteedUpdate(context.TODO(), "/some/key", podPtr, true, storage.NewUIDPreconditions("B"), storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if !storage.IsInvalidObj(err) {
t.Fatalf("Expect a Test Failed (write conflict) error, got: %v", err)
}
}
func TestDeleteUIDMismatch(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, codec, prefix)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
err = helper.Delete(context.TODO(), "/some/key", obj, storage.NewUIDPreconditions("B"))
if !storage.IsInvalidObj(err) {
t.Fatalf("Expect a Test Failed (write conflict) error, got: %v", err)
}
}
type getFunc func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error)
type fakeDeleteKeysAPI struct {
etcd.KeysAPI
fakeGetFunc getFunc
getCount int
// The fakeGetFunc will be called fakeGetCap times before the KeysAPI's Get will be called.
fakeGetCap int
}
func (f *fakeDeleteKeysAPI) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
f.getCount++
if f.getCount < f.fakeGetCap {
return f.fakeGetFunc(ctx, key, opts)
}
return f.KeysAPI.Get(ctx, key, opts)
}
// This is to emulate the case where another party updates the object when
// etcdHelper.Delete has verified the preconditions, but hasn't carried out the
// deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper
// should retry until there is no conflict.
func TestDeleteWithRetry(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
// fakeGet returns a large ModifiedIndex to emulate the case that another
// party has updated the object.
fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
data, _ := runtime.Encode(codec, obj)
return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil
}
expectedRetries := 3
helper := newEtcdHelper(server.Client, codec, prefix)
fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet}
helper.etcdKeysAPI = fake
returnedObj := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 0)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
err = helper.Delete(context.TODO(), "/some/key", obj, storage.NewUIDPreconditions("A"))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if fake.getCount != expectedRetries {
t.Errorf("Expect %d retries, got %d", expectedRetries, fake.getCount)
}
err = helper.Get(context.TODO(), "/some/key", "", obj, false)
if !storage.IsNotFound(err) {
t.Errorf("Expect an NotFound error, got %v", err)
}
}
func TestPrefix(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
testcases := map[string]string{
"custom/prefix": "/custom/prefix",
"/custom//prefix//": "/custom/prefix",
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
helper := newEtcdHelper(server.Client, codec, configuredPrefix)
if helper.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, helper.pathPrefix)
}
}
}

View File

@ -1,496 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"context"
"fmt"
"net/http"
"reflect"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
)
// Etcd watch event actions
const (
EtcdCreate = "create"
EtcdGet = "get"
EtcdSet = "set"
EtcdCAS = "compareAndSwap"
EtcdDelete = "delete"
EtcdCAD = "compareAndDelete"
EtcdExpire = "expire"
)
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)
// includeFunc returns true if the given key should be considered part of a watch
type includeFunc func(key string) bool
// exceptKey is an includeFunc that returns false when the provided key matches the watched key
func exceptKey(except string) includeFunc {
return func(key string) bool {
return key != except
}
}
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
// HighWaterMarks for performance debugging.
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
incomingHWM storage.HighWaterMark
outgoingHWM storage.HighWaterMark
encoding runtime.Codec
// Note that versioner is required for etcdWatcher to work correctly.
// There is no public constructor of it, so be careful when manipulating
// with it manually.
versioner storage.Versioner
transform TransformFunc
valueTransformer ValueTransformer
list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, should be true
include includeFunc
pred storage.SelectionPredicate
etcdIncoming chan *etcd.Response
etcdError chan error
ctx context.Context
cancel context.CancelFunc
etcdCallEnded chan struct{}
outgoing chan watch.Event
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// wg is used to avoid calls to etcd after Stop(), and to make sure
// that the translate goroutine is not leaked.
wg sync.WaitGroup
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
cache etcdCache
}
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, quorum bool, include includeFunc, pred storage.SelectionPredicate,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
valueTransformer: valueTransformer,
list: list,
quorum: quorum,
include: include,
pred: pred,
// Buffer this channel, so that the etcd client is not forced
// to context switch with every object it gets, and so that a
// long time spent decoding an object won't block the *next*
// object. Basically, we see a lot of "401 window exceeded"
// errors from etcd, and that's due to the client not streaming
// results but rather getting them one at a time. So we really
// want to never block the etcd client, if possible. The 100 is
// mostly arbitrary--we know it goes as high as 50, though.
// There's a V(2) log message that prints the length so we can
// monitor how much of this buffer is actually used.
etcdIncoming: make(chan *etcd.Response, 100),
etcdError: make(chan error, 1),
// Similarly to etcdIncomming, we don't want to force context
// switch on every new incoming object.
outgoing: make(chan watch.Event, 100),
userStop: make(chan struct{}),
stopped: false,
wg: sync.WaitGroup{},
cache: cache,
ctx: nil,
cancel: nil,
}
w.emit = func(e watch.Event) {
if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen)
}
// Give up on user stop, without this we leak a lot of goroutines in tests.
select {
case w.outgoing <- e:
case <-w.userStop:
}
}
// translate will call done. We need to Add() here because otherwise,
// if Stop() gets called before translate gets started, there'd be a
// problem.
w.wg.Add(1)
go w.translate()
return w
}
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(w.etcdError)
defer close(w.etcdIncoming)
// All calls to etcd are coming from this function - once it is finished
// no other call to etcd should be generated by this watcher.
done := func() {}
// We need to be prepared, that Stop() can be called at any time.
// It can potentially also be called, even before this function is called.
// If that is the case, we simply skip all the code here.
// See #18928 for more details.
var watcher etcd.Watcher
returned := func() bool {
w.stopLock.Lock()
defer w.stopLock.Unlock()
if w.stopped {
// Watcher has already been stopped - don't event initiate it here.
return true
}
w.wg.Add(1)
done = w.wg.Done
// Perform initialization of watcher under lock - we want to avoid situation when
// Stop() is called in the meantime (which in tests can cause etcd termination and
// strange behavior here).
if resourceVersion == 0 {
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
if err != nil {
w.etcdError <- err
return true
}
resourceVersion = latest
}
opts := etcd.WatcherOptions{
Recursive: w.list,
AfterIndex: resourceVersion,
}
watcher = client.Watcher(key, &opts)
w.ctx, w.cancel = context.WithCancel(ctx)
return false
}()
defer done()
if returned {
return
}
for {
resp, err := watcher.Next(w.ctx)
if err != nil {
w.etcdError <- err
return
}
w.etcdIncoming <- resp
}
}
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
opts := etcd.GetOptions{
Recursive: recursive,
Sort: false,
Quorum: quorum,
}
resp, err := client.Get(ctx, key, &opts)
if err != nil {
if !etcdutil.IsEtcdNotFound(err) {
utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err))
return resourceVersion, toStorageErr(err, key, 0)
}
if etcdError, ok := err.(etcd.Error); ok {
resourceVersion = etcdError.Index
}
return resourceVersion, nil
}
resourceVersion = resp.Index
convertRecursiveResponse(resp.Node, resp, incoming)
return
}
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
copied.Action = "get"
copied.Node = node
incoming <- &copied
}
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer w.wg.Done()
defer close(w.outgoing)
defer utilruntime.HandleCrash()
for {
select {
case err := <-w.etcdError:
if err != nil {
var status *metav1.Status
switch {
case etcdutil.IsEtcdWatchExpired(err):
status = &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Code: http.StatusGone, // Gone
Reason: metav1.StatusReasonExpired,
}
// TODO: need to generate errors using api/errors which has a circular dependency on this package
// no other way to inject errors
// case etcdutil.IsEtcdUnreachable(err):
// status = errors.NewServerTimeout(...)
default:
status = &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: metav1.StatusReasonInternalError,
}
}
w.emit(watch.Event{
Type: watch.Error,
Object: status,
})
}
return
case <-w.userStop:
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen)
}
w.sendResult(res)
}
// If !ok, don't return here-- must wait for etcdError channel
// to give an error or be closed.
}
}
}
// decodeObject extracts an object from the provided etcd node or returns an error.
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
return obj, nil
}
body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value)
if err != nil {
return nil, err
}
obj, err := runtime.Decode(w.encoding, []byte(body))
if err != nil {
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
}
// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
return nil, err
}
}
if node.ModifiedIndex != 0 {
w.cache.addToCache(node.ModifiedIndex, obj)
}
return obj, nil
}
func (w *etcdWatcher) sendAdd(res *etcd.Response) {
if res.Node == nil {
utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res))
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
obj, err := w.decodeObject(res.Node)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if matched, err := w.pred.Matches(obj); err != nil || !matched {
return
}
action := watch.Added
w.emit(watch.Event{
Type: action,
Object: obj,
})
}
func (w *etcdWatcher) sendModify(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
curObj, err := w.decodeObject(res.Node)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
curObjPasses := false
if matched, err := w.pred.Matches(curObj); err == nil && matched {
curObjPasses = true
}
oldObjPasses := false
var oldObj runtime.Object
if res.PrevNode != nil && res.PrevNode.Value != "" {
// Ignore problems reading the old object.
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
}
if matched, err := w.pred.Matches(oldObj); err == nil && matched {
oldObjPasses = true
}
}
}
// Some changes to an object may cause it to start or stop matching a pred.
// We need to report those as adds/deletes. So we have to check both the previous
// and current value of the object.
switch {
case curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Modified,
Object: curObj,
})
case curObjPasses && !oldObjPasses:
w.emit(watch.Event{
Type: watch.Added,
Object: curObj,
})
case !curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Deleted,
Object: oldObj,
})
}
// Do nothing if neither new nor old object passed the pred.
}
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
if res.PrevNode == nil {
utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res))
return
}
if w.include != nil && !w.include(res.PrevNode.Key) {
return
}
node := *res.PrevNode
if res.Node != nil {
// Note that this sends the *old* object with the etcd index for the time at
// which it gets deleted. This will allow users to restart the watch at the right
// index.
node.ModifiedIndex = res.Node.ModifiedIndex
}
obj, err := w.decodeObject(&node)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node))
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if matched, err := w.pred.Matches(obj); err != nil || !matched {
return
}
w.emit(watch.Event{
Type: watch.Deleted,
Object: obj,
})
}
func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case EtcdCreate, EtcdGet:
// "Get" will only happen in watch 0 case, where we explicitly want ADDED event
// for initial state.
w.sendAdd(res)
case EtcdSet, EtcdCAS:
w.sendModify(res)
case EtcdDelete, EtcdExpire, EtcdCAD:
w.sendDelete(res)
default:
utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))
}
}
// ResultChan implements watch.Interface.
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}
// Stop implements watch.Interface.
func (w *etcdWatcher) Stop() {
w.stopLock.Lock()
if w.cancel != nil {
w.cancel()
w.cancel = nil
}
if !w.stopped {
w.stopped = true
close(w.userStop)
}
w.stopLock.Unlock()
// Wait until all calls to etcd are finished and no other
// will be issued.
w.wg.Wait()
}

View File

@ -1,579 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"context"
rt "runtime"
"testing"
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
etcd "github.com/coreos/etcd/client"
)
var versioner = APIObjectVersioner{}
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
type fakeEtcdCache struct{}
func (f *fakeEtcdCache) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
return nil, false
}
func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
}
var _ etcdCache = &fakeEtcdCache{}
func TestWatchInterpretations(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
// Declare some pods to make the test cases compact.
podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}}
// All of these test cases will be run with the firstLetterIsB SelectionPredicate.
table := map[string]struct {
actions []string // Run this test item for every action here.
prevNodeValue string
nodeValue string
expectEmit bool
expectType watch.EventType
expectObject runtime.Object
}{
"create": {
actions: []string{"create", "get"},
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"create but filter blocks": {
actions: []string{"create", "get"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
"delete": {
actions: []string{"delete"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar,
},
"delete but filter blocks": {
actions: []string{"delete"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
"modify appears to create 1": {
actions: []string{"set", "compareAndSwap"},
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to create 2": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podFoo),
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to delete": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar, // Should return last state that passed the filter!
},
"modify modifies": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podBaz),
expectEmit: true,
expectType: watch.Modified,
expectObject: podBaz,
},
"modify ignores": {
actions: []string{"set", "compareAndSwap"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
}
// Should use fieldSelector here.
// But for the sake of tests (simplifying the codes), use labelSelector to support set-based requirements
selector, err := labels.Parse("metadata.name in (bar, baz)")
if err != nil {
t.Fatal(err)
}
firstLetterIsB := storage.SelectionPredicate{
Label: selector,
Field: fields.Everything(),
IncludeUninitialized: true,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return labels.Set{"metadata.name": pod.Name}, nil, pod.Initializers != nil, nil
},
}
for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
if !item.expectEmit {
return
}
if e, a := item.expectType, event.Type; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
if e, a := item.expectObject, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
}
var n, pn *etcd.Node
if item.nodeValue != "" {
n = &etcd.Node{Value: defaultPrefix(item.nodeValue)}
}
if item.prevNodeValue != "" {
pn = &etcd.Node{Value: defaultPrefix(item.prevNodeValue)}
}
w.sendResult(&etcd.Response{
Action: action,
Node: n,
PrevNode: pn,
})
if e, a := item.expectEmit, emitCalled; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
w.Stop()
}
}
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "update",
})
w.Stop()
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
})
w.Stop()
}
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
Node: &etcd.Node{
Value: defaultPrefix("foobar"),
},
})
w.sendResult(&etcd.Response{
Action: action,
PrevNode: &etcd.Node{
Value: defaultPrefix("foobar"),
},
})
w.Stop()
}
}
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
selector, _ := fields.ParseSelector("metadata.name!=bar")
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: selector,
IncludeUninitialized: true,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}
w := newEtcdWatcher(false, false, nil, pred, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
eventChan := make(chan watch.Event, 1)
w.emit = func(e watch.Event) {
eventChan <- e
}
fooPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
barPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fooBytes, err := runtime.Encode(codec, fooPod)
if err != nil {
t.Fatalf("Encode failed: %v", err)
}
barBytes, err := runtime.Encode(codec, barPod)
if err != nil {
t.Fatalf("Encode failed: %v", err)
}
tests := []struct {
response *etcd.Response
expRV string
}{{ // Delete event
response: &etcd.Response{
Action: EtcdDelete,
Node: &etcd.Node{
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: defaultPrefixValue(fooBytes),
ModifiedIndex: 1,
},
},
expRV: "2",
}, { // Modify event with uninterested data
response: &etcd.Response{
Action: EtcdSet,
Node: &etcd.Node{
Value: defaultPrefixValue(barBytes),
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: defaultPrefixValue(fooBytes),
ModifiedIndex: 1,
},
},
expRV: "2",
}}
for i, tt := range tests {
w.sendResult(tt.response)
ev := <-eventChan
if ev.Type != watch.Deleted {
t.Errorf("#%d: event type want=Deleted, get=%s", i, ev.Type)
return
}
rv := ev.Object.(*example.Pod).ResourceVersion
if rv != tt.expRV {
t.Errorf("#%d: resource version want=%s, get=%s", i, tt.expRV, rv)
}
}
w.Stop()
}
func TestWatch(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// watching is explicitly closed below.
// Test normal case
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
returnObj := &example.Pod{}
err = h.Create(context.TODO(), key, pod, returnObj, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event := <-watching.ResultChan()
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
watching.Stop()
// There is a race in etcdWatcher so that after calling Stop() one of
// two things can happen:
// - ResultChan() may be closed (triggered by closing userStop channel)
// - an Error "context cancelled" may be emitted (triggered by cancelling request
// to etcd and putting that error to etcdError channel)
// We need to be prepared for both here.
event, open := <-watching.ResultChan()
if open && event.Type != watch.Error {
t.Errorf("Unexpected event from stopped watcher: %#v", event)
}
}
func TestWatchEtcdState(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
pod := &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
}
err = h.Create(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event := <-watching.ResultChan()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
pod.ResourceVersion = ""
pod.Status = example.PodStatus{
Phase: example.PodPhase("Running"),
}
// CAS the previous value
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return pod.DeepCopyObject(), nil, nil
}
err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event = <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
}
if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Unexpected error: expected %#v, got %#v", e, a)
}
}
func TestWatchFromZeroIndex(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
// set before the watch and verify events
err := h.Create(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
pod.ResourceVersion = ""
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// The create trigger ADDED event when watching from 0
event := <-watching.ResultChan()
watching.Stop()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
// check for concatenation on watch event with CAS
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
pod := input.(*example.Pod)
pod.Name = "bar"
return pod, nil, nil
}
err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
watching, err = h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
// because we watch from 0, first event that we receive will always be ADDED
event = <-watching.ResultChan()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
pod.Name = "baz"
updateFn = func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
pod := input.(*example.Pod)
pod.Name = "baz"
return pod, nil, nil
}
err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event = <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
}
if e, a := pod, event.Object; a == nil || !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Unexpected error: expected %#v, got %#v", e, a)
}
}
func TestWatchListFromZeroIndex(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
prefix := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, prefix)
watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
// creates foo which should trigger the WatchList for "/"
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
err = h.Create(context.TODO(), pod.Name, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event, _ := <-watching.ResultChan()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Unexpected error: expected %v, got %v", e, a)
}
}
func TestWatchListIgnoresRootKey(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, key)
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
// creates key/foo which should trigger the WatchList for "key"
err = h.Create(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// force context switch to ensure watches would catch and notify.
rt.Gosched()
select {
case event, _ := <-watching.ResultChan():
t.Fatalf("Unexpected event: %#v", event)
default:
// fall through, expected behavior
}
}
func TestWatchPurposefulShutdown(t *testing.T) {
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
// Test purposeful shutdown
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
watching.Stop()
rt.Gosched()
// There is a race in etcdWatcher so that after calling Stop() one of
// two things can happen:
// - ResultChan() may be closed (triggered by closing userStop channel)
// - an Error "context cancelled" may be emitted (triggered by cancelling request
// to etcd and putting that error to etcdError channel)
// We need to be prepared for both here.
event, open := <-watching.ResultChan()
if open && event.Type != watch.Error {
t.Errorf("Unexpected event from stopped watcher: %#v", event)
}
}

View File

@ -25,7 +25,6 @@ import (
const (
StorageTypeUnset = ""
StorageTypeETCD2 = "etcd2"
StorageTypeETCD3 = "etcd3"
DefaultCompactInterval = 5 * time.Minute

View File

@ -28,21 +28,17 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"etcd2.go",
"etcd3.go",
"factory.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory",
importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//vendor/github.com/coreos/etcd/client:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/github.com/grpc-ecosystem/go-grpc-prometheus:go_default_library",

View File

@ -1,106 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"context"
"fmt"
"net"
"net/http"
"time"
etcd2client "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
func newETCD2HealthCheck(c storagebackend.Config) (func() error, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
}
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, err
}
members := etcd2client.NewMembersAPI(client)
return func() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := members.List(ctx); err != nil {
return fmt.Errorf("error listing etcd members: %v", err)
}
return nil
}, nil
}
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, nil, err
}
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, nil, err
}
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, etcd.IdentityTransformer)
return s, tr.CloseIdleConnections, nil
}
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {
cli, err := etcd2client.New(etcd2client.Config{
Endpoints: serverList,
Transport: tr,
})
if err != nil {
return nil, err
}
return cli, nil
}
func newTransportForETCD2(certFile, keyFile, caFile string) (*http.Transport, error) {
info := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
tr := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
TLSClientConfig: cfg,
})
return tr, nil
}

View File

@ -29,8 +29,8 @@ type DestroyFunc func()
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
@ -45,8 +45,8 @@ func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
// CreateHealthCheck creates a healthcheck function based on given config.
func CreateHealthCheck(c storagebackend.Config) (func() error, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2HealthCheck(c)
case "etcd2":
return nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3HealthCheck(c)
default:

View File

@ -38,10 +38,6 @@
"ImportPath": "github.com/coreos/etcd/auth/authpb",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/client",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/clientv3",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
@ -58,14 +54,6 @@
"ImportPath": "github.com/coreos/etcd/mvcc/mvccpb",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/pathutil",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/srv",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/tlsutil",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
@ -78,14 +66,6 @@
"ImportPath": "github.com/coreos/etcd/pkg/types",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/version",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/go-semver/semver",
"Rev": "568e959cd89871e61434c1143528d9162da89ef2"
},
{
"ImportPath": "github.com/coreos/go-systemd/daemon",
"Rev": "48702e0da86bd25e76cfef347e2adeb434a0d0a6"
@ -322,10 +302,6 @@
"ImportPath": "github.com/stretchr/testify/assert",
"Rev": "c679ae2cc0cb27ec3293fea7e254e47386f05d69"
},
{
"ImportPath": "github.com/ugorji/go/codec",
"Rev": "ded73eae5db7e7a0ef6f55aace87a2873c5d2b74"
},
{
"ImportPath": "golang.org/x/crypto/ssh/terminal",
"Rev": "de0752318171da717af4ce24d0a2e8626afaeb11"
@ -1162,10 +1138,6 @@
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd/metrics",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd/util",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -38,10 +38,6 @@
"ImportPath": "github.com/coreos/etcd/auth/authpb",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/client",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/clientv3",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
@ -58,14 +54,6 @@
"ImportPath": "github.com/coreos/etcd/mvcc/mvccpb",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/pathutil",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/srv",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/tlsutil",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
@ -78,14 +66,6 @@
"ImportPath": "github.com/coreos/etcd/pkg/types",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/etcd/version",
"Rev": "420a452267a7ce45b3fcbed04d54030d69964fc1"
},
{
"ImportPath": "github.com/coreos/go-semver/semver",
"Rev": "568e959cd89871e61434c1143528d9162da89ef2"
},
{
"ImportPath": "github.com/coreos/go-systemd/daemon",
"Rev": "48702e0da86bd25e76cfef347e2adeb434a0d0a6"
@ -302,10 +282,6 @@
"ImportPath": "github.com/spf13/pflag",
"Rev": "583c0c0531f06d5278b7d917446061adc344b5cd"
},
{
"ImportPath": "github.com/ugorji/go/codec",
"Rev": "ded73eae5db7e7a0ef6f55aace87a2873c5d2b74"
},
{
"ImportPath": "golang.org/x/crypto/ssh/terminal",
"Rev": "de0752318171da717af4ce24d0a2e8626afaeb11"
@ -1126,10 +1102,6 @@
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd/metrics",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd/util",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/etcd3",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -86,7 +86,7 @@ const rootfs = "/rootfs"
func TestE2eNode(t *testing.T) {
if *runServicesMode {
// If run-services-mode is specified, only run services in current process.
services.RunE2EServices()
services.RunE2EServices(t)
return
}
if *runKubeletMode {

View File

@ -9,7 +9,6 @@ go_library(
name = "go_default_library",
srcs = [
"apiserver.go",
"etcd.go",
"internal_services.go",
"kubelet.go",
"logs.go",
@ -29,6 +28,8 @@ go_library(
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
@ -39,10 +40,6 @@ go_library(
"//test/e2e/framework:go_default_library",
"//test/e2e_node/builder:go_default_library",
"//test/e2e_node/remote:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver/api/v2http:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/types:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/kardianos/osext:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net"
"k8s.io/apiserver/pkg/storage/storagebackend"
apiserver "k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
@ -31,21 +32,23 @@ const (
)
// APIServer is a server which manages apiserver.
type APIServer struct{}
type APIServer struct {
storageConfig storagebackend.Config
stopCh chan struct{}
}
// NewAPIServer creates an apiserver.
func NewAPIServer() *APIServer {
return &APIServer{}
func NewAPIServer(storageConfig storagebackend.Config) *APIServer {
return &APIServer{
storageConfig: storageConfig,
stopCh: make(chan struct{}),
}
}
// Start starts the apiserver, returns when apiserver is ready.
func (a *APIServer) Start() error {
o := options.NewServerRunOptions()
o.Etcd.StorageConfig.ServerList = []string{getEtcdClientURL()}
// TODO: Current setup of etcd in e2e-node tests doesn't support etcd v3
// protocol. We should migrate it to use the same infrastructure as all
// other tests (pkg/storage/etcd/testing).
o.Etcd.StorageConfig.Type = "etcd2"
o.Etcd.StorageConfig = a.storageConfig
_, ipnet, err := net.ParseCIDR(clusterIPRange)
if err != nil {
return err
@ -56,14 +59,12 @@ func (a *APIServer) Start() error {
errCh := make(chan error)
go func() {
defer close(errCh)
stopCh := make(chan struct{})
defer close(stopCh)
completedOptions, err := apiserver.Complete(o)
if err != nil {
errCh <- fmt.Errorf("set apiserver default options error: %v", err)
return
}
err = apiserver.Run(completedOptions, stopCh)
err = apiserver.Run(completedOptions, a.stopCh)
if err != nil {
errCh <- fmt.Errorf("run apiserver error: %v", err)
return
@ -80,6 +81,10 @@ func (a *APIServer) Start() error {
// Stop stops the apiserver. Currently, there is no way to stop the apiserver.
// The function is here only for completion.
func (a *APIServer) Stop() error {
if a.stopCh != nil {
close(a.stopCh)
a.stopCh = nil
}
return nil
}

View File

@ -1,161 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"crypto/tls"
"net"
"net/http"
"net/url"
"time"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/golang/glog"
)
// TODO: These tests should not be leveraging v2http
// TODO(random-liu): Add service interface to manage services with the same behaviour.
// All following configurations are got from etcd source code.
// TODO(random-liu): Use embed.NewConfig after etcd3 is supported.
const (
etcdName = "etcd"
clientURLStr = "http://localhost:4001" // clientURL has listener created and handles etcd API traffic
peerURLStr = "http://localhost:7001" // peerURL does't have listener created, it is used to pass Etcd validation
snapCount = etcdserver.DefaultSnapCount
maxSnapFiles = 5
maxWALFiles = 5
tickMs = 100
electionTicks = 10
etcdHealthCheckURL = clientURLStr + "/v2/keys/" // Trailing slash is required,
)
// EtcdServer is a server which manages etcd.
type EtcdServer struct {
*etcdserver.EtcdServer
config *etcdserver.ServerConfig
clientListen net.Listener
}
// NewEtcd creates a new default etcd server using 'dataDir' for persistence.
func NewEtcd(dataDir string) *EtcdServer {
clientURLs, err := types.NewURLs([]string{clientURLStr})
if err != nil {
glog.Fatalf("Failed to parse client url %q: %v", clientURLStr, err)
}
peerURLs, err := types.NewURLs([]string{peerURLStr})
if err != nil {
glog.Fatalf("Failed to parse peer url %q: %v", peerURLStr, err)
}
config := &etcdserver.ServerConfig{
Name: etcdName,
ClientURLs: clientURLs,
PeerURLs: peerURLs,
DataDir: dataDir,
InitialPeerURLsMap: map[string]types.URLs{etcdName: peerURLs},
NewCluster: true,
SnapCount: snapCount,
MaxSnapFiles: maxSnapFiles,
MaxWALFiles: maxWALFiles,
TickMs: tickMs,
ElectionTicks: electionTicks,
AuthToken: "simple",
}
return &EtcdServer{
config: config,
}
}
// Start starts the etcd server and listening for client connections
func (e *EtcdServer) Start() error {
var err error
e.EtcdServer, err = etcdserver.NewServer(e.config)
if err != nil {
return err
}
// create client listener, there should be only one url
e.clientListen, err = createListener(e.config.ClientURLs[0])
if err != nil {
return err
}
// start etcd
e.EtcdServer.Start()
// setup client listener
ch := v2http.NewClientHandler(e.EtcdServer, e.config.ReqTimeout())
errCh := make(chan error)
go func(l net.Listener) {
defer close(errCh)
srv := &http.Server{
Handler: ch,
ReadTimeout: 5 * time.Minute,
}
// Serve always returns a non-nil error.
errCh <- srv.Serve(l)
}(e.clientListen)
err = readinessCheck("etcd", []string{etcdHealthCheckURL}, errCh)
if err != nil {
return err
}
return nil
}
// Stop closes all connections and stops the Etcd server
func (e *EtcdServer) Stop() error {
if e.EtcdServer != nil {
e.EtcdServer.Stop()
}
if e.clientListen != nil {
err := e.clientListen.Close()
if err != nil {
return err
}
}
return nil
}
// Name returns the server's unique name
func (e *EtcdServer) Name() string {
return etcdName
}
func createListener(url url.URL) (net.Listener, error) {
l, err := net.Listen("tcp", url.Host)
if err != nil {
return nil, err
}
l, err = transport.NewKeepAliveListener(l, url.Scheme, &tls.Config{})
if err != nil {
return nil, err
}
return l, nil
}
func getEtcdClientURL() string {
return clientURLStr
}
func getEtcdHealthCheckURL() string {
return etcdHealthCheckURL
}

View File

@ -17,9 +17,11 @@ limitations under the License.
package services
import (
"io/ioutil"
"os"
"testing"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/test/e2e/framework"
"github.com/golang/glog"
@ -29,7 +31,8 @@ import (
type e2eServices struct {
rmDirs []string
// statically linked e2e services
etcdServer *EtcdServer
etcdServer *etcdtesting.EtcdTestServer
etcdStorage *storagebackend.Config
apiServer *APIServer
nsController *NamespaceController
}
@ -40,9 +43,9 @@ func newE2EServices() *e2eServices {
// run starts all e2e services and wait for the termination signal. Once receives the
// termination signal, it will stop the e2e services gracefully.
func (es *e2eServices) run() error {
defer es.stop()
if err := es.start(); err != nil {
func (es *e2eServices) run(t *testing.T) error {
defer es.stop(t)
if err := es.start(t); err != nil {
return err
}
// Wait until receiving a termination signal.
@ -51,13 +54,13 @@ func (es *e2eServices) run() error {
}
// start starts the tests embedded services or returns an error.
func (es *e2eServices) start() error {
func (es *e2eServices) start(t *testing.T) error {
glog.Info("Starting e2e services...")
err := es.startEtcd()
err := es.startEtcd(t)
if err != nil {
return err
}
err = es.startApiServer()
err = es.startApiServer(es.etcdStorage)
if err != nil {
return err
}
@ -70,7 +73,7 @@ func (es *e2eServices) start() error {
}
// stop stops the embedded e2e services.
func (es *e2eServices) stop() {
func (es *e2eServices) stop(t *testing.T) {
glog.Info("Stopping e2e services...")
// TODO(random-liu): Use a loop to stop all services after introducing
// service interface.
@ -90,9 +93,7 @@ func (es *e2eServices) stop() {
glog.Info("Stopping etcd")
if es.etcdServer != nil {
if err := es.etcdServer.Stop(); err != nil {
glog.Errorf("Failed to stop %q: %v", es.etcdServer.Name(), err)
}
es.etcdServer.Terminate(t)
}
for _, d := range es.rmDirs {
@ -107,23 +108,18 @@ func (es *e2eServices) stop() {
}
// startEtcd starts the embedded etcd instance or returns an error.
func (es *e2eServices) startEtcd() error {
func (es *e2eServices) startEtcd(t *testing.T) error {
glog.Info("Starting etcd")
// Create data directory in current working space.
dataDir, err := ioutil.TempDir(".", "etcd")
if err != nil {
return err
}
// Mark the dataDir as directories to remove.
es.rmDirs = append(es.rmDirs, dataDir)
es.etcdServer = NewEtcd(dataDir)
return es.etcdServer.Start()
server, etcdStorage := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
es.etcdServer = server
es.etcdStorage = etcdStorage
return nil
}
// startApiServer starts the embedded API server or returns an error.
func (es *e2eServices) startApiServer() error {
func (es *e2eServices) startApiServer(etcdStorage *storagebackend.Config) error {
glog.Info("Starting API server")
es.apiServer = NewAPIServer()
es.apiServer = NewAPIServer(*etcdStorage)
return es.apiServer.Start()
}
@ -137,7 +133,6 @@ func (es *e2eServices) startNamespaceController() error {
// getServicesHealthCheckURLs returns the health check urls for the internal services.
func getServicesHealthCheckURLs() []string {
return []string{
getEtcdHealthCheckURL(),
getAPIServerHealthCheckURL(),
}
}

View File

@ -22,6 +22,7 @@ import (
"os"
"os/exec"
"path"
"testing"
"github.com/golang/glog"
"github.com/kardianos/osext"
@ -105,12 +106,12 @@ func (e *E2EServices) Stop() {
// RunE2EServices actually start the e2e services. This function is used to
// start e2e services in current process. This is only used in run-services-mode.
func RunE2EServices() {
func RunE2EServices(t *testing.T) {
// Populate global DefaultFeatureGate with value from TestContext.FeatureGates.
// This way, statically-linked components see the same feature gate config as the test context.
utilfeature.DefaultFeatureGate.SetFromMap(framework.TestContext.FeatureGates)
e := newE2EServices()
if err := e.run(); err != nil {
if err := e.run(t); err != nil {
glog.Fatalf("Failed to run e2e services: %v", err)
}
}