Add a test that emulates large N node behavior against master

Run:

    etcd &
    kube-apiserver --etcd-servers=... ...
    UPDATE_NODE_APISERVER go test ./test/integration/master
-test.run=TestUpdateNodeObjects -test.v -tags integration

Simulates the core update loops from nodes to the API server, allowing
baseline profiling for steady state of large clusters. May require
tweaking the http.Transport used by the client to support >N idle
connections to the master.
pull/6/head
Clayton Coleman 2016-10-31 10:24:24 -04:00
parent a52ad987d2
commit 498727a520
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
1 changed files with 157 additions and 0 deletions

View File

@ -25,7 +25,9 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
@ -34,9 +36,11 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clienttypedv1 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/integration"
@ -488,3 +492,156 @@ func TestServiceAlloc(t *testing.T) {
t.Fatalf("got unexpected error: %v", err)
}
}
// TestUpdateNodeObjects represents a simple version of the behavior of node checkins at steady
// state. This test allows for easy profiling of a realistic master scenario for baseline CPU
// in very large clusters. It is disabled by default - start a kube-apiserver and pass
// UPDATE_NODE_APISERVER as the host value.
func TestUpdateNodeObjects(t *testing.T) {
server := os.Getenv("UPDATE_NODE_APISERVER")
if len(server) == 0 {
t.Skip("UPDATE_NODE_APISERVER is not set")
}
c := clienttypedv1.NewForConfigOrDie(&restclient.Config{
QPS: 10000,
Host: server,
ContentConfig: restclient.ContentConfig{
AcceptContentTypes: "application/vnd.kubernetes.protobuf",
ContentType: "application/vnd.kubernetes.protobuf",
},
})
nodes := 400
listers := 5
watchers := 50
iterations := 10000
for i := 0; i < nodes*6; i++ {
c.Nodes().Delete(fmt.Sprintf("node-%d", i), nil)
_, err := c.Nodes().Create(&v1.Node{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("node-%d", i),
},
})
if err != nil {
t.Fatal(err)
}
}
for k := 0; k < listers; k++ {
go func(lister int) {
for i := 0; i < iterations; i++ {
_, err := c.Nodes().List(v1.ListOptions{})
if err != nil {
fmt.Printf("[list:%d] error after %d: %v\n", lister, i, err)
break
}
time.Sleep(time.Duration(lister)*10*time.Millisecond + 1500*time.Millisecond)
}
}(k)
}
for k := 0; k < watchers; k++ {
go func(lister int) {
w, err := c.Nodes().Watch(v1.ListOptions{})
if err != nil {
fmt.Printf("[watch:%d] error: %v", k, err)
return
}
i := 0
for r := range w.ResultChan() {
i++
if _, ok := r.Object.(*v1.Node); !ok {
fmt.Printf("[watch:%d] unexpected object after %d: %#v\n", lister, i, r)
}
if i%100 == 0 {
fmt.Printf("[watch:%d] iteration %d ...\n", lister, i)
}
}
fmt.Printf("[watch:%d] done\n", lister)
}(k)
}
var wg sync.WaitGroup
wg.Add(nodes - listers)
for j := 0; j < nodes; j++ {
go func(node int) {
var lastCount int
for i := 0; i < iterations; i++ {
if i%100 == 0 {
fmt.Printf("[%d] iteration %d ...\n", node, i)
}
if i%20 == 0 {
_, err := c.Nodes().List(v1.ListOptions{})
if err != nil {
fmt.Printf("[%d] error after %d: %v\n", node, i, err)
break
}
}
r, err := c.Nodes().List(v1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=node-%d", node),
ResourceVersion: "0",
})
if err != nil {
fmt.Printf("[%d] error after %d: %v\n", node, i, err)
break
}
if len(r.Items) != 1 {
fmt.Printf("[%d] error after %d: unexpected list count\n", node, i)
break
}
n, err := c.Nodes().Get(fmt.Sprintf("node-%d", node))
if err != nil {
fmt.Printf("[%d] error after %d: %v\n", node, i, err)
break
}
if len(n.Status.Conditions) != lastCount {
fmt.Printf("[%d] worker set %d, read %d conditions\n", node, lastCount, len(n.Status.Conditions))
break
}
previousCount := lastCount
switch {
case i%4 == 0:
lastCount = 1
n.Status.Conditions = []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "foo",
},
}
case i%4 == 1:
lastCount = 2
n.Status.Conditions = []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "foo",
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionTrue,
Reason: "bar",
},
}
case i%4 == 1:
lastCount = 0
n.Status.Conditions = nil
}
if _, err := c.Nodes().UpdateStatus(n); err != nil {
if !errors.IsConflict(err) {
fmt.Printf("[%d] error after %d: %v\n", node, i, err)
break
}
lastCount = previousCount
}
}
wg.Done()
fmt.Printf("[%d] done\n", node)
}(j)
}
wg.Wait()
}