mirror of https://github.com/k3s-io/k3s
Merge pull request #36198 from Crassirostris/logging-load-test
Automatic merge from submit-queue Add cluster logging load test for GCL Changed code around logging tests a little to avoid duplication. Added GCL go library, because command line tool cannot handle required number of logs effectively. Related to https://github.com/kubernetes/kubernetes/issues/34310 @pioszpull/6/head
commit
b7717e74e7
|
@ -2555,6 +2555,10 @@
|
|||
"ImportPath": "google.golang.org/api/googleapi/internal/uritemplates",
|
||||
"Rev": "55146ba61254fdb1c26d65ff3c04bc1611ad73fb"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/api/logging/v2beta1",
|
||||
"Rev": "55146ba61254fdb1c26d65ff3c04bc1611ad73fb"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/appengine",
|
||||
"Rev": "4f7eeb5305a4ba1966344836ba4af9996b7b4e05"
|
||||
|
|
|
@ -75772,6 +75772,41 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|||
================================================================================
|
||||
|
||||
|
||||
================================================================================
|
||||
= vendor/google.golang.org/api/logging/v2beta1 licensed under: =
|
||||
|
||||
Copyright (c) 2011 Google Inc. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google Inc. nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
= vendor/google.golang.org/api/LICENSE a651bb3d8b1c412632e28823bb432b40 -
|
||||
================================================================================
|
||||
|
||||
|
||||
================================================================================
|
||||
= vendor/google.golang.org/appengine licensed under: =
|
||||
|
||||
|
|
|
@ -15,7 +15,10 @@ go_library(
|
|||
"apparmor.go",
|
||||
"cadvisor.go",
|
||||
"cluster_logging_es.go",
|
||||
"cluster_logging_es_utils.go",
|
||||
"cluster_logging_gcl.go",
|
||||
"cluster_logging_gcl_load.go",
|
||||
"cluster_logging_gcl_utils.go",
|
||||
"cluster_logging_utils.go",
|
||||
"cluster_size_autoscaling.go",
|
||||
"cluster_upgrade.go",
|
||||
|
@ -176,8 +179,10 @@ go_library(
|
|||
"//vendor:golang.org/x/crypto/ssh",
|
||||
"//vendor:golang.org/x/net/context",
|
||||
"//vendor:golang.org/x/net/websocket",
|
||||
"//vendor:golang.org/x/oauth2/google",
|
||||
"//vendor:google.golang.org/api/compute/v1",
|
||||
"//vendor:google.golang.org/api/googleapi",
|
||||
"//vendor:google.golang.org/api/logging/v2beta1",
|
||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||
"//vendor:k8s.io/apimachinery/pkg/api/resource",
|
||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
|
@ -187,7 +192,6 @@ go_library(
|
|||
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||
"//vendor:k8s.io/apimachinery/pkg/types",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/json",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/net",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||
|
|
|
@ -17,24 +17,13 @@ limitations under the License.
|
|||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
const (
|
||||
// graceTime is how long to keep retrying requesting elasticsearch for status information.
|
||||
graceTime = 5 * time.Minute
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() {
|
||||
|
@ -48,241 +37,25 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
|
|||
})
|
||||
|
||||
It("should check that logs from containers are ingested into Elasticsearch", func() {
|
||||
err := checkElasticsearchReadiness(f)
|
||||
framework.ExpectNoError(err, "Elasticsearch failed to start")
|
||||
podName := "synthlogger"
|
||||
esLogsProvider, err := newEsLogsProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
||||
|
||||
err = esLogsProvider.EnsureWorking()
|
||||
framework.ExpectNoError(err, "Elasticsearch is not working")
|
||||
|
||||
By("Running synthetic logger")
|
||||
createSynthLogger(f, expectedLinesCount)
|
||||
defer f.PodClient().Delete(synthLoggerPodName, &metav1.DeleteOptions{})
|
||||
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, synthLoggerPodName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName))
|
||||
pod := createLoggingPod(f, podName, 100, 1*time.Second)
|
||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
|
||||
|
||||
By("Waiting for logs to ingest")
|
||||
totalMissing := expectedLinesCount
|
||||
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||
totalMissing, err = getMissingLinesCountElasticsearch(f, expectedLinesCount)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to get missing lines count due to %v", err)
|
||||
totalMissing = expectedLinesCount
|
||||
} else if totalMissing > 0 {
|
||||
framework.Logf("Still missing %d lines", totalMissing)
|
||||
}
|
||||
err = waitForLogsIngestion(esLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
|
||||
framework.ExpectNoError(err, "Failed to ingest logs")
|
||||
|
||||
if totalMissing == 0 {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
reportLogsFromFluentdPod(f, pod)
|
||||
}
|
||||
|
||||
if totalMissing > 0 {
|
||||
if err := reportLogsFromFluentdPod(f); err != nil {
|
||||
framework.Logf("Failed to report logs from fluentd pod due to %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Expect(totalMissing).To(Equal(0), "Some log lines are still missing")
|
||||
})
|
||||
})
|
||||
|
||||
// Ensures that elasticsearch is running and ready to serve requests
|
||||
func checkElasticsearchReadiness(f *framework.Framework) error {
|
||||
// Check for the existence of the Elasticsearch service.
|
||||
By("Checking the Elasticsearch service exists.")
|
||||
s := f.ClientSet.Core().Services(metav1.NamespaceSystem)
|
||||
// Make a few attempts to connect. This makes the test robust against
|
||||
// being run as the first e2e test just after the e2e cluster has been created.
|
||||
var err error
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
if _, err = s.Get("elasticsearch-logging", metav1.GetOptions{}); err == nil {
|
||||
break
|
||||
}
|
||||
framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Wait for the Elasticsearch pods to enter the running state.
|
||||
By("Checking to make sure the Elasticsearch pods are running")
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "elasticsearch-logging"}))
|
||||
options := metav1.ListOptions{LabelSelector: label.String()}
|
||||
pods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for _, pod := range pods.Items {
|
||||
err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
By("Checking to make sure we are talking to an Elasticsearch service.")
|
||||
// Perform a few checks to make sure this looks like an Elasticsearch cluster.
|
||||
var statusCode int
|
||||
err = nil
|
||||
var body []byte
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Query against the root URL for Elasticsearch.
|
||||
response := proxyRequest.Namespace(metav1.NamespaceSystem).
|
||||
Context(ctx).
|
||||
Name("elasticsearch-logging").
|
||||
Do()
|
||||
err = response.Error()
|
||||
response.StatusCode(&statusCode)
|
||||
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
framework.Failf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
|
||||
continue
|
||||
}
|
||||
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
|
||||
continue
|
||||
}
|
||||
if int(statusCode) != 200 {
|
||||
framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if int(statusCode) != 200 {
|
||||
framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
|
||||
}
|
||||
|
||||
// Now assume we really are talking to an Elasticsearch instance.
|
||||
// Check the cluster health.
|
||||
By("Checking health of Elasticsearch service.")
|
||||
healthy := false
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
body, err = proxyRequest.Namespace(metav1.NamespaceSystem).
|
||||
Context(ctx).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_cluster/health").
|
||||
Param("level", "indices").
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
framework.Failf("Failed to get cluster health from elasticsearch: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
health := make(map[string]interface{})
|
||||
err := json.Unmarshal(body, &health)
|
||||
if err != nil {
|
||||
framework.Logf("Bad json response from elasticsearch: %v", err)
|
||||
continue
|
||||
}
|
||||
statusIntf, ok := health["status"]
|
||||
if !ok {
|
||||
framework.Logf("No status field found in cluster health response: %v", health)
|
||||
continue
|
||||
}
|
||||
status := statusIntf.(string)
|
||||
if status != "green" && status != "yellow" {
|
||||
framework.Logf("Cluster health has bad status: %v", health)
|
||||
continue
|
||||
}
|
||||
if err == nil && ok {
|
||||
healthy = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !healthy {
|
||||
return fmt.Errorf("After %v elasticsearch cluster is not healthy", graceTime)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int) (int, error) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Ask Elasticsearch to return all the log lines that were tagged with the
|
||||
// pod name. Ask for ten times as many log lines because duplication is possible.
|
||||
body, err := proxyRequest.Namespace(metav1.NamespaceSystem).
|
||||
Context(ctx).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_search").
|
||||
// TODO: Change filter to only match records from current test run
|
||||
// after fluent-plugin-kubernetes_metadata_filter is enabled
|
||||
// and optimize current query
|
||||
Param("q", fmt.Sprintf("tag:*%s*", synthLoggerPodName)).
|
||||
Param("size", strconv.Itoa(expectedCount*10)).
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
framework.Failf("Failed to make proxy call to elasticsearch-logging: %v", err)
|
||||
}
|
||||
return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err)
|
||||
}
|
||||
|
||||
var response map[string]interface{}
|
||||
err = json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Failed to unmarshal response: %v", err)
|
||||
}
|
||||
|
||||
hits, ok := response["hits"].(map[string]interface{})
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("response[hits] not of the expected type: %T", response["hits"])
|
||||
}
|
||||
|
||||
h, ok := hits["hits"].([]interface{})
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("Hits not of the expected type: %T", hits["hits"])
|
||||
}
|
||||
|
||||
// Initialize data-structure for observing counts.
|
||||
counts := make(map[int]int)
|
||||
|
||||
// Iterate over the hits and populate the observed array.
|
||||
for _, e := range h {
|
||||
l, ok := e.(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("Element of hit not of expected type: %T", e)
|
||||
continue
|
||||
}
|
||||
source, ok := l["_source"].(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("_source not of the expected type: %T", l["_source"])
|
||||
continue
|
||||
}
|
||||
msg, ok := source["log"].(string)
|
||||
if !ok {
|
||||
framework.Logf("Log not of the expected type: %T", source["log"])
|
||||
continue
|
||||
}
|
||||
lineNumber, err := strconv.Atoi(strings.TrimSpace(msg))
|
||||
if err != nil {
|
||||
framework.Logf("Log line %s is not a number", msg)
|
||||
continue
|
||||
}
|
||||
if lineNumber < 0 || lineNumber >= expectedCount {
|
||||
framework.Logf("Number %d is not valid, expected number from range [0, %d)", lineNumber, expectedCount)
|
||||
continue
|
||||
}
|
||||
// Record the observation of a log line
|
||||
// Duplicates are possible and fine, fluentd has at-least-once delivery
|
||||
counts[lineNumber]++
|
||||
}
|
||||
|
||||
return expectedCount - len(counts), nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,240 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
const (
|
||||
// esRetryTimeout is how long to keep retrying requesting elasticsearch for status information.
|
||||
esRetryTimeout = 5 * time.Minute
|
||||
// esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch
|
||||
esRetryDelay = 5 * time.Second
|
||||
)
|
||||
|
||||
type esLogsProvider struct {
|
||||
Framework *framework.Framework
|
||||
}
|
||||
|
||||
func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
|
||||
return &esLogsProvider{Framework: f}, nil
|
||||
}
|
||||
|
||||
// Ensures that elasticsearch is running and ready to serve requests
|
||||
func (logsProvider *esLogsProvider) EnsureWorking() error {
|
||||
f := logsProvider.Framework
|
||||
// Check for the existence of the Elasticsearch service.
|
||||
By("Checking the Elasticsearch service exists.")
|
||||
s := f.ClientSet.Core().Services(api.NamespaceSystem)
|
||||
// Make a few attempts to connect. This makes the test robust against
|
||||
// being run as the first e2e test just after the e2e cluster has been created.
|
||||
var err error
|
||||
for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
|
||||
if _, err = s.Get("elasticsearch-logging", meta_v1.GetOptions{}); err == nil {
|
||||
break
|
||||
}
|
||||
framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Wait for the Elasticsearch pods to enter the running state.
|
||||
By("Checking to make sure the Elasticsearch pods are running")
|
||||
labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String()
|
||||
options := meta_v1.ListOptions{LabelSelector: labelSelector}
|
||||
pods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for _, pod := range pods.Items {
|
||||
err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
By("Checking to make sure we are talking to an Elasticsearch service.")
|
||||
// Perform a few checks to make sure this looks like an Elasticsearch cluster.
|
||||
var statusCode int
|
||||
err = nil
|
||||
var body []byte
|
||||
for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
// Query against the root URL for Elasticsearch.
|
||||
response := proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
Do()
|
||||
err = response.Error()
|
||||
response.StatusCode(&statusCode)
|
||||
|
||||
if err != nil {
|
||||
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
|
||||
continue
|
||||
}
|
||||
if int(statusCode) != 200 {
|
||||
framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if int(statusCode) != 200 {
|
||||
framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
|
||||
}
|
||||
|
||||
// Now assume we really are talking to an Elasticsearch instance.
|
||||
// Check the cluster health.
|
||||
By("Checking health of Elasticsearch service.")
|
||||
healthy := false
|
||||
for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
body, err = proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_cluster/health").
|
||||
Param("level", "indices").
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
health := make(map[string]interface{})
|
||||
err := json.Unmarshal(body, &health)
|
||||
if err != nil {
|
||||
framework.Logf("Bad json response from elasticsearch: %v", err)
|
||||
continue
|
||||
}
|
||||
statusIntf, ok := health["status"]
|
||||
if !ok {
|
||||
framework.Logf("No status field found in cluster health response: %v", health)
|
||||
continue
|
||||
}
|
||||
status := statusIntf.(string)
|
||||
if status != "green" && status != "yellow" {
|
||||
framework.Logf("Cluster health has bad status: %v", health)
|
||||
continue
|
||||
}
|
||||
if err == nil && ok {
|
||||
healthy = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !healthy {
|
||||
return fmt.Errorf("after %v elasticsearch cluster is not healthy", esRetryTimeout)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
||||
f := logsProvider.Framework
|
||||
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("Failed to get services proxy request: %v", errProxy)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ask Elasticsearch to return all the log lines that were tagged with the
|
||||
// pod name. Ask for ten times as many log lines because duplication is possible.
|
||||
body, err := proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_search").
|
||||
// TODO: Change filter to only match records from current test run
|
||||
// after fluent-plugin-kubernetes_metadata_filter is enabled
|
||||
// and optimize current query
|
||||
Param("q", fmt.Sprintf("tag:*%s*", pod.Name)).
|
||||
// Ask for more in case we included some unrelated records in our query
|
||||
Param("size", strconv.Itoa(pod.ExpectedLinesNumber*10)).
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
framework.Logf("Failed to make proxy call to elasticsearch-logging: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var response map[string]interface{}
|
||||
err = json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to unmarshal response: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
hits, ok := response["hits"].(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("response[hits] not of the expected type: %T", response["hits"])
|
||||
return nil
|
||||
}
|
||||
|
||||
h, ok := hits["hits"].([]interface{})
|
||||
if !ok {
|
||||
framework.Logf("Hits not of the expected type: %T", hits["hits"])
|
||||
return nil
|
||||
}
|
||||
|
||||
entries := []*logEntry{}
|
||||
// Iterate over the hits and populate the observed array.
|
||||
for _, e := range h {
|
||||
l, ok := e.(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("Element of hit not of expected type: %T", e)
|
||||
continue
|
||||
}
|
||||
|
||||
source, ok := l["_source"].(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("_source not of the expected type: %T", l["_source"])
|
||||
continue
|
||||
}
|
||||
|
||||
msg, ok := source["log"].(string)
|
||||
if !ok {
|
||||
framework.Logf("Log not of the expected type: %T", source["log"])
|
||||
continue
|
||||
}
|
||||
|
||||
timestampString, ok := source["@timestamp"].(string)
|
||||
if !ok {
|
||||
framework.Logf("Timestamp not of the expected type: %T", source["@timestamp"])
|
||||
continue
|
||||
}
|
||||
timestamp, err := time.Parse(time.RFC3339, timestampString)
|
||||
if err != nil {
|
||||
framework.Logf("Timestamp was not in correct format: %s", timestampString)
|
||||
continue
|
||||
}
|
||||
|
||||
entries = append(entries, &logEntry{
|
||||
Payload: msg,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
return entries
|
||||
}
|
|
@ -18,17 +18,12 @@ package e2e
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
|
||||
|
@ -39,97 +34,26 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
|
|||
})
|
||||
|
||||
It("should check that logs from containers are ingested in GCL", func() {
|
||||
podName := "synthlogger"
|
||||
|
||||
gclLogsProvider, err := newGclLogsProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
||||
|
||||
err = gclLogsProvider.EnsureWorking()
|
||||
framework.ExpectNoError(err, "GCL is not working")
|
||||
|
||||
By("Running synthetic logger")
|
||||
createSynthLogger(f, expectedLinesCount)
|
||||
defer f.PodClient().Delete(synthLoggerPodName, &metav1.DeleteOptions{})
|
||||
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, synthLoggerPodName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName))
|
||||
pod := createLoggingPod(f, podName, 100, 1*time.Second)
|
||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
|
||||
|
||||
By("Waiting for logs to ingest")
|
||||
totalMissing := expectedLinesCount
|
||||
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||
var err error
|
||||
totalMissing, err = getMissingLinesCountGcl(f, synthLoggerPodName, expectedLinesCount)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to get missing lines count due to %v", err)
|
||||
totalMissing = expectedLinesCount
|
||||
} else if totalMissing > 0 {
|
||||
framework.Logf("Still missing %d lines", totalMissing)
|
||||
}
|
||||
err = waitForLogsIngestion(gclLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
|
||||
framework.ExpectNoError(err, "Failed to ingest logs")
|
||||
|
||||
if totalMissing == 0 {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
reportLogsFromFluentdPod(f, pod)
|
||||
}
|
||||
|
||||
if totalMissing > 0 {
|
||||
if err := reportLogsFromFluentdPod(f); err != nil {
|
||||
framework.Logf("Failed to report logs from fluentd pod due to %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Expect(totalMissing).To(Equal(0), "Some log lines are still missing")
|
||||
})
|
||||
})
|
||||
|
||||
func getMissingLinesCountGcl(f *framework.Framework, podName string, expectedCount int) (int, error) {
|
||||
gclFilter := fmt.Sprintf("resource.labels.pod_id:%s AND resource.labels.namespace_id:%s", podName, f.Namespace.Name)
|
||||
entries, err := readFilteredEntriesFromGcl(gclFilter)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
occurrences := make(map[int]int)
|
||||
for _, entry := range entries {
|
||||
lineNumber, err := strconv.Atoi(strings.TrimSpace(entry))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if lineNumber < 0 || lineNumber >= expectedCount {
|
||||
framework.Logf("Unexpected line number: %d", lineNumber)
|
||||
} else {
|
||||
// Duplicates are possible and fine, fluentd has at-least-once delivery
|
||||
occurrences[lineNumber]++
|
||||
}
|
||||
}
|
||||
|
||||
return expectedCount - len(occurrences), nil
|
||||
}
|
||||
|
||||
type LogEntry struct {
|
||||
TextPayload string
|
||||
}
|
||||
|
||||
// Since GCL API is not easily available from the outside of cluster
|
||||
// we use gcloud command to perform search with filter
|
||||
func readFilteredEntriesFromGcl(filter string) ([]string, error) {
|
||||
framework.Logf("Reading entries from GCL with filter '%v'", filter)
|
||||
argList := []string{"beta",
|
||||
"logging",
|
||||
"read",
|
||||
filter,
|
||||
"--format",
|
||||
"json",
|
||||
"--project",
|
||||
framework.TestContext.CloudConfig.ProjectID,
|
||||
}
|
||||
output, err := exec.Command("gcloud", argList...).CombinedOutput()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var entries []*LogEntry
|
||||
if err = json.Unmarshal(output, &entries); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
framework.Logf("Read %d entries from GCL", len(entries))
|
||||
|
||||
var result []string
|
||||
for _, entry := range entries {
|
||||
if entry.TextPayload != "" {
|
||||
result = append(result, entry.TextPayload)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
)
|
||||
|
||||
const (
|
||||
// TODO(crassirostris): Once test is stable, decrease allowed loses
|
||||
loadTestMaxAllowedLostFraction = 0.1
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]", func() {
|
||||
f := framework.NewDefaultFramework("gcl-logging-load")
|
||||
|
||||
It("should create a constant load with long-living pods and ensure logs delivery", func() {
|
||||
gclLogsProvider, err := newGclLogsProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
||||
|
||||
podCount := 30
|
||||
loggingDuration := 10 * time.Minute
|
||||
linesPerSecond := 1000
|
||||
linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount
|
||||
ingestionTimeout := 1 * time.Hour
|
||||
|
||||
By("Running logs generator pods")
|
||||
pods := []*loggingPod{}
|
||||
for podIdx := 0; podIdx < podCount; podIdx++ {
|
||||
podName := f.Namespace.Name + "-logs-generator-" + strconv.Itoa(linesPerPod) + "-" + strconv.Itoa(podIdx)
|
||||
pods = append(pods, createLoggingPod(f, podName, linesPerPod, loggingDuration))
|
||||
|
||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||
}
|
||||
|
||||
By("Waiting for pods to succeed")
|
||||
time.Sleep(loggingDuration)
|
||||
|
||||
By("Waiting for all log lines to be ingested")
|
||||
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
|
||||
if err != nil {
|
||||
framework.Failf("Failed to ingest logs: %v", err)
|
||||
} else {
|
||||
framework.Logf("Successfully ingested all logs")
|
||||
}
|
||||
})
|
||||
|
||||
It("should create a constant load with short-living pods and ensure logs delivery", func() {
|
||||
gclLogsProvider, err := newGclLogsProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
||||
|
||||
maxPodCount := 10
|
||||
jobDuration := 1 * time.Minute
|
||||
linesPerPodPerSecond := 10
|
||||
testDuration := 1 * time.Hour
|
||||
ingestionTimeout := 1 * time.Hour
|
||||
|
||||
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
|
||||
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
|
||||
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
|
||||
|
||||
By("Running short-living pods")
|
||||
pods := []*loggingPod{}
|
||||
for i := 0; i < podRunCount; i++ {
|
||||
podName := f.Namespace.Name + "-job-logs-generator-" +
|
||||
strconv.Itoa(maxPodCount) + "-" + strconv.Itoa(linesPerPod) + "-" + strconv.Itoa(i)
|
||||
pods = append(pods, createLoggingPod(f, podName, linesPerPod, jobDuration))
|
||||
|
||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||
|
||||
time.Sleep(podRunDelay)
|
||||
}
|
||||
|
||||
By("Waiting for the last pods to finish")
|
||||
time.Sleep(jobDuration)
|
||||
|
||||
By("Waiting for all log lines to be ingested")
|
||||
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
|
||||
if err != nil {
|
||||
framework.Failf("Failed to ingest logs: %v", err)
|
||||
} else {
|
||||
framework.Logf("Successfully ingested all logs")
|
||||
}
|
||||
})
|
||||
})
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/oauth2/google"
|
||||
gcl "google.golang.org/api/logging/v2beta1"
|
||||
)
|
||||
|
||||
const (
|
||||
// GCL doesn't support page size more than 1000
|
||||
gclPageSize = 1000
|
||||
|
||||
// If we failed to get response from GCL, it can be a random 500 or
|
||||
// quota limit exceeded. So we retry for some time in case the problem will go away.
|
||||
// Quota is enforced every 100 seconds, so we have to wait for more than
|
||||
// that to reliably get the next portion.
|
||||
queryGclRetryDelay = 10 * time.Second
|
||||
queryGclRetryTimeout = 200 * time.Second
|
||||
)
|
||||
|
||||
type gclLogsProvider struct {
|
||||
GclService *gcl.Service
|
||||
Framework *framework.Framework
|
||||
}
|
||||
|
||||
func (gclLogsProvider *gclLogsProvider) EnsureWorking() error {
|
||||
// We assume that GCL is always working
|
||||
return nil
|
||||
}
|
||||
|
||||
func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
|
||||
ctx := context.Background()
|
||||
hc, err := google.DefaultClient(ctx, gcl.CloudPlatformScope)
|
||||
gclService, err := gcl.New(hc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
provider := &gclLogsProvider{
|
||||
GclService: gclService,
|
||||
Framework: f,
|
||||
}
|
||||
return provider, nil
|
||||
}
|
||||
|
||||
// Since GCL API is not easily available from the outside of cluster
|
||||
// we use gcloud command to perform search with filter
|
||||
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
||||
filter := fmt.Sprintf("resource.labels.pod_id=%s AND resource.labels.namespace_id=%s AND timestamp>=\"%v\"",
|
||||
pod.Name, gclLogsProvider.Framework.Namespace.Name, pod.LastTimestamp.Format(time.RFC3339))
|
||||
framework.Logf("Reading entries from GCL with filter '%v'", filter)
|
||||
|
||||
response := getResponseSafe(gclLogsProvider.GclService, filter, "")
|
||||
|
||||
var entries []*logEntry
|
||||
for response != nil && len(response.Entries) > 0 {
|
||||
framework.Logf("Received %d entries from GCL", len(response.Entries))
|
||||
|
||||
for _, entry := range response.Entries {
|
||||
if entry.TextPayload == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
timestamp, parseErr := time.Parse(time.RFC3339, entry.Timestamp)
|
||||
if parseErr != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
entries = append(entries, &logEntry{
|
||||
Timestamp: timestamp,
|
||||
Payload: entry.TextPayload,
|
||||
})
|
||||
}
|
||||
|
||||
nextToken := response.NextPageToken
|
||||
if nextToken == "" {
|
||||
break
|
||||
}
|
||||
|
||||
response = getResponseSafe(gclLogsProvider.GclService, filter, response.NextPageToken)
|
||||
}
|
||||
|
||||
return entries
|
||||
}
|
||||
|
||||
func getResponseSafe(gclService *gcl.Service, filter string, pageToken string) *gcl.ListLogEntriesResponse {
|
||||
for start := time.Now(); time.Since(start) < queryGclRetryTimeout; time.Sleep(queryGclRetryDelay) {
|
||||
response, err := gclService.Entries.List(&gcl.ListLogEntriesRequest{
|
||||
ProjectIds: []string{
|
||||
framework.TestContext.CloudConfig.ProjectID,
|
||||
},
|
||||
OrderBy: "timestamp desc",
|
||||
Filter: filter,
|
||||
PageSize: int64(gclPageSize),
|
||||
PageToken: pageToken,
|
||||
}).Do()
|
||||
|
||||
if err == nil {
|
||||
return response
|
||||
}
|
||||
|
||||
framework.Logf("Failed to get response from GCL due to %v, retrying", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -19,52 +19,202 @@ package e2e
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
api_v1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
// ingestionTimeout is how long to keep retrying to wait for all the
|
||||
// logs to be ingested.
|
||||
ingestionTimeout = 10 * time.Minute
|
||||
// ingestionRetryDelay is how long test should wait between
|
||||
// two attempts to check for ingestion
|
||||
ingestionRetryDelay = 25 * time.Second
|
||||
// Duration of delay between any two attempts to check if all logs are ingested
|
||||
ingestionRetryDelay = 10 * time.Second
|
||||
|
||||
synthLoggerPodName = "synthlogger"
|
||||
// Amount of requested cores for logging container in millicores
|
||||
loggingContainerCpuRequest = 10
|
||||
|
||||
// expectedLinesCount is the number of log lines emitted (and checked) for each synthetic logging pod.
|
||||
expectedLinesCount = 100
|
||||
// Amount of requested memory for logging container in bytes
|
||||
loggingContainerMemoryRequest = 10 * 1024 * 1024
|
||||
)
|
||||
|
||||
func createSynthLogger(f *framework.Framework, linesCount int) {
|
||||
f.PodClient().Create(&v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: synthLoggerPodName,
|
||||
Namespace: f.Namespace.Name,
|
||||
// Type to track the progress of logs generating pod
|
||||
type loggingPod struct {
|
||||
// Name of the pod
|
||||
Name string
|
||||
// If we didn't read some log entries, their
|
||||
// timestamps should be no less than this timestamp.
|
||||
// Effectively, timestamp of the last ingested entry
|
||||
// for which there's no missing entry before it
|
||||
LastTimestamp time.Time
|
||||
// Cache of ingested and read entries
|
||||
Occurrences map[int]*logEntry
|
||||
// Number of lines expected to be ingested from this pod
|
||||
ExpectedLinesNumber int
|
||||
}
|
||||
|
||||
type logEntry struct {
|
||||
Payload string
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
type logsProvider interface {
|
||||
EnsureWorking() error
|
||||
ReadEntries(*loggingPod) []*logEntry
|
||||
}
|
||||
|
||||
func (entry *logEntry) getLogEntryNumber() (int, bool) {
|
||||
chunks := strings.Split(entry.Payload, " ")
|
||||
lineNumber, err := strconv.Atoi(strings.TrimSpace(chunks[0]))
|
||||
return lineNumber, err == nil
|
||||
}
|
||||
|
||||
func createLoggingPod(f *framework.Framework, podName string, totalLines int, loggingDuration time.Duration) *loggingPod {
|
||||
framework.Logf("Starting pod %s", podName)
|
||||
createLogsGeneratorPod(f, podName, totalLines, loggingDuration)
|
||||
|
||||
return &loggingPod{
|
||||
Name: podName,
|
||||
// It's used to avoid querying logs from before the pod was started
|
||||
LastTimestamp: time.Now(),
|
||||
Occurrences: make(map[int]*logEntry),
|
||||
ExpectedLinesNumber: totalLines,
|
||||
}
|
||||
}
|
||||
|
||||
func createLogsGeneratorPod(f *framework.Framework, podName string, linesCount int, duration time.Duration) {
|
||||
f.PodClient().Create(&api_v1.Pod{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Name: podName,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyOnFailure,
|
||||
Containers: []v1.Container{
|
||||
Spec: api_v1.PodSpec{
|
||||
RestartPolicy: api_v1.RestartPolicyNever,
|
||||
Containers: []api_v1.Container{
|
||||
{
|
||||
Name: synthLoggerPodName,
|
||||
Image: "gcr.io/google_containers/busybox:1.24",
|
||||
// notice: the subshell syntax is escaped with `$$`
|
||||
Command: []string{"/bin/sh", "-c", fmt.Sprintf("i=0; while [ $i -lt %d ]; do echo $i; i=`expr $i + 1`; done", linesCount)},
|
||||
Name: podName,
|
||||
Image: "gcr.io/google_containers/logs-generator:v0.1.0",
|
||||
Env: []api_v1.EnvVar{
|
||||
{
|
||||
Name: "LOGS_GENERATOR_LINES_TOTAL",
|
||||
Value: strconv.Itoa(linesCount),
|
||||
},
|
||||
{
|
||||
Name: "LOGS_GENERATOR_DURATION",
|
||||
Value: duration.String(),
|
||||
},
|
||||
},
|
||||
Resources: api_v1.ResourceRequirements{
|
||||
Requests: api_v1.ResourceList{
|
||||
api_v1.ResourceCPU: *resource.NewMilliQuantity(
|
||||
loggingContainerCpuRequest,
|
||||
resource.DecimalSI),
|
||||
api_v1.ResourceMemory: *resource.NewQuantity(
|
||||
loggingContainerMemoryRequest,
|
||||
resource.BinarySI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func reportLogsFromFluentdPod(f *framework.Framework) error {
|
||||
synthLoggerPod, err := f.PodClient().Get(synthLoggerPodName, metav1.GetOptions{})
|
||||
func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingestionTimeout time.Duration, maxAllowedLostFraction float64) error {
|
||||
expectedLinesNumber := 0
|
||||
for _, pod := range pods {
|
||||
expectedLinesNumber += pod.ExpectedLinesNumber
|
||||
}
|
||||
|
||||
totalMissing := expectedLinesNumber
|
||||
|
||||
missingByPod := make([]int, len(pods))
|
||||
for podIdx, pod := range pods {
|
||||
missingByPod[podIdx] = pod.ExpectedLinesNumber
|
||||
}
|
||||
|
||||
for start := time.Now(); totalMissing > 0 && time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||
missing := 0
|
||||
for podIdx, pod := range pods {
|
||||
if missingByPod[podIdx] == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
missingByPod[podIdx] = pullMissingLogsCount(logsProvider, pod)
|
||||
missing += missingByPod[podIdx]
|
||||
}
|
||||
|
||||
totalMissing = missing
|
||||
if totalMissing > 0 {
|
||||
framework.Logf("Still missing %d lines in total", totalMissing)
|
||||
}
|
||||
}
|
||||
|
||||
lostFraction := float64(totalMissing) / float64(expectedLinesNumber)
|
||||
|
||||
if totalMissing > 0 {
|
||||
framework.Logf("After %v still missing %d lines, %.2f%% of total number oflines",
|
||||
ingestionTimeout, totalMissing, lostFraction*100)
|
||||
}
|
||||
|
||||
if lostFraction > maxAllowedLostFraction {
|
||||
return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated",
|
||||
lostFraction*100, maxAllowedLostFraction*100)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int {
|
||||
missingOnPod, err := getMissingLinesCount(logsProvider, pod)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get synth logger pod due to %v", err)
|
||||
framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err)
|
||||
return pod.ExpectedLinesNumber
|
||||
} else if missingOnPod > 0 {
|
||||
framework.Logf("Pod %s is missing %d lines", pod.Name, missingOnPod)
|
||||
} else {
|
||||
framework.Logf("All logs from pod %s are ingested", pod.Name)
|
||||
}
|
||||
return missingOnPod
|
||||
}
|
||||
|
||||
func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) {
|
||||
entries := logsProvider.ReadEntries(pod)
|
||||
|
||||
for _, entry := range entries {
|
||||
lineNumber, ok := entry.getLogEntryNumber()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if lineNumber < 0 || lineNumber >= pod.ExpectedLinesNumber {
|
||||
framework.Logf("Unexpected line number: %d", lineNumber)
|
||||
} else {
|
||||
pod.Occurrences[lineNumber] = entry
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < pod.ExpectedLinesNumber; i++ {
|
||||
entry, ok := pod.Occurrences[i]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
if entry.Timestamp.After(pod.LastTimestamp) {
|
||||
pod.LastTimestamp = entry.Timestamp
|
||||
}
|
||||
}
|
||||
|
||||
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
|
||||
}
|
||||
|
||||
func reportLogsFromFluentdPod(f *framework.Framework, pod *loggingPod) error {
|
||||
synthLoggerPod, err := f.PodClient().Get(pod.Name, meta_v1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get synth logger pod due to %v", err)
|
||||
}
|
||||
|
||||
synthLoggerNodeName := synthLoggerPod.Spec.NodeName
|
||||
|
@ -73,20 +223,20 @@ func reportLogsFromFluentdPod(f *framework.Framework) error {
|
|||
}
|
||||
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "fluentd-logging"}))
|
||||
options := metav1.ListOptions{LabelSelector: label.String()}
|
||||
fluentdPods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options)
|
||||
options := meta_v1.ListOptions{LabelSelector: label.String()}
|
||||
fluentdPods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
|
||||
|
||||
for _, fluentdPod := range fluentdPods.Items {
|
||||
if fluentdPod.Spec.NodeName == synthLoggerNodeName {
|
||||
containerName := fluentdPod.Spec.Containers[0].Name
|
||||
logs, err := framework.GetPodLogs(f.ClientSet, metav1.NamespaceSystem, fluentdPod.Name, containerName)
|
||||
logs, err := framework.GetPodLogs(f.ClientSet, meta_v1.NamespaceSystem, fluentdPod.Name, containerName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err)
|
||||
return fmt.Errorf("failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err)
|
||||
}
|
||||
framework.Logf("Logs from fluentd pod %s:\n%s", fluentdPod.Name, logs)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("Failed to find fluentd pod running on node %s", synthLoggerNodeName)
|
||||
return fmt.Errorf("failed to find fluentd pod running on node %s", synthLoggerNodeName)
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@ Cassandra should create and scale cassandra,fabioy,1,apps,
|
|||
CassandraStatefulSet should create statefulset,wojtek-t,1,apps,
|
||||
Cluster level logging using Elasticsearch should check that logs from containers are ingested into Elasticsearch,crassirostris,0,instrumentation,
|
||||
Cluster level logging using GCL should check that logs from containers are ingested in GCL,crassirostris,0,instrumentation,
|
||||
Cluster level logging using GCL should create a constant load with long-living pods and ensure logs delivery,crassirostris,0,instrumentation,
|
||||
Cluster level logging using GCL should create a constant load with short-living pods and ensure logs delivery,crassirostris,0,instrumentation,
|
||||
Cluster size autoscaling should add node to the particular mig,spxtr,1,autoscaling,
|
||||
Cluster size autoscaling should correctly scale down after a node is not needed,pmorie,1,autoscaling,
|
||||
Cluster size autoscaling should correctly scale down after a node is not needed when there is non autoscaled pool,krousey,1,autoscaling,
|
||||
|
|
|
|
@ -16906,3 +16906,15 @@ go_test(
|
|||
library = ":k8s.io/apimachinery/pkg/api/testing",
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "google.golang.org/api/logging/v2beta1",
|
||||
srcs = ["google.golang.org/api/logging/v2beta1/logging-gen.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor:golang.org/x/net/context",
|
||||
"//vendor:golang.org/x/net/context/ctxhttp",
|
||||
"//vendor:google.golang.org/api/gensupport",
|
||||
"//vendor:google.golang.org/api/googleapi",
|
||||
],
|
||||
)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue