diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 341164e722..d5bdc5f3cb 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -25,6 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" @@ -283,6 +284,7 @@ func SimpleRunKubelet(client *client.Client, // Eventually, #2 will be replaced with instances of #3 func RunKubelet(kcfg *KubeletConfig) { kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride) + kcfg.Recorder = record.FromSource(api.EventSource{Component: "kubelet", Host: kcfg.Hostname}) if kcfg.KubeClient != nil { kubelet.SetupEventSending(kcfg.KubeClient, kcfg.Hostname) } else { @@ -323,7 +325,7 @@ func startKubelet(k *kubelet.Kubelet, podCfg *config.PodConfig, kc *KubeletConfi func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { // source of all configuration - cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) + cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates, kc.Recorder) // define file config source if kc.ConfigFile != "" { @@ -378,6 +380,7 @@ type KubeletConfig struct { MasterServiceNamespace string VolumePlugins []volume.Plugin StreamingConnectionIdleTimeout time.Duration + Recorder record.EventRecorder } func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) { @@ -401,7 +404,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub net.IP(kc.ClusterDNS), kc.MasterServiceNamespace, kc.VolumePlugins, - kc.StreamingConnectionIdleTimeout) + kc.StreamingConnectionIdleTimeout, + kc.Recorder) if err != nil { return nil, err diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index cf6755caf7..313ca8b921 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -35,20 +35,22 @@ const maxTriesPerEvent = 12 var sleepDuration = 10 * time.Second -// EventRecorder knows how to store events (client.Client implements it.) -// EventRecorder must respect the namespace that will be embedded in 'event'. -// It is assumed that EventRecorder will return the same sorts of errors as +// EventSink knows how to store events (client.Client implements it.) +// EventSink must respect the namespace that will be embedded in 'event'. +// It is assumed that EventSink will return the same sorts of errors as // pkg/client's REST client. -type EventRecorder interface { +type EventSink interface { Create(event *api.Event) (*api.Event, error) Update(event *api.Event) (*api.Event, error) } -// StartRecording starts sending events to recorder. Call once while initializing +var emptySource = api.EventSource{} + +// StartRecording starts sending events to a sink. Call once while initializing // your binary. Subsequent calls will be ignored. The return value can be ignored // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval -func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { +func StartRecording(sink EventSink) watch.Interface { // The default math/rand package functions aren't thread safe, so create a // new Rand object for each StartRecording call. randGen := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -57,7 +59,6 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf // Events are safe to copy like this. eventCopy := *event event = &eventCopy - event.Source = source previousEvent := getEvent(event) updateExistingEvent := previousEvent.Count > 0 @@ -70,7 +71,7 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf tries := 0 for { - if recordEvent(recorder, event, updateExistingEvent) { + if recordEvent(sink, event, updateExistingEvent) { break } tries++ @@ -89,17 +90,17 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf }) } -// recordEvent attempts to write event to recorder. It returns true if the event +// recordEvent attempts to write event to a sink. It returns true if the event // was successfully recorded or discarded, false if it should be retried. // If updateExistingEvent is false, it creates a new event, otherwise it updates // existing event. -func recordEvent(recorder EventRecorder, event *api.Event, updateExistingEvent bool) bool { +func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) bool { var newEvent *api.Event var err error if updateExistingEvent { - newEvent, err = recorder.Update(event) + newEvent, err = sink.Update(event) } else { - newEvent, err = recorder.Create(event) + newEvent, err = sink.Create(event) } if err == nil { addOrUpdateEvent(newEvent) @@ -165,24 +166,52 @@ const maxQueuedEvents = 1000 var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) -// Event constructs an event from the given information and puts it in the queue for sending. -// 'object' is the object this event is about. Event will make a reference-- or you may also -// pass a reference to the object directly. -// 'reason' is the reason this event is generated. 'reason' should be short and unique; it will -// be used to automate handling of events, so imagine people writing switch statements to -// handle them. You want to make that easy. -// 'message' is intended to be human readable. -// -// The resulting event will be created in the same namespace as the reference object. -func Event(object runtime.Object, reason, message string) { +// EventRecorder knows how to record events for an EventSource. +type EventRecorder interface { + // Event constructs an event from the given information and puts it in the queue for sending. + // 'object' is the object this event is about. Event will make a reference-- or you may also + // pass a reference to the object directly. + // 'reason' is the reason this event is generated. 'reason' should be short and unique; it will + // be used to automate handling of events, so imagine people writing switch statements to + // handle them. You want to make that easy. + // 'message' is intended to be human readable. + // + // The resulting event will be created in the same namespace as the reference object. + Event(object runtime.Object, reason, message string) + + // Eventf is just like Event, but with Sprintf for the message field. + Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) +} + +// FromSource returns an EventRecorder that records events with the +// given event source. +func FromSource(source api.EventSource) EventRecorder { + return &recorderImpl{source} +} + +type recorderImpl struct { + source api.EventSource +} + +func (i *recorderImpl) Event(object runtime.Object, reason, message string) { ref, err := api.GetReference(object) if err != nil { glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v'", object, err, reason, message) - return } - t := util.Now() - e := &api.Event{ + e := makeEvent(ref, reason, message) + e.Source = i.source + + events.Action(watch.Added, e) +} + +func (i *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) { + i.Event(object, reason, fmt.Sprintf(messageFmt, args...)) +} + +func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event { + t := util.Now() + return &api.Event{ ObjectMeta: api.ObjectMeta{ Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), Namespace: ref.Namespace, @@ -194,11 +223,4 @@ func Event(object runtime.Object, reason, message string) { LastTimestamp: t, Count: 1, } - - events.Action(watch.Added, e) -} - -// Eventf is just like Event, but with Sprintf for the message field. -func Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) { - Event(object, reason, fmt.Sprintf(messageFmt, args...)) } diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 6337338e85..728c10301d 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -35,13 +35,13 @@ func init() { sleepDuration = 0 } -type testEventRecorder struct { +type testEventSink struct { OnCreate func(e *api.Event) (*api.Event, error) OnUpdate func(e *api.Event) (*api.Event, error) } // CreateEvent records the event for testing. -func (t *testEventRecorder) Create(e *api.Event) (*api.Event, error) { +func (t *testEventSink) Create(e *api.Event) (*api.Event, error) { if t.OnCreate != nil { return t.OnCreate(e) } @@ -49,7 +49,7 @@ func (t *testEventRecorder) Create(e *api.Event) (*api.Event, error) { } // UpdateEvent records the event for testing. -func (t *testEventRecorder) Update(e *api.Event) (*api.Event, error) { +func (t *testEventSink) Update(e *api.Event) (*api.Event, error) { if t.OnUpdate != nil { return t.OnUpdate(e) } @@ -273,7 +273,7 @@ func TestEventf(t *testing.T) { for _, item := range table { called := make(chan struct{}) - testEvents := testEventRecorder{ + testEvents := testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { returnEvent, _ := validateEvent(event, item.expect, t) if item.expectUpdate { @@ -291,7 +291,7 @@ func TestEventf(t *testing.T) { return returnEvent, nil }, } - recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"}) + recorder := StartRecording(&testEvents) logger := StartLogging(t.Logf) // Prove that it is useful logger2 := StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { @@ -300,7 +300,8 @@ func TestEventf(t *testing.T) { called <- struct{}{} }) - Eventf(item.obj, item.reason, item.messageFmt, item.elements...) + testSource := api.EventSource{Component: "eventTest"} + FromSource(testSource).Eventf(item.obj, item.reason, item.messageFmt, item.elements...) <-called <-called @@ -387,7 +388,7 @@ func TestWriteEventError(t *testing.T) { done := make(chan struct{}) defer StartRecording( - &testEventRecorder{ + &testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { if event.Message == "finished" { close(done) @@ -405,13 +406,13 @@ func TestWriteEventError(t *testing.T) { return event, nil }, }, - api.EventSource{Component: "eventTest"}, ).Stop() + testSource := api.EventSource{Component: "eventTest"} for caseName := range table { - Event(ref, "Reason", caseName) + FromSource(testSource).Event(ref, "Reason", caseName) } - Event(ref, "Reason", "finished") + FromSource(testSource).Event(ref, "Reason", "finished") <-done for caseName, item := range table { @@ -427,7 +428,7 @@ func TestLotsOfEvents(t *testing.T) { // Fail each event a few times to ensure there's some load on the tested code. var counts [1000]int - testEvents := testEventRecorder{ + testEvents := testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { num, err := strconv.Atoi(event.Message) if err != nil { @@ -442,7 +443,8 @@ func TestLotsOfEvents(t *testing.T) { return event, nil }, } - recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"}) + recorder := StartRecording(&testEvents) + testSource := api.EventSource{Component: "eventTest"} logger := StartLogging(func(formatter string, args ...interface{}) { loggerCalled <- struct{}{} }) @@ -455,7 +457,7 @@ func TestLotsOfEvents(t *testing.T) { APIVersion: "v1beta1", } for i := 0; i < maxQueuedEvents; i++ { - go Event(ref, "Reason", strconv.Itoa(i)) + go FromSource(testSource).Event(ref, "Reason", strconv.Itoa(i)) } // Make sure no events were dropped by either of the listeners. for i := 0; i < maxQueuedEvents; i++ { diff --git a/pkg/client/record/fake.go b/pkg/client/record/fake.go new file mode 100644 index 0000000000..1ba0f5abda --- /dev/null +++ b/pkg/client/record/fake.go @@ -0,0 +1,28 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" +) + +// FakeRecorder is used as a fake during tests. +type FakeRecorder struct{} + +func (f *FakeRecorder) Event(object runtime.Object, reason, message string) {} + +func (f *FakeRecorder) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {} diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 49467a1076..2254bee198 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -59,9 +59,9 @@ type PodConfig struct { // NewPodConfig creates an object that can merge many configuration sources into a stream // of normalized updates to a pod configuration. -func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { +func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig { updates := make(chan kubelet.PodUpdate, 50) - storage := newPodStorage(updates, mode) + storage := newPodStorage(updates, mode, recorder) podConfig := &PodConfig{ pods: storage, mux: config.NewMux(storage), @@ -114,17 +114,21 @@ type podStorage struct { // contains the set of all sources that have sent at least one SET sourcesSeenLock sync.Mutex sourcesSeen util.StringSet + + // the EventRecorder to use + recorder record.EventRecorder } // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel // in the future, especially with multiple listeners. // TODO: allow initialization of the current state of the store with snapshotted version. -func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage { +func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage { return &podStorage{ pods: make(map[string]map[string]*api.BoundPod), mode: mode, updates: updates, sourcesSeen: util.StringSet{}, + recorder: recorder, } } @@ -192,7 +196,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods) } - filtered := filterInvalidPods(update.Pods, source) + filtered := filterInvalidPods(update.Pods, source, s.recorder) for _, ref := range filtered { name := podUniqueName(ref) if existing, found := pods[name]; found { @@ -234,7 +238,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de oldPods := pods pods = make(map[string]*api.BoundPod) - filtered := filterInvalidPods(update.Pods, source) + filtered := filterInvalidPods(update.Pods, source, s.recorder) for _, ref := range filtered { name := podUniqueName(ref) if existing, found := oldPods[name]; found { @@ -284,7 +288,7 @@ func (s *podStorage) seenSources(sources ...string) bool { return s.sourcesSeen.HasAll(sources...) } -func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) { +func filterInvalidPods(pods []api.BoundPod, source string, recorder record.EventRecorder) (filtered []*api.BoundPod) { names := util.StringSet{} for i := range pods { pod := &pods[i] @@ -305,7 +309,7 @@ func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.Boun name := bestPodIdentString(pod) err := utilerrors.NewAggregate(errlist) glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, name, source, err) - record.Eventf(pod, "failedValidation", "Error validating pod %s from %s, ignoring: %v", name, source, err) + recorder.Eventf(pod, "failedValidation", "Error validating pod %s from %s, ignoring: %v", name, source, err) continue } filtered = append(filtered, pod) diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index a26cccfa25..3a438c35da 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) @@ -74,7 +75,7 @@ func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.BoundPo } func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) { - config := NewPodConfig(mode) + config := NewPodConfig(mode, record.FromSource(api.EventSource{Component: "kubelet"})) channel := config.Channel(TestSource) ch := config.Updates() return channel, ch, config diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e458241bbf..739f86fe48 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -89,7 +89,8 @@ func NewMainKubelet( clusterDNS net.IP, masterServiceNamespace string, volumePlugins []volume.Plugin, - streamingConnectionIdleTimeout time.Duration) (*Kubelet, error) { + streamingConnectionIdleTimeout time.Duration, + recorder record.EventRecorder) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -135,6 +136,7 @@ func NewMainKubelet( prober: newProbeHolder(), readiness: newReadinessStates(), streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, + recorder: recorder, } dockerCache, err := dockertools.NewDockerCache(dockerClient) @@ -142,7 +144,7 @@ func NewMainKubelet( return nil, err } klet.dockerCache = dockerCache - klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod) + klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod, recorder) metrics.Register(dockerCache) @@ -230,6 +232,9 @@ type Kubelet struct { // how long to keep idle streaming command execution/port forwarding // connections open before terminating them streamingConnectionIdleTimeout time.Duration + + // the EventRecorder to use + recorder record.EventRecorder } // getRootDir returns the full path to the directory under which kubelet can @@ -654,15 +659,14 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod dockerContainer, err := kl.dockerClient.CreateContainer(opts) if err != nil { if ref != nil { - record.Eventf(ref, "failed", - "Failed to create docker container with error: %v", err) + kl.recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err) } return "", err } // Remember this reference so we can report events about this container if ref != nil { kl.setRef(dockertools.DockerID(dockerContainer.ID), ref) - record.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID) + kl.recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID) } if len(container.TerminationMessagePath) != 0 { @@ -707,13 +711,13 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod err = kl.dockerClient.StartContainer(dockerContainer.ID, hc) if err != nil { if ref != nil { - record.Eventf(ref, "failed", + kl.recorder.Eventf(ref, "failed", "Failed to start with docker id %v with error: %v", dockerContainer.ID, err) } return "", err } if ref != nil { - record.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID) + kl.recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID) } if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { @@ -885,7 +889,7 @@ func (kl *Kubelet) killContainerByID(ID, name string) error { glog.Warningf("No ref for pod '%v' - '%v'", ID, name) } else { // TODO: pass reason down here, and state, or move this call up the stack. - record.Eventf(ref, "killing", "Killing %v - %v", ID, name) + kl.recorder.Eventf(ref, "killing", "Killing %v - %v", ID, name) } return err @@ -916,7 +920,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke ok, err := kl.dockerPuller.IsImagePresent(container.Image) if err != nil { if ref != nil { - record.Eventf(ref, "failed", "Failed to inspect image %q", container.Image) + kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q", container.Image) } return "", err } @@ -926,7 +930,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke } } if ref != nil { - record.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image) + kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image) } id, err := kl.runContainer(pod, container, nil, "", "") if err != nil { @@ -956,12 +960,12 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error { if err := kl.dockerPuller.Pull(img); err != nil { if ref != nil { - record.Eventf(ref, "failed", "Failed to pull image %q", img) + kl.recorder.Eventf(ref, "failed", "Failed to pull image %q", img) } return err } if ref != nil { - record.Eventf(ref, "pulled", "Successfully pulled image %q", img) + kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", img) } return nil } @@ -1055,7 +1059,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke podVolumes, err := kl.mountExternalVolumes(pod) if err != nil { if ref != nil { - record.Eventf(ref, "failedMount", + kl.recorder.Eventf(ref, "failedMount", "Unable to mount volumes for pod %q: %v", podFullName, err) } glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err) @@ -1104,7 +1108,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke if !ok { glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name) } else { - record.Eventf(ref, "unhealthy", "Liveness Probe Failed %v - %v", containerID, container.Name) + kl.recorder.Eventf(ref, "unhealthy", "Liveness Probe Failed %v - %v", containerID, container.Name) } glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, live) } else { @@ -1163,7 +1167,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke present, err := kl.dockerPuller.IsImagePresent(container.Image) if err != nil { if ref != nil { - record.Eventf(ref, "failed", "Failed to inspect image %q", container.Image) + kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q", container.Image) } glog.Errorf("Failed to inspect image %q: %v; skipping pod %q container %q", container.Image, err, podFullName, container.Name) continue @@ -1408,7 +1412,7 @@ func (s podsByCreationTime) Less(i, j int) bool { } // filterHostPortConflicts removes pods that conflict on Port.HostPort values -func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { +func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { filtered := []api.BoundPod{} ports := map[int]bool{} extract := func(p *api.ContainerPort) int { return p.HostPort } @@ -1420,7 +1424,7 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { pod := &pods[i] if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { glog.Warningf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs) - record.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") + kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") // TODO: Set the pod status to fail. continue } @@ -1437,11 +1441,11 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) { case SET: glog.V(3).Infof("SET: Containers changed") kl.pods = u.Pods - kl.pods = filterHostPortConflicts(kl.pods) + kl.pods = kl.filterHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") kl.pods = updateBoundPods(u.Pods, kl.pods) - kl.pods = filterHostPortConflicts(kl.pods) + kl.pods = kl.filterHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") @@ -1508,7 +1512,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy } kl.pods = u.Pods - kl.pods = filterHostPortConflicts(kl.pods) + kl.pods = kl.filterHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") @@ -1519,7 +1523,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy } kl.pods = updateBoundPods(u.Pods, kl.pods) - kl.pods = filterHostPortConflicts(kl.pods) + kl.pods = kl.filterHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") } @@ -1810,7 +1814,7 @@ func (kl *Kubelet) BirthCry() { UID: types.UID(kl.hostname), Namespace: api.NamespaceDefault, } - record.Eventf(ref, "starting", "Starting kubelet.") + kl.recorder.Eventf(ref, "starting", "Starting kubelet.") } func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index d3858699e0..ead1358791 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -33,6 +33,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" @@ -54,6 +55,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn RemovedImages: util.StringSet{}, } fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) + recorder := &record.FakeRecorder{} kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker @@ -74,11 +76,13 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn err := kubelet.syncPod(pod, containers) waitGroup.Done() return err - }) + }, + recorder) kubelet.sourceReady = func(source string) bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.readiness = newReadinessStates() + kubelet.recorder = recorder if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } @@ -1206,6 +1210,8 @@ func TestMakePortsAndBindings(t *testing.T) { } func TestCheckHostPortConflicts(t *testing.T) { + kubelet, _, _ := newTestKubelet(t) + successCaseAll := []api.BoundPod{ {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, @@ -1215,7 +1221,7 @@ func TestCheckHostPortConflicts(t *testing.T) { Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}, } expected := append(successCaseAll, successCaseNew) - if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { + if actual := kubelet.filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { t.Errorf("Expected %#v, Got %#v", expected, actual) } @@ -1227,7 +1233,7 @@ func TestCheckHostPortConflicts(t *testing.T) { failureCaseNew := api.BoundPod{ Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, } - if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { + if actual := kubelet.filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { t.Errorf("Expected %#v, Got %#v", expected, actual) } } @@ -3089,6 +3095,8 @@ func TestPortForward(t *testing.T) { // Tests that upon host port conflict, the newer pod is removed. func TestFilterHostPortConflicts(t *testing.T) { + kubelet, _, _ := newTestKubelet(t) + // Reuse the pod spec with the same port to create a conflict. spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}} var pods = []api.BoundPod{ @@ -3112,7 +3120,7 @@ func TestFilterHostPortConflicts(t *testing.T) { // Make sure the BoundPods are in the reverse order of creation time. pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) - filteredPods := filterHostPortConflicts(pods) + filteredPods := kubelet.filterHostPortConflicts(pods) if len(filteredPods) != 1 { t.Fatalf("Expected one pod. Got pods %#v", filteredPods) } diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 0f812c5566..4a74998c6e 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -48,6 +48,9 @@ type podWorkers struct { // NOTE: This function has to be thread-safe - it can be called for // different pods at the same time. syncPodFn syncPodFnType + + // The EventRecorder to use + recorder record.EventRecorder } type workUpdate struct { @@ -58,12 +61,13 @@ type workUpdate struct { updateCompleteFn func() } -func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType) *podWorkers { +func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, dockerCache: dockerCache, syncPodFn: syncPodFn, + recorder: recorder, } } @@ -83,7 +87,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { err = p.syncPodFn(newWork.pod, containers) if err != nil { glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) - record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) + p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) return } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 0179c31e0e..d897819ceb 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err err if kl.dockerPuller == nil { kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } - pods = filterHostPortConflicts(pods) + pods = kl.filterHostPortConflicts(pods) ch := make(chan RunPodResult) for i := range pods { diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 3fde0209a2..f64e6945bb 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" docker "github.com/fsouza/go-dockerclient" ) @@ -68,6 +69,7 @@ func (d *testDocker) InspectContainer(id string) (*docker.Container, error) { func TestRunOnce(t *testing.T) { kb := &Kubelet{ rootDirectory: "/tmp/kubelet", + recorder: &record.FakeRecorder{}, } if err := kb.setupDataDirs(); err != nil { t.Errorf("Failed to init data dirs: %v", err) diff --git a/pkg/kubelet/util.go b/pkg/kubelet/util.go index 07bb42bbb8..a1890c8505 100644 --- a/pkg/kubelet/util.go +++ b/pkg/kubelet/util.go @@ -19,7 +19,6 @@ package kubelet import ( "strconv" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" @@ -74,9 +73,5 @@ func SetupLogging() { func SetupEventSending(client *client.Client, hostname string) { glog.Infof("Sending events to api server.") - record.StartRecording(client.Events(""), - api.EventSource{ - Component: "kubelet", - Host: hostname, - }) + record.StartRecording(client.Events("")) } diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index e255a256d9..6f770492ef 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -25,7 +25,6 @@ import ( "os" "strconv" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" @@ -76,7 +75,7 @@ func (s *SchedulerServer) Run(_ []string) error { glog.Fatalf("Invalid API configuration: %v", err) } - record.StartRecording(kubeClient.Events(""), api.EventSource{Component: "scheduler"}) + record.StartRecording(kubeClient.Events("")) go http.ListenAndServe(net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)), nil) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 494a9f5aee..2611eaa30d 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -156,7 +157,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name) return pod }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + Recorder: record.FromSource(api.EventSource{Component: "scheduler"}), }, nil } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 03a2cdacc0..bf6d3e8b7d 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -51,6 +51,9 @@ type Config struct { // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*api.Pod, error) + + // Recorder is the EventRecorder to use + Recorder record.EventRecorder } // New returns a new scheduler. @@ -72,7 +75,7 @@ func (s *Scheduler) scheduleOne() { dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister) if err != nil { glog.V(1).Infof("Failed to schedule: %v", pod) - record.Eventf(pod, "failedScheduling", "Error scheduling: %v", err) + s.config.Recorder.Eventf(pod, "failedScheduling", "Error scheduling: %v", err) s.config.Error(pod, err) return } @@ -83,9 +86,9 @@ func (s *Scheduler) scheduleOne() { } if err := s.config.Binder.Bind(b); err != nil { glog.V(1).Infof("Failed to bind pod: %v", err) - record.Eventf(pod, "failedScheduling", "Binding rejected: %v", err) + s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err) s.config.Error(pod, err) return } - record.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest) + s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 6d76475bd6..d47f3a61a8 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -102,6 +102,7 @@ func TestScheduler(t *testing.T) { NextPod: func() *api.Pod { return item.sendPod }, + Recorder: record.FromSource(api.EventSource{Component: "scheduler"}), } s := New(c) called := make(chan struct{})