diff --git a/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml b/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml index 634015bf65..6784dd13b3 100644 --- a/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml +++ b/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml @@ -12,8 +12,6 @@ spec: resources: limits: cpu: 100m - args: - - -q volumeMounts: - name: varlog mountPath: /var/log diff --git a/test/e2e/es_cluster_logging.go b/test/e2e/es_cluster_logging.go index 0e40bdf4f8..088e654308 100644 --- a/test/e2e/es_cluster_logging.go +++ b/test/e2e/es_cluster_logging.go @@ -76,7 +76,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { const graceTime = 5 * time.Minute // ingestionTimeout is how long to keep retrying to wait for all the // logs to be ingested. - const ingestionTimeout = 3 * time.Minute + const ingestionTimeout = 10 * time.Minute // Check for the existence of the Elasticsearch service. By("Checking the Elasticsearch service exists.") @@ -219,9 +219,9 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { By("Checking to make sure the Fluentd pod are running on each healthy node") label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue})) options = api.ListOptions{LabelSelector: label} - pods, err = f.Client.Pods(api.NamespaceSystem).List(options) + fluentdPods, err := f.Client.Pods(api.NamespaceSystem).List(options) Expect(err).NotTo(HaveOccurred()) - for _, pod := range pods.Items { + for _, pod := range fluentdPods.Items { if nodeInNodeList(pod.Spec.NodeName, nodes) { err = waitForPodRunningInNamespace(f.Client, pod.Name, api.NamespaceSystem) Expect(err).NotTo(HaveOccurred()) @@ -231,7 +231,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { // Check if each healthy node has fluentd running on it for _, node := range nodes.Items { exists := false - for _, pod := range pods.Items { + for _, pod := range fluentdPods.Items { if pod.Spec.NodeName == node.Name { exists = true break @@ -305,7 +305,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { totalMissing := 0 expected := nodeCount * countTo missingPerNode := []int{} - for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(10 * time.Second) { + for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(25 * time.Second) { // Debugging code to report the status of the elasticsearch logging endpoints. selector := labels.Set{k8sAppKey: esValue}.AsSelector() @@ -346,7 +346,8 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { } hits, ok := response["hits"].(map[string]interface{}) if !ok { - Failf("response[hits] not of the expected type: %T", response["hits"]) + Logf("response[hits] not of the expected type: %T", response["hits"]) + continue } totalF, ok := hits["total"].(float64) if !ok { @@ -371,33 +372,41 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { for _, e := range h { l, ok := e.(map[string]interface{}) if !ok { - Failf("element of hit not of expected type: %T", e) + Logf("element of hit not of expected type: %T", e) + continue } source, ok := l["_source"].(map[string]interface{}) if !ok { - Failf("_source not of the expected type: %T", l["_source"]) + Logf("_source not of the expected type: %T", l["_source"]) + continue } msg, ok := source["log"].(string) if !ok { - Failf("log not of the expected type: %T", source["log"]) + Logf("log not of the expected type: %T", source["log"]) + continue } words := strings.Split(msg, " ") - if len(words) < 4 { - Failf("Malformed log line: %s", msg) + if len(words) != 4 { + Logf("Malformed log line: %s", msg) + continue } n, err := strconv.ParseUint(words[0], 10, 0) if err != nil { - Failf("Expecting numer of node as first field of %s", msg) + Logf("Expecting numer of node as first field of %s", msg) + continue } if n < 0 || int(n) >= nodeCount { - Failf("Node count index out of range: %d", nodeCount) + Logf("Node count index out of range: %d", nodeCount) + continue } index, err := strconv.ParseUint(words[2], 10, 0) if err != nil { - Failf("Expecting number as third field of %s", msg) + Logf("Expecting number as third field of %s", msg) + continue } if index < 0 || index >= countTo { - Failf("Index value out of range: %d", index) + Logf("Index value out of range: %d", index) + continue } // Record the observation of a log line from node n at the given index. observed[n][index]++ @@ -405,6 +414,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { // Make sure we correctly observed the expected log lines from each node. totalMissing = 0 missingPerNode = make([]int, nodeCount) + incorrectCount := false for n := range observed { for i, c := range observed[n] { if c == 0 { @@ -412,10 +422,15 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { missingPerNode[n]++ } if c < 0 || c > 1 { - Failf("Got incorrect count for node %d index %d: %d", n, i, c) + Logf("Got incorrect count for node %d index %d: %d", n, i, c) + incorrectCount = true } } } + if incorrectCount { + Logf("After %v es still return duplicated log lines", time.Since(start)) + continue + } if totalMissing != 0 { Logf("After %v still missing %d log lines", time.Since(start), totalMissing) continue @@ -425,7 +440,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { } for n := range missingPerNode { if missingPerNode[n] > 0 { - Logf("Node %d is missing %d logs", n, missingPerNode[n]) + Logf("Node %d %s is missing %d logs", n, nodes.Items[n].Name, missingPerNode[n]) opts := &api.PodLogOptions{} body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw() if err != nil { @@ -433,6 +448,18 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { continue } Logf("Pod %s has the following logs: %s", podNames[n], body) + + for _, pod := range fluentdPods.Items { + if pod.Spec.NodeName == nodes.Items[n].Name { + body, err = f.Client.Pods(api.NamespaceSystem).GetLogs(pod.Name, opts).DoRaw() + if err != nil { + Logf("Cannot get logs from pod %v", pod.Name) + break + } + Logf("Fluentd Pod %s on node %s has the following logs: %s", pod.Name, nodes.Items[n].Name, body) + break + } + } } } Failf("Failed to find all %d log lines", expected)