Merge pull request #47793 from crassirostris/sd-logging-e2e-convert-to-soak

Automatic merge from submit-queue (batch tested with PRs 47784, 47793, 48334, 48435, 48354)

Convert Stackdriver Logging load e2e tests to soak tests

Instead of loading logging mechanism for 10 minutes, load for 21 hours to detect regressions that require some time to build up.

Made possible by switching to pub/sub. Only merge after corresponding test suites have appropriate timeouts: https://github.com/kubernetes/test-infra/pull/3119

/cc @piosz @fgrzadkowski
pull/6/head
Kubernetes Submit Queue 2017-07-03 10:41:49 -07:00 committed by GitHub
commit e06b53d8a6
7 changed files with 250 additions and 235 deletions

View File

@ -14,7 +14,7 @@ go_library(
"es_utils.go", "es_utils.go",
"sd.go", "sd.go",
"sd_events.go", "sd_events.go",
"sd_load.go", "sd_soak.go",
"sd_utils.go", "sd_utils.go",
"utils.go", "utils.go",
], ],

View File

@ -26,24 +26,24 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
) )
var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver", func() {
f := framework.NewDefaultFramework("gcl-logging") f := framework.NewDefaultFramework("sd-logging")
BeforeEach(func() { BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke") framework.SkipUnlessProviderIs("gce", "gke")
}) })
It("should check that logs from containers are ingested in GCL", func() { It("should ingest logs from applications", func() {
podName := "synthlogger" podName := "synthlogger"
gclLogsProvider, err := newGclLogsProvider(f) sdLogsProvider, err := newSdLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider") framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
err = gclLogsProvider.Init() err = sdLogsProvider.Init()
defer gclLogsProvider.Cleanup() defer sdLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider") framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
err = ensureSingleFluentdOnEachNode(f, gclLogsProvider.FluentdApplicationName()) err = ensureSingleFluentdOnEachNode(f, sdLogsProvider.FluentdApplicationName())
framework.ExpectNoError(err, "Fluentd deployed incorrectly") framework.ExpectNoError(err, "Fluentd deployed incorrectly")
By("Running synthetic logger") By("Running synthetic logger")
@ -54,7 +54,7 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
By("Waiting for logs to ingest") By("Waiting for logs to ingest")
config := &loggingTestConfig{ config := &loggingTestConfig{
LogsProvider: gclLogsProvider, LogsProvider: sdLogsProvider,
Pods: []*loggingPod{pod}, Pods: []*loggingPod{pod},
IngestionTimeout: 10 * time.Minute, IngestionTimeout: 10 * time.Minute,
MaxAllowedLostFraction: 0, MaxAllowedLostFraction: 0,

View File

@ -40,26 +40,26 @@ const (
eventCreationInterval = 10 * time.Second eventCreationInterval = 10 * time.Second
) )
var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver", func() {
f := framework.NewDefaultFramework("gcl-logging-events") f := framework.NewDefaultFramework("sd-logging-events")
BeforeEach(func() { BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke") framework.SkipUnlessProviderIs("gce", "gke")
}) })
It("should ingest events", func() { It("should ingest events", func() {
gclLogsProvider, err := newGclLogsProvider(f) sdLogsProvider, err := newSdLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider") framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
err = gclLogsProvider.Init() err = sdLogsProvider.Init()
defer gclLogsProvider.Cleanup() defer sdLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider") framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
stopCh := make(chan struct{}) stopCh := make(chan struct{})
successCh := make(chan struct{}) successCh := make(chan struct{})
go func() { go func() {
wait.Poll(eventPollingInterval, eventsIngestionTimeout, func() (bool, error) { wait.Poll(eventPollingInterval, eventsIngestionTimeout, func() (bool, error) {
events := gclLogsProvider.ReadEvents() events := sdLogsProvider.ReadEvents()
if len(events) > 0 { if len(events) > 0 {
framework.Logf("Some events are ingested, sample event: %v", events[0]) framework.Logf("Some events are ingested, sample event: %v", events[0])
close(successCh) close(successCh)

View File

@ -1,125 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
)
const (
loadTestMaxAllowedLostFraction = 0.01
loadTestMaxAllowedFluentdRestarts = 1
)
var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:StackdriverLogging]", func() {
f := framework.NewDefaultFramework("gcl-logging-load")
It("should create a constant load with long-living pods and ensure logs delivery", func() {
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
nodeCount := len(nodes)
podCount := 30 * nodeCount
loggingDuration := 10 * time.Minute
linesPerSecond := 1000 * nodeCount
linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount
ingestionTimeout := 20 * time.Minute
By("Running logs generator pods")
pods := []*loggingPod{}
for podIdx := 0; podIdx < podCount; podIdx++ {
node := nodes[podIdx%len(nodes)]
podName := fmt.Sprintf("logs-generator-%d-%d", linesPerPod, podIdx)
pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, loggingDuration))
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
}
By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
Pods: pods,
IngestionTimeout: ingestionTimeout,
MaxAllowedLostFraction: loadTestMaxAllowedLostFraction,
MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts,
}
err = waitForFullLogsIngestion(f, config)
if err != nil {
framework.Failf("Failed to ingest logs: %v", err)
} else {
framework.Logf("Successfully ingested all logs")
}
})
It("should create a constant load with short-living pods and ensure logs delivery", func() {
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
maxPodCount := 10
jobDuration := 1 * time.Minute
linesPerPodPerSecond := 100
testDuration := 10 * time.Minute
ingestionTimeout := 20 * time.Minute
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
By("Running short-living pods")
pods := []*loggingPod{}
for runIdx := 0; runIdx < podRunCount; runIdx++ {
for nodeIdx, node := range nodes {
podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx)
pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, jobDuration))
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
}
time.Sleep(podRunDelay)
}
By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
Pods: pods,
IngestionTimeout: ingestionTimeout,
MaxAllowedLostFraction: loadTestMaxAllowedLostFraction,
MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts,
}
err = waitForFullLogsIngestion(f, config)
if err != nil {
framework.Failf("Failed to ingest logs: %v", err)
} else {
framework.Logf("Successfully ingested all logs")
}
})
})

View File

@ -0,0 +1,90 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"math"
"time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
)
const (
// maxAllowedLostFraction is the fraction of lost logs considered acceptable.
maxAllowedLostFraction = 0.01
// maxAllowedRestartsPerHour is the number of fluentd container restarts
// considered acceptable. Once per hour is fine for now, as long as it
// doesn't loose too much logs.
maxAllowedRestartsPerHour = 1.0
)
var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() {
f := framework.NewDefaultFramework("sd-logging-load")
It("should ingest logs from applications running for a prolonged amount of time", func() {
sdLogsProvider, err := newSdLogsProvider(f)
framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
err = sdLogsProvider.Init()
defer sdLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
maxPodCount := 10
jobDuration := 1 * time.Hour
linesPerPodPerSecond := 100
testDuration := 21 * time.Hour
ingestionTimeout := testDuration + 30*time.Minute
allowedRestarts := int(math.Ceil(float64(testDuration) /
float64(time.Hour) * maxAllowedRestartsPerHour))
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
By("Running short-living pods")
pods := []*loggingPod{}
for runIdx := 0; runIdx < podRunCount; runIdx++ {
for nodeIdx, node := range nodes {
podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx)
pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, jobDuration))
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
}
time.Sleep(podRunDelay)
}
By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{
LogsProvider: sdLogsProvider,
Pods: pods,
IngestionTimeout: ingestionTimeout,
MaxAllowedLostFraction: maxAllowedLostFraction,
MaxAllowedFluentdRestarts: allowedRestarts,
}
err = waitForFullLogsIngestion(f, config)
if err != nil {
framework.Failf("Failed to ingest logs: %v", err)
} else {
framework.Logf("Successfully ingested all logs")
}
})
})

