mirror of https://github.com/k3s-io/k3s
Add kubelet watch manager integration test
parent
517922f31a
commit
9cd57a5a18
|
@ -52,6 +52,7 @@ filegroup(
|
|||
"//test/integration/framework:all-srcs",
|
||||
"//test/integration/garbagecollector:all-srcs",
|
||||
"//test/integration/ipamperf:all-srcs",
|
||||
"//test/integration/kubelet:all-srcs",
|
||||
"//test/integration/master:all-srcs",
|
||||
"//test/integration/metrics:all-srcs",
|
||||
"//test/integration/objectmeta:all-srcs",
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
load("@io_bazel_rules_go//go:def.bzl", "go_test")
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"main_test.go",
|
||||
"watch_manager_test.go",
|
||||
],
|
||||
tags = ["integration"],
|
||||
deps = [
|
||||
"//cmd/kube-apiserver/app/testing:go_default_library",
|
||||
"//pkg/kubelet/util/manager:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
framework.EtcdMain(m.Run)
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/manager"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
func TestWatchBasedManager(t *testing.T) {
|
||||
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
|
||||
defer server.TearDownFn()
|
||||
|
||||
server.ClientConfig.QPS = 10000
|
||||
client, err := kubernetes.NewForConfig(server.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
listObj := func(namespace string, options metav1.ListOptions) (runtime.Object, error) {
|
||||
return client.CoreV1().Secrets(namespace).List(options)
|
||||
}
|
||||
watchObj := func(namespace string, options metav1.ListOptions) (watch.Interface, error) {
|
||||
return client.CoreV1().Secrets(namespace).Watch(options)
|
||||
}
|
||||
newObj := func() runtime.Object { return &v1.Secret{} }
|
||||
store := manager.NewObjectCache(listObj, watchObj, newObj, schema.GroupResource{Group: "v1", Resource: "secrets"})
|
||||
|
||||
// create 1000 secrets in parallel
|
||||
t.Log(time.Now(), "creating 1000 secrets")
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 100; j++ {
|
||||
name := fmt.Sprintf("s%d", i*100+j)
|
||||
if _, err := client.CoreV1().Secrets("default").Create(&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
fmt.Print(".")
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
t.Log(time.Now(), "finished creating 1000 secrets")
|
||||
|
||||
// fetch all secrets
|
||||
wg = sync.WaitGroup{}
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 100; j++ {
|
||||
name := fmt.Sprintf("s%d", i*100+j)
|
||||
start := time.Now()
|
||||
store.AddReference("default", name)
|
||||
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
|
||||
obj, err := store.Get("default", name)
|
||||
if err != nil {
|
||||
t.Logf("failed on %s, retrying: %v", name, err)
|
||||
return false, nil
|
||||
}
|
||||
if obj.(*v1.Secret).Name != name {
|
||||
return false, fmt.Errorf("wrong object: %v", obj.(*v1.Secret).Name)
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed on %s: %v", name, err)
|
||||
}
|
||||
if d := time.Now().Sub(start); d > time.Second {
|
||||
t.Logf("%s took %v", name, d)
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
Loading…
Reference in New Issue