Merge pull request #25968 from rrati/density-capacity-pods

Automatic merge from submit-queue

Added test to density that will run maximum capacity pods on nodes

Added a test to the Density Suite that will load the kubelets with their maximum capacity number of pods
pull/6/head
Kubernetes Submit Queue 2016-08-04 11:23:48 -07:00 committed by GitHub
commit 4080ad770a
3 changed files with 294 additions and 181 deletions

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
utiluuid "k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/e2e/framework"
@ -50,6 +51,15 @@ const (
// Maximum container failures this test tolerates before failing.
var MaxContainerFailures = 0
type DensityTestConfig struct {
Configs []framework.RCConfig
Client *client.Client
Namespace string
PollInterval time.Duration
PodCount int
Timeout time.Duration
}
func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint {
var apiserverMem uint64
var controllerMem uint64
@ -167,6 +177,155 @@ func logPodStartupStatus(c *client.Client, expectedPods int, ns string, observed
}
}
// runDensityTest will perform a density test and return the time it took for
// all pods to start
func runDensityTest(dtc DensityTestConfig) time.Duration {
defer GinkgoRecover()
// Create a listener for events.
// eLock is a lock protects the events
var eLock sync.Mutex
events := make([](*api.Event), 0)
_, controller := controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dtc.Client.Events(dtc.Namespace).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dtc.Client.Events(dtc.Namespace).Watch(options)
},
},
&api.Event{},
0,
controllerframework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eLock.Lock()
defer eLock.Unlock()
events = append(events, obj.(*api.Event))
},
},
)
stop := make(chan struct{})
go controller.Run(stop)
// Create a listener for api updates
// uLock is a lock protects the updateCount
var uLock sync.Mutex
updateCount := 0
label := labels.SelectorFromSet(labels.Set(map[string]string{"type": "densityPod"}))
_, updateController := controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.LabelSelector = label
return dtc.Client.Pods(dtc.Namespace).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.LabelSelector = label
return dtc.Client.Pods(dtc.Namespace).Watch(options)
},
},
&api.Pod{},
0,
controllerframework.ResourceEventHandlerFuncs{
UpdateFunc: func(_, _ interface{}) {
uLock.Lock()
defer uLock.Unlock()
updateCount++
},
},
)
go updateController.Run(stop)
// Start all replication controllers.
startTime := time.Now()
wg := sync.WaitGroup{}
wg.Add(len(dtc.Configs))
for i := range dtc.Configs {
rcConfig := dtc.Configs[i]
go func() {
framework.ExpectNoError(framework.RunRC(rcConfig))
wg.Done()
}()
}
logStopCh := make(chan struct{})
go logPodStartupStatus(dtc.Client, dtc.PodCount, dtc.Namespace, map[string]string{"type": "densityPod"}, dtc.PollInterval, logStopCh)
wg.Wait()
startupTime := time.Now().Sub(startTime)
close(logStopCh)
framework.Logf("E2E startup time for %d pods: %v", dtc.PodCount, startupTime)
framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(dtc.PodCount)/float32(startupTime/time.Second))
By("Waiting for all events to be recorded")
last := -1
current := len(events)
lastCount := -1
currentCount := updateCount
for start := time.Now(); (last < current || lastCount < currentCount) && time.Since(start) < dtc.Timeout; time.Sleep(10 * time.Second) {
func() {
eLock.Lock()
defer eLock.Unlock()
last = current
current = len(events)
}()
func() {
uLock.Lock()
defer uLock.Unlock()
lastCount = currentCount
currentCount = updateCount
}()
}
close(stop)
if current != last {
framework.Logf("Warning: Not all events were recorded after waiting %.2f minutes", dtc.Timeout.Minutes())
}
framework.Logf("Found %d events", current)
if currentCount != lastCount {
framework.Logf("Warning: Not all updates were recorded after waiting %.2f minutes", dtc.Timeout.Minutes())
}
framework.Logf("Found %d updates", currentCount)
// Tune the threshold for allowed failures.
badEvents := framework.BadEvents(events)
Expect(badEvents).NotTo(BeNumerically(">", int(math.Floor(0.01*float64(dtc.PodCount)))))
// Print some data about Pod to Node allocation
By("Printing Pod to Node allocation data")
podList, err := dtc.Client.Pods(api.NamespaceAll).List(api.ListOptions{})
framework.ExpectNoError(err)
pausePodAllocation := make(map[string]int)
systemPodAllocation := make(map[string][]string)
for _, pod := range podList.Items {
if pod.Namespace == api.NamespaceSystem {
systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name)
} else {
pausePodAllocation[pod.Spec.NodeName]++
}
}
nodeNames := make([]string, 0)
for k := range pausePodAllocation {
nodeNames = append(nodeNames, k)
}
sort.Strings(nodeNames)
for _, node := range nodeNames {
framework.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node])
}
return startupTime
}
func cleanupDensityTest(dtc DensityTestConfig) {
defer GinkgoRecover()
By("Deleting ReplicationController")
// We explicitly delete all pods to have API calls necessary for deletion accounted in metrics.
for i := range dtc.Configs {
rcName := dtc.Configs[i].Name
rc, err := dtc.Client.ReplicationControllers(dtc.Namespace).Get(rcName)
if err == nil && rc.Spec.Replicas != 0 {
By("Cleaning up the replication controller")
err := framework.DeleteRC(dtc.Client, dtc.Namespace, rcName)
framework.ExpectNoError(err)
}
}
}
// This test suite can take a long time to run, and can affect or be affected by other tests.
// So by default it is added to the ginkgo.skip list (see driver.go).
// To run this suite you must explicitly ask for it by setting the
@ -185,6 +344,8 @@ var _ = framework.KubeDescribe("Density", func() {
var totalPods int
var nodeCpuCapacity int64
var nodeMemCapacity int64
var nodes *api.NodeList
var masters sets.String
// Gathers data prior to framework namespace teardown
AfterEach(func() {
@ -225,8 +386,7 @@ var _ = framework.KubeDescribe("Density", func() {
// of nodes without Routes created. Since this would make a node
// unschedulable, we need to wait until all of them are schedulable.
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(c))
nodes := framework.GetReadySchedulableNodesOrDie(c)
masters, nodes = framework.GetMasterAndWorkerNodesOrDie(c)
nodeCount = len(nodes.Items)
Expect(nodeCount).NotTo(BeZero())
if nodeCount == 30 {
@ -291,16 +451,18 @@ var _ = framework.KubeDescribe("Density", func() {
}
itArg := testArg
It(name, func() {
podsPerNode := itArg.podsPerNode
totalPods = podsPerNode * nodeCount
fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
framework.ExpectNoError(err)
defer fileHndl.Close()
podsPerNode := itArg.podsPerNode
totalPods = podsPerNode * nodeCount
timeout := 10 * time.Minute
// TODO: loop to podsPerNode instead of 1 when we're ready.
numberOrRCs := 1
RCConfigs := make([]framework.RCConfig, numberOrRCs)
for i := 0; i < numberOrRCs; i++ {
RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
RCName := "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
RCConfigs[i] = framework.RCConfig{Client: c,
Image: framework.GetPauseImageName(f.Client),
Name: RCName,
@ -316,135 +478,14 @@ var _ = framework.KubeDescribe("Density", func() {
}
}
// Create a listener for events.
// eLock is a lock protects the events
var eLock sync.Mutex
events := make([](*api.Event), 0)
_, controller := controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Events(ns).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Events(ns).Watch(options)
},
},
&api.Event{},
0,
controllerframework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eLock.Lock()
defer eLock.Unlock()
events = append(events, obj.(*api.Event))
},
},
)
stop := make(chan struct{})
go controller.Run(stop)
// Create a listener for api updates
// uLock is a lock protects the updateCount
var uLock sync.Mutex
updateCount := 0
label := labels.SelectorFromSet(labels.Set(map[string]string{"type": "densityPod"}))
_, updateController := controllerframework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.LabelSelector = label
return c.Pods(ns).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.LabelSelector = label
return c.Pods(ns).Watch(options)
},
},
&api.Pod{},
0,
controllerframework.ResourceEventHandlerFuncs{
UpdateFunc: func(_, _ interface{}) {
uLock.Lock()
defer uLock.Unlock()
updateCount++
},
},
)
go updateController.Run(stop)
// Start all replication controllers.
startTime := time.Now()
wg := sync.WaitGroup{}
wg.Add(len(RCConfigs))
for i := range RCConfigs {
rcConfig := RCConfigs[i]
go func() {
framework.ExpectNoError(framework.RunRC(rcConfig))
wg.Done()
}()
dConfig := DensityTestConfig{Client: c,
Configs: RCConfigs,
PodCount: totalPods,
Namespace: ns,
PollInterval: itArg.interval,
Timeout: timeout,
}
logStopCh := make(chan struct{})
go logPodStartupStatus(c, totalPods, ns, map[string]string{"type": "densityPod"}, itArg.interval, logStopCh)
wg.Wait()
e2eStartupTime = time.Now().Sub(startTime)
close(logStopCh)
framework.Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime)
framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(totalPods)/float32(e2eStartupTime/time.Second))
By("Waiting for all events to be recorded")
last := -1
current := len(events)
lastCount := -1
currentCount := updateCount
timeout := 10 * time.Minute
for start := time.Now(); (last < current || lastCount < currentCount) && time.Since(start) < timeout; time.Sleep(10 * time.Second) {
func() {
eLock.Lock()
defer eLock.Unlock()
last = current
current = len(events)
}()
func() {
uLock.Lock()
defer uLock.Unlock()
lastCount = currentCount
currentCount = updateCount
}()
}
close(stop)
if current != last {
framework.Logf("Warning: Not all events were recorded after waiting %.2f minutes", timeout.Minutes())
}
framework.Logf("Found %d events", current)
if currentCount != lastCount {
framework.Logf("Warning: Not all updates were recorded after waiting %.2f minutes", timeout.Minutes())
}
framework.Logf("Found %d updates", currentCount)
// Tune the threshold for allowed failures.
badEvents := framework.BadEvents(events)
Expect(badEvents).NotTo(BeNumerically(">", int(math.Floor(0.01*float64(totalPods)))))
// Print some data about Pod to Node allocation
By("Printing Pod to Node allocation data")
podList, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
framework.ExpectNoError(err)
pausePodAllocation := make(map[string]int)
systemPodAllocation := make(map[string][]string)
for _, pod := range podList.Items {
if pod.Namespace == api.NamespaceSystem {
systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name)
} else {
pausePodAllocation[pod.Spec.NodeName]++
}
}
nodeNames := make([]string, 0)
for k := range pausePodAllocation {
nodeNames = append(nodeNames, k)
}
sort.Strings(nodeNames)
for _, node := range nodeNames {
framework.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node])
}
e2eStartupTime = runDensityTest(dConfig)
if itArg.runLatencyTest {
By("Scheduling additional Pods to measure startup latencies")
@ -613,17 +654,7 @@ var _ = framework.KubeDescribe("Density", func() {
framework.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c)
}
By("Deleting ReplicationController")
// We explicitly delete all pods to have API calls necessary for deletion accounted in metrics.
for i := range RCConfigs {
rcName := RCConfigs[i].Name
rc, err := c.ReplicationControllers(ns).Get(rcName)
if err == nil && rc.Spec.Replicas != 0 {
By("Cleaning up the replication controller")
err := framework.DeleteRC(c, ns, rcName)
framework.ExpectNoError(err)
}
}
cleanupDensityTest(dConfig)
By("Removing additional replication controllers if any")
for i := 1; i <= nodeCount; i++ {
@ -632,6 +663,48 @@ var _ = framework.KubeDescribe("Density", func() {
}
})
}
// Calculate total number of pods from each node's max-pod
It("[Feature:ManualPerformance] should allow running maximum capacity pods on nodes", func() {
totalPods = 0
for _, n := range nodes.Items {
totalPods += int(n.Status.Capacity.Pods().Value())
}
totalPods -= framework.WaitForStableCluster(c, masters)
fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
framework.ExpectNoError(err)
defer fileHndl.Close()
rcCnt := 1
RCConfigs := make([]framework.RCConfig, rcCnt)
podsPerRC := int(totalPods / rcCnt)
for i := 0; i < rcCnt; i++ {
if i == rcCnt-1 {
podsPerRC += int(math.Mod(float64(totalPods), float64(rcCnt)))
}
RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
RCConfigs[i] = framework.RCConfig{Client: c,
Image: "gcr.io/google_containers/pause-amd64:3.0",
Name: RCName,
Namespace: ns,
Labels: map[string]string{"type": "densityPod"},
PollInterval: 10 * time.Second,
PodStatusFile: fileHndl,
Replicas: podsPerRC,
MaxContainerFailures: &MaxContainerFailures,
Silent: true,
}
}
dConfig := DensityTestConfig{Client: c,
Configs: RCConfigs,
PodCount: totalPods,
Namespace: ns,
PollInterval: 10 * time.Second,
Timeout: 10 * time.Minute,
}
e2eStartupTime = runDensityTest(dConfig)
cleanupDensityTest(dConfig)
})
})
func createRunningPodFromRC(wg *sync.WaitGroup, c *client.Client, name, ns, image, podType string, cpuRequest, memRequest resource.Quantity) {

View File

@ -65,6 +65,7 @@ import (
"k8s.io/kubernetes/pkg/types"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
@ -4665,3 +4666,73 @@ func retryCmd(command string, args ...string) (string, string, error) {
})
return stdout, stderr, err
}
// GetPodsScheduled returns a number of currently scheduled and not scheduled Pods.
func GetPodsScheduled(masterNodes sets.String, pods *api.PodList) (scheduledPods, notScheduledPods []api.Pod) {
for _, pod := range pods.Items {
if !masterNodes.Has(pod.Spec.NodeName) {
if pod.Spec.NodeName != "" {
_, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
Expect(scheduledCondition != nil).To(Equal(true))
Expect(scheduledCondition.Status).To(Equal(api.ConditionTrue))
scheduledPods = append(scheduledPods, pod)
} else {
_, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
Expect(scheduledCondition != nil).To(Equal(true))
Expect(scheduledCondition.Status).To(Equal(api.ConditionFalse))
if scheduledCondition.Reason == "Unschedulable" {
notScheduledPods = append(notScheduledPods, pod)
}
}
}
}
return
}
// WaitForStableCluster waits until all existing pods are scheduled and returns their amount.
func WaitForStableCluster(c *client.Client, masterNodes sets.String) int {
timeout := 10 * time.Minute
startTime := time.Now()
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
ExpectNoError(err)
// API server returns also Pods that succeeded. We need to filter them out.
currentPods := make([]api.Pod, 0, len(allPods.Items))
for _, pod := range allPods.Items {
if pod.Status.Phase != api.PodSucceeded && pod.Status.Phase != api.PodFailed {
currentPods = append(currentPods, pod)
}
}
allPods.Items = currentPods
scheduledPods, currentlyNotScheduledPods := GetPodsScheduled(masterNodes, allPods)
for len(currentlyNotScheduledPods) != 0 {
time.Sleep(2 * time.Second)
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
ExpectNoError(err)
scheduledPods, currentlyNotScheduledPods = GetPodsScheduled(masterNodes, allPods)
if startTime.Add(timeout).Before(time.Now()) {
Failf("Timed out after %v waiting for stable cluster.", timeout)
break
}
}
return len(scheduledPods)
}
// GetMasterAndWorkerNodesOrDie will return a list masters and schedulable worker nodes
func GetMasterAndWorkerNodesOrDie(c *client.Client) (sets.String, *api.NodeList) {
nodes := &api.NodeList{}
masters := sets.NewString()
all, _ := c.Nodes().List(api.ListOptions{})
for _, n := range all.Items {
if system.IsMasterNode(&n) {
masters.Insert(n.Name)
} else if isNodeSchedulable(&n) {
nodes.Items = append(nodes.Items, n)
}
}
return masters, nodes
}

