Merge pull request #6718 from vishh/sys_oom1

Adding system oom events from kubelet
pull/6/head
Victor Marmol 2015-04-29 14:29:38 -07:00
commit 209b4fcbef
11 changed files with 174 additions and 15 deletions

View File

@ -61,6 +61,9 @@ type EventRecorder interface {
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, reason, messageFmt string, args ...interface{})
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp util.Time, reason, messageFmt string, args ...interface{})
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
@ -234,7 +237,7 @@ type recorderImpl struct {
*watch.Broadcaster
}
func (recorder *recorderImpl) Event(object runtime.Object, reason, message string) {
func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp util.Time, reason, message string) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v'", object, err, reason, message)
@ -247,10 +250,18 @@ func (recorder *recorderImpl) Event(object runtime.Object, reason, message strin
recorder.Action(watch.Added, event)
}
func (recorder *recorderImpl) Event(object runtime.Object, reason, message string) {
recorder.generateEvent(object, util.Now(), reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp util.Time, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, timestamp, reason, fmt.Sprintf(messageFmt, args...))
}
func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {
t := util.Now()
namespace := ref.Namespace

View File

@ -18,6 +18,7 @@ package record
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// FakeRecorder is used as a fake during tests.
@ -26,3 +27,6 @@ type FakeRecorder struct{}
func (f *FakeRecorder) Event(object runtime.Object, reason, message string) {}
func (f *FakeRecorder) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {}
func (f *FakeRecorder) PastEventf(object runtime.Object, timestamp util.Time, reason, messageFmt string, args ...interface{}) {
}

View File

@ -52,6 +52,6 @@ func (c *Fake) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) {
return cadvisorApiV2.FsInfo{}, nil
}
func (c *Fake) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) {
return []*cadvisorApi.Event{}, nil
func (c *Fake) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return new(events.EventChannel), nil
}

View File

@ -147,6 +147,6 @@ func (cc *cadvisorClient) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) {
return res[0], nil
}
func (cc *cadvisorClient) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) {
return cc.GetPastEvents(request)
func (cc *cadvisorClient) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return cc.WatchForEvents(request)
}

View File

@ -62,7 +62,7 @@ func (c *Mock) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error) {
return args.Get(0).(cadvisorApiV2.FsInfo), args.Error(1)
}
func (c *Mock) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) {
func (c *Mock) WatchEvents(request *events.Request) (*events.EventChannel, error) {
args := c.Called()
return args.Get(0).([]*cadvisorApi.Event), args.Error(1)
return args.Get(0).(*events.EventChannel), args.Error(1)
}

View File

@ -61,6 +61,6 @@ func (cu *cadvisorUnsupported) DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error
return cadvisorApiV2.FsInfo{}, unsupportedErr
}
func (cu *cadvisorUnsupported) GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error) {
return []*cadvisorApi.Event{}, unsupportedErr
func (cu *cadvisorUnsupported) WatchEvents(request *events.Request) (*events.EventChannel, error) {
return nil, unsupportedErr
}

View File

@ -34,6 +34,6 @@ type Interface interface {
// Returns usage information about the filesystem holding Docker images.
DockerImagesFsInfo() (cadvisorApiV2.FsInfo, error)
// Get past events that have been detected and that fit the request.
GetPastEvents(request *events.Request) ([]*cadvisorApi.Event, error)
// Get events streamed through passedChannel that fit the request.
WatchEvents(request *events.Request) (*events.EventChannel, error)
}

View File

@ -38,7 +38,7 @@ import (
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/golang/groupcache/lru"
)

View File

@ -55,7 +55,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
@ -207,6 +207,8 @@ func NewMainKubelet(
volumeManager := newVolumeManager()
oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)
klet := &Kubelet{
hostname: hostname,
dockerClient: dockerClient,
@ -234,6 +236,7 @@ func NewMainKubelet(
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
oomWatcher: oomWatcher,
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@ -393,12 +396,12 @@ type Kubelet struct {
// status. Kubelet may fail to update node status reliablly if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
os kubecontainer.OSInterface
os kubecontainer.OSInterface
oomWatcher OOMWatcher
}
// getRootDir returns the full path to the directory under which kubelet can
@ -588,10 +591,20 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
}
go kl.syncNodeStatus()
// Run the system oom watcher forever.
go util.Until(kl.runOOMWatcher, time.Second, util.NeverStop)
kl.statusManager.Start()
kl.syncLoop(updates, kl)
}
// Watches for system OOMs.
func (kl *Kubelet) runOOMWatcher() {
glog.V(5).Infof("Starting to record system OOMs")
if err := kl.oomWatcher.RecordSysOOMs(kl.nodeRef); err != nil {
glog.Errorf("failed to record system OOMs - %v", err)
}
}
// syncNodeStatus periodically synchronizes node status to master.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
@ -1788,6 +1801,8 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
// Update the current status on the API server
_, err = kl.kubeClient.Nodes().UpdateStatus(node)
return err
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"github.com/google/cadvisor/events"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
type OOMWatcher interface {
RecordSysOOMs(ref *api.ObjectReference) error
}
type realOOMWatcher struct {
cadvisor cadvisor.Interface
recorder record.EventRecorder
}
func NewOOMWatcher(cadvisor cadvisor.Interface, recorder record.EventRecorder) OOMWatcher {
return &realOOMWatcher{
cadvisor: cadvisor,
recorder: recorder,
}
}
const systemOOMEvent = "SystemOOM"
// Watches cadvisor for system oom's and records an event for every system oom encountered.
func (ow *realOOMWatcher) RecordSysOOMs(ref *api.ObjectReference) error {
request := events.Request{
EventType: map[cadvisorApi.EventType]bool{
cadvisorApi.EventOom: true,
},
ContainerName: "/",
IncludeSubcontainers: false,
}
eventChannel, err := ow.cadvisor.WatchEvents(&request)
if err != nil {
return err
}
for event := range eventChannel.GetChannel() {
glog.V(2).Infof("got sys oom event from cadvisor: %v", event)
ow.recorder.PastEventf(ref, util.Time{event.Timestamp}, systemOOMEvent, "System OOM encountered")
}
return fmt.Errorf("failed to watch cadvisor for sys oom events")
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type fakeEvent struct {
object runtime.Object
timestamp util.Time
reason string
message string
}
type fakeRecorder struct {
events []fakeEvent
}
func (f fakeRecorder) Event(object runtime.Object, reason, message string) {
f.events = append(f.events, fakeEvent{object, util.Now(), reason, message})
}
func (f fakeRecorder) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
f.events = append(f.events, fakeEvent{object, util.Now(), reason, fmt.Sprintf(messageFmt, args...)})
}
func (f fakeRecorder) PastEventf(object runtime.Object, timestamp util.Time, reason, messageFmt string, args ...interface{}) {
f.events = append(f.events, fakeEvent{object, timestamp, reason, fmt.Sprintf(messageFmt, args...)})
}
func TestBasic(t *testing.T) {
fakeRecorder := fakeRecorder{}
mockCadvisor := &cadvisor.Fake{}
node := &api.ObjectReference{}
oomWatcher := NewOOMWatcher(mockCadvisor, fakeRecorder)
go func() {
oomWatcher.RecordSysOOMs(node)
}()
// TODO: Improve this test once cadvisor exports events.EventChannel as an interface
// and thereby allow using a mock version of cadvisor.
}