Merge pull request #57335 from crassirostris/fix-sd-logging-tests

Automatic merge from submit-queue (batch tested with PRs 56108, 56811, 57335, 57331, 54530). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix Stackdriver Logging e2e tests

This PR fixes the problems in the SD Logging tests:

- Because of Docker problem, trimming works unpredictably on some OSs, so instead of assuming exact trimming mechanism, just assume trimming
- Increases the throughput of reading logs through Pub/Sub by increasing the parallelism level

```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-12-18 17:45:38 -08:00 committed by GitHub
commit ed22c3b0e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 49 deletions

View File

@ -18,7 +18,6 @@ package stackdriver
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/wait"
@ -107,19 +106,14 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
framework.ExpectNoError(err)
})
})
})
ginkgo.It("should ingest logs [Feature:StackdriverLogging]", func() {
withLogProviderForScope(f, podsScope, func(p *sdLogProvider) {
ginkgo.By("Checking that too long lines are trimmed", func() {
originalLength := 100001
maxLength := 100000
cmd := []string{
"/bin/sh",
"-c",
fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 60; done", originalLength),
fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 60; done", maxLength+1),
}
trimPrefix := "[Trimmed]"
pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f)
framework.ExpectNoError(err, "Failed to start a pod")
@ -133,11 +127,8 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd
if log.JSONPayload != nil {
return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload)
}
if len(log.TextPayload) == originalLength {
return false, fmt.Errorf("got non-trimmed entry of length %d", len(log.TextPayload))
}
if !strings.HasPrefix(log.TextPayload, trimPrefix) {
return false, fmt.Errorf("got message without prefix '%s': %s", trimPrefix, log.TextPayload)
if len(log.TextPayload) > maxLength {
return false, fmt.Errorf("got too long entry of length %d", len(log.TextPayload))
}
return true, nil
}, utils.JustTimeout, pod.Name())

View File

@ -20,6 +20,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
@ -45,6 +46,9 @@ const (
// PubSub topic with log entries polling interval
sdLoggingPollInterval = 100 * time.Millisecond
// The parallelism level of polling logs process.
sdLoggingPollParallelism = 10
)
type logProviderScope int
@ -68,6 +72,7 @@ type sdLogProvider struct {
logSink *sd.LogSink
pollingStopChannel chan struct{}
pollingWG *sync.WaitGroup
queueCollection utils.LogsQueueCollection
@ -92,7 +97,8 @@ func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogPro
sdService: sdService,
pubsubService: pubsubService,
framework: f,
pollingStopChannel: make(chan struct{}, 1),
pollingStopChannel: make(chan struct{}),
pollingWG: &sync.WaitGroup{},
queueCollection: utils.NewLogsQueueCollection(maxQueueSize),
}
return provider, nil
@ -128,13 +134,14 @@ func (p *sdLogProvider) Init() error {
return fmt.Errorf("failed to wait for sink to become operational: %v", err)
}
go p.pollLogs()
p.startPollingLogs()
return nil
}
func (p *sdLogProvider) Cleanup() {
p.pollingStopChannel <- struct{}{}
close(p.pollingStopChannel)
p.pollingWG.Wait()
if p.logSink != nil {
projectID := framework.TestContext.CloudConfig.ProjectID
@ -257,44 +264,54 @@ func (p *sdLogProvider) waitSinkInit() error {
})
}
func (p *sdLogProvider) pollLogs() {
wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
messages, err := pullAndAck(p.pubsubService, p.subscription)
func (p *sdLogProvider) startPollingLogs() {
for i := 0; i < sdLoggingPollParallelism; i++ {
p.pollingWG.Add(1)
go func() {
defer p.pollingWG.Done()
wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
p.pollLogsOnce()
return false, nil
}, p.pollingStopChannel)
}()
}
}
func (p *sdLogProvider) pollLogsOnce() {
messages, err := pullAndAck(p.pubsubService, p.subscription)
if err != nil {
framework.Logf("Failed to pull messages from PubSub due to %v", err)
return
}
for _, msg := range messages {
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
if err != nil {
framework.Logf("Failed to pull messages from PubSub due to %v", err)
return false, nil
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
continue
}
for _, msg := range messages {
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
if err != nil {
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
continue
}
var sdLogEntry sd.LogEntry
if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
continue
}
name, ok := p.tryGetName(sdLogEntry)
if !ok {
framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
continue
}
logEntry, err := convertLogEntry(sdLogEntry)
if err != nil {
framework.Logf("Failed to parse Stackdriver LogEntry: %v", err)
continue
}
p.queueCollection.Push(name, logEntry)
var sdLogEntry sd.LogEntry
if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
continue
}
return false, nil
}, p.pollingStopChannel)
name, ok := p.tryGetName(sdLogEntry)
if !ok {
framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
continue
}
logEntry, err := convertLogEntry(sdLogEntry)
if err != nil {
framework.Logf("Failed to parse Stackdriver LogEntry: %v", err)
continue
}
p.queueCollection.Push(name, logEntry)
}
}
func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) {