Allow multiple sources to be used with record package

pull/6/head
Paul Morie 2015-03-03 01:06:20 -05:00
parent 9fcb48cab6
commit 02b18edac6
16 changed files with 175 additions and 96 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
@ -283,6 +284,7 @@ func SimpleRunKubelet(client *client.Client,
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) {
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
kcfg.Recorder = record.FromSource(api.EventSource{Component: "kubelet", Host: kcfg.Hostname})
if kcfg.KubeClient != nil {
kubelet.SetupEventSending(kcfg.KubeClient, kcfg.Hostname)
} else {
@ -323,7 +325,7 @@ func startKubelet(k *kubelet.Kubelet, podCfg *config.PodConfig, kc *KubeletConfi
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates, kc.Recorder)
// define file config source
if kc.ConfigFile != "" {
@ -378,6 +380,7 @@ type KubeletConfig struct {
MasterServiceNamespace string
VolumePlugins []volume.Plugin
StreamingConnectionIdleTimeout time.Duration
Recorder record.EventRecorder
}
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) {
@ -401,7 +404,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.StreamingConnectionIdleTimeout)
kc.StreamingConnectionIdleTimeout,
kc.Recorder)
if err != nil {
return nil, err

View File

@ -35,20 +35,22 @@ const maxTriesPerEvent = 12
var sleepDuration = 10 * time.Second
// EventRecorder knows how to store events (client.Client implements it.)
// EventRecorder must respect the namespace that will be embedded in 'event'.
// It is assumed that EventRecorder will return the same sorts of errors as
// EventSink knows how to store events (client.Client implements it.)
// EventSink must respect the namespace that will be embedded in 'event'.
// It is assumed that EventSink will return the same sorts of errors as
// pkg/client's REST client.
type EventRecorder interface {
type EventSink interface {
Create(event *api.Event) (*api.Event, error)
Update(event *api.Event) (*api.Event, error)
}
// StartRecording starts sending events to recorder. Call once while initializing
var emptySource = api.EventSource{}
// StartRecording starts sending events to a sink. Call once while initializing
// your binary. Subsequent calls will be ignored. The return value can be ignored
// or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface {
func StartRecording(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -57,7 +59,6 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
event.Source = source
previousEvent := getEvent(event)
updateExistingEvent := previousEvent.Count > 0
@ -70,7 +71,7 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
tries := 0
for {
if recordEvent(recorder, event, updateExistingEvent) {
if recordEvent(sink, event, updateExistingEvent) {
break
}
tries++
@ -89,17 +90,17 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
})
}
// recordEvent attempts to write event to recorder. It returns true if the event
// recordEvent attempts to write event to a sink. It returns true if the event
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(recorder EventRecorder, event *api.Event, updateExistingEvent bool) bool {
func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) bool {
var newEvent *api.Event
var err error
if updateExistingEvent {
newEvent, err = recorder.Update(event)
newEvent, err = sink.Update(event)
} else {
newEvent, err = recorder.Create(event)
newEvent, err = sink.Create(event)
}
if err == nil {
addOrUpdateEvent(newEvent)
@ -165,24 +166,52 @@ const maxQueuedEvents = 1000
var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it will
// be used to automate handling of events, so imagine people writing switch statements to
// handle them. You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
func Event(object runtime.Object, reason, message string) {
// EventRecorder knows how to record events for an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it will
// be used to automate handling of events, so imagine people writing switch statements to
// handle them. You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, reason, messageFmt string, args ...interface{})
}
// FromSource returns an EventRecorder that records events with the
// given event source.
func FromSource(source api.EventSource) EventRecorder {
return &recorderImpl{source}
}
type recorderImpl struct {
source api.EventSource
}
func (i *recorderImpl) Event(object runtime.Object, reason, message string) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v'", object, err, reason, message)
return
}
t := util.Now()
e := &api.Event{
e := makeEvent(ref, reason, message)
e.Source = i.source
events.Action(watch.Added, e)
}
func (i *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
i.Event(object, reason, fmt.Sprintf(messageFmt, args...))
}
func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {
t := util.Now()
return &api.Event{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: ref.Namespace,
@ -194,11 +223,4 @@ func Event(object runtime.Object, reason, message string) {
LastTimestamp: t,
Count: 1,
}
events.Action(watch.Added, e)
}
// Eventf is just like Event, but with Sprintf for the message field.
func Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
Event(object, reason, fmt.Sprintf(messageFmt, args...))
}

View File

@ -35,13 +35,13 @@ func init() {
sleepDuration = 0
}
type testEventRecorder struct {
type testEventSink struct {
OnCreate func(e *api.Event) (*api.Event, error)
OnUpdate func(e *api.Event) (*api.Event, error)
}
// CreateEvent records the event for testing.
func (t *testEventRecorder) Create(e *api.Event) (*api.Event, error) {
func (t *testEventSink) Create(e *api.Event) (*api.Event, error) {
if t.OnCreate != nil {
return t.OnCreate(e)
}
@ -49,7 +49,7 @@ func (t *testEventRecorder) Create(e *api.Event) (*api.Event, error) {
}
// UpdateEvent records the event for testing.
func (t *testEventRecorder) Update(e *api.Event) (*api.Event, error) {
func (t *testEventSink) Update(e *api.Event) (*api.Event, error) {
if t.OnUpdate != nil {
return t.OnUpdate(e)
}
@ -273,7 +273,7 @@ func TestEventf(t *testing.T) {
for _, item := range table {
called := make(chan struct{})
testEvents := testEventRecorder{
testEvents := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
returnEvent, _ := validateEvent(event, item.expect, t)
if item.expectUpdate {
@ -291,7 +291,7 @@ func TestEventf(t *testing.T) {
return returnEvent, nil
},
}
recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"})
recorder := StartRecording(&testEvents)
logger := StartLogging(t.Logf) // Prove that it is useful
logger2 := StartLogging(func(formatter string, args ...interface{}) {
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
@ -300,7 +300,8 @@ func TestEventf(t *testing.T) {
called <- struct{}{}
})
Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
testSource := api.EventSource{Component: "eventTest"}
FromSource(testSource).Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
<-called
<-called
@ -387,7 +388,7 @@ func TestWriteEventError(t *testing.T) {
done := make(chan struct{})
defer StartRecording(
&testEventRecorder{
&testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
if event.Message == "finished" {
close(done)
@ -405,13 +406,13 @@ func TestWriteEventError(t *testing.T) {
return event, nil
},
},
api.EventSource{Component: "eventTest"},
).Stop()
testSource := api.EventSource{Component: "eventTest"}
for caseName := range table {
Event(ref, "Reason", caseName)
FromSource(testSource).Event(ref, "Reason", caseName)
}
Event(ref, "Reason", "finished")
FromSource(testSource).Event(ref, "Reason", "finished")
<-done
for caseName, item := range table {
@ -427,7 +428,7 @@ func TestLotsOfEvents(t *testing.T) {
// Fail each event a few times to ensure there's some load on the tested code.
var counts [1000]int
testEvents := testEventRecorder{
testEvents := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
num, err := strconv.Atoi(event.Message)
if err != nil {
@ -442,7 +443,8 @@ func TestLotsOfEvents(t *testing.T) {
return event, nil
},
}
recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"})
recorder := StartRecording(&testEvents)
testSource := api.EventSource{Component: "eventTest"}
logger := StartLogging(func(formatter string, args ...interface{}) {
loggerCalled <- struct{}{}
})
@ -455,7 +457,7 @@ func TestLotsOfEvents(t *testing.T) {
APIVersion: "v1beta1",
}
for i := 0; i < maxQueuedEvents; i++ {
go Event(ref, "Reason", strconv.Itoa(i))
go FromSource(testSource).Event(ref, "Reason", strconv.Itoa(i))
}
// Make sure no events were dropped by either of the listeners.
for i := 0; i < maxQueuedEvents; i++ {

28
pkg/client/record/fake.go Normal file
View File

@ -0,0 +1,28 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package record
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
// FakeRecorder is used as a fake during tests.
type FakeRecorder struct{}
func (f *FakeRecorder) Event(object runtime.Object, reason, message string) {}
func (f *FakeRecorder) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {}

View File

@ -59,9 +59,9 @@ type PodConfig struct {
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubelet.PodUpdate, 50)
storage := newPodStorage(updates, mode)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
@ -114,17 +114,21 @@ type podStorage struct {
// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.Mutex
sourcesSeen util.StringSet
// the EventRecorder to use
recorder record.EventRecorder
}
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage {
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*api.BoundPod),
mode: mode,
updates: updates,
sourcesSeen: util.StringSet{},
recorder: recorder,
}
}
@ -192,7 +196,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
}
filtered := filterInvalidPods(update.Pods, source)
filtered := filterInvalidPods(update.Pods, source, s.recorder)
for _, ref := range filtered {
name := podUniqueName(ref)
if existing, found := pods[name]; found {
@ -234,7 +238,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
oldPods := pods
pods = make(map[string]*api.BoundPod)
filtered := filterInvalidPods(update.Pods, source)
filtered := filterInvalidPods(update.Pods, source, s.recorder)
for _, ref := range filtered {
name := podUniqueName(ref)
if existing, found := oldPods[name]; found {
@ -284,7 +288,7 @@ func (s *podStorage) seenSources(sources ...string) bool {
return s.sourcesSeen.HasAll(sources...)
}
func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) {
func filterInvalidPods(pods []api.BoundPod, source string, recorder record.EventRecorder) (filtered []*api.BoundPod) {
names := util.StringSet{}
for i := range pods {
pod := &pods[i]
@ -305,7 +309,7 @@ func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.Boun
name := bestPodIdentString(pod)
err := utilerrors.NewAggregate(errlist)
glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, name, source, err)
record.Eventf(pod, "failedValidation", "Error validating pod %s from %s, ignoring: %v", name, source, err)
recorder.Eventf(pod, "failedValidation", "Error validating pod %s from %s, ignoring: %v", name, source, err)
continue
}
filtered = append(filtered, pod)

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
@ -74,7 +75,7 @@ func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.BoundPo
}
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) {
config := NewPodConfig(mode)
config := NewPodConfig(mode, record.FromSource(api.EventSource{Component: "kubelet"}))
channel := config.Channel(TestSource)
ch := config.Updates()
return channel, ch, config

View File

@ -89,7 +89,8 @@ func NewMainKubelet(
clusterDNS net.IP,
masterServiceNamespace string,
volumePlugins []volume.Plugin,
streamingConnectionIdleTimeout time.Duration) (*Kubelet, error) {
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -135,6 +136,7 @@ func NewMainKubelet(
prober: newProbeHolder(),
readiness: newReadinessStates(),
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
}
dockerCache, err := dockertools.NewDockerCache(dockerClient)
@ -142,7 +144,7 @@ func NewMainKubelet(
return nil, err
}
klet.dockerCache = dockerCache
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod)
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod, recorder)
metrics.Register(dockerCache)
@ -230,6 +232,9 @@ type Kubelet struct {
// how long to keep idle streaming command execution/port forwarding
// connections open before terminating them
streamingConnectionIdleTimeout time.Duration
// the EventRecorder to use
recorder record.EventRecorder
}
// getRootDir returns the full path to the directory under which kubelet can
@ -654,15 +659,14 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
dockerContainer, err := kl.dockerClient.CreateContainer(opts)
if err != nil {
if ref != nil {
record.Eventf(ref, "failed",
"Failed to create docker container with error: %v", err)
kl.recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err)
}
return "", err
}
// Remember this reference so we can report events about this container
if ref != nil {
kl.setRef(dockertools.DockerID(dockerContainer.ID), ref)
record.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID)
kl.recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID)
}
if len(container.TerminationMessagePath) != 0 {
@ -707,13 +711,13 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
err = kl.dockerClient.StartContainer(dockerContainer.ID, hc)
if err != nil {
if ref != nil {
record.Eventf(ref, "failed",
kl.recorder.Eventf(ref, "failed",
"Failed to start with docker id %v with error: %v", dockerContainer.ID, err)
}
return "", err
}
if ref != nil {
record.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID)
kl.recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID)
}
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
@ -885,7 +889,7 @@ func (kl *Kubelet) killContainerByID(ID, name string) error {
glog.Warningf("No ref for pod '%v' - '%v'", ID, name)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
record.Eventf(ref, "killing", "Killing %v - %v", ID, name)
kl.recorder.Eventf(ref, "killing", "Killing %v - %v", ID, name)
}
return err
@ -916,7 +920,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke
ok, err := kl.dockerPuller.IsImagePresent(container.Image)
if err != nil {
if ref != nil {
record.Eventf(ref, "failed", "Failed to inspect image %q", container.Image)
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q", container.Image)
}
return "", err
}
@ -926,7 +930,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke
}
}
if ref != nil {
record.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
}
id, err := kl.runContainer(pod, container, nil, "", "")
if err != nil {
@ -956,12 +960,12 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
if err := kl.dockerPuller.Pull(img); err != nil {
if ref != nil {
record.Eventf(ref, "failed", "Failed to pull image %q", img)
kl.recorder.Eventf(ref, "failed", "Failed to pull image %q", img)
}
return err
}
if ref != nil {
record.Eventf(ref, "pulled", "Successfully pulled image %q", img)
kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", img)
}
return nil
}
@ -1055,7 +1059,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
if ref != nil {
record.Eventf(ref, "failedMount",
kl.recorder.Eventf(ref, "failedMount",
"Unable to mount volumes for pod %q: %v", podFullName, err)
}
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
@ -1104,7 +1108,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name)
} else {
record.Eventf(ref, "unhealthy", "Liveness Probe Failed %v - %v", containerID, container.Name)
kl.recorder.Eventf(ref, "unhealthy", "Liveness Probe Failed %v - %v", containerID, container.Name)
}
glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, live)
} else {
@ -1163,7 +1167,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
present, err := kl.dockerPuller.IsImagePresent(container.Image)
if err != nil {
if ref != nil {
record.Eventf(ref, "failed", "Failed to inspect image %q", container.Image)
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q", container.Image)
}
glog.Errorf("Failed to inspect image %q: %v; skipping pod %q container %q", container.Image, err, podFullName, container.Name)
continue
@ -1408,7 +1412,7 @@ func (s podsByCreationTime) Less(i, j int) bool {
}
// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
filtered := []api.BoundPod{}
ports := map[int]bool{}
extract := func(p *api.ContainerPort) int { return p.HostPort }
@ -1420,7 +1424,7 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
record.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
// TODO: Set the pod status to fail.
continue
}
@ -1437,11 +1441,11 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
@ -1508,7 +1512,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
}
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
@ -1519,7 +1523,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
}
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
@ -1810,7 +1814,7 @@ func (kl *Kubelet) BirthCry() {
UID: types.UID(kl.hostname),
Namespace: api.NamespaceDefault,
}
record.Eventf(ref, "starting", "Starting kubelet.")
kl.recorder.Eventf(ref, "starting", "Starting kubelet.")
}
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {

View File

@ -33,6 +33,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
@ -54,6 +55,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
RemovedImages: util.StringSet{},
}
fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker)
recorder := &record.FakeRecorder{}
kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker
@ -74,11 +76,13 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
err := kubelet.syncPod(pod, containers)
waitGroup.Done()
return err
})
},
recorder)
kubelet.sourceReady = func(source string) bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.readiness = newReadinessStates()
kubelet.recorder = recorder
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
@ -1206,6 +1210,8 @@ func TestMakePortsAndBindings(t *testing.T) {
}
func TestCheckHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
successCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
@ -1215,7 +1221,7 @@ func TestCheckHostPortConflicts(t *testing.T) {
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}},
}
expected := append(successCaseAll, successCaseNew)
if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
if actual := kubelet.filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
@ -1227,7 +1233,7 @@ func TestCheckHostPortConflicts(t *testing.T) {
failureCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
if actual := kubelet.filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}
@ -3089,6 +3095,8 @@ func TestPortForward(t *testing.T) {
// Tests that upon host port conflict, the newer pod is removed.
func TestFilterHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
// Reuse the pod spec with the same port to create a conflict.
spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}
var pods = []api.BoundPod{
@ -3112,7 +3120,7 @@ func TestFilterHostPortConflicts(t *testing.T) {
// Make sure the BoundPods are in the reverse order of creation time.
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
filteredPods := filterHostPortConflicts(pods)
filteredPods := kubelet.filterHostPortConflicts(pods)
if len(filteredPods) != 1 {
t.Fatalf("Expected one pod. Got pods %#v", filteredPods)
}

View File

@ -48,6 +48,9 @@ type podWorkers struct {
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFn syncPodFnType
// The EventRecorder to use
recorder record.EventRecorder
}
type workUpdate struct {
@ -58,12 +61,13 @@ type workUpdate struct {
updateCompleteFn func()
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType) *podWorkers {
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
dockerCache: dockerCache,
syncPodFn: syncPodFn,
recorder: recorder,
}
}
@ -83,7 +87,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
err = p.syncPodFn(newWork.pod, containers)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
return
}

View File

@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err err
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
pods = filterHostPortConflicts(pods)
pods = kl.filterHostPortConflicts(pods)
ch := make(chan RunPodResult)
for i := range pods {

View File

@ -22,6 +22,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
docker "github.com/fsouza/go-dockerclient"
)
@ -68,6 +69,7 @@ func (d *testDocker) InspectContainer(id string) (*docker.Container, error) {
func TestRunOnce(t *testing.T) {
kb := &Kubelet{
rootDirectory: "/tmp/kubelet",
recorder: &record.FakeRecorder{},
}
if err := kb.setupDataDirs(); err != nil {
t.Errorf("Failed to init data dirs: %v", err)

View File

@ -19,7 +19,6 @@ package kubelet
import (
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
@ -74,9 +73,5 @@ func SetupLogging() {
func SetupEventSending(client *client.Client, hostname string) {
glog.Infof("Sending events to api server.")
record.StartRecording(client.Events(""),
api.EventSource{
Component: "kubelet",
Host: hostname,
})
record.StartRecording(client.Events(""))
}

View File

@ -25,7 +25,6 @@ import (
"os"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
@ -76,7 +75,7 @@ func (s *SchedulerServer) Run(_ []string) error {
glog.Fatalf("Invalid API configuration: %v", err)
}
record.StartRecording(kubeClient.Events(""), api.EventSource{Component: "scheduler"})
record.StartRecording(kubeClient.Events(""))
go http.ListenAndServe(net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)), nil)

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -156,7 +157,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name)
return pod
},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
Recorder: record.FromSource(api.EventSource{Component: "scheduler"}),
}, nil
}

View File

@ -51,6 +51,9 @@ type Config struct {
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*api.Pod, error)
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
}
// New returns a new scheduler.
@ -72,7 +75,7 @@ func (s *Scheduler) scheduleOne() {
dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister)
if err != nil {
glog.V(1).Infof("Failed to schedule: %v", pod)
record.Eventf(pod, "failedScheduling", "Error scheduling: %v", err)
s.config.Recorder.Eventf(pod, "failedScheduling", "Error scheduling: %v", err)
s.config.Error(pod, err)
return
}
@ -83,9 +86,9 @@ func (s *Scheduler) scheduleOne() {
}
if err := s.config.Binder.Bind(b); err != nil {
glog.V(1).Infof("Failed to bind pod: %v", err)
record.Eventf(pod, "failedScheduling", "Binding rejected: %v", err)
s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err)
return
}
record.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
}

View File

@ -102,6 +102,7 @@ func TestScheduler(t *testing.T) {
NextPod: func() *api.Pod {
return item.sendPod
},
Recorder: record.FromSource(api.EventSource{Component: "scheduler"}),
}
s := New(c)
called := make(chan struct{})