From 9cd57a5a18936e5421cbf3b37cdfb41aeff1bd83 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Thu, 28 Feb 2019 22:10:33 -0500 Subject: [PATCH] Add kubelet watch manager integration test --- test/integration/BUILD | 1 + test/integration/kubelet/BUILD | 36 ++++++++ test/integration/kubelet/main_test.go | 27 ++++++ .../integration/kubelet/watch_manager_test.go | 90 +++++++++++++++++++ 4 files changed, 154 insertions(+) create mode 100644 test/integration/kubelet/BUILD create mode 100644 test/integration/kubelet/main_test.go create mode 100644 test/integration/kubelet/watch_manager_test.go diff --git a/test/integration/BUILD b/test/integration/BUILD index d024c667e5..acce949a91 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -52,6 +52,7 @@ filegroup( "//test/integration/framework:all-srcs", "//test/integration/garbagecollector:all-srcs", "//test/integration/ipamperf:all-srcs", + "//test/integration/kubelet:all-srcs", "//test/integration/master:all-srcs", "//test/integration/metrics:all-srcs", "//test/integration/objectmeta:all-srcs", diff --git a/test/integration/kubelet/BUILD b/test/integration/kubelet/BUILD new file mode 100644 index 0000000000..3348b035f3 --- /dev/null +++ b/test/integration/kubelet/BUILD @@ -0,0 +1,36 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "go_default_test", + srcs = [ + "main_test.go", + "watch_manager_test.go", + ], + tags = ["integration"], + deps = [ + "//cmd/kube-apiserver/app/testing:go_default_library", + "//pkg/kubelet/util/manager:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//test/integration/framework:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/integration/kubelet/main_test.go b/test/integration/kubelet/main_test.go new file mode 100644 index 0000000000..ecc4822778 --- /dev/null +++ b/test/integration/kubelet/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go new file mode 100644 index 0000000000..bdb3c71ea5 --- /dev/null +++ b/test/integration/kubelet/watch_manager_test.go @@ -0,0 +1,90 @@ +package kubelet + +import ( + "fmt" + "sync" + "testing" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/kubelet/util/manager" + "k8s.io/kubernetes/test/integration/framework" +) + +func TestWatchBasedManager(t *testing.T) { + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() + + server.ClientConfig.QPS = 10000 + client, err := kubernetes.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + listObj := func(namespace string, options metav1.ListOptions) (runtime.Object, error) { + return client.CoreV1().Secrets(namespace).List(options) + } + watchObj := func(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().Secrets(namespace).Watch(options) + } + newObj := func() runtime.Object { return &v1.Secret{} } + store := manager.NewObjectCache(listObj, watchObj, newObj, schema.GroupResource{Group: "v1", Resource: "secrets"}) + + // create 1000 secrets in parallel + t.Log(time.Now(), "creating 1000 secrets") + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < 100; j++ { + name := fmt.Sprintf("s%d", i*100+j) + if _, err := client.CoreV1().Secrets("default").Create(&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}); err != nil { + t.Fatal(err) + } + } + fmt.Print(".") + }(i) + } + wg.Wait() + t.Log(time.Now(), "finished creating 1000 secrets") + + // fetch all secrets + wg = sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < 100; j++ { + name := fmt.Sprintf("s%d", i*100+j) + start := time.Now() + store.AddReference("default", name) + err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + obj, err := store.Get("default", name) + if err != nil { + t.Logf("failed on %s, retrying: %v", name, err) + return false, nil + } + if obj.(*v1.Secret).Name != name { + return false, fmt.Errorf("wrong object: %v", obj.(*v1.Secret).Name) + } + return true, nil + }) + if err != nil { + t.Fatalf("failed on %s: %v", name, err) + } + if d := time.Now().Sub(start); d > time.Second { + t.Logf("%s took %v", name, d) + } + } + }(i) + } + wg.Wait() +}