mirror of https://github.com/k3s-io/k3s
Merge pull request #22869 from jayunit100/iperf-e2e
Automatic merge from submit-queue [Networking] [E2E] n-to-1 iperf client->server throughput benchmarkingpull/6/head
commit
e6295f165a
|
@ -32,12 +32,13 @@ import (
|
|||
adapter_1_2 "k8s.io/kubernetes/pkg/client/unversioned/adapters/release_1_2"
|
||||
adapter_1_3 "k8s.io/kubernetes/pkg/client/unversioned/adapters/release_1_3"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/metrics"
|
||||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -381,6 +382,79 @@ func (f *Framework) ReadFileViaContainer(podName, containerName string, path str
|
|||
return string(stdout), err
|
||||
}
|
||||
|
||||
// CreateServiceForSimpleAppWithPods is a convenience wrapper to create a service and its matching pods all at once.
|
||||
func (f *Framework) CreateServiceForSimpleAppWithPods(contPort int, svcPort int, appName string, podSpec func(n api.Node) api.PodSpec, count int, block bool) (error, *api.Service) {
|
||||
var err error = nil
|
||||
theService := f.CreateServiceForSimpleApp(contPort, svcPort, appName)
|
||||
f.CreatePodsPerNodeForSimpleApp(appName, podSpec, count)
|
||||
if block {
|
||||
err = WaitForPodsWithLabelRunning(f.Client, f.Namespace.Name, labels.SelectorFromSet(labels.Set(theService.Spec.Selector)))
|
||||
}
|
||||
return err, theService
|
||||
}
|
||||
|
||||
// CreateServiceForSimpleApp returns a service that selects/exposes pods (send -1 ports if no exposure needed) with an app label.
|
||||
func (f *Framework) CreateServiceForSimpleApp(contPort, svcPort int, appName string) *api.Service {
|
||||
if appName == "" {
|
||||
panic(fmt.Sprintf("no app name provided"))
|
||||
}
|
||||
|
||||
serviceSelector := map[string]string{
|
||||
"app": appName + "-pod",
|
||||
}
|
||||
|
||||
// For convenience, user sending ports are optional.
|
||||
portsFunc := func() []api.ServicePort {
|
||||
if contPort < 1 || svcPort < 1 {
|
||||
return nil
|
||||
} else {
|
||||
return []api.ServicePort{{
|
||||
Protocol: "TCP",
|
||||
Port: svcPort,
|
||||
TargetPort: intstr.FromInt(contPort),
|
||||
}}
|
||||
}
|
||||
}
|
||||
Logf("Creating a service-for-%v for selecting app=%v-pod", appName, appName)
|
||||
service, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "service-for-" + appName,
|
||||
Labels: map[string]string{
|
||||
"app": appName + "-service",
|
||||
},
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: portsFunc(),
|
||||
Selector: serviceSelector,
|
||||
},
|
||||
})
|
||||
ExpectNoError(err)
|
||||
return service
|
||||
}
|
||||
|
||||
// CreatePodsPerNodeForSimpleApp Creates pods w/ labels. Useful for tests which make a bunch of pods w/o any networking.
|
||||
func (f *Framework) CreatePodsPerNodeForSimpleApp(appName string, podSpec func(n api.Node) api.PodSpec, maxCount int) map[string]string {
|
||||
nodes := ListSchedulableNodesOrDie(f.Client)
|
||||
labels := map[string]string{
|
||||
"app": appName + "-pod",
|
||||
}
|
||||
for i, node := range nodes.Items {
|
||||
// one per node, but no more than maxCount.
|
||||
if i <= maxCount {
|
||||
Logf("%v/%v : Creating container with label app=%v-pod", i, maxCount, appName)
|
||||
_, err := f.Client.Pods(f.Namespace.Name).Create(&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: fmt.Sprintf(appName+"-pod-%v", i),
|
||||
Labels: labels,
|
||||
},
|
||||
Spec: podSpec(node),
|
||||
})
|
||||
ExpectNoError(err)
|
||||
}
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func kubectlExecWithRetry(namespace string, podName, containerName string, args ...string) ([]byte, []byte, error) {
|
||||
for numRetries := 0; numRetries < maxKubectlExecRetries; numRetries++ {
|
||||
if numRetries > 0 {
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
// Tests network performance using iperf or other containers.
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
// empirically derived as a baseline for expectations from running this test using kube-up.sh.
|
||||
gceBandwidthBitsEstimate = int64(30000000000)
|
||||
// on 4 node clusters, we found this test passes very quickly, generally in less then 100 seconds.
|
||||
smallClusterTimeout = 200 * time.Second
|
||||
)
|
||||
|
||||
// Declared as Flakey since it has not been proven to run in parallel on small nodes or slow networks in CI
|
||||
// TODO jayunit100 : Retag this test according to semantics from #22401
|
||||
var _ = framework.KubeDescribe("Networking IPerf [Experimental] [Slow] [Feature:Networking-Performance]", func() {
|
||||
|
||||
f := framework.NewDefaultFramework("network-perf")
|
||||
|
||||
// A few simple bandwidth tests which are capped by nodes.
|
||||
// TODO replace the 1 with the scale option implementation
|
||||
runClientServerBandwidthMeasurement(f, 1, gceBandwidthBitsEstimate)
|
||||
})
|
||||
|
||||
func runClientServerBandwidthMeasurement(f *framework.Framework, numClient int, maxBandwidthBits int64) {
|
||||
// TODO: Make this a function parameter, once we distribute iperf endpoints, possibly via session affinity.
|
||||
numServer := 1
|
||||
|
||||
It(fmt.Sprintf("should transfer ~ 1GB onto the service endpoint %v servers (maximum of %v clients)", numServer, numClient), func() {
|
||||
nodes := framework.ListSchedulableNodesOrDie(f.Client)
|
||||
totalPods := len(nodes.Items)
|
||||
// for a single service, we expect to divide bandwidth between the network. Very crude estimate.
|
||||
expectedBandwidth := int(float64(maxBandwidthBits) / float64(totalPods))
|
||||
Expect(totalPods).NotTo(Equal(0))
|
||||
appName := "iperf-e2e"
|
||||
err, _ := f.CreateServiceForSimpleAppWithPods(
|
||||
8001,
|
||||
8002,
|
||||
appName,
|
||||
func(n api.Node) api.PodSpec {
|
||||
return api.PodSpec{
|
||||
Containers: []api.Container{{
|
||||
Name: "iperf-server",
|
||||
Image: "gcr.io/google_containers/iperf:e2e",
|
||||
Args: []string{
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"/usr/local/bin/iperf -s -p 8001 ",
|
||||
},
|
||||
Ports: []api.ContainerPort{{ContainerPort: 8001}},
|
||||
}},
|
||||
NodeName: n.Name,
|
||||
RestartPolicy: api.RestartPolicyOnFailure,
|
||||
}
|
||||
},
|
||||
// this will be used to generate the -service name which all iperf clients point at.
|
||||
numServer, // Generally should be 1 server unless we do affinity or use a version of iperf that supports LB
|
||||
true, // Make sure we wait, otherwise all the clients will die and need to restart.
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
framework.Failf("Fatal error waiting for iperf server endpoint : %v", err)
|
||||
}
|
||||
|
||||
iperfClientPodLabels := f.CreatePodsPerNodeForSimpleApp(
|
||||
"iperf-e2e-cli",
|
||||
func(n api.Node) api.PodSpec {
|
||||
return api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "iperf-client",
|
||||
Image: "gcr.io/google_containers/iperf:e2e",
|
||||
Args: []string{
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"/usr/local/bin/iperf -c service-for-" + appName + " -p 8002 --reportstyle C && sleep 5",
|
||||
},
|
||||
},
|
||||
},
|
||||
RestartPolicy: api.RestartPolicyOnFailure, // let them successfully die.
|
||||
}
|
||||
},
|
||||
numClient,
|
||||
)
|
||||
|
||||
framework.Logf("Reading all perf results to stdout.")
|
||||
framework.Logf("date,cli,cliPort,server,serverPort,id,interval,transferBits,bandwidthBits")
|
||||
|
||||
// Calculate expected number of clients based on total nodes.
|
||||
expectedCli := func() int {
|
||||
nodes, err := framework.GetReadyNodes(f)
|
||||
framework.ExpectNoError(err)
|
||||
return int(math.Min(float64(len(nodes.Items)), float64(numClient)))
|
||||
}()
|
||||
|
||||
// Extra 1/10 second per client.
|
||||
iperfTimeout := smallClusterTimeout + (time.Duration(expectedCli/10) * time.Second)
|
||||
iperfResults := &IPerfResults{}
|
||||
|
||||
iperfClusterVerification := f.NewClusterVerification(
|
||||
framework.PodStateVerification{
|
||||
Selectors: iperfClientPodLabels,
|
||||
ValidPhases: []api.PodPhase{api.PodSucceeded},
|
||||
},
|
||||
)
|
||||
|
||||
pods, err2 := iperfClusterVerification.WaitFor(expectedCli, iperfTimeout)
|
||||
if err2 != nil {
|
||||
framework.Failf("Error in wait...")
|
||||
} else if len(pods) < expectedCli {
|
||||
framework.Failf("IPerf restuls : Only got %v out of %v, after waiting %v", len(pods), expectedCli, iperfTimeout)
|
||||
} else {
|
||||
// For each builds up a collection of IPerfRecords
|
||||
iperfClusterVerification.ForEach(
|
||||
func(p api.Pod) {
|
||||
resultS, err := framework.LookForStringInLog(f.Namespace.Name, p.Name, "iperf-client", "0-", 1*time.Second)
|
||||
if err == nil {
|
||||
framework.Logf(resultS)
|
||||
iperfResults.Add(NewIPerf(resultS))
|
||||
} else {
|
||||
framework.Failf("Unexpected error, %v when running forEach on the pods.", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
fmt.Println("[begin] Node,Bandwith CSV")
|
||||
fmt.Println(iperfResults.ToTSV())
|
||||
fmt.Println("[end] Node,Bandwith CSV")
|
||||
|
||||
for ipClient, bandwidth := range iperfResults.BandwidthMap {
|
||||
framework.Logf("%v had bandwidth %v. Ratio to expected (%v) was %f", ipClient, bandwidth, expectedBandwidth, float64(bandwidth)/float64(expectedBandwidth))
|
||||
}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
// Tests network performance using iperf or other containers.
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
type IPerfResults struct {
|
||||
BandwidthMap map[string]int64
|
||||
}
|
||||
|
||||
// IPerfResult struct modelling an iperf record....
|
||||
// 20160314154239,172.17.0.3,34152,172.17.0.2,5001,3,0.0-10.0,33843707904,27074774092
|
||||
type IPerfResult struct {
|
||||
date string // field 1 in the csv
|
||||
cli string // field 2 in the csv
|
||||
cliPort int64 // ...
|
||||
server string
|
||||
servPort int64
|
||||
id string
|
||||
interval string
|
||||
transferBits int64
|
||||
bandwidthBits int64
|
||||
}
|
||||
|
||||
// Add adds a new result to the Results struct.
|
||||
func (i *IPerfResults) Add(ipr *IPerfResult) {
|
||||
if i.BandwidthMap == nil {
|
||||
i.BandwidthMap = map[string]int64{}
|
||||
}
|
||||
i.BandwidthMap[ipr.cli] = ipr.bandwidthBits
|
||||
}
|
||||
|
||||
// ToTSV exports an easily readable tab delimited format of all IPerfResults.
|
||||
func (i *IPerfResults) ToTSV() string {
|
||||
if len(i.BandwidthMap) < 1 {
|
||||
framework.Logf("Warning: no data in bandwidth map")
|
||||
}
|
||||
|
||||
var buffer bytes.Buffer
|
||||
for node, bandwidth := range i.BandwidthMap {
|
||||
asJson, _ := json.Marshal(node)
|
||||
buffer.WriteString("\t " + string(asJson) + "\t " + fmt.Sprintf("%E", float64(bandwidth)))
|
||||
}
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
// NewIPerf parses an IPerf CSV output line into an IPerfResult.
|
||||
func NewIPerf(csvLine string) *IPerfResult {
|
||||
slice := StrSlice(strings.Split(csvLine, ","))
|
||||
if len(slice) != 9 {
|
||||
framework.Failf("Incorrect fields in the output: %v (%v out of 9)", slice, len(slice))
|
||||
}
|
||||
i := IPerfResult{}
|
||||
i.date = slice.get(0)
|
||||
i.cli = slice.get(1)
|
||||
i.cliPort = intOrFail("client port", slice.get(2))
|
||||
i.server = slice.get(3)
|
||||
i.servPort = intOrFail("server port", slice.get(4))
|
||||
i.id = slice.get(5)
|
||||
i.interval = slice.get(6)
|
||||
i.transferBits = intOrFail("transfer port", slice.get(7))
|
||||
i.bandwidthBits = intOrFail("bandwidth port", slice.get(8))
|
||||
return &i
|
||||
}
|
||||
|
||||
type StrSlice []string
|
||||
|
||||
func (s StrSlice) get(i int) string {
|
||||
if i >= 0 && i < len(s) {
|
||||
return s[i]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// intOrFail is a convenience function for parsing integers.
|
||||
func intOrFail(debugName string, rawValue string) int64 {
|
||||
value, err := strconv.ParseInt(rawValue, 10, 64)
|
||||
if err != nil {
|
||||
framework.Failf("Failed parsing value %v from the string '%v' as an integer", debugName, rawValue)
|
||||
}
|
||||
return value
|
||||
}
|
Loading…
Reference in New Issue