Fix Stackdriver Logging e2e soak tests

pull/6/head
Mik Vyatskov 2017-07-06 19:55:14 +02:00
parent 7df2bce1ec
commit 01df7090cb
5 changed files with 58 additions and 36 deletions

View File

@ -49,7 +49,7 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
framework.ExpectNoError(err, "Fluentd deployed incorrectly") framework.ExpectNoError(err, "Fluentd deployed incorrectly")
By("Running synthetic logger") By("Running synthetic logger")
pod := createLoggingPod(f, podName, "", 10*60, 10*time.Minute) pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute)
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name) err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName)) framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName))

View File

@ -47,7 +47,7 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver
framework.ExpectNoError(err, "Fluentd deployed incorrectly") framework.ExpectNoError(err, "Fluentd deployed incorrectly")
By("Running synthetic logger") By("Running synthetic logger")
pod := createLoggingPod(f, podName, "", 10*60, 10*time.Minute) pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute)
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name) err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName)) framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName))

View File

@ -73,7 +73,7 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver
By("Running pods to generate events while waiting for some of them to be ingested") By("Running pods to generate events while waiting for some of them to be ingested")
wait.PollUntil(eventCreationInterval, func() (bool, error) { wait.PollUntil(eventCreationInterval, func() (bool, error) {
podName := "synthlogger" podName := "synthlogger"
createLoggingPod(f, podName, "", 1, 1*time.Second) startNewLoggingPod(f, podName, "", 1, 1*time.Second)
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name) err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
if err != nil { if err != nil {

View File

@ -34,6 +34,9 @@ const (
// considered acceptable. Once per hour is fine for now, as long as it // considered acceptable. Once per hour is fine for now, as long as it
// doesn't loose too much logs. // doesn't loose too much logs.
maxAllowedRestartsPerHour = 1.0 maxAllowedRestartsPerHour = 1.0
// lastPodIngestionSlack is the amount of time to wait for the last pod's
// logs to be ingested by the logging agent.
lastPodIngestionSlack = 5 * time.Minute
) )
var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() { var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() {
@ -57,21 +60,28 @@ var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver
float64(time.Hour) * maxAllowedRestartsPerHour)) float64(time.Hour) * maxAllowedRestartsPerHour))
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1 podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
By("Running short-living pods")
pods := []*loggingPod{} pods := []*loggingPod{}
for runIdx := 0; runIdx < podRunCount; runIdx++ { for runIdx := 0; runIdx < podRunCount; runIdx++ {
for nodeIdx, node := range nodes { for nodeIdx, node := range nodes {
podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx) podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx)
pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, jobDuration)) pods = append(pods, newLoggingPod(podName, node.Name, linesPerPod, jobDuration))
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
} }
time.Sleep(podRunDelay)
} }
By("Running short-living pods")
go func() {
for _, pod := range pods {
pod.Start(f)
time.Sleep(podRunDelay)
defer f.PodClient().Delete(pod.Name, &meta_v1.DeleteOptions{})
}
// Waiting until the last pod has completed
time.Sleep(jobDuration - podRunDelay + lastPodIngestionSlack)
}()
By("Waiting for all log lines to be ingested") By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{ config := &loggingTestConfig{
LogsProvider: sdLogsProvider, LogsProvider: sdLogsProvider,

View File

@ -50,20 +50,20 @@ var (
logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*") logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*")
) )
// Type to track the progress of logs generating pod
type loggingPod struct {
// Name of the pod
Name string
// Cache of ingested and read entries
Occurrences map[int]logEntry
// Number of lines expected to be ingested from this pod
ExpectedLinesNumber int
}
type logEntry struct { type logEntry struct {
Payload string Payload string
} }
func (entry logEntry) getLogEntryNumber() (int, bool) {
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload)
if submatch == nil || len(submatch) < 2 {
return 0, false
}
lineNumber, err := strconv.Atoi(submatch[1])
return lineNumber, err == nil
}
type logsProvider interface { type logsProvider interface {
Init() error Init() error
Cleanup() Cleanup()
@ -79,31 +79,37 @@ type loggingTestConfig struct {
MaxAllowedFluentdRestarts int MaxAllowedFluentdRestarts int
} }
func (entry logEntry) getLogEntryNumber() (int, bool) { // Type to track the progress of logs generating pod
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload) type loggingPod struct {
if submatch == nil || len(submatch) < 2 { // Name equals to the pod name and the container name.
return 0, false Name string
} // NodeName is the name of the node this pod will be
// assigned to. Can be empty.
lineNumber, err := strconv.Atoi(submatch[1]) NodeName string
return lineNumber, err == nil // Occurrences is a cache of ingested and read entries.
Occurrences map[int]logEntry
// ExpectedLinesNumber is the number of lines that are
// expected to be ingested from this pod.
ExpectedLinesNumber int
// RunDuration is how long the pod will live.
RunDuration time.Duration
} }
func createLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod { func newLoggingPod(podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod {
framework.Logf("Starting pod %s", podName)
createLogsGeneratorPod(f, podName, nodeName, totalLines, loggingDuration)
return &loggingPod{ return &loggingPod{
Name: podName, Name: podName,
NodeName: nodeName,
Occurrences: make(map[int]logEntry), Occurrences: make(map[int]logEntry),
ExpectedLinesNumber: totalLines, ExpectedLinesNumber: totalLines,
RunDuration: loggingDuration,
} }
} }
func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName string, linesCount int, duration time.Duration) { func (p *loggingPod) Start(f *framework.Framework) {
framework.Logf("Starting pod %s", p.Name)
f.PodClient().Create(&api_v1.Pod{ f.PodClient().Create(&api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{ ObjectMeta: meta_v1.ObjectMeta{
Name: podName, Name: p.Name,
}, },
Spec: api_v1.PodSpec{ Spec: api_v1.PodSpec{
RestartPolicy: api_v1.RestartPolicyNever, RestartPolicy: api_v1.RestartPolicyNever,
@ -114,11 +120,11 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str
Env: []api_v1.EnvVar{ Env: []api_v1.EnvVar{
{ {
Name: "LOGS_GENERATOR_LINES_TOTAL", Name: "LOGS_GENERATOR_LINES_TOTAL",
Value: strconv.Itoa(linesCount), Value: strconv.Itoa(p.ExpectedLinesNumber),
}, },
{ {
Name: "LOGS_GENERATOR_DURATION", Name: "LOGS_GENERATOR_DURATION",
Value: duration.String(), Value: p.RunDuration.String(),
}, },
}, },
Resources: api_v1.ResourceRequirements{ Resources: api_v1.ResourceRequirements{
@ -133,11 +139,17 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str
}, },
}, },
}, },
NodeName: nodeName, NodeName: p.NodeName,
}, },
}) })
} }
func startNewLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod {
pod := newLoggingPod(podName, nodeName, totalLines, loggingDuration)
pod.Start(f)
return pod
}
func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error { func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error {
podHasIngestedLogs := make([]bool, len(config.Pods)) podHasIngestedLogs := make([]bool, len(config.Pods))
podWithIngestedLogsCount := 0 podWithIngestedLogsCount := 0