mirror of https://github.com/k3s-io/k3s
Merge pull request #45255 from crassirostris/sd-logging-e2e-performance-refactoring
Automatic merge from submit-queue Implement Stackdriver Logging e2e tests using PubSub Makes tests faster and mitigates Stackdriver Logging quotas issue. Fixes https://github.com/kubernetes/kubernetes/issues/47069pull/6/head
commit
012ffed6d3
|
@ -2820,6 +2820,10 @@
|
||||||
"ImportPath": "google.golang.org/api/monitoring/v3",
|
"ImportPath": "google.golang.org/api/monitoring/v3",
|
||||||
"Rev": "e3824ed33c72bf7e81da0286772c34b987520914"
|
"Rev": "e3824ed33c72bf7e81da0286772c34b987520914"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "google.golang.org/api/pubsub/v1",
|
||||||
|
"Rev": "e3824ed33c72bf7e81da0286772c34b987520914"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc",
|
"ImportPath": "google.golang.org/grpc",
|
||||||
"Comment": "v1.0.4",
|
"Comment": "v1.0.4",
|
||||||
|
|
|
@ -84430,6 +84430,41 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
================================================================================
|
================================================================================
|
||||||
|
|
||||||
|
|
||||||
|
================================================================================
|
||||||
|
= vendor/google.golang.org/api/pubsub/v1 licensed under: =
|
||||||
|
|
||||||
|
Copyright (c) 2011 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Redistribution and use in source and binary forms, with or without
|
||||||
|
modification, are permitted provided that the following conditions are
|
||||||
|
met:
|
||||||
|
|
||||||
|
* Redistributions of source code must retain the above copyright
|
||||||
|
notice, this list of conditions and the following disclaimer.
|
||||||
|
* Redistributions in binary form must reproduce the above
|
||||||
|
copyright notice, this list of conditions and the following disclaimer
|
||||||
|
in the documentation and/or other materials provided with the
|
||||||
|
distribution.
|
||||||
|
* Neither the name of Google Inc. nor the names of its
|
||||||
|
contributors may be used to endorse or promote products derived from
|
||||||
|
this software without specific prior written permission.
|
||||||
|
|
||||||
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||||
|
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||||
|
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||||
|
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||||
|
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||||
|
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||||
|
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||||
|
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||||
|
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
|
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
|
||||||
|
= vendor/google.golang.org/api/LICENSE a651bb3d8b1c412632e28823bb432b40 -
|
||||||
|
================================================================================
|
||||||
|
|
||||||
|
|
||||||
================================================================================
|
================================================================================
|
||||||
= vendor/google.golang.org/grpc licensed under: =
|
= vendor/google.golang.org/grpc licensed under: =
|
||||||
|
|
||||||
|
|
|
@ -27,10 +27,12 @@ go_library(
|
||||||
"//vendor/golang.org/x/net/context:go_default_library",
|
"//vendor/golang.org/x/net/context:go_default_library",
|
||||||
"//vendor/golang.org/x/oauth2/google:go_default_library",
|
"//vendor/golang.org/x/oauth2/google:go_default_library",
|
||||||
"//vendor/google.golang.org/api/logging/v2beta1:go_default_library",
|
"//vendor/google.golang.org/api/logging/v2beta1:go_default_library",
|
||||||
|
"//vendor/google.golang.org/api/pubsub/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/integer:go_default_library",
|
"//vendor/k8s.io/client-go/util/integer:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -39,10 +39,11 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
|
||||||
It("should check that logs from containers are ingested into Elasticsearch", func() {
|
It("should check that logs from containers are ingested into Elasticsearch", func() {
|
||||||
podName := "synthlogger"
|
podName := "synthlogger"
|
||||||
esLogsProvider, err := newEsLogsProvider(f)
|
esLogsProvider, err := newEsLogsProvider(f)
|
||||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
framework.ExpectNoError(err, "Failed to create Elasticsearch logs provider")
|
||||||
|
|
||||||
err = esLogsProvider.EnsureWorking()
|
err = esLogsProvider.Init()
|
||||||
framework.ExpectNoError(err, "Elasticsearch is not working")
|
defer esLogsProvider.Cleanup()
|
||||||
|
framework.ExpectNoError(err, "Failed to init Elasticsearch logs provider")
|
||||||
|
|
||||||
err = ensureSingleFluentdOnEachNode(f, esLogsProvider.FluentdApplicationName())
|
err = ensureSingleFluentdOnEachNode(f, esLogsProvider.FluentdApplicationName())
|
||||||
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
|
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
|
||||||
|
|
|
@ -46,12 +46,8 @@ func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
|
||||||
return &esLogsProvider{Framework: f}, nil
|
return &esLogsProvider{Framework: f}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logsProvider *esLogsProvider) FluentdApplicationName() string {
|
|
||||||
return "fluentd-es"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensures that elasticsearch is running and ready to serve requests
|
// Ensures that elasticsearch is running and ready to serve requests
|
||||||
func (logsProvider *esLogsProvider) EnsureWorking() error {
|
func (logsProvider *esLogsProvider) Init() error {
|
||||||
f := logsProvider.Framework
|
f := logsProvider.Framework
|
||||||
// Check for the existence of the Elasticsearch service.
|
// Check for the existence of the Elasticsearch service.
|
||||||
By("Checking the Elasticsearch service exists.")
|
By("Checking the Elasticsearch service exists.")
|
||||||
|
@ -157,7 +153,11 @@ func (logsProvider *esLogsProvider) EnsureWorking() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
func (logsProvider *esLogsProvider) Cleanup() {
|
||||||
|
// Nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||||
f := logsProvider.Framework
|
f := logsProvider.Framework
|
||||||
|
|
||||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||||
|
@ -202,7 +202,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
entries := []*logEntry{}
|
entries := []logEntry{}
|
||||||
// Iterate over the hits and populate the observed array.
|
// Iterate over the hits and populate the observed array.
|
||||||
for _, e := range h {
|
for _, e := range h {
|
||||||
l, ok := e.(map[string]interface{})
|
l, ok := e.(map[string]interface{})
|
||||||
|
@ -223,22 +223,12 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
timestampString, ok := source["@timestamp"].(string)
|
entries = append(entries, logEntry{Payload: msg})
|
||||||
if !ok {
|
|
||||||
framework.Logf("Timestamp not of the expected type: %T", source["@timestamp"])
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
timestamp, err := time.Parse(time.RFC3339, timestampString)
|
|
||||||
if err != nil {
|
|
||||||
framework.Logf("Timestamp was not in correct format: %s", timestampString)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
entries = append(entries, &logEntry{
|
|
||||||
Payload: msg,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return entries
|
return entries
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (logsProvider *esLogsProvider) FluentdApplicationName() string {
|
||||||
|
return "fluentd-es"
|
||||||
|
}
|
||||||
|
|
|
@ -39,8 +39,9 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
|
||||||
gclLogsProvider, err := newGclLogsProvider(f)
|
gclLogsProvider, err := newGclLogsProvider(f)
|
||||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
||||||
|
|
||||||
err = gclLogsProvider.EnsureWorking()
|
err = gclLogsProvider.Init()
|
||||||
framework.ExpectNoError(err, "GCL is not working")
|
defer gclLogsProvider.Cleanup()
|
||||||
|
framework.ExpectNoError(err, "Failed to init GCL logs provider")
|
||||||
|
|
||||||
err = ensureSingleFluentdOnEachNode(f, gclLogsProvider.FluentdApplicationName())
|
err = ensureSingleFluentdOnEachNode(f, gclLogsProvider.FluentdApplicationName())
|
||||||
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
|
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
|
||||||
|
|
|
@ -38,13 +38,17 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
|
||||||
gclLogsProvider, err := newGclLogsProvider(f)
|
gclLogsProvider, err := newGclLogsProvider(f)
|
||||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
||||||
|
|
||||||
|
err = gclLogsProvider.Init()
|
||||||
|
defer gclLogsProvider.Cleanup()
|
||||||
|
framework.ExpectNoError(err, "Failed to init GCL logs provider")
|
||||||
|
|
||||||
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
|
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
|
||||||
nodeCount := len(nodes)
|
nodeCount := len(nodes)
|
||||||
podCount := 30 * nodeCount
|
podCount := 30 * nodeCount
|
||||||
loggingDuration := 10 * time.Minute
|
loggingDuration := 10 * time.Minute
|
||||||
linesPerSecond := 1000 * nodeCount
|
linesPerSecond := 1000 * nodeCount
|
||||||
linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount
|
linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount
|
||||||
ingestionTimeout := 60 * time.Minute
|
ingestionTimeout := 20 * time.Minute
|
||||||
|
|
||||||
By("Running logs generator pods")
|
By("Running logs generator pods")
|
||||||
pods := []*loggingPod{}
|
pods := []*loggingPod{}
|
||||||
|
@ -56,9 +60,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
|
||||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
By("Waiting for pods to succeed")
|
|
||||||
time.Sleep(loggingDuration)
|
|
||||||
|
|
||||||
By("Waiting for all log lines to be ingested")
|
By("Waiting for all log lines to be ingested")
|
||||||
config := &loggingTestConfig{
|
config := &loggingTestConfig{
|
||||||
LogsProvider: gclLogsProvider,
|
LogsProvider: gclLogsProvider,
|
||||||
|
@ -79,12 +80,16 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
|
||||||
gclLogsProvider, err := newGclLogsProvider(f)
|
gclLogsProvider, err := newGclLogsProvider(f)
|
||||||
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
framework.ExpectNoError(err, "Failed to create GCL logs provider")
|
||||||
|
|
||||||
|
err = gclLogsProvider.Init()
|
||||||
|
defer gclLogsProvider.Cleanup()
|
||||||
|
framework.ExpectNoError(err, "Failed to init GCL logs provider")
|
||||||
|
|
||||||
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
|
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
|
||||||
maxPodCount := 10
|
maxPodCount := 10
|
||||||
jobDuration := 1 * time.Minute
|
jobDuration := 1 * time.Minute
|
||||||
linesPerPodPerSecond := 100
|
linesPerPodPerSecond := 100
|
||||||
testDuration := 10 * time.Minute
|
testDuration := 10 * time.Minute
|
||||||
ingestionTimeout := 60 * time.Minute
|
ingestionTimeout := 20 * time.Minute
|
||||||
|
|
||||||
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
|
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
|
||||||
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
|
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
|
||||||
|
@ -102,9 +107,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
|
||||||
time.Sleep(podRunDelay)
|
time.Sleep(podRunDelay)
|
||||||
}
|
}
|
||||||
|
|
||||||
By("Waiting for the last pods to finish")
|
|
||||||
time.Sleep(jobDuration)
|
|
||||||
|
|
||||||
By("Waiting for all log lines to be ingested")
|
By("Waiting for all log lines to be ingested")
|
||||||
config := &loggingTestConfig{
|
config := &loggingTestConfig{
|
||||||
LogsProvider: gclLogsProvider,
|
LogsProvider: gclLogsProvider,
|
||||||
|
|
|
@ -17,36 +17,46 @@ limitations under the License.
|
||||||
package e2e
|
package e2e
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"golang.org/x/oauth2/google"
|
"golang.org/x/oauth2/google"
|
||||||
gcl "google.golang.org/api/logging/v2beta1"
|
gcl "google.golang.org/api/logging/v2beta1"
|
||||||
|
pubsub "google.golang.org/api/pubsub/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// GCL doesn't support page size more than 1000
|
// The amount of time to wait before considering
|
||||||
gclPageSize = 1000
|
// Stackdriver Logging sink operational
|
||||||
|
sinkInitialDelay = 1 * time.Minute
|
||||||
|
|
||||||
// If we failed to get response from GCL, it can be a random 500 or
|
// The limit on the number of messages to pull from PubSub
|
||||||
// quota limit exceeded. So we retry for some time in case the problem will go away.
|
maxPullLogMessages = 100 * 1000
|
||||||
// Quota is enforced every 100 seconds, so we have to wait for more than
|
|
||||||
// that to reliably get the next portion.
|
// The limit on the number of messages in the cache for a pod
|
||||||
queryGclRetryDelay = 100 * time.Second
|
maxCachedMessagesPerPod = 10 * 1000
|
||||||
queryGclRetryTimeout = 250 * time.Second
|
|
||||||
|
// PubSub topic with log entries polling interval
|
||||||
|
gclLoggingPollInterval = 100 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
type gclLogsProvider struct {
|
type gclLogsProvider struct {
|
||||||
GclService *gcl.Service
|
GclService *gcl.Service
|
||||||
Framework *framework.Framework
|
PubsubService *pubsub.Service
|
||||||
}
|
Framework *framework.Framework
|
||||||
|
Topic *pubsub.Topic
|
||||||
func (gclLogsProvider *gclLogsProvider) EnsureWorking() error {
|
Subscription *pubsub.Subscription
|
||||||
// We assume that GCL is always working
|
LogSink *gcl.LogSink
|
||||||
return nil
|
LogEntryCache map[string]chan logEntry
|
||||||
|
CacheMutex *sync.Mutex
|
||||||
|
PollingStopChannel chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
|
func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
|
||||||
|
@ -57,75 +67,206 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pubsubService, err := pubsub.New(hc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
provider := &gclLogsProvider{
|
provider := &gclLogsProvider{
|
||||||
GclService: gclService,
|
GclService: gclService,
|
||||||
Framework: f,
|
PubsubService: pubsubService,
|
||||||
|
Framework: f,
|
||||||
|
LogEntryCache: map[string]chan logEntry{},
|
||||||
|
CacheMutex: &sync.Mutex{},
|
||||||
|
PollingStopChannel: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
return provider, nil
|
return provider, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) Init() error {
|
||||||
|
projectId := framework.TestContext.CloudConfig.ProjectID
|
||||||
|
nsName := gclLogsProvider.Framework.Namespace.Name
|
||||||
|
|
||||||
|
topic, err := gclLogsProvider.createPubSubTopic(projectId, nsName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create PubSub topic: %v", err)
|
||||||
|
}
|
||||||
|
gclLogsProvider.Topic = topic
|
||||||
|
|
||||||
|
subs, err := gclLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create PubSub subscription: %v", err)
|
||||||
|
}
|
||||||
|
gclLogsProvider.Subscription = subs
|
||||||
|
|
||||||
|
logSink, err := gclLogsProvider.createGclLogSink(projectId, nsName, nsName, topic.Name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
|
||||||
|
}
|
||||||
|
gclLogsProvider.LogSink = logSink
|
||||||
|
|
||||||
|
if err = gclLogsProvider.authorizeGclLogSink(); err != nil {
|
||||||
|
return fmt.Errorf("failed to authorize log sink: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
framework.Logf("Waiting for log sink to become operational")
|
||||||
|
// TODO: Replace with something more intelligent
|
||||||
|
time.Sleep(sinkInitialDelay)
|
||||||
|
|
||||||
|
go gclLogsProvider.pollLogs()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) {
|
||||||
|
topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName)
|
||||||
|
topic := &pubsub.Topic{
|
||||||
|
Name: topicFullName,
|
||||||
|
}
|
||||||
|
return gclLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) {
|
||||||
|
subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName)
|
||||||
|
subs := &pubsub.Subscription{
|
||||||
|
Name: subsFullName,
|
||||||
|
Topic: topicName,
|
||||||
|
}
|
||||||
|
return gclLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) {
|
||||||
|
projectDst := fmt.Sprintf("projects/%s", projectId)
|
||||||
|
filter := fmt.Sprintf("resource.labels.namespace_id=%s AND resource.labels.container_name=%s", nsName, loggingContainerName)
|
||||||
|
sink := &gcl.LogSink{
|
||||||
|
Name: sinkName,
|
||||||
|
Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
|
||||||
|
Filter: filter,
|
||||||
|
}
|
||||||
|
return gclLogsProvider.GclService.Projects.Sinks.Create(projectDst, sink).Do()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) authorizeGclLogSink() error {
|
||||||
|
topicsService := gclLogsProvider.PubsubService.Projects.Topics
|
||||||
|
policy, err := topicsService.GetIamPolicy(gclLogsProvider.Topic.Name).Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
binding := &pubsub.Binding{
|
||||||
|
Role: "roles/pubsub.publisher",
|
||||||
|
Members: []string{gclLogsProvider.LogSink.WriterIdentity},
|
||||||
|
}
|
||||||
|
policy.Bindings = append(policy.Bindings, binding)
|
||||||
|
req := &pubsub.SetIamPolicyRequest{Policy: policy}
|
||||||
|
if _, err = topicsService.SetIamPolicy(gclLogsProvider.Topic.Name, req).Do(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) pollLogs() {
|
||||||
|
wait.PollUntil(gclLoggingPollInterval, func() (bool, error) {
|
||||||
|
subsName := gclLogsProvider.Subscription.Name
|
||||||
|
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions
|
||||||
|
req := &pubsub.PullRequest{
|
||||||
|
ReturnImmediately: true,
|
||||||
|
MaxMessages: maxPullLogMessages,
|
||||||
|
}
|
||||||
|
resp, err := subsService.Pull(subsName, req).Do()
|
||||||
|
if err != nil {
|
||||||
|
framework.Logf("Failed to pull messaged from PubSub due to %v", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ids := []string{}
|
||||||
|
for _, msg := range resp.ReceivedMessages {
|
||||||
|
ids = append(ids, msg.AckId)
|
||||||
|
|
||||||
|
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
|
||||||
|
if err != nil {
|
||||||
|
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var gclLogEntry gcl.LogEntry
|
||||||
|
if err := json.Unmarshal(logEntryEncoded, &gclLogEntry); err != nil {
|
||||||
|
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
podName := gclLogEntry.Resource.Labels["pod_id"]
|
||||||
|
ch := gclLogsProvider.getCacheChannel(podName)
|
||||||
|
ch <- logEntry{Payload: gclLogEntry.TextPayload}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ids) > 0 {
|
||||||
|
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
|
||||||
|
if _, err = subsService.Acknowledge(subsName, ackReq).Do(); err != nil {
|
||||||
|
framework.Logf("Failed to ack: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}, gclLogsProvider.PollingStopChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) Cleanup() {
|
||||||
|
gclLogsProvider.PollingStopChannel <- struct{}{}
|
||||||
|
|
||||||
|
if gclLogsProvider.LogSink != nil {
|
||||||
|
projectId := framework.TestContext.CloudConfig.ProjectID
|
||||||
|
sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, gclLogsProvider.LogSink.Name)
|
||||||
|
sinksService := gclLogsProvider.GclService.Projects.Sinks
|
||||||
|
if _, err := sinksService.Delete(sinkNameId).Do(); err != nil {
|
||||||
|
framework.Logf("Failed to delete LogSink: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if gclLogsProvider.Subscription != nil {
|
||||||
|
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions
|
||||||
|
if _, err := subsService.Delete(gclLogsProvider.Subscription.Name).Do(); err != nil {
|
||||||
|
framework.Logf("Failed to delete PubSub subscription: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if gclLogsProvider.Topic != nil {
|
||||||
|
topicsService := gclLogsProvider.PubsubService.Projects.Topics
|
||||||
|
if _, err := topicsService.Delete(gclLogsProvider.Topic.Name).Do(); err != nil {
|
||||||
|
framework.Logf("Failed to delete PubSub topic: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||||
|
var entries []logEntry
|
||||||
|
ch := gclLogsProvider.getCacheChannel(pod.Name)
|
||||||
|
polling_loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case entry := <-ch:
|
||||||
|
entries = append(entries, entry)
|
||||||
|
default:
|
||||||
|
break polling_loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return entries
|
||||||
|
}
|
||||||
|
|
||||||
func (logsProvider *gclLogsProvider) FluentdApplicationName() string {
|
func (logsProvider *gclLogsProvider) FluentdApplicationName() string {
|
||||||
return "fluentd-gcp"
|
return "fluentd-gcp"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since GCL API is not easily available from the outside of cluster
|
func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry {
|
||||||
// we use gcloud command to perform search with filter
|
gclLogsProvider.CacheMutex.Lock()
|
||||||
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
defer gclLogsProvider.CacheMutex.Unlock()
|
||||||
filter := fmt.Sprintf("resource.labels.pod_id=%s AND resource.labels.namespace_id=%s AND timestamp>=\"%v\"",
|
|
||||||
pod.Name, gclLogsProvider.Framework.Namespace.Name, pod.LastTimestamp.Format(time.RFC3339))
|
|
||||||
framework.Logf("Reading entries from GCL with filter '%v'", filter)
|
|
||||||
|
|
||||||
response := getResponseSafe(gclLogsProvider.GclService, filter, "")
|
if ch, ok := gclLogsProvider.LogEntryCache[podName]; ok {
|
||||||
|
return ch
|
||||||
var entries []*logEntry
|
|
||||||
for response != nil && len(response.Entries) > 0 {
|
|
||||||
framework.Logf("Received %d entries from GCL", len(response.Entries))
|
|
||||||
|
|
||||||
for _, entry := range response.Entries {
|
|
||||||
if entry.TextPayload == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
timestamp, parseErr := time.Parse(time.RFC3339, entry.Timestamp)
|
|
||||||
if parseErr != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
entries = append(entries, &logEntry{
|
|
||||||
Timestamp: timestamp,
|
|
||||||
Payload: entry.TextPayload,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
nextToken := response.NextPageToken
|
|
||||||
if nextToken == "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
response = getResponseSafe(gclLogsProvider.GclService, filter, response.NextPageToken)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return entries
|
newCh := make(chan logEntry, maxCachedMessagesPerPod)
|
||||||
}
|
gclLogsProvider.LogEntryCache[podName] = newCh
|
||||||
|
return newCh
|
||||||
func getResponseSafe(gclService *gcl.Service, filter string, pageToken string) *gcl.ListLogEntriesResponse {
|
|
||||||
for start := time.Now(); time.Since(start) < queryGclRetryTimeout; time.Sleep(queryGclRetryDelay) {
|
|
||||||
response, err := gclService.Entries.List(&gcl.ListLogEntriesRequest{
|
|
||||||
ProjectIds: []string{
|
|
||||||
framework.TestContext.CloudConfig.ProjectID,
|
|
||||||
},
|
|
||||||
OrderBy: "timestamp desc",
|
|
||||||
Filter: filter,
|
|
||||||
PageSize: int64(gclPageSize),
|
|
||||||
PageToken: pageToken,
|
|
||||||
}).Do()
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
return response
|
|
||||||
}
|
|
||||||
|
|
||||||
framework.Logf("Failed to get response from GCL due to %v, retrying", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,13 +33,16 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Duration of delay between any two attempts to check if all logs are ingested
|
// Duration of delay between any two attempts to check if all logs are ingested
|
||||||
ingestionRetryDelay = 100 * time.Second
|
ingestionRetryDelay = 30 * time.Second
|
||||||
|
|
||||||
// Amount of requested cores for logging container in millicores
|
// Amount of requested cores for logging container in millicores
|
||||||
loggingContainerCpuRequest = 10
|
loggingContainerCpuRequest = 10
|
||||||
|
|
||||||
// Amount of requested memory for logging container in bytes
|
// Amount of requested memory for logging container in bytes
|
||||||
loggingContainerMemoryRequest = 10 * 1024 * 1024
|
loggingContainerMemoryRequest = 10 * 1024 * 1024
|
||||||
|
|
||||||
|
// Name of the container used for logging tests
|
||||||
|
loggingContainerName = "logging-container"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -51,26 +54,21 @@ var (
|
||||||
type loggingPod struct {
|
type loggingPod struct {
|
||||||
// Name of the pod
|
// Name of the pod
|
||||||
Name string
|
Name string
|
||||||
// If we didn't read some log entries, their
|
|
||||||
// timestamps should be no less than this timestamp.
|
|
||||||
// Effectively, timestamp of the last ingested entry
|
|
||||||
// for which there's no missing entry before it
|
|
||||||
LastTimestamp time.Time
|
|
||||||
// Cache of ingested and read entries
|
// Cache of ingested and read entries
|
||||||
Occurrences map[int]*logEntry
|
Occurrences map[int]logEntry
|
||||||
// Number of lines expected to be ingested from this pod
|
// Number of lines expected to be ingested from this pod
|
||||||
ExpectedLinesNumber int
|
ExpectedLinesNumber int
|
||||||
}
|
}
|
||||||
|
|
||||||
type logEntry struct {
|
type logEntry struct {
|
||||||
Payload string
|
Payload string
|
||||||
Timestamp time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type logsProvider interface {
|
type logsProvider interface {
|
||||||
|
Init() error
|
||||||
|
Cleanup()
|
||||||
|
ReadEntries(*loggingPod) []logEntry
|
||||||
FluentdApplicationName() string
|
FluentdApplicationName() string
|
||||||
EnsureWorking() error
|
|
||||||
ReadEntries(*loggingPod) []*logEntry
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type loggingTestConfig struct {
|
type loggingTestConfig struct {
|
||||||
|
@ -81,7 +79,7 @@ type loggingTestConfig struct {
|
||||||
MaxAllowedFluentdRestarts int
|
MaxAllowedFluentdRestarts int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (entry *logEntry) getLogEntryNumber() (int, bool) {
|
func (entry logEntry) getLogEntryNumber() (int, bool) {
|
||||||
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload)
|
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload)
|
||||||
if submatch == nil || len(submatch) < 2 {
|
if submatch == nil || len(submatch) < 2 {
|
||||||
return 0, false
|
return 0, false
|
||||||
|
@ -96,10 +94,8 @@ func createLoggingPod(f *framework.Framework, podName string, nodeName string, t
|
||||||
createLogsGeneratorPod(f, podName, nodeName, totalLines, loggingDuration)
|
createLogsGeneratorPod(f, podName, nodeName, totalLines, loggingDuration)
|
||||||
|
|
||||||
return &loggingPod{
|
return &loggingPod{
|
||||||
Name: podName,
|
Name: podName,
|
||||||
// It's used to avoid querying logs from before the pod was started
|
Occurrences: make(map[int]logEntry),
|
||||||
LastTimestamp: time.Now(),
|
|
||||||
Occurrences: make(map[int]*logEntry),
|
|
||||||
ExpectedLinesNumber: totalLines,
|
ExpectedLinesNumber: totalLines,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -113,7 +109,7 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str
|
||||||
RestartPolicy: api_v1.RestartPolicyNever,
|
RestartPolicy: api_v1.RestartPolicyNever,
|
||||||
Containers: []api_v1.Container{
|
Containers: []api_v1.Container{
|
||||||
{
|
{
|
||||||
Name: podName,
|
Name: loggingContainerName,
|
||||||
Image: "gcr.io/google_containers/logs-generator:v0.1.0",
|
Image: "gcr.io/google_containers/logs-generator:v0.1.0",
|
||||||
Env: []api_v1.EnvVar{
|
Env: []api_v1.EnvVar{
|
||||||
{
|
{
|
||||||
|
@ -146,7 +142,7 @@ func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error {
|
||||||
podHasIngestedLogs := make([]bool, len(config.Pods))
|
podHasIngestedLogs := make([]bool, len(config.Pods))
|
||||||
podWithIngestedLogsCount := 0
|
podWithIngestedLogsCount := 0
|
||||||
|
|
||||||
for start := time.Now(); podWithIngestedLogsCount < len(config.Pods) && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
|
for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||||
for podIdx, pod := range config.Pods {
|
for podIdx, pod := range config.Pods {
|
||||||
if podHasIngestedLogs[podIdx] {
|
if podHasIngestedLogs[podIdx] {
|
||||||
continue
|
continue
|
||||||
|
@ -167,6 +163,10 @@ func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if podWithIngestedLogsCount == len(config.Pods) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if podWithIngestedLogsCount < len(config.Pods) {
|
if podWithIngestedLogsCount < len(config.Pods) {
|
||||||
|
@ -189,7 +189,7 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig)
|
||||||
missingByPod[podIdx] = pod.ExpectedLinesNumber
|
missingByPod[podIdx] = pod.ExpectedLinesNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
for start := time.Now(); totalMissing > 0 && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
|
for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||||
missing := 0
|
missing := 0
|
||||||
for podIdx, pod := range config.Pods {
|
for podIdx, pod := range config.Pods {
|
||||||
if missingByPod[podIdx] == 0 {
|
if missingByPod[podIdx] == 0 {
|
||||||
|
@ -203,6 +203,8 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig)
|
||||||
totalMissing = missing
|
totalMissing = missing
|
||||||
if totalMissing > 0 {
|
if totalMissing > 0 {
|
||||||
framework.Logf("Still missing %d lines in total", totalMissing)
|
framework.Logf("Still missing %d lines in total", totalMissing)
|
||||||
|
} else {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,19 +247,14 @@ func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err)
|
framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err)
|
||||||
return pod.ExpectedLinesNumber
|
return pod.ExpectedLinesNumber
|
||||||
} else if missingOnPod > 0 {
|
|
||||||
framework.Logf("Pod %s is missing %d lines", pod.Name, missingOnPod)
|
|
||||||
} else {
|
|
||||||
framework.Logf("All logs from pod %s are ingested", pod.Name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return missingOnPod
|
return missingOnPod
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) {
|
func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) {
|
||||||
entries := logsProvider.ReadEntries(pod)
|
entries := logsProvider.ReadEntries(pod)
|
||||||
|
|
||||||
framework.Logf("Got %d entries from provider", len(entries))
|
|
||||||
|
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
lineNumber, ok := entry.getLogEntryNumber()
|
lineNumber, ok := entry.getLogEntryNumber()
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -271,17 +268,6 @@ func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, erro
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < pod.ExpectedLinesNumber; i++ {
|
|
||||||
entry, ok := pod.Occurrences[i]
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if entry.Timestamp.After(pod.LastTimestamp) {
|
|
||||||
pod.LastTimestamp = entry.Timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
|
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -362,6 +362,7 @@ filegroup(
|
||||||
"//vendor/google.golang.org/api/googleapi:all-srcs",
|
"//vendor/google.golang.org/api/googleapi:all-srcs",
|
||||||
"//vendor/google.golang.org/api/logging/v2beta1:all-srcs",
|
"//vendor/google.golang.org/api/logging/v2beta1:all-srcs",
|
||||||
"//vendor/google.golang.org/api/monitoring/v3:all-srcs",
|
"//vendor/google.golang.org/api/monitoring/v3:all-srcs",
|
||||||
|
"//vendor/google.golang.org/api/pubsub/v1:all-srcs",
|
||||||
"//vendor/google.golang.org/grpc:all-srcs",
|
"//vendor/google.golang.org/grpc:all-srcs",
|
||||||
"//vendor/gopkg.in/gcfg.v1:all-srcs",
|
"//vendor/gopkg.in/gcfg.v1:all-srcs",
|
||||||
"//vendor/gopkg.in/inf.v0:all-srcs",
|
"//vendor/gopkg.in/inf.v0:all-srcs",
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
package(default_visibility = ["//visibility:public"])
|
||||||
|
|
||||||
|
licenses(["notice"])
|
||||||
|
|
||||||
|
load(
|
||||||
|
"@io_bazel_rules_go//go:def.bzl",
|
||||||
|
"go_library",
|
||||||
|
)
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["pubsub-gen.go"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
deps = [
|
||||||
|
"//vendor/golang.org/x/net/context:go_default_library",
|
||||||
|
"//vendor/golang.org/x/net/context/ctxhttp:go_default_library",
|
||||||
|
"//vendor/google.golang.org/api/gensupport:go_default_library",
|
||||||
|
"//vendor/google.golang.org/api/googleapi:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
)
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue