From 464a994a10b71c45583f3426fd970291f8a5b756 Mon Sep 17 00:00:00 2001 From: Yassine TIJANI Date: Tue, 3 Jul 2018 22:25:17 +0200 Subject: [PATCH] Implementing logic for v1beta1.Event API Signed-off-by: Yassine TIJANI --- staging/publishing/import-restrictions.yaml | 1 + staging/src/k8s.io/client-go/BUILD | 1 + staging/src/k8s.io/client-go/go.sum | 1 + .../src/k8s.io/client-go/tools/events/BUILD | 58 ++++ .../tools/events/event_broadcaster.go | 285 ++++++++++++++++++ .../client-go/tools/events/event_recorder.go | 89 ++++++ .../tools/events/eventseries_test.go | 208 +++++++++++++ .../client-go/tools/events/interfaces.go | 58 ++++ .../src/k8s.io/client-go/tools/record/BUILD | 64 ++-- 9 files changed, 731 insertions(+), 34 deletions(-) create mode 100644 staging/src/k8s.io/client-go/tools/events/BUILD create mode 100644 staging/src/k8s.io/client-go/tools/events/event_broadcaster.go create mode 100644 staging/src/k8s.io/client-go/tools/events/event_recorder.go create mode 100644 staging/src/k8s.io/client-go/tools/events/eventseries_test.go create mode 100644 staging/src/k8s.io/client-go/tools/events/interfaces.go diff --git a/staging/publishing/import-restrictions.yaml b/staging/publishing/import-restrictions.yaml index e92287a3bf..9a0c6d66b9 100644 --- a/staging/publishing/import-restrictions.yaml +++ b/staging/publishing/import-restrictions.yaml @@ -80,6 +80,7 @@ - "./vendor/k8s.io/client-go/tools/leaderelection/resourcelock" - "./vendor/k8s.io/client-go/tools/portforward" - "./vendor/k8s.io/client-go/tools/record" + - "./vendor/k8s.io/client-go/tools/events" - "./vendor/k8s.io/client-go/tools/reference" - "./vendor/k8s.io/client-go/tools/remotecommand" allowedImports: diff --git a/staging/src/k8s.io/client-go/BUILD b/staging/src/k8s.io/client-go/BUILD index afffdcbd02..1d2f15a266 100644 --- a/staging/src/k8s.io/client-go/BUILD +++ b/staging/src/k8s.io/client-go/BUILD @@ -69,6 +69,7 @@ filegroup( "//staging/src/k8s.io/client-go/tools/auth:all-srcs", "//staging/src/k8s.io/client-go/tools/cache:all-srcs", "//staging/src/k8s.io/client-go/tools/clientcmd:all-srcs", + "//staging/src/k8s.io/client-go/tools/events:all-srcs", "//staging/src/k8s.io/client-go/tools/leaderelection:all-srcs", "//staging/src/k8s.io/client-go/tools/metrics:all-srcs", "//staging/src/k8s.io/client-go/tools/pager:all-srcs", diff --git a/staging/src/k8s.io/client-go/go.sum b/staging/src/k8s.io/client-go/go.sum index 5c370e1d75..e050deb3f1 100644 --- a/staging/src/k8s.io/client-go/go.sum +++ b/staging/src/k8s.io/client-go/go.sum @@ -23,6 +23,7 @@ github.com/google/btree v0.0.0-20160524151835-7d79101e329e h1:JHB7F/4TJCrYBW8+GZ github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= +github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= diff --git a/staging/src/k8s.io/client-go/tools/events/BUILD b/staging/src/k8s.io/client-go/tools/events/BUILD new file mode 100644 index 0000000000..e290abd8a8 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/events/BUILD @@ -0,0 +1,58 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "event_broadcaster.go", + "event_recorder.go", + "interfaces.go", + ], + importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/events", + importpath = "k8s.io/client-go/tools/events", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/events/v1beta1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/record/util:go_default_library", + "//staging/src/k8s.io/client-go/tools/reference:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["eventseries_test.go"], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/events/v1beta1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//staging/src/k8s.io/client-go/tools/reference:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go new file mode 100644 index 0000000000..9d373dd84c --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go @@ -0,0 +1,285 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "os" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" + + "k8s.io/api/events/v1beta1" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record/util" + "k8s.io/klog" +) + +const ( + maxTriesPerEvent = 12 + finishTime = 6 * time.Minute + refreshTime = 30 * time.Minute + maxQueuedEvents = 1000 +) + +var defaultSleepDuration = 10 * time.Second + +// TODO: validate impact of copying and investigate hashing +type eventKey struct { + action string + reason string + reportingController string + reportingInstance string + regarding corev1.ObjectReference + related corev1.ObjectReference +} + +type eventBroadcasterImpl struct { + *watch.Broadcaster + mu sync.RWMutex + eventCache map[eventKey]*v1beta1.Event + sleepDuration time.Duration + sink EventSink +} + +// NewBroadcaster Creates a new event broadcaster. +func NewBroadcaster(sink EventSink) EventBroadcaster { + return newBroadcaster(sink, defaultSleepDuration) +} + +// NewBroadcasterForTest Creates a new event broadcaster for test purposes. +func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster { + return &eventBroadcasterImpl{ + Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + eventCache: map[eventKey]*v1beta1.Event{}, + sleepDuration: sleepDuration, + sink: sink, + } +} + +// TODO: add test for refreshExistingEventSeries +func (e *eventBroadcasterImpl) refreshExistingEventSeries() { + // TODO: Investigate whether lock contention won't be a problem + e.mu.RLock() + defer e.mu.RUnlock() + for isomorphicKey, event := range e.eventCache { + if event.Series != nil { + if recordedEvent, retry := recordEvent(e.sink, event); !retry { + e.eventCache[isomorphicKey] = recordedEvent + } + } + } +} + +// TODO: add test for finishSeries +func (e *eventBroadcasterImpl) finishSeries() { + // TODO: Investigate whether lock contention won't be a problem + e.mu.Lock() + defer e.mu.Unlock() + for isomorphicKey, event := range e.eventCache { + eventSerie := event.Series + if eventSerie != nil { + if eventSerie.LastObservedTime.Time.Add(finishTime).Before(time.Now()) { + if _, retry := recordEvent(e.sink, event); !retry { + delete(e.eventCache, isomorphicKey) + } + } + } + } +} + +// NewRecorder returns an EventRecorder that records events with the given event source. +func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder { + hostname, _ := os.Hostname() + reportingInstance := reportingController + "-" + hostname + return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}} +} + +func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) { + // Make a copy before modification, because there could be multiple listeners. + eventCopy := event.DeepCopy() + go func() { + evToRecord := func() *v1beta1.Event { + e.mu.Lock() + defer e.mu.Unlock() + eventKey := getKey(eventCopy) + isomorphicEvent, isIsomorphic := e.eventCache[eventKey] + if isIsomorphic { + if isomorphicEvent.Series != nil { + isomorphicEvent.Series.Count++ + isomorphicEvent.EventTime = metav1.MicroTime{Time: clock.Now()} + return nil + } + isomorphicEvent.Series = &v1beta1.EventSeries{ + Count: 1, + LastObservedTime: metav1.MicroTime{Time: clock.Now()}, + } + return isomorphicEvent + } + e.eventCache[eventKey] = eventCopy + return eventCopy + }() + if evToRecord != nil { + recordedEvent := e.attemptRecording(evToRecord) + if recordedEvent != nil { + recordedEventKey := getKey(recordedEvent) + e.mu.Lock() + defer e.mu.Unlock() + e.eventCache[recordedEventKey] = recordedEvent + } + } + }() +} + +func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event { + tries := 0 + for { + if recordedEvent, retry := recordEvent(e.sink, event); !retry { + return recordedEvent + } + tries++ + if tries >= maxTriesPerEvent { + klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + return nil + } + // Randomize sleep so that various clients won't all be + // synced up if the master goes down. + time.Sleep(wait.Jitter(e.sleepDuration, 0.25)) + } +} + +func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) { + var newEvent *v1beta1.Event + var err error + isEventSerie := event.Series != nil + if isEventSerie { + patch, err := createPatchBytesForSeries(event) + if err != nil { + klog.Errorf("Unable to calculate diff, no merge is possible: %v", err) + return nil, false + } + newEvent, err = sink.Patch(event, patch) + } + // Update can fail because the event may have been removed and it no longer exists. + if !isEventSerie || (isEventSerie && util.IsKeyNotFoundError(err)) { + // Making sure that ResourceVersion is empty on creation + event.ResourceVersion = "" + newEvent, err = sink.Create(event) + } + if err == nil { + return newEvent, false + } + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + switch err.(type) { + case *restclient.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) + return nil, false + case *errors.StatusError: + if errors.IsAlreadyExists(err) { + klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) + } else { + klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) + } + return nil, false + case *errors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return nil, true +} + +func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) { + oldEvent := event.DeepCopy() + oldEvent.Series = nil + oldData, err := json.Marshal(oldEvent) + if err != nil { + return nil, err + } + newData, err := json.Marshal(event) + if err != nil { + return nil, err + } + return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{}) +} + +func getKey(event *v1beta1.Event) eventKey { + key := eventKey{ + action: event.Action, + reason: event.Reason, + reportingController: event.ReportingController, + reportingInstance: event.ReportingInstance, + regarding: event.Regarding, + } + if event.Related != nil { + key.related = *event.Related + } + return key +} + +// startEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. +// The return value is used to stop recording +func (e *eventBroadcasterImpl) startEventWatcher(eventHandler func(event runtime.Object)) func() { + watcher := e.Watch() + go func() { + defer utilruntime.HandleCrash() + for { + watchEvent, ok := <-watcher.ResultChan() + if !ok { + return + } + eventHandler(watchEvent.Object) + } + }() + return watcher.Stop +} + +// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. +func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { + go wait.Until(func() { + e.refreshExistingEventSeries() + }, refreshTime, stopCh) + go wait.Until(func() { + e.finishSeries() + }, finishTime, stopCh) + eventHandler := func(obj runtime.Object) { + event, ok := obj.(*v1beta1.Event) + if !ok { + klog.Errorf("unexpected type, expected v1beta1.Event") + return + } + e.recordToSink(event, clock.RealClock{}) + } + stopWatcher := e.startEventWatcher(eventHandler) + go func() { + <-stopCh + stopWatcher() + }() +} diff --git a/staging/src/k8s.io/client-go/tools/events/event_recorder.go b/staging/src/k8s.io/client-go/tools/events/event_recorder.go new file mode 100644 index 0000000000..c2c18fb39e --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/events/event_recorder.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/reference" + + "k8s.io/api/events/v1beta1" + "k8s.io/client-go/tools/record/util" + "k8s.io/klog" +) + +type recorderImpl struct { + scheme *runtime.Scheme + reportingController string + reportingInstance string + *watch.Broadcaster + clock clock.Clock +} + +func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { + timestamp := metav1.MicroTime{time.Now()} + message := fmt.Sprintf(note, args...) + refRegarding, err := reference.GetReference(recorder.scheme, regarding) + if err != nil { + klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message) + return + } + refRelated, err := reference.GetReference(recorder.scheme, related) + if err != nil { + klog.Errorf("Could not construct reference to: '%#v' due to: '%v'.", related, err) + } + if !util.ValidateEventType(eventtype) { + klog.Errorf("Unsupported event type: '%v'", eventtype) + return + } + event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action) + go func() { + defer utilruntime.HandleCrash() + recorder.Action(watch.Added, event) + }() +} + +func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *v1beta1.Event { + t := metav1.Time{Time: recorder.clock.Now()} + namespace := refRegarding.Namespace + if namespace == "" { + namespace = metav1.NamespaceSystem + } + return &v1beta1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()), + Namespace: namespace, + }, + EventTime: timestamp, + Series: nil, + ReportingController: reportingController, + ReportingInstance: reportingInstance, + Action: action, + Reason: reason, + Regarding: *refRegarding, + Related: refRelated, + Note: message, + Type: eventtype, + } +} diff --git a/staging/src/k8s.io/client-go/tools/events/eventseries_test.go b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go new file mode 100644 index 0000000000..f4418922ee --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go @@ -0,0 +1,208 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "strconv" + "testing" + "time" + + "os" + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/api/events/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + ref "k8s.io/client-go/tools/reference" +) + +type testEventSeriesSink struct { + OnCreate func(e *v1beta1.Event) (*v1beta1.Event, error) + OnUpdate func(e *v1beta1.Event) (*v1beta1.Event, error) + OnPatch func(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) +} + +// Create records the event for testing. +func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) { + if t.OnCreate != nil { + return t.OnCreate(e) + } + return e, nil +} + +// Update records the event for testing. +func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) { + if t.OnUpdate != nil { + return t.OnUpdate(e) + } + return e, nil +} + +// Patch records the event for testing. +func (t *testEventSeriesSink) Patch(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) { + if t.OnPatch != nil { + return t.OnPatch(e, p) + } + return e, nil +} + +func TestEventSeriesf(t *testing.T) { + hostname, _ := os.Hostname() + + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/api/version/pods/foo", + Name: "foo", + Namespace: "baz", + UID: "bar", + }, + } + + regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]") + if err != nil { + t.Fatal(err) + } + + related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]") + if err != nil { + t.Fatal(err) + } + + expectedEvent := &v1beta1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "baz", + }, + EventTime: metav1.MicroTime{time.Now()}, + ReportingController: "eventTest", + ReportingInstance: "eventTest-" + hostname, + Action: "started", + Reason: "test", + Regarding: *regarding, + Related: related, + Note: "some verbose message: 1", + Type: v1.EventTypeNormal, + } + + isomorphicEvent := expectedEvent.DeepCopy() + + nonIsomorphicEvent := expectedEvent.DeepCopy() + nonIsomorphicEvent.Action = "stopped" + + expectedEvent.Series = &v1beta1.EventSeries{Count: 1} + table := []struct { + regarding k8sruntime.Object + related k8sruntime.Object + actual *v1beta1.Event + elements []interface{} + expect *v1beta1.Event + expectUpdate bool + }{ + { + regarding: regarding, + related: related, + actual: isomorphicEvent, + elements: []interface{}{1}, + expect: expectedEvent, + expectUpdate: true, + }, + { + regarding: regarding, + related: related, + actual: nonIsomorphicEvent, + elements: []interface{}{1}, + expect: nonIsomorphicEvent, + expectUpdate: false, + }, + } + + stopCh := make(chan struct{}) + + createEvent := make(chan *v1beta1.Event) + updateEvent := make(chan *v1beta1.Event) + patchEvent := make(chan *v1beta1.Event) + + testEvents := testEventSeriesSink{ + OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + createEvent <- event + return event, nil + }, + OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + updateEvent <- event + return event, nil + }, + OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + // event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here + // we'll use it directly + patchEvent <- event + return event, nil + }, + } + eventBroadcaster := newBroadcaster(&testEvents, 0) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest") + eventBroadcaster.StartRecordingToSink(stopCh) + recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) + // read from the chan as this was needed only to populate the cache + <-createEvent + for index, item := range table { + actual := item.actual + recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements) + // validate event + if item.expectUpdate { + actualEvent := <-patchEvent + t.Logf("%v - validating event affected by patch request", index) + validateEventSerie(strconv.Itoa(index), true, actualEvent, item.expect, t) + } else { + actualEvent := <-createEvent + t.Logf("%v - validating event affected by a create request", index) + validateEventSerie(strconv.Itoa(index), false, actualEvent, item.expect, t) + } + } + close(stopCh) +} + +func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *v1beta1.Event, expectedEvent *v1beta1.Event, t *testing.T) { + recvEvent := *actualEvent + + // Just check that the timestamp was set. + if recvEvent.EventTime.IsZero() { + t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent) + } + + if expectedUpdate { + if recvEvent.Series == nil { + t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series) + + } else { + if recvEvent.Series.Count != expectedEvent.Series.Count { + t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series) + } + } + + // Check that name has the right prefix. + if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) { + t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en) + } + } else { + if recvEvent.Series != nil { + t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series) + } + } + +} diff --git a/staging/src/k8s.io/client-go/tools/events/interfaces.go b/staging/src/k8s.io/client-go/tools/events/interfaces.go new file mode 100644 index 0000000000..2c8032aa22 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/events/interfaces.go @@ -0,0 +1,58 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "k8s.io/api/events/v1beta1" + "k8s.io/apimachinery/pkg/runtime" +) + +// EventRecorder knows how to record events on behalf of an EventSource. +type EventRecorder interface { + // Eventf constructs an event from the given information and puts it in the queue for sending. + // 'regarding' is the object this event is about. Event will make a reference-- or you may also + // pass a reference to the object directly. + // 'related' is the secondary object for more complex actions. E.g. when regarding object triggers + // a creation or deletion of related object. + // 'type' of this event, and can be one of Normal, Warning. New types could be added in future + // 'reason' is the reason this event is generated. 'reason' should be short and unique; it + // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used + // to automate handling of events, so imagine people writing switch statements to handle them. + // You want to make that easy. + // 'note' is intended to be human readable. + Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) +} + +// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. +type EventBroadcaster interface { + // StartRecordingToSink starts sending events received from the specified eventBroadcaster. + StartRecordingToSink(stopCh <-chan struct{}) + + // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster + // with the event source set to the given event source. + NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder +} + +// EventSink knows how to store events (client-go implements it.) +// EventSink must respect the namespace that will be embedded in 'event'. +// It is assumed that EventSink will return the same sorts of errors as +// client-go's REST client. +type EventSink interface { + Create(event *v1beta1.Event) (*v1beta1.Event, error) + Update(event *v1beta1.Event) (*v1beta1.Event, error) + Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error) +} diff --git a/staging/src/k8s.io/client-go/tools/record/BUILD b/staging/src/k8s.io/client-go/tools/record/BUILD index 6ab6476351..3686363a83 100644 --- a/staging/src/k8s.io/client-go/tools/record/BUILD +++ b/staging/src/k8s.io/client-go/tools/record/BUILD @@ -1,9 +1,33 @@ -package(default_visibility = ["//visibility:public"]) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "event.go", + "events_cache.go", + "fake.go", + ], + importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/record", + importpath = "k8s.io/client-go/tools/record", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/record/util:go_default_library", + "//staging/src/k8s.io/client-go/tools/reference:go_default_library", + "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", + "//vendor/github.com/golang/groupcache/lru:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], ) go_test( @@ -27,35 +51,6 @@ go_test( ], ) -go_library( - name = "go_default_library", - srcs = [ - "doc.go", - "event.go", - "events_cache.go", - "fake.go", - ], - importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/record", - importpath = "k8s.io/client-go/tools/record", - deps = [ - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", - "//staging/src/k8s.io/client-go/rest:go_default_library", - "//staging/src/k8s.io/client-go/tools/record/util:go_default_library", - "//staging/src/k8s.io/client-go/tools/reference:go_default_library", - "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", - "//vendor/github.com/golang/groupcache/lru:go_default_library", - "//vendor/k8s.io/klog:go_default_library", - ], -) - filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -70,4 +65,5 @@ filegroup( "//staging/src/k8s.io/client-go/tools/record/util:all-srcs", ], tags = ["automanaged"], + visibility = ["//visibility:public"], )