View File

@ -148,7 +148,7 @@ func getRequestedCPU(pod api.Pod) int64 {
func verifyResult(c *client.Client, podName string, expectedScheduled int, expectedNotScheduled int, ns string) {
allPods, err := c.Pods(ns).List(api.ListOptions{})
framework.ExpectNoError(err)
scheduledPods, notScheduledPods := getPodsScheduled(allPods)
scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods)
printed := false
printOnce := func(msg string) string {
@ -174,37 +174,6 @@ func cleanupPods(c *client.Client, ns string) {
}
}
// Waits until all existing pods are scheduled and returns their amount.
func waitForStableCluster(c *client.Client) int {
timeout := 10 * time.Minute
startTime := time.Now()
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
framework.ExpectNoError(err)
// API server returns also Pods that succeeded. We need to filter them out.
currentPods := make([]api.Pod, 0, len(allPods.Items))
for _, pod := range allPods.Items {
if pod.Status.Phase != api.PodSucceeded && pod.Status.Phase != api.PodFailed {
currentPods = append(currentPods, pod)
}
}
allPods.Items = currentPods
scheduledPods, currentlyNotScheduledPods := getPodsScheduled(allPods)
for len(currentlyNotScheduledPods) != 0 {
time.Sleep(2 * time.Second)
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
framework.ExpectNoError(err)
scheduledPods, currentlyNotScheduledPods = getPodsScheduled(allPods)
if startTime.Add(timeout).Before(time.Now()) {
framework.Failf("Timed out after %v waiting for stable cluster.", timeout)
break
}
}
return len(scheduledPods)
}
var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
var c *client.Client
var nodeList *api.NodeList
@ -279,7 +248,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
totalPodCapacity += podCapacity.Value()
}
currentlyScheduledPods := waitForStableCluster(c)
currentlyScheduledPods := framework.WaitForStableCluster(c, masterNodes)
podsNeededForSaturation := int(totalPodCapacity) - currentlyScheduledPods
By(fmt.Sprintf("Starting additional %v Pods to fully saturate the cluster max pods and trying to start another one", podsNeededForSaturation))
@ -349,7 +318,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
nodeMaxCapacity = capacity.MilliValue()
}
}
waitForStableCluster(c)
framework.WaitForStableCluster(c, masterNodes)
pods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
framework.ExpectNoError(err)
@ -444,7 +413,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to schedule Pod with nonempty NodeSelector.")
podName := "restricted-pod"
waitForStableCluster(c)
framework.WaitForStableCluster(c, masterNodes)
_, err := c.Pods(ns).Create(&api.Pod{
TypeMeta: unversioned.TypeMeta{
@ -597,7 +566,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to schedule Pod with nonempty NodeSelector.")
podName := "restricted-pod"
waitForStableCluster(c)
framework.WaitForStableCluster(c, masterNodes)
_, err := c.Pods(ns).Create(&api.Pod{
TypeMeta: unversioned.TypeMeta{
@ -853,7 +822,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to schedule Pod with nonempty Pod Affinity.")
podName := "without-label-" + string(uuid.NewUUID())
waitForStableCluster(c)
framework.WaitForStableCluster(c, masterNodes)
_, err := c.Pods(ns).Create(&api.Pod{
TypeMeta: unversioned.TypeMeta{