mirror of https://github.com/k3s-io/k3s
Merge pull request #65782 from yastij/eventv2-eventf
Implementing logic for v1beta1.Event APIk3s-v1.15.3
commit
e67c266a72
|
@ -81,6 +81,7 @@
|
||||||
- "./vendor/k8s.io/client-go/tools/leaderelection/resourcelock"
|
- "./vendor/k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
- "./vendor/k8s.io/client-go/tools/portforward"
|
- "./vendor/k8s.io/client-go/tools/portforward"
|
||||||
- "./vendor/k8s.io/client-go/tools/record"
|
- "./vendor/k8s.io/client-go/tools/record"
|
||||||
|
- "./vendor/k8s.io/client-go/tools/events"
|
||||||
- "./vendor/k8s.io/client-go/tools/reference"
|
- "./vendor/k8s.io/client-go/tools/reference"
|
||||||
- "./vendor/k8s.io/client-go/tools/remotecommand"
|
- "./vendor/k8s.io/client-go/tools/remotecommand"
|
||||||
allowedImports:
|
allowedImports:
|
||||||
|
|
|
@ -69,6 +69,7 @@ filegroup(
|
||||||
"//staging/src/k8s.io/client-go/tools/auth:all-srcs",
|
"//staging/src/k8s.io/client-go/tools/auth:all-srcs",
|
||||||
"//staging/src/k8s.io/client-go/tools/cache:all-srcs",
|
"//staging/src/k8s.io/client-go/tools/cache:all-srcs",
|
||||||
"//staging/src/k8s.io/client-go/tools/clientcmd:all-srcs",
|
"//staging/src/k8s.io/client-go/tools/clientcmd:all-srcs",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/events:all-srcs",
|
||||||
"//staging/src/k8s.io/client-go/tools/leaderelection:all-srcs",
|
"//staging/src/k8s.io/client-go/tools/leaderelection:all-srcs",
|
||||||
"//staging/src/k8s.io/client-go/tools/metrics:all-srcs",
|
"//staging/src/k8s.io/client-go/tools/metrics:all-srcs",
|
||||||
"//staging/src/k8s.io/client-go/tools/pager:all-srcs",
|
"//staging/src/k8s.io/client-go/tools/pager:all-srcs",
|
||||||
|
|
|
@ -26,6 +26,7 @@ github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
|
||||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck=
|
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck=
|
||||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
|
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
|
||||||
|
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
||||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
||||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
|
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = [
|
||||||
|
"event_broadcaster.go",
|
||||||
|
"event_recorder.go",
|
||||||
|
"interfaces.go",
|
||||||
|
],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/events",
|
||||||
|
importpath = "k8s.io/client-go/tools/events",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/record/util:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
||||||
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["eventseries_test.go"],
|
||||||
|
embed = [":go_default_library"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
|
@ -0,0 +1,285 @@
|
||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
restclient "k8s.io/client-go/rest"
|
||||||
|
|
||||||
|
"k8s.io/api/events/v1beta1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/json"
|
||||||
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/tools/record/util"
|
||||||
|
"k8s.io/klog"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxTriesPerEvent = 12
|
||||||
|
finishTime = 6 * time.Minute
|
||||||
|
refreshTime = 30 * time.Minute
|
||||||
|
maxQueuedEvents = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultSleepDuration = 10 * time.Second
|
||||||
|
|
||||||
|
// TODO: validate impact of copying and investigate hashing
|
||||||
|
type eventKey struct {
|
||||||
|
action string
|
||||||
|
reason string
|
||||||
|
reportingController string
|
||||||
|
reportingInstance string
|
||||||
|
regarding corev1.ObjectReference
|
||||||
|
related corev1.ObjectReference
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventBroadcasterImpl struct {
|
||||||
|
*watch.Broadcaster
|
||||||
|
mu sync.RWMutex
|
||||||
|
eventCache map[eventKey]*v1beta1.Event
|
||||||
|
sleepDuration time.Duration
|
||||||
|
sink EventSink
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBroadcaster Creates a new event broadcaster.
|
||||||
|
func NewBroadcaster(sink EventSink) EventBroadcaster {
|
||||||
|
return newBroadcaster(sink, defaultSleepDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
|
||||||
|
func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster {
|
||||||
|
return &eventBroadcasterImpl{
|
||||||
|
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||||
|
eventCache: map[eventKey]*v1beta1.Event{},
|
||||||
|
sleepDuration: sleepDuration,
|
||||||
|
sink: sink,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: add test for refreshExistingEventSeries
|
||||||
|
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
||||||
|
// TODO: Investigate whether lock contention won't be a problem
|
||||||
|
e.mu.RLock()
|
||||||
|
defer e.mu.RUnlock()
|
||||||
|
for isomorphicKey, event := range e.eventCache {
|
||||||
|
if event.Series != nil {
|
||||||
|
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||||
|
e.eventCache[isomorphicKey] = recordedEvent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: add test for finishSeries
|
||||||
|
func (e *eventBroadcasterImpl) finishSeries() {
|
||||||
|
// TODO: Investigate whether lock contention won't be a problem
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
for isomorphicKey, event := range e.eventCache {
|
||||||
|
eventSerie := event.Series
|
||||||
|
if eventSerie != nil {
|
||||||
|
if eventSerie.LastObservedTime.Time.Add(finishTime).Before(time.Now()) {
|
||||||
|
if _, retry := recordEvent(e.sink, event); !retry {
|
||||||
|
delete(e.eventCache, isomorphicKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRecorder returns an EventRecorder that records events with the given event source.
|
||||||
|
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
reportingInstance := reportingController + "-" + hostname
|
||||||
|
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) {
|
||||||
|
// Make a copy before modification, because there could be multiple listeners.
|
||||||
|
eventCopy := event.DeepCopy()
|
||||||
|
go func() {
|
||||||
|
evToRecord := func() *v1beta1.Event {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
eventKey := getKey(eventCopy)
|
||||||
|
isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
|
||||||
|
if isIsomorphic {
|
||||||
|
if isomorphicEvent.Series != nil {
|
||||||
|
isomorphicEvent.Series.Count++
|
||||||
|
isomorphicEvent.EventTime = metav1.MicroTime{Time: clock.Now()}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
isomorphicEvent.Series = &v1beta1.EventSeries{
|
||||||
|
Count: 1,
|
||||||
|
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
|
||||||
|
}
|
||||||
|
return isomorphicEvent
|
||||||
|
}
|
||||||
|
e.eventCache[eventKey] = eventCopy
|
||||||
|
return eventCopy
|
||||||
|
}()
|
||||||
|
if evToRecord != nil {
|
||||||
|
recordedEvent := e.attemptRecording(evToRecord)
|
||||||
|
if recordedEvent != nil {
|
||||||
|
recordedEventKey := getKey(recordedEvent)
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
e.eventCache[recordedEventKey] = recordedEvent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event {
|
||||||
|
tries := 0
|
||||||
|
for {
|
||||||
|
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||||
|
return recordedEvent
|
||||||
|
}
|
||||||
|
tries++
|
||||||
|
if tries >= maxTriesPerEvent {
|
||||||
|
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Randomize sleep so that various clients won't all be
|
||||||
|
// synced up if the master goes down.
|
||||||
|
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
|
||||||
|
var newEvent *v1beta1.Event
|
||||||
|
var err error
|
||||||
|
isEventSerie := event.Series != nil
|
||||||
|
if isEventSerie {
|
||||||
|
patch, err := createPatchBytesForSeries(event)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Unable to calculate diff, no merge is possible: %v", err)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
newEvent, err = sink.Patch(event, patch)
|
||||||
|
}
|
||||||
|
// Update can fail because the event may have been removed and it no longer exists.
|
||||||
|
if !isEventSerie || (isEventSerie && util.IsKeyNotFoundError(err)) {
|
||||||
|
// Making sure that ResourceVersion is empty on creation
|
||||||
|
event.ResourceVersion = ""
|
||||||
|
newEvent, err = sink.Create(event)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
return newEvent, false
|
||||||
|
}
|
||||||
|
// If we can't contact the server, then hold everything while we keep trying.
|
||||||
|
// Otherwise, something about the event is malformed and we should abandon it.
|
||||||
|
switch err.(type) {
|
||||||
|
case *restclient.RequestConstructionError:
|
||||||
|
// We will construct the request the same next time, so don't keep trying.
|
||||||
|
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
|
||||||
|
return nil, false
|
||||||
|
case *errors.StatusError:
|
||||||
|
if errors.IsAlreadyExists(err) {
|
||||||
|
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||||
|
} else {
|
||||||
|
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
case *errors.UnexpectedObjectError:
|
||||||
|
// We don't expect this; it implies the server's response didn't match a
|
||||||
|
// known pattern. Go ahead and retry.
|
||||||
|
default:
|
||||||
|
// This case includes actual http transport errors. Go ahead and retry.
|
||||||
|
}
|
||||||
|
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
|
||||||
|
return nil, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) {
|
||||||
|
oldEvent := event.DeepCopy()
|
||||||
|
oldEvent.Series = nil
|
||||||
|
oldData, err := json.Marshal(oldEvent)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newData, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func getKey(event *v1beta1.Event) eventKey {
|
||||||
|
key := eventKey{
|
||||||
|
action: event.Action,
|
||||||
|
reason: event.Reason,
|
||||||
|
reportingController: event.ReportingController,
|
||||||
|
reportingInstance: event.ReportingInstance,
|
||||||
|
regarding: event.Regarding,
|
||||||
|
}
|
||||||
|
if event.Related != nil {
|
||||||
|
key.related = *event.Related
|
||||||
|
}
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
// startEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||||
|
// The return value is used to stop recording
|
||||||
|
func (e *eventBroadcasterImpl) startEventWatcher(eventHandler func(event runtime.Object)) func() {
|
||||||
|
watcher := e.Watch()
|
||||||
|
go func() {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
for {
|
||||||
|
watchEvent, ok := <-watcher.ResultChan()
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
eventHandler(watchEvent.Object)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return watcher.Stop
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
|
||||||
|
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
|
||||||
|
go wait.Until(func() {
|
||||||
|
e.refreshExistingEventSeries()
|
||||||
|
}, refreshTime, stopCh)
|
||||||
|
go wait.Until(func() {
|
||||||
|
e.finishSeries()
|
||||||
|
}, finishTime, stopCh)
|
||||||
|
eventHandler := func(obj runtime.Object) {
|
||||||
|
event, ok := obj.(*v1beta1.Event)
|
||||||
|
if !ok {
|
||||||
|
klog.Errorf("unexpected type, expected v1beta1.Event")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.recordToSink(event, clock.RealClock{})
|
||||||
|
}
|
||||||
|
stopWatcher := e.startEventWatcher(eventHandler)
|
||||||
|
go func() {
|
||||||
|
<-stopCh
|
||||||
|
stopWatcher()
|
||||||
|
}()
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/client-go/tools/reference"
|
||||||
|
|
||||||
|
"k8s.io/api/events/v1beta1"
|
||||||
|
"k8s.io/client-go/tools/record/util"
|
||||||
|
"k8s.io/klog"
|
||||||
|
)
|
||||||
|
|
||||||
|
type recorderImpl struct {
|
||||||
|
scheme *runtime.Scheme
|
||||||
|
reportingController string
|
||||||
|
reportingInstance string
|
||||||
|
*watch.Broadcaster
|
||||||
|
clock clock.Clock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
|
||||||
|
timestamp := metav1.MicroTime{time.Now()}
|
||||||
|
message := fmt.Sprintf(note, args...)
|
||||||
|
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
refRelated, err := reference.GetReference(recorder.scheme, related)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'.", related, err)
|
||||||
|
}
|
||||||
|
if !util.ValidateEventType(eventtype) {
|
||||||
|
klog.Errorf("Unsupported event type: '%v'", eventtype)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
|
||||||
|
go func() {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
recorder.Action(watch.Added, event)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *v1beta1.Event {
|
||||||
|
t := metav1.Time{Time: recorder.clock.Now()}
|
||||||
|
namespace := refRegarding.Namespace
|
||||||
|
if namespace == "" {
|
||||||
|
namespace = metav1.NamespaceSystem
|
||||||
|
}
|
||||||
|
return &v1beta1.Event{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
|
||||||
|
Namespace: namespace,
|
||||||
|
},
|
||||||
|
EventTime: timestamp,
|
||||||
|
Series: nil,
|
||||||
|
ReportingController: reportingController,
|
||||||
|
ReportingInstance: reportingInstance,
|
||||||
|
Action: action,
|
||||||
|
Reason: reason,
|
||||||
|
Regarding: *refRegarding,
|
||||||
|
Related: refRelated,
|
||||||
|
Note: message,
|
||||||
|
Type: eventtype,
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,208 @@
|
||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/api/events/v1beta1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
|
ref "k8s.io/client-go/tools/reference"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testEventSeriesSink struct {
|
||||||
|
OnCreate func(e *v1beta1.Event) (*v1beta1.Event, error)
|
||||||
|
OnUpdate func(e *v1beta1.Event) (*v1beta1.Event, error)
|
||||||
|
OnPatch func(e *v1beta1.Event, p []byte) (*v1beta1.Event, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create records the event for testing.
|
||||||
|
func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
if t.OnCreate != nil {
|
||||||
|
return t.OnCreate(e)
|
||||||
|
}
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update records the event for testing.
|
||||||
|
func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
if t.OnUpdate != nil {
|
||||||
|
return t.OnUpdate(e)
|
||||||
|
}
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Patch records the event for testing.
|
||||||
|
func (t *testEventSeriesSink) Patch(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) {
|
||||||
|
if t.OnPatch != nil {
|
||||||
|
return t.OnPatch(e, p)
|
||||||
|
}
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventSeriesf(t *testing.T) {
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
|
||||||
|
testPod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
SelfLink: "/api/version/pods/foo",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedEvent := &v1beta1.Event{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
EventTime: metav1.MicroTime{time.Now()},
|
||||||
|
ReportingController: "eventTest",
|
||||||
|
ReportingInstance: "eventTest-" + hostname,
|
||||||
|
Action: "started",
|
||||||
|
Reason: "test",
|
||||||
|
Regarding: *regarding,
|
||||||
|
Related: related,
|
||||||
|
Note: "some verbose message: 1",
|
||||||
|
Type: v1.EventTypeNormal,
|
||||||
|
}
|
||||||
|
|
||||||
|
isomorphicEvent := expectedEvent.DeepCopy()
|
||||||
|
|
||||||
|
nonIsomorphicEvent := expectedEvent.DeepCopy()
|
||||||
|
nonIsomorphicEvent.Action = "stopped"
|
||||||
|
|
||||||
|
expectedEvent.Series = &v1beta1.EventSeries{Count: 1}
|
||||||
|
table := []struct {
|
||||||
|
regarding k8sruntime.Object
|
||||||
|
related k8sruntime.Object
|
||||||
|
actual *v1beta1.Event
|
||||||
|
elements []interface{}
|
||||||
|
expect *v1beta1.Event
|
||||||
|
expectUpdate bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
regarding: regarding,
|
||||||
|
related: related,
|
||||||
|
actual: isomorphicEvent,
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: expectedEvent,
|
||||||
|
expectUpdate: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
regarding: regarding,
|
||||||
|
related: related,
|
||||||
|
actual: nonIsomorphicEvent,
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: nonIsomorphicEvent,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
|
||||||
|
createEvent := make(chan *v1beta1.Event)
|
||||||
|
updateEvent := make(chan *v1beta1.Event)
|
||||||
|
patchEvent := make(chan *v1beta1.Event)
|
||||||
|
|
||||||
|
testEvents := testEventSeriesSink{
|
||||||
|
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
createEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
updateEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||||
|
// event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here
|
||||||
|
// we'll use it directly
|
||||||
|
patchEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
eventBroadcaster := newBroadcaster(&testEvents, 0)
|
||||||
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
|
||||||
|
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||||
|
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
|
||||||
|
// read from the chan as this was needed only to populate the cache
|
||||||
|
<-createEvent
|
||||||
|
for index, item := range table {
|
||||||
|
actual := item.actual
|
||||||
|
recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
|
||||||
|
// validate event
|
||||||
|
if item.expectUpdate {
|
||||||
|
actualEvent := <-patchEvent
|
||||||
|
t.Logf("%v - validating event affected by patch request", index)
|
||||||
|
validateEventSerie(strconv.Itoa(index), true, actualEvent, item.expect, t)
|
||||||
|
} else {
|
||||||
|
actualEvent := <-createEvent
|
||||||
|
t.Logf("%v - validating event affected by a create request", index)
|
||||||
|
validateEventSerie(strconv.Itoa(index), false, actualEvent, item.expect, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *v1beta1.Event, expectedEvent *v1beta1.Event, t *testing.T) {
|
||||||
|
recvEvent := *actualEvent
|
||||||
|
|
||||||
|
// Just check that the timestamp was set.
|
||||||
|
if recvEvent.EventTime.IsZero() {
|
||||||
|
t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectedUpdate {
|
||||||
|
if recvEvent.Series == nil {
|
||||||
|
t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if recvEvent.Series.Count != expectedEvent.Series.Count {
|
||||||
|
t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that name has the right prefix.
|
||||||
|
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
|
||||||
|
t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if recvEvent.Series != nil {
|
||||||
|
t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/api/events/v1beta1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||||
|
type EventRecorder interface {
|
||||||
|
// Eventf constructs an event from the given information and puts it in the queue for sending.
|
||||||
|
// 'regarding' is the object this event is about. Event will make a reference-- or you may also
|
||||||
|
// pass a reference to the object directly.
|
||||||
|
// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers
|
||||||
|
// a creation or deletion of related object.
|
||||||
|
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
|
||||||
|
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
|
||||||
|
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
|
||||||
|
// to automate handling of events, so imagine people writing switch statements to handle them.
|
||||||
|
// You want to make that easy.
|
||||||
|
// 'note' is intended to be human readable.
|
||||||
|
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
|
||||||
|
type EventBroadcaster interface {
|
||||||
|
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
|
||||||
|
StartRecordingToSink(stopCh <-chan struct{})
|
||||||
|
|
||||||
|
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
|
||||||
|
// with the event source set to the given event source.
|
||||||
|
NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventSink knows how to store events (client-go implements it.)
|
||||||
|
// EventSink must respect the namespace that will be embedded in 'event'.
|
||||||
|
// It is assumed that EventSink will return the same sorts of errors as
|
||||||
|
// client-go's REST client.
|
||||||
|
type EventSink interface {
|
||||||
|
Create(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||||
|
Update(event *v1beta1.Event) (*v1beta1.Event, error)
|
||||||
|
Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error)
|
||||||
|
}
|
|
@ -1,9 +1,33 @@
|
||||||
package(default_visibility = ["//visibility:public"])
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
load(
|
go_library(
|
||||||
"@io_bazel_rules_go//go:def.bzl",
|
name = "go_default_library",
|
||||||
"go_library",
|
srcs = [
|
||||||
"go_test",
|
"doc.go",
|
||||||
|
"event.go",
|
||||||
|
"events_cache.go",
|
||||||
|
"fake.go",
|
||||||
|
],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/record",
|
||||||
|
importpath = "k8s.io/client-go/tools/record",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/record/util:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||||
|
"//vendor/github.com/golang/groupcache/lru:go_default_library",
|
||||||
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
go_test(
|
go_test(
|
||||||
|
@ -27,35 +51,6 @@ go_test(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
go_library(
|
|
||||||
name = "go_default_library",
|
|
||||||
srcs = [
|
|
||||||
"doc.go",
|
|
||||||
"event.go",
|
|
||||||
"events_cache.go",
|
|
||||||
"fake.go",
|
|
||||||
],
|
|
||||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/record",
|
|
||||||
importpath = "k8s.io/client-go/tools/record",
|
|
||||||
deps = [
|
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/tools/record/util:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
|
||||||
"//vendor/github.com/golang/groupcache/lru:go_default_library",
|
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
filegroup(
|
filegroup(
|
||||||
name = "package-srcs",
|
name = "package-srcs",
|
||||||
srcs = glob(["**"]),
|
srcs = glob(["**"]),
|
||||||
|
@ -70,4 +65,5 @@ filegroup(
|
||||||
"//staging/src/k8s.io/client-go/tools/record/util:all-srcs",
|
"//staging/src/k8s.io/client-go/tools/record/util:all-srcs",
|
||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue