Reduce the etcd surface area in the integration test to minimize the client dependency.

pull/6/head
Timothy St. Clair 2016-11-18 16:10:09 -06:00
parent 4a3c7aecdb
commit d15e20eed4
10 changed files with 23 additions and 324 deletions

View File

@ -22,9 +22,5 @@ go_library(
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library",
"//pkg/util/wait:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor:github.com/coreos/etcd/client",
"//vendor:github.com/golang/glog",
"//vendor:golang.org/x/net/context",
],
)

View File

@ -13,7 +13,6 @@ load(
go_library(
name = "go_default_library",
srcs = [
"etcd_utils.go",
"master_utils.go",
"perf_utils.go",
"serializer.go",
@ -63,10 +62,8 @@ go_library(
"//plugin/pkg/auth/authenticator/request/union:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/utils:go_default_library",
"//vendor:github.com/coreos/etcd/client",
"//vendor:github.com/go-openapi/spec",
"//vendor:github.com/golang/glog",
"//vendor:github.com/pborman/uuid",
"//vendor:golang.org/x/net/context",
],
)

View File

@ -1,72 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"math/rand"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/util/env"
)
// If you need to start an etcd instance by hand, you also need to insert a key
// for this check to pass (*any* key will do, eg:
//curl -L http://127.0.0.1:2379/v2/keys/message -XPUT -d value="Hello world").
var testing_etcd = false
func GetEtcdURLFromEnv() string {
url := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
glog.V(4).Infof("Using KUBE_INTEGRATION_ETCD_URL=%q", url)
return url
}
func NewEtcdClient() etcd.Client {
// gaurded to avoid infinite recursion, check etcd.
if testing_etcd {
RequireEtcd()
}
cfg := etcd.Config{
Endpoints: []string{GetEtcdURLFromEnv()},
}
client, err := etcd.New(cfg)
if err != nil {
glog.Fatalf("unable to connect to etcd for testing: %v", err)
}
return client
}
func RequireEtcd() {
testing_etcd = true
defer func() {
testing_etcd = false
}()
if _, err := etcd.NewKeysAPI(NewEtcdClient()).Get(context.TODO(), "/", nil); err != nil {
glog.Fatalf("unable to connect to etcd for testing: %v", err)
}
}
func WithEtcdKey(f func(string)) {
prefix := fmt.Sprintf("/test-%d", rand.Int63())
defer etcd.NewKeysAPI(NewEtcdClient()).Delete(context.TODO(), prefix, &etcd.DeleteOptions{Recursive: true})
f(prefix)
}

View File

@ -60,6 +60,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/env"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
@ -295,6 +296,13 @@ func parseCIDROrDie(cidr string) *net.IPNet {
return parsed
}
// return the EtcdURL
func GetEtcdURLFromEnv() string {
url := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
glog.V(4).Infof("Using KUBE_INTEGRATION_ETCD_URL=%q", url)
return url
}
// Returns a basic master config.
func NewMasterConfig() *master.Config {
config := storagebackend.Config{

View File

@ -1,200 +0,0 @@
// +build integration,!no-etcd
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"strconv"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/integration/framework"
etcd "github.com/coreos/etcd/client"
"golang.org/x/net/context"
)
func TestCreate(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false, etcdtest.DeserializationCacheSize)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
if err := etcdStorage.Create(ctx, key, &testObject, nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := keysAPI.Get(ctx, key, nil)
if err != nil || resp.Node == nil {
t.Fatalf("unexpected error: %v %v", err, resp)
}
decoded, err := runtime.Decode(testapi.Default.Codec(), []byte(resp.Node.Value))
if err != nil {
t.Fatalf("unexpected response: %#v", resp.Node)
}
result := *decoded.(*api.ServiceAccount)
if !api.Semantic.DeepEqual(testObject, result) {
t.Errorf("expected: %#v got: %#v", testObject, result)
}
})
}
func TestGet(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false, etcdtest.DeserializationCacheSize)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
coded, err := runtime.Encode(testapi.Default.Codec(), &testObject)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
_, err = keysAPI.Set(ctx, key, string(coded), nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
result := api.ServiceAccount{}
if err := etcdStorage.Get(ctx, key, &result, false); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Propagate ResourceVersion (it is set automatically).
testObject.ObjectMeta.ResourceVersion = result.ObjectMeta.ResourceVersion
if !api.Semantic.DeepEqual(testObject, result) {
t.Errorf("expected: %#v got: %#v", testObject, result)
}
})
}
func TestWriteTTL(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false, etcdtest.DeserializationCacheSize)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
if err := etcdStorage.Create(ctx, key, &testObject, nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
result := &api.ServiceAccount{}
err := etcdStorage.GuaranteedUpdate(ctx, key, result, false, nil, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "foo" {
t.Fatalf("unexpected existing object: %v", obj)
}
if res.TTL != 0 {
t.Fatalf("unexpected TTL: %#v", res)
}
ttl := uint64(10)
out := &api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "out"}}
return out, &ttl, nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Name != "out" {
t.Errorf("unexpected response: %#v", result)
}
if res, err := keysAPI.Get(ctx, key, nil); err != nil || res == nil || res.Node.TTL != 10 {
t.Fatalf("unexpected get: %v %#v", err, res)
}
result = &api.ServiceAccount{}
err = etcdStorage.GuaranteedUpdate(ctx, key, result, false, nil, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "out" {
t.Fatalf("unexpected existing object: %v", obj)
}
if res.TTL <= 1 {
t.Fatalf("unexpected TTL: %#v", res)
}
out := &api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "out2"}}
return out, nil, nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Name != "out2" {
t.Errorf("unexpected response: %#v", result)
}
if res, err := keysAPI.Get(ctx, key, nil); err != nil || res == nil || res.Node.TTL <= 1 {
t.Fatalf("unexpected get: %v %#v", err, res)
}
})
}
func TestWatch(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
key = etcdtest.AddPrefix(key)
resp, err := keysAPI.Set(ctx, key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectedVersion := resp.Node.ModifiedIndex
// watch should load the object at the current index
w, err := etcdStorage.Watch(ctx, key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event := <-w.ResultChan()
if event.Type != watch.Added || event.Object == nil {
t.Fatalf("expected first value to be set to ADDED, got %#v", event)
}
// version should match what we set
pod := event.Object.(*api.Pod)
if pod.ResourceVersion != strconv.FormatUint(expectedVersion, 10) {
t.Errorf("expected version %d, got %#v", expectedVersion, pod)
}
// should be no events in the stream
select {
case event, ok := <-w.ResultChan():
if !ok {
t.Fatalf("channel closed unexpectedly")
}
t.Fatalf("unexpected object in channel: %#v", event)
default:
}
// should return the previously deleted item in the watch, but with the latest index
resp, err = keysAPI.Delete(ctx, key, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectedVersion = resp.Node.ModifiedIndex
event = <-w.ResultChan()
if event.Type != watch.Deleted {
t.Errorf("expected deleted event %#v", event)
}
pod = event.Object.(*api.Pod)
if pod.ResourceVersion != strconv.FormatUint(expectedVersion, 10) {
t.Errorf("expected version %d, got %#v", expectedVersion, pod)
}
})
}

View File

@ -19,6 +19,8 @@ package objectmeta
import (
"testing"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api/testapi"
@ -32,6 +34,18 @@ import (
"k8s.io/kubernetes/test/integration/framework"
)
// TODO: Eliminate this v2 client dependency.
func newEtcdClient() etcd.Client {
cfg := etcd.Config{
Endpoints: []string{framework.GetEtcdURLFromEnv()},
}
client, err := etcd.New(cfg)
if err != nil {
glog.Fatalf("unable to connect to etcd for testing: %v", err)
}
return client
}
func TestIgnoreClusterName(t *testing.T) {
config := framework.NewMasterConfig()
prefix := config.StorageFactory.(*genericapiserver.DefaultStorageFactory).StorageConfig.Prefix
@ -39,7 +53,7 @@ func TestIgnoreClusterName(t *testing.T) {
defer s.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
etcdClient := framework.NewEtcdClient()
etcdClient := newEtcdClient()
etcdStorage := etcdstorage.NewEtcdStorage(etcdClient, testapi.Default.Codec(),
prefix+"/namespaces/", false, etcdtest.DeserializationCacheSize)
ctx := context.TODO()

View File

@ -41,16 +41,11 @@ import (
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
"github.com/golang/glog"
)
func init() {
integration.RequireEtcd()
}
// Several tests in this file are configurable by environment variables:
// KUBE_INTEGRATION_PV_OBJECTS - nr. of PVs/PVCs to be created
// (100 by default)

View File

@ -41,14 +41,9 @@ import (
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
)
func init() {
integration.RequireEtcd()
}
// 1.2 code gets:
// quota_test.go:95: Took 4.218619579s to scale up without quota
// quota_test.go:199: unexpected error: timed out waiting for the condition, ended with 342 pods (1 minute)

View File

@ -50,7 +50,6 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
serviceaccountadmission "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/auth/authenticator/request/union"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
)
@ -62,10 +61,6 @@ const (
readWriteServiceAccountName = "rw"
)
func init() {
integration.RequireEtcd()
}
func TestServiceAccountAutoCreate(t *testing.T) {
c, _, stopFunc := startServiceAccountTestServer(t)
defer stopFunc()

View File

@ -17,44 +17,15 @@ limitations under the License.
package integration
import (
"fmt"
"math/rand"
"testing"
"time"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api/errors"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/integration/framework"
)
func newEtcdClient() etcd.Client {
cfg := etcd.Config{
Endpoints: []string{framework.GetEtcdURLFromEnv()},
}
client, err := etcd.New(cfg)
if err != nil {
glog.Fatalf("unable to connect to etcd for testing: %v", err)
}
return client
}
func RequireEtcd() {
if _, err := etcd.NewKeysAPI(newEtcdClient()).Get(context.TODO(), "/", nil); err != nil {
glog.Fatalf("unable to connect to etcd for integration testing: %v", err)
}
}
func withEtcdKey(f func(string)) {
prefix := fmt.Sprintf("/test-%d", rand.Int63())
defer etcd.NewKeysAPI(newEtcdClient()).Delete(context.TODO(), prefix, &etcd.DeleteOptions{Recursive: true})
f(prefix)
}
func DeletePodOrErrorf(t *testing.T, c clientset.Interface, ns, name string) {
if err := c.Core().Pods(ns).Delete(name, nil); err != nil {
t.Errorf("unable to delete pod %v: %v", name, err)