View File

@ -28,14 +28,14 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
gcl "google.golang.org/api/logging/v2beta1" sd "google.golang.org/api/logging/v2beta1"
pubsub "google.golang.org/api/pubsub/v1" pubsub "google.golang.org/api/pubsub/v1"
) )
const ( const (
// The amount of time to wait before considering // The amount of time to wait for Stackdriver Logging
// Stackdriver Logging sink operational // sink to become operational
sinkInitialDelay = 1 * time.Minute sinkStartupTimeout = 10 * time.Minute
// The limit on the number of messages to pull from PubSub // The limit on the number of messages to pull from PubSub
maxPullLogMessages = 100 * 1000 maxPullLogMessages = 100 * 1000
@ -44,26 +44,26 @@ const (
maxCacheSize = 10 * 1000 maxCacheSize = 10 * 1000
// PubSub topic with log entries polling interval // PubSub topic with log entries polling interval
gclLoggingPollInterval = 100 * time.Millisecond sdLoggingPollInterval = 100 * time.Millisecond
) )
type gclLogsProvider struct { type sdLogsProvider struct {
GclService *gcl.Service SdService *sd.Service
PubsubService *pubsub.Service PubsubService *pubsub.Service
Framework *framework.Framework Framework *framework.Framework
Topic *pubsub.Topic Topic *pubsub.Topic
Subscription *pubsub.Subscription Subscription *pubsub.Subscription
LogSink *gcl.LogSink LogSink *sd.LogSink
LogEntryCache map[string]chan logEntry LogEntryCache map[string]chan logEntry
EventCache chan map[string]interface{} EventCache chan map[string]interface{}
CacheMutex *sync.Mutex CacheMutex *sync.Mutex
PollingStopChannel chan struct{} PollingStopChannel chan struct{}
} }
func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { func newSdLogsProvider(f *framework.Framework) (*sdLogsProvider, error) {
ctx := context.Background() ctx := context.Background()
hc, err := google.DefaultClient(ctx, gcl.CloudPlatformScope) hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope)
gclService, err := gcl.New(hc) sdService, err := sd.New(hc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -73,8 +73,8 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
return nil, err return nil, err
} }
provider := &gclLogsProvider{ provider := &sdLogsProvider{
GclService: gclService, SdService: sdService,
PubsubService: pubsubService, PubsubService: pubsubService,
Framework: f, Framework: f,
LogEntryCache: map[string]chan logEntry{}, LogEntryCache: map[string]chan logEntry{},
@ -85,131 +85,144 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
return provider, nil return provider, nil
} }
func (gclLogsProvider *gclLogsProvider) Init() error { func (sdLogsProvider *sdLogsProvider) Init() error {
projectId := framework.TestContext.CloudConfig.ProjectID projectId := framework.TestContext.CloudConfig.ProjectID
nsName := gclLogsProvider.Framework.Namespace.Name nsName := sdLogsProvider.Framework.Namespace.Name
topic, err := gclLogsProvider.createPubSubTopic(projectId, nsName) topic, err := sdLogsProvider.createPubSubTopic(projectId, nsName)
if err != nil { if err != nil {
return fmt.Errorf("failed to create PubSub topic: %v", err) return fmt.Errorf("failed to create PubSub topic: %v", err)
} }
gclLogsProvider.Topic = topic sdLogsProvider.Topic = topic
subs, err := gclLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name) subs, err := sdLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name)
if err != nil { if err != nil {
return fmt.Errorf("failed to create PubSub subscription: %v", err) return fmt.Errorf("failed to create PubSub subscription: %v", err)
} }
gclLogsProvider.Subscription = subs sdLogsProvider.Subscription = subs
logSink, err := gclLogsProvider.createGclLogSink(projectId, nsName, nsName, topic.Name) logSink, err := sdLogsProvider.createSink(projectId, nsName, nsName, topic.Name)
if err != nil { if err != nil {
return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err) return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
} }
gclLogsProvider.LogSink = logSink sdLogsProvider.LogSink = logSink
if err = gclLogsProvider.authorizeGclLogSink(); err != nil { if err = sdLogsProvider.authorizeSink(); err != nil {
return fmt.Errorf("failed to authorize log sink: %v", err) return fmt.Errorf("failed to authorize log sink: %v", err)
} }
framework.Logf("Waiting for log sink to become operational") if err = sdLogsProvider.waitSinkInit(); err != nil {
// TODO: Replace with something more intelligent return fmt.Errorf("failed to wait for sink to become operational: %v", err)
time.Sleep(sinkInitialDelay) }
go gclLogsProvider.pollLogs() go sdLogsProvider.pollLogs()
return nil return nil
} }
func (gclLogsProvider *gclLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) { func (sdLogsProvider *sdLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) {
topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName) topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName)
topic := &pubsub.Topic{ topic := &pubsub.Topic{
Name: topicFullName, Name: topicFullName,
} }
return gclLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do() return sdLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do()
} }
func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) { func (sdLogsProvider *sdLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) {
subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName) subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName)
subs := &pubsub.Subscription{ subs := &pubsub.Subscription{
Name: subsFullName, Name: subsFullName,
Topic: topicName, Topic: topicName,
} }
return gclLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do() return sdLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
} }
func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) { func (sdLogsProvider *sdLogsProvider) createSink(projectId, nsName, sinkName, topicName string) (*sd.LogSink, error) {
projectDst := fmt.Sprintf("projects/%s", projectId) projectDst := fmt.Sprintf("projects/%s", projectId)
filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+ filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+
"(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName) "(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName)
framework.Logf("Using the following filter for entries: %s", filter) framework.Logf("Using the following filter for entries: %s", filter)
sink := &gcl.LogSink{ sink := &sd.LogSink{
Name: sinkName, Name: sinkName,
Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName), Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
Filter: filter, Filter: filter,
} }
return gclLogsProvider.GclService.Projects.Sinks.Create(projectDst, sink).Do() return sdLogsProvider.SdService.Projects.Sinks.Create(projectDst, sink).Do()
} }
func (gclLogsProvider *gclLogsProvider) authorizeGclLogSink() error { func (sdLogsProvider *sdLogsProvider) authorizeSink() error {
topicsService := gclLogsProvider.PubsubService.Projects.Topics topicsService := sdLogsProvider.PubsubService.Projects.Topics
policy, err := topicsService.GetIamPolicy(gclLogsProvider.Topic.Name).Do() policy, err := topicsService.GetIamPolicy(sdLogsProvider.Topic.Name).Do()
if err != nil { if err != nil {
return err return err
} }
binding := &pubsub.Binding{ binding := &pubsub.Binding{
Role: "roles/pubsub.publisher", Role: "roles/pubsub.publisher",
Members: []string{gclLogsProvider.LogSink.WriterIdentity}, Members: []string{sdLogsProvider.LogSink.WriterIdentity},
} }
policy.Bindings = append(policy.Bindings, binding) policy.Bindings = append(policy.Bindings, binding)
req := &pubsub.SetIamPolicyRequest{Policy: policy} req := &pubsub.SetIamPolicyRequest{Policy: policy}
if _, err = topicsService.SetIamPolicy(gclLogsProvider.Topic.Name, req).Do(); err != nil { if _, err = topicsService.SetIamPolicy(sdLogsProvider.Topic.Name, req).Do(); err != nil {
return err return err
} }
return nil return nil
} }
func (gclLogsProvider *gclLogsProvider) pollLogs() { func (sdLogsProvider *sdLogsProvider) waitSinkInit() error {
wait.PollUntil(gclLoggingPollInterval, func() (bool, error) { framework.Logf("Waiting for log sink to become operational")
subsName := gclLogsProvider.Subscription.Name return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) {
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions err := publish(sdLogsProvider.PubsubService, sdLogsProvider.Topic, "embrace eternity")
req := &pubsub.PullRequest{
ReturnImmediately: true,
MaxMessages: maxPullLogMessages,
}
resp, err := subsService.Pull(subsName, req).Do()
if err != nil { if err != nil {
framework.Logf("Failed to pull messaged from PubSub due to %v", err) framework.Logf("Failed to push message to PubSub due to %v", err)
}
messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription)
if err != nil {
framework.Logf("Failed to pull messages from PubSub due to %v", err)
return false, nil
}
if len(messages) > 0 {
framework.Logf("Sink %s is operational", sdLogsProvider.LogSink.Name)
return true, nil
}
return false, nil
})
}
func (sdLogsProvider *sdLogsProvider) pollLogs() {
wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription)
if err != nil {
framework.Logf("Failed to pull messages from PubSub due to %v", err)
return false, nil return false, nil
} }
ids := []string{} for _, msg := range messages {
for _, msg := range resp.ReceivedMessages {
ids = append(ids, msg.AckId)
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
if err != nil { if err != nil {
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
continue continue
} }
var gclLogEntry gcl.LogEntry var sdLogEntry sd.LogEntry
if err := json.Unmarshal(logEntryEncoded, &gclLogEntry); err != nil { if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
continue continue
} }
switch gclLogEntry.Resource.Type { switch sdLogEntry.Resource.Type {
case "container": case "container":
podName := gclLogEntry.Resource.Labels["pod_id"] podName := sdLogEntry.Resource.Labels["pod_id"]
ch := gclLogsProvider.getCacheChannel(podName) ch := sdLogsProvider.getCacheChannel(podName)
ch <- logEntry{Payload: gclLogEntry.TextPayload} ch <- logEntry{Payload: sdLogEntry.TextPayload}
break break
case "gke_cluster": case "gke_cluster":
jsonPayloadRaw, err := gclLogEntry.JsonPayload.MarshalJSON() jsonPayloadRaw, err := sdLogEntry.JsonPayload.MarshalJSON()
if err != nil { if err != nil {
framework.Logf("Failed to get jsonPayload from LogEntry %v", gclLogEntry) framework.Logf("Failed to get jsonPayload from LogEntry %v", sdLogEntry)
break break
} }
var eventObject map[string]interface{} var eventObject map[string]interface{}
@ -218,55 +231,48 @@ func (gclLogsProvider *gclLogsProvider) pollLogs() {
framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:])) framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:]))
break break
} }
gclLogsProvider.EventCache <- eventObject sdLogsProvider.EventCache <- eventObject
break break
default: default:
framework.Logf("Received LogEntry with unexpected resource type: %s", gclLogEntry.Resource.Type) framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
break break
} }
} }
if len(ids) > 0 {
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
if _, err = subsService.Acknowledge(subsName, ackReq).Do(); err != nil {
framework.Logf("Failed to ack: %v", err)
}
}
return false, nil return false, nil
}, gclLogsProvider.PollingStopChannel) }, sdLogsProvider.PollingStopChannel)
} }
func (gclLogsProvider *gclLogsProvider) Cleanup() { func (sdLogsProvider *sdLogsProvider) Cleanup() {
gclLogsProvider.PollingStopChannel <- struct{}{} sdLogsProvider.PollingStopChannel <- struct{}{}
if gclLogsProvider.LogSink != nil { if sdLogsProvider.LogSink != nil {
projectId := framework.TestContext.CloudConfig.ProjectID projectId := framework.TestContext.CloudConfig.ProjectID
sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, gclLogsProvider.LogSink.Name) sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, sdLogsProvider.LogSink.Name)
sinksService := gclLogsProvider.GclService.Projects.Sinks sinksService := sdLogsProvider.SdService.Projects.Sinks
if _, err := sinksService.Delete(sinkNameId).Do(); err != nil { if _, err := sinksService.Delete(sinkNameId).Do(); err != nil {
framework.Logf("Failed to delete LogSink: %v", err) framework.Logf("Failed to delete LogSink: %v", err)
} }
} }
if gclLogsProvider.Subscription != nil { if sdLogsProvider.Subscription != nil {
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions subsService := sdLogsProvider.PubsubService.Projects.Subscriptions
if _, err := subsService.Delete(gclLogsProvider.Subscription.Name).Do(); err != nil { if _, err := subsService.Delete(sdLogsProvider.Subscription.Name).Do(); err != nil {
framework.Logf("Failed to delete PubSub subscription: %v", err) framework.Logf("Failed to delete PubSub subscription: %v", err)
} }
} }
if gclLogsProvider.Topic != nil { if sdLogsProvider.Topic != nil {
topicsService := gclLogsProvider.PubsubService.Projects.Topics topicsService := sdLogsProvider.PubsubService.Projects.Topics
if _, err := topicsService.Delete(gclLogsProvider.Topic.Name).Do(); err != nil { if _, err := topicsService.Delete(sdLogsProvider.Topic.Name).Do(); err != nil {
framework.Logf("Failed to delete PubSub topic: %v", err) framework.Logf("Failed to delete PubSub topic: %v", err)
} }
} }
} }
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []logEntry { func (sdLogsProvider *sdLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
var entries []logEntry var entries []logEntry
ch := gclLogsProvider.getCacheChannel(pod.Name) ch := sdLogsProvider.getCacheChannel(pod.Name)
polling_loop: polling_loop:
for { for {
select { select {
@ -279,16 +285,16 @@ polling_loop:
return entries return entries
} }
func (logsProvider *gclLogsProvider) FluentdApplicationName() string { func (logsProvider *sdLogsProvider) FluentdApplicationName() string {
return "fluentd-gcp" return "fluentd-gcp"
} }
func (gclLogsProvider *gclLogsProvider) ReadEvents() []map[string]interface{} { func (sdLogsProvider *sdLogsProvider) ReadEvents() []map[string]interface{} {
var events []map[string]interface{} var events []map[string]interface{}
polling_loop: polling_loop:
for { for {
select { select {
case event := <-gclLogsProvider.EventCache: case event := <-sdLogsProvider.EventCache:
events = append(events, event) events = append(events, event)
default: default:
break polling_loop break polling_loop
@ -297,15 +303,54 @@ polling_loop:
return events return events
} }
func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry { func (sdLogsProvider *sdLogsProvider) getCacheChannel(podName string) chan logEntry {
gclLogsProvider.CacheMutex.Lock() sdLogsProvider.CacheMutex.Lock()
defer gclLogsProvider.CacheMutex.Unlock() defer sdLogsProvider.CacheMutex.Unlock()
if ch, ok := gclLogsProvider.LogEntryCache[podName]; ok { if ch, ok := sdLogsProvider.LogEntryCache[podName]; ok {
return ch return ch
} }
newCh := make(chan logEntry, maxCacheSize) newCh := make(chan logEntry, maxCacheSize)
gclLogsProvider.LogEntryCache[podName] = newCh sdLogsProvider.LogEntryCache[podName] = newCh
return newCh return newCh
} }
func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) {
subsService := service.Projects.Subscriptions
req := &pubsub.PullRequest{
ReturnImmediately: true,
MaxMessages: maxPullLogMessages,
}
resp, err := subsService.Pull(subs.Name, req).Do()
if err != nil {
return nil, err
}
var ids []string
for _, msg := range resp.ReceivedMessages {
ids = append(ids, msg.AckId)
}
if len(ids) > 0 {
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil {
framework.Logf("Failed to ack poll: %v", err)
}
}
return resp.ReceivedMessages, nil
}
func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error {
topicsService := service.Projects.Topics
req := &pubsub.PublishRequest{
Messages: []*pubsub.PubsubMessage{
{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
},
},
}
_, err := topicsService.Publish(topic.Name, req).Do()
return err
}

View File

@ -213,6 +213,11 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig)
if totalMissing > 0 { if totalMissing > 0 {
framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines", framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines",
config.IngestionTimeout, totalMissing, lostFraction*100) config.IngestionTimeout, totalMissing, lostFraction*100)
for podIdx, missing := range missingByPod {
if missing != 0 {
framework.Logf("Still missing %d lines for pod %v", missing, config.Pods[podIdx])
}
}
} }
if lostFraction > config.MaxAllowedLostFraction { if lostFraction > config.MaxAllowedLostFraction {