diff --git a/test/e2e/cluster-logging/BUILD b/test/e2e/cluster-logging/BUILD index 0cc0797172..d2d7bddce8 100644 --- a/test/e2e/cluster-logging/BUILD +++ b/test/e2e/cluster-logging/BUILD @@ -13,6 +13,7 @@ go_library( "es.go", "es_utils.go", "sd.go", + "sd_events.go", "sd_load.go", "sd_utils.go", "utils.go", diff --git a/test/e2e/cluster-logging/sd_events.go b/test/e2e/cluster-logging/sd_events.go new file mode 100644 index 0000000000..913ab18e12 --- /dev/null +++ b/test/e2e/cluster-logging/sd_events.go @@ -0,0 +1,93 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "time" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" +) + +const ( + // eventsIngestionTimeout is the amount of time to wait until some + // events are ingested. + eventsIngestionTimeout = 10 * time.Minute + + // eventPollingInterval is the delay between attempts to read events + // from the logs provider. + eventPollingInterval = 1 * time.Second + + // eventCreationInterval is the minimal delay between two events + // created for testing purposes. + eventCreationInterval = 10 * time.Second +) + +var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { + f := framework.NewDefaultFramework("gcl-logging-events") + + BeforeEach(func() { + framework.SkipUnlessProviderIs("gce", "gke") + }) + + It("should ingest events", func() { + gclLogsProvider, err := newGclLogsProvider(f) + framework.ExpectNoError(err, "Failed to create GCL logs provider") + + err = gclLogsProvider.Init() + defer gclLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init GCL logs provider") + + stopCh := make(chan struct{}) + successCh := make(chan struct{}) + go func() { + wait.Poll(eventPollingInterval, eventsIngestionTimeout, func() (bool, error) { + events := gclLogsProvider.ReadEvents() + if len(events) > 0 { + framework.Logf("Some events are ingested, sample event: %v", events[0]) + close(successCh) + return true, nil + } + return false, nil + }) + close(stopCh) + }() + + By("Running pods to generate events while waiting for some of them to be ingested") + wait.PollUntil(eventCreationInterval, func() (bool, error) { + podName := "synthlogger" + createLoggingPod(f, podName, "", 1, 1*time.Second) + defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) + err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name) + if err != nil { + framework.Logf("Failed to wait pod %s to successfully complete due to %v", podName, err) + } + + return false, nil + }, stopCh) + + select { + case <-successCh: + break + default: + framework.Failf("No events are present in Stackdriver after %v", eventsIngestionTimeout) + } + }) +}) diff --git a/test/e2e/cluster-logging/sd_utils.go b/test/e2e/cluster-logging/sd_utils.go index e3c50ba8fb..d64d9a293c 100644 --- a/test/e2e/cluster-logging/sd_utils.go +++ b/test/e2e/cluster-logging/sd_utils.go @@ -40,8 +40,8 @@ const ( // The limit on the number of messages to pull from PubSub maxPullLogMessages = 100 * 1000 - // The limit on the number of messages in the cache for a pod - maxCachedMessagesPerPod = 10 * 1000 + // The limit on the number of messages in the single cache + maxCacheSize = 10 * 1000 // PubSub topic with log entries polling interval gclLoggingPollInterval = 100 * time.Millisecond @@ -55,6 +55,7 @@ type gclLogsProvider struct { Subscription *pubsub.Subscription LogSink *gcl.LogSink LogEntryCache map[string]chan logEntry + EventCache chan map[string]interface{} CacheMutex *sync.Mutex PollingStopChannel chan struct{} } @@ -77,6 +78,7 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { PubsubService: pubsubService, Framework: f, LogEntryCache: map[string]chan logEntry{}, + EventCache: make(chan map[string]interface{}, maxCacheSize), CacheMutex: &sync.Mutex{}, PollingStopChannel: make(chan struct{}, 1), } @@ -137,7 +139,9 @@ func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subs func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) { projectDst := fmt.Sprintf("projects/%s", projectId) - filter := fmt.Sprintf("resource.labels.namespace_id=%s AND resource.labels.container_name=%s", nsName, loggingContainerName) + filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+ + "(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName) + framework.Logf("Using the following filter for entries: %s", filter) sink := &gcl.LogSink{ Name: sinkName, Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName), @@ -196,9 +200,30 @@ func (gclLogsProvider *gclLogsProvider) pollLogs() { continue } - podName := gclLogEntry.Resource.Labels["pod_id"] - ch := gclLogsProvider.getCacheChannel(podName) - ch <- logEntry{Payload: gclLogEntry.TextPayload} + switch gclLogEntry.Resource.Type { + case "container": + podName := gclLogEntry.Resource.Labels["pod_id"] + ch := gclLogsProvider.getCacheChannel(podName) + ch <- logEntry{Payload: gclLogEntry.TextPayload} + break + case "gke_cluster": + jsonPayloadRaw, err := gclLogEntry.JsonPayload.MarshalJSON() + if err != nil { + framework.Logf("Failed to get jsonPayload from LogEntry %v", gclLogEntry) + break + } + var eventObject map[string]interface{} + err = json.Unmarshal(jsonPayloadRaw, &eventObject) + if err != nil { + framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:])) + break + } + gclLogsProvider.EventCache <- eventObject + break + default: + framework.Logf("Received LogEntry with unexpected resource type: %s", gclLogEntry.Resource.Type) + break + } } if len(ids) > 0 { @@ -258,6 +283,20 @@ func (logsProvider *gclLogsProvider) FluentdApplicationName() string { return "fluentd-gcp" } +func (gclLogsProvider *gclLogsProvider) ReadEvents() []map[string]interface{} { + var events []map[string]interface{} +polling_loop: + for { + select { + case event := <-gclLogsProvider.EventCache: + events = append(events, event) + default: + break polling_loop + } + } + return events +} + func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry { gclLogsProvider.CacheMutex.Lock() defer gclLogsProvider.CacheMutex.Unlock() @@ -266,7 +305,7 @@ func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan log return ch } - newCh := make(chan logEntry, maxCachedMessagesPerPod) + newCh := make(chan logEntry, maxCacheSize) gclLogsProvider.LogEntryCache[podName] = newCh return newCh }