Merge pull request #41822 from Crassirostris/cluster-logging-tests-control-ooms

Automatic merge from submit-queue (batch tested with PRs 41980, 42192, 42223, 41822, 42048)

Take into account number of restarts in cluster logging tests

Before, in cluster logging tests, we only measured e2e number of lines delivered to the backend.

Also, befure https://github.com/kubernetes/kubernetes/pull/41795 was merged, from the k8s perspective, fluentd was always working properly, even if it's crashlooping inside.

Now we can detect whether fluentd is truly working properly, experiencing no, or almost no OOMs duing its operation.
pull/6/head
Kubernetes Submit Queue 2017-03-02 00:59:21 -08:00 committed by GitHub
commit 979219f07b
7 changed files with 86 additions and 52 deletions

View File

@ -208,6 +208,7 @@ go_library(
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/transport",
"//vendor:k8s.io/client-go/util/flowcontrol",
"//vendor:k8s.io/client-go/util/integer",
"//vendor:k8s.io/client-go/util/workqueue",
],
)

View File

@ -51,11 +51,14 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
By("Waiting for logs to ingest")
err = waitForLogsIngestion(esLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
framework.ExpectNoError(err, "Failed to ingest logs")
if err != nil {
reportLogsFromFluentdPod(f, pod)
config := &loggingTestConfig{
LogsProvider: esLogsProvider,
Pods: []*loggingPod{pod},
IngestionTimeout: 10 * time.Minute,
MaxAllowedLostFraction: 0,
MaxAllowedFluentdRestarts: 0,
}
err = waitForLogsIngestion(f, config)
framework.ExpectNoError(err, "Failed to ingest logs")
})
})

View File

@ -46,6 +46,10 @@ func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
return &esLogsProvider{Framework: f}, nil
}
func (logsProvider *esLogsProvider) FluentdApplicationName() string {
return "fluentd-es"
}
// Ensures that elasticsearch is running and ready to serve requests
func (logsProvider *esLogsProvider) EnsureWorking() error {
f := logsProvider.Framework

View File

@ -50,11 +50,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Flaky]", func()
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
By("Waiting for logs to ingest")
err = waitForLogsIngestion(gclLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
framework.ExpectNoError(err, "Failed to ingest logs")
if err != nil {
reportLogsFromFluentdPod(f, pod)
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
Pods: []*loggingPod{pod},
IngestionTimeout: 10 * time.Minute,
MaxAllowedLostFraction: 0,
MaxAllowedFluentdRestarts: 0,
}
err = waitForLogsIngestion(f, config)
framework.ExpectNoError(err, "Failed to ingest logs")
})
})

View File

