diff --git a/test/e2e/cluster-logging/es.go b/test/e2e/cluster-logging/es.go index bf3488d4ab..21bb4d24bf 100644 --- a/test/e2e/cluster-logging/es.go +++ b/test/e2e/cluster-logging/es.go @@ -49,7 +49,7 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu framework.ExpectNoError(err, "Fluentd deployed incorrectly") By("Running synthetic logger") - pod := createLoggingPod(f, podName, "", 10*60, 10*time.Minute) + pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute) defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name) framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName)) diff --git a/test/e2e/cluster-logging/sd.go b/test/e2e/cluster-logging/sd.go index 65a8ac66c1..60fdf87173 100644 --- a/test/e2e/cluster-logging/sd.go +++ b/test/e2e/cluster-logging/sd.go @@ -47,7 +47,7 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver framework.ExpectNoError(err, "Fluentd deployed incorrectly") By("Running synthetic logger") - pod := createLoggingPod(f, podName, "", 10*60, 10*time.Minute) + pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute) defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name) framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName)) diff --git a/test/e2e/cluster-logging/sd_events.go b/test/e2e/cluster-logging/sd_events.go index 2c81b3887c..1440b99322 100644 --- a/test/e2e/cluster-logging/sd_events.go +++ b/test/e2e/cluster-logging/sd_events.go @@ -73,7 +73,7 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver By("Running pods to generate events while waiting for some of them to be ingested") wait.PollUntil(eventCreationInterval, func() (bool, error) { podName := "synthlogger" - createLoggingPod(f, podName, "", 1, 1*time.Second) + startNewLoggingPod(f, podName, "", 1, 1*time.Second) defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name) if err != nil { diff --git a/test/e2e/cluster-logging/sd_soak.go b/test/e2e/cluster-logging/sd_soak.go index 3d8f9485a7..c1d2f657b0 100644 --- a/test/e2e/cluster-logging/sd_soak.go +++ b/test/e2e/cluster-logging/sd_soak.go @@ -34,6 +34,9 @@ const ( // considered acceptable. Once per hour is fine for now, as long as it // doesn't loose too much logs. maxAllowedRestartsPerHour = 1.0 + // lastPodIngestionSlack is the amount of time to wait for the last pod's + // logs to be ingested by the logging agent. + lastPodIngestionSlack = 5 * time.Minute ) var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() { @@ -57,21 +60,28 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver float64(time.Hour) * maxAllowedRestartsPerHour)) podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) - podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1 + podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1 linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) - By("Running short-living pods") pods := []*loggingPod{} for runIdx := 0; runIdx < podRunCount; runIdx++ { for nodeIdx, node := range nodes { podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx) - pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, jobDuration)) - - defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) + pods = append(pods, newLoggingPod(podName, node.Name, linesPerPod, jobDuration)) } - time.Sleep(podRunDelay) } + By("Running short-living pods") + go func() { + for _, pod := range pods { + pod.Start(f) + time.Sleep(podRunDelay) + defer f.PodClient().Delete(pod.Name, &meta_v1.DeleteOptions{}) + } + // Waiting until the last pod has completed + time.Sleep(jobDuration - podRunDelay + lastPodIngestionSlack) + }() + By("Waiting for all log lines to be ingested") config := &loggingTestConfig{ LogsProvider: sdLogsProvider, diff --git a/test/e2e/cluster-logging/utils.go b/test/e2e/cluster-logging/utils.go index 01190acd2e..f87b2575f8 100644 --- a/test/e2e/cluster-logging/utils.go +++ b/test/e2e/cluster-logging/utils.go @@ -50,20 +50,20 @@ var ( logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*") ) -// Type to track the progress of logs generating pod -type loggingPod struct { - // Name of the pod - Name string - // Cache of ingested and read entries - Occurrences map[int]logEntry - // Number of lines expected to be ingested from this pod - ExpectedLinesNumber int -} - type logEntry struct { Payload string } +func (entry logEntry) getLogEntryNumber() (int, bool) { + submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload) + if submatch == nil || len(submatch) < 2 { + return 0, false + } + + lineNumber, err := strconv.Atoi(submatch[1]) + return lineNumber, err == nil +} + type logsProvider interface { Init() error Cleanup() @@ -79,31 +79,37 @@ type loggingTestConfig struct { MaxAllowedFluentdRestarts int } -func (entry logEntry) getLogEntryNumber() (int, bool) { - submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload) - if submatch == nil || len(submatch) < 2 { - return 0, false - } - - lineNumber, err := strconv.Atoi(submatch[1]) - return lineNumber, err == nil +// Type to track the progress of logs generating pod +type loggingPod struct { + // Name equals to the pod name and the container name. + Name string + // NodeName is the name of the node this pod will be + // assigned to. Can be empty. + NodeName string + // Occurrences is a cache of ingested and read entries. + Occurrences map[int]logEntry + // ExpectedLinesNumber is the number of lines that are + // expected to be ingested from this pod. + ExpectedLinesNumber int + // RunDuration is how long the pod will live. + RunDuration time.Duration } -func createLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod { - framework.Logf("Starting pod %s", podName) - createLogsGeneratorPod(f, podName, nodeName, totalLines, loggingDuration) - +func newLoggingPod(podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod { return &loggingPod{ Name: podName, + NodeName: nodeName, Occurrences: make(map[int]logEntry), ExpectedLinesNumber: totalLines, + RunDuration: loggingDuration, } } -func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName string, linesCount int, duration time.Duration) { +func (p *loggingPod) Start(f *framework.Framework) { + framework.Logf("Starting pod %s", p.Name) f.PodClient().Create(&api_v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ - Name: podName, + Name: p.Name, }, Spec: api_v1.PodSpec{ RestartPolicy: api_v1.RestartPolicyNever, @@ -114,11 +120,11 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str Env: []api_v1.EnvVar{ { Name: "LOGS_GENERATOR_LINES_TOTAL", - Value: strconv.Itoa(linesCount), + Value: strconv.Itoa(p.ExpectedLinesNumber), }, { Name: "LOGS_GENERATOR_DURATION", - Value: duration.String(), + Value: p.RunDuration.String(), }, }, Resources: api_v1.ResourceRequirements{ @@ -133,11 +139,17 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str }, }, }, - NodeName: nodeName, + NodeName: p.NodeName, }, }) } +func startNewLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod { + pod := newLoggingPod(podName, nodeName, totalLines, loggingDuration) + pod.Start(f) + return pod +} + func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error { podHasIngestedLogs := make([]bool, len(config.Pods)) podWithIngestedLogsCount := 0