@ -28,7 +28,8 @@ import (
const (
// TODO(crassirostris): Once test is stable, decrease allowed loses
loadTestMaxAllowedLostFraction = 0.1
loadTestMaxAllowedLostFraction = 0.1
loadTestMaxAllowedFluentdRestarts = 1
)
// TODO(crassirostris): Remove Flaky once test is stable
@ -58,7 +59,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]",
time.Sleep(loggingDuration)
By("Waiting for all log lines to be ingested")
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
Pods: pods,
IngestionTimeout: ingestionTimeout,
MaxAllowedLostFraction: loadTestMaxAllowedLostFraction,
MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts,
}
err = waitForLogsIngestion(f, config)
if err != nil {
framework.Failf("Failed to ingest logs: %v", err)
} else {
@ -96,7 +104,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]",
time.Sleep(jobDuration)
By("Waiting for all log lines to be ingested")
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
Pods: pods,
IngestionTimeout: ingestionTimeout,
MaxAllowedLostFraction: loadTestMaxAllowedLostFraction,
MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts,
}
err = waitForLogsIngestion(f, config)
if err != nil {
framework.Failf("Failed to ingest logs: %v", err)
} else {

View File

@ -64,6 +64,10 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
return provider, nil
}
func (logsProvider *gclLogsProvider) FluentdApplicationName() string {
return "fluentd-gcp"
}
// Since GCL API is not easily available from the outside of cluster
// we use gcloud command to perform search with filter
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {

View File

@ -17,7 +17,6 @@ limitations under the License.
package e2e
import (
"errors"
"fmt"
"strconv"
"strings"
@ -26,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/integer"
"k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/test/e2e/framework"
@ -63,10 +63,19 @@ type logEntry struct {
}
type logsProvider interface {
FluentdApplicationName() string
EnsureWorking() error
ReadEntries(*loggingPod) []*logEntry
}
type loggingTestConfig struct {
LogsProvider logsProvider
Pods []*loggingPod
IngestionTimeout time.Duration
MaxAllowedLostFraction float64
MaxAllowedFluentdRestarts int
}
func (entry *logEntry) getLogEntryNumber() (int, bool) {
chunks := strings.Split(entry.Payload, " ")
lineNumber, err := strconv.Atoi(strings.TrimSpace(chunks[0]))
@ -123,27 +132,27 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, linesCount i
})
}
func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingestionTimeout time.Duration, maxAllowedLostFraction float64) error {
func waitForLogsIngestion(f *framework.Framework, config *loggingTestConfig) error {
expectedLinesNumber := 0
for _, pod := range pods {
for _, pod := range config.Pods {
expectedLinesNumber += pod.ExpectedLinesNumber
}
totalMissing := expectedLinesNumber
missingByPod := make([]int, len(pods))
for podIdx, pod := range pods {
missingByPod := make([]int, len(config.Pods))
for podIdx, pod := range config.Pods {
missingByPod[podIdx] = pod.ExpectedLinesNumber
}
for start := time.Now(); totalMissing > 0 && time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
for start := time.Now(); totalMissing > 0 && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
missing := 0
for podIdx, pod := range pods {
for podIdx, pod := range config.Pods {
if missingByPod[podIdx] == 0 {
continue
}
missingByPod[podIdx] = pullMissingLogsCount(logsProvider, pod)
missingByPod[podIdx] = pullMissingLogsCount(config.LogsProvider, pod)
missing += missingByPod[podIdx]
}
@ -156,13 +165,32 @@ func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingesti
lostFraction := float64(totalMissing) / float64(expectedLinesNumber)
if totalMissing > 0 {
framework.Logf("After %v still missing %d lines, %.2f%% of total number oflines",
ingestionTimeout, totalMissing, lostFraction*100)
framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines",
config.IngestionTimeout, totalMissing, lostFraction*100)
}
if lostFraction > maxAllowedLostFraction {
if lostFraction > config.MaxAllowedLostFraction {
return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated",
lostFraction*100, maxAllowedLostFraction*100)
lostFraction*100, config.MaxAllowedLostFraction*100)
}
fluentdPods, err := getFluentdPods(f, config.LogsProvider.FluentdApplicationName())
if err != nil {
return fmt.Errorf("failed to get fluentd pods due to %v", err)
}
maxRestartCount := 0
for _, fluentdPod := range fluentdPods.Items {
restartCount := int(fluentdPod.Status.ContainerStatuses[0].RestartCount)
maxRestartCount = integer.IntMax(maxRestartCount, restartCount)
framework.Logf("Fluentd pod %s on node %s was restarted %d times",
fluentdPod.Name, fluentdPod.Spec.NodeName, restartCount)
}
if maxRestartCount > config.MaxAllowedFluentdRestarts {
return fmt.Errorf("max fluentd pod restarts was %d, which is more than allowed %d",
maxRestartCount, config.MaxAllowedFluentdRestarts)
}
return nil
@ -211,32 +239,8 @@ func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, erro
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
}
func reportLogsFromFluentdPod(f *framework.Framework, pod *loggingPod) error {
synthLoggerPod, err := f.PodClient().Get(pod.Name, meta_v1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get synth logger pod due to %v", err)
}
synthLoggerNodeName := synthLoggerPod.Spec.NodeName
if synthLoggerNodeName == "" {
return errors.New("Synthlogger pod is not assigned to the node")
}
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "fluentd-logging"}))
func getFluentdPods(f *framework.Framework, fluentdApplicationName string) (*api_v1.PodList, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": fluentdApplicationName}))
options := meta_v1.ListOptions{LabelSelector: label.String()}
fluentdPods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
for _, fluentdPod := range fluentdPods.Items {
if fluentdPod.Spec.NodeName == synthLoggerNodeName {
containerName := fluentdPod.Spec.Containers[0].Name
logs, err := framework.GetPodLogs(f.ClientSet, meta_v1.NamespaceSystem, fluentdPod.Name, containerName)
if err != nil {
return fmt.Errorf("failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err)
}
framework.Logf("Logs from fluentd pod %s:\n%s", fluentdPod.Name, logs)
return nil
}
}
return fmt.Errorf("failed to find fluentd pod running on node %s", synthLoggerNodeName)
return f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
}