From 86d421ec5e9aac7037b69b50a1183bf1c5b5b2ed Mon Sep 17 00:00:00 2001 From: Vishnu Kannan Date: Fri, 3 Apr 2015 00:22:55 +0000 Subject: [PATCH] Updating cadvisor deps. This is necessary for handling sys oom events in kubelet. --- Godeps/Godeps.json | 64 +++++----- .../github.com/google/cadvisor/api/handler.go | 24 ++-- .../google/cadvisor/api/versions.go | 61 ++++----- .../google/cadvisor/api/versions_test.go | 39 +++--- .../google/cadvisor/events/handler.go | 119 ++++++++++-------- .../google/cadvisor/events/handler_test.go | 45 +++---- .../google/cadvisor/info/v1/container.go | 30 +++++ .../google/cadvisor/manager/manager.go | 37 +++--- .../google/cadvisor/manager/manager_mock.go | 6 +- .../oomparser/{oominfo => oomexample}/main.go | 5 +- .../cadvisor/utils/oomparser/oomparser.go | 75 ++++++----- .../utils/oomparser/oomparser_test.go | 55 ++------ .../google/cadvisor/version/version.go | 2 +- 13 files changed, 290 insertions(+), 272 deletions(-) rename Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/{oominfo => oomexample}/main.go (93%) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 77566be117..8bbf86a094 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -212,83 +212,83 @@ }, { "ImportPath": "github.com/google/cadvisor/api", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/container", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/events", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/fs", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/healthz", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/http", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/info/v1", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/info/v2", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/manager", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/metrics", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/pages", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/storage", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/summary", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/utils", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/validate", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/cadvisor/version", - "Comment": "0.10.1-106-gfd9f7e0", - "Rev": "fd9f7e0e820c7916e052e2180ee51e5ef8ad6614" + "Comment": "0.11.0", + "Rev": "318252a107983f9d9fc109cc97f8140c37ec8233" }, { "ImportPath": "github.com/google/gofuzz", diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/api/handler.go b/Godeps/_workspace/src/github.com/google/cadvisor/api/handler.go index 18c848aeb6..6015fd5624 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/api/handler.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/api/handler.go @@ -133,7 +133,7 @@ func writeResult(res interface{}, w http.ResponseWriter) error { } -func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Request) error { +func streamResults(eventChannel *events.EventChannel, w http.ResponseWriter, r *http.Request, m manager.Manager) error { cn, ok := w.(http.CloseNotifier) if !ok { return errors.New("could not access http.CloseNotifier") @@ -151,8 +151,10 @@ func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Re for { select { case <-cn.CloseNotify(): + glog.V(3).Infof("Received CloseNotify event") + m.CloseEventChannel(eventChannel.GetWatchId()) return nil - case ev := <-results: + case ev := <-eventChannel.GetChannel(): glog.V(3).Infof("Received event from watch channel in api: %v", ev) err := enc.Encode(ev) if err != nil { @@ -178,19 +180,19 @@ func getContainerInfoRequest(body io.ReadCloser) (*info.ContainerInfoRequest, er // with any twice defined arguments being assigned the first value. // If the value type for the argument is wrong the field will be assumed to be // unassigned -// bools: historical, subcontainers, oom_events, creation_events, deletion_events +// bools: stream, subcontainers, oom_events, creation_events, deletion_events // ints: max_events, start_time (unix timestamp), end_time (unix timestamp) -// example r.URL: http://localhost:8080/api/v1.3/events?oom_events=true&historical=true&max_events=10 +// example r.URL: http://localhost:8080/api/v1.3/events?oom_events=true&stream=true func getEventRequest(r *http.Request) (*events.Request, bool, error) { query := events.NewRequest() - getHistoricalEvents := false + stream := false urlMap := r.URL.Query() - if val, ok := urlMap["historical"]; ok { + if val, ok := urlMap["stream"]; ok { newBool, err := strconv.ParseBool(val[0]) if err == nil { - getHistoricalEvents = newBool + stream = newBool } } if val, ok := urlMap["subcontainers"]; ok { @@ -202,19 +204,19 @@ func getEventRequest(r *http.Request) (*events.Request, bool, error) { if val, ok := urlMap["oom_events"]; ok { newBool, err := strconv.ParseBool(val[0]) if err == nil { - query.EventType[events.TypeOom] = newBool + query.EventType[info.EventOom] = newBool } } if val, ok := urlMap["creation_events"]; ok { newBool, err := strconv.ParseBool(val[0]) if err == nil { - query.EventType[events.TypeContainerCreation] = newBool + query.EventType[info.EventContainerCreation] = newBool } } if val, ok := urlMap["deletion_events"]; ok { newBool, err := strconv.ParseBool(val[0]) if err == nil { - query.EventType[events.TypeContainerDeletion] = newBool + query.EventType[info.EventContainerDeletion] = newBool } } if val, ok := urlMap["max_events"]; ok { @@ -239,7 +241,7 @@ func getEventRequest(r *http.Request) (*events.Request, bool, error) { glog.V(2).Infof( "%v was returned in api/handler.go:getEventRequest from the url rawQuery %v", query, r.URL.RawQuery) - return query, getHistoricalEvents, nil + return query, stream, nil } func getContainerName(request []string) string { diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/api/versions.go b/Godeps/_workspace/src/github.com/google/cadvisor/api/versions.go index 545aae7176..9f56ff6ccd 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/api/versions.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/api/versions.go @@ -20,7 +20,6 @@ import ( "strconv" "github.com/golang/glog" - "github.com/google/cadvisor/events" info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/manager" @@ -58,7 +57,7 @@ func getApiVersions() []ApiVersion { v1_1 := newVersion1_1(v1_0) v1_2 := newVersion1_2(v1_1) v1_3 := newVersion1_3(v1_2) - v2_0 := newVersion2_0(v1_3) + v2_0 := newVersion2_0() return []ApiVersion{v1_0, v1_1, v1_2, v1_3, v2_0} @@ -262,40 +261,40 @@ func (self *version1_3) SupportedRequestTypes() []string { func (self *version1_3) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error { switch requestType { case eventsApi: - query, eventsFromAllTime, err := getEventRequest(r) - if err != nil { - return err - } - glog.V(2).Infof("Api - Events(%v)", query) - if eventsFromAllTime { - pastEvents, err := m.GetPastEvents(query) - if err != nil { - return err - } - return writeResult(pastEvents, w) - } - eventsChannel := make(chan *events.Event, 10) - err = m.WatchForEvents(query, eventsChannel) - if err != nil { - return err - } - return streamResults(eventsChannel, w, r) + return handleEventRequest(m, w, r) default: return self.baseVersion.HandleRequest(requestType, request, m, w, r) } } -// API v2.0 +func handleEventRequest(m manager.Manager, w http.ResponseWriter, r *http.Request) error { + query, stream, err := getEventRequest(r) + if err != nil { + return err + } + glog.V(2).Infof("Api - Events(%v)", query) + if !stream { + pastEvents, err := m.GetPastEvents(query) + if err != nil { + return err + } + return writeResult(pastEvents, w) + } + eventChannel, err := m.WatchForEvents(query) + if err != nil { + return err + } + return streamResults(eventChannel, w, r, m) -// v2.0 builds on v1.3 -type version2_0 struct { - baseVersion *version1_3 } -func newVersion2_0(v *version1_3) *version2_0 { - return &version2_0{ - baseVersion: v, - } +// API v2.0 + +type version2_0 struct { +} + +func newVersion2_0() *version2_0 { + return &version2_0{} } func (self *version2_0) Version() string { @@ -303,7 +302,7 @@ func (self *version2_0) Version() string { } func (self *version2_0) SupportedRequestTypes() []string { - return append(self.baseVersion.SupportedRequestTypes(), summaryApi) + return []string{versionApi, attributesApi, eventsApi, machineApi, summaryApi, statsApi, specApi, storageApi} } func (self *version2_0) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error { @@ -388,8 +387,10 @@ func (self *version2_0) HandleRequest(requestType string, request []string, m ma } } return writeResult(fi, w) + case eventsApi: + return handleEventRequest(m, w, r) default: - return self.baseVersion.HandleRequest(requestType, request, m, w, r) + return fmt.Errorf("unknown request type %q", requestType) } } diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/api/versions_test.go b/Godeps/_workspace/src/github.com/google/cadvisor/api/versions_test.go index 26709827ac..0a107858a1 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/api/versions_test.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/api/versions_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/cadvisor/events" + info "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" ) @@ -33,20 +34,19 @@ func makeHTTPRequest(requestURL string, t *testing.T) *http.Request { } func TestGetEventRequestBasicRequest(t *testing.T) { - r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?oom_events=true&historical=true&max_events=10", t) - expectedQuery := &events.Request{ - EventType: map[events.EventType]bool{ - events.TypeOom: true, - }, - MaxEventsReturned: 10, + r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?oom_events=true&stream=false&max_events=20", t) + expectedQuery := events.NewRequest() + expectedQuery.EventType = map[info.EventType]bool{ + info.EventOom: true, } + expectedQuery.MaxEventsReturned = 20 - receivedQuery, getHistoricalEvents, err := getEventRequest(r) + receivedQuery, stream, err := getEventRequest(r) if !reflect.DeepEqual(expectedQuery, receivedQuery) { - t.Errorf("expected %v but received %v", expectedQuery, receivedQuery) + t.Errorf("expected %#v but received %#v", expectedQuery, receivedQuery) } - assert.True(t, getHistoricalEvents) + assert.False(t, stream) assert.Nil(t, err) } @@ -54,28 +54,27 @@ func TestGetEventEmptyRequest(t *testing.T) { r := makeHTTPRequest("", t) expectedQuery := events.NewRequest() - receivedQuery, getHistoricalEvents, err := getEventRequest(r) + receivedQuery, stream, err := getEventRequest(r) if !reflect.DeepEqual(expectedQuery, receivedQuery) { - t.Errorf("expected %v but received %v", expectedQuery, receivedQuery) + t.Errorf("expected %#v but received %#v", expectedQuery, receivedQuery) } - assert.False(t, getHistoricalEvents) + assert.False(t, stream) assert.Nil(t, err) } func TestGetEventRequestDoubleArgument(t *testing.T) { - r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?historical=true&oom_events=true&oom_events=false", t) - expectedQuery := &events.Request{ - EventType: map[events.EventType]bool{ - events.TypeOom: true, - }, + r := makeHTTPRequest("http://localhost:8080/api/v1.3/events?stream=true&oom_events=true&oom_events=false", t) + expectedQuery := events.NewRequest() + expectedQuery.EventType = map[info.EventType]bool{ + info.EventOom: true, } - receivedQuery, getHistoricalEvents, err := getEventRequest(r) + receivedQuery, stream, err := getEventRequest(r) if !reflect.DeepEqual(expectedQuery, receivedQuery) { - t.Errorf("expected %v but received %v", expectedQuery, receivedQuery) + t.Errorf("expected %#v but received %#v", expectedQuery, receivedQuery) } - assert.True(t, getHistoricalEvents) + assert.True(t, stream) assert.Nil(t, err) } diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/events/handler.go b/Godeps/_workspace/src/github.com/google/cadvisor/events/handler.go index eb586e5307..8357a02612 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/events/handler.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/events/handler.go @@ -20,6 +20,9 @@ import ( "strings" "sync" "time" + + "github.com/golang/glog" + info "github.com/google/cadvisor/info/v1" ) // EventManager is implemented by Events. It provides two ways to monitor @@ -27,13 +30,15 @@ import ( type EventManager interface { // Watch checks if events fed to it by the caller of AddEvent satisfy the // request and if so sends the event back to the caller on outChannel - WatchEvents(outChannel chan *Event, request *Request) error + WatchEvents(request *Request) (*EventChannel, error) // GetEvents() returns a slice of all events detected that have passed // the *Request object parameters to the caller GetEvents(request *Request) (EventSlice, error) // AddEvent allows the caller to add an event to an EventManager // object - AddEvent(e *Event) error + AddEvent(e *info.Event) error + // Removes a watch instance from the EventManager's watchers map + StopWatch(watch_id int) } // Events holds a slice of *Event objects with a potential field @@ -47,11 +52,14 @@ type events struct { // linked to different calls of WatchEvents. When new events are found that // satisfy the request of a given watch object in watchers, the event // is sent over the channel to that caller of WatchEvents - watchers []*watch + watchers map[int]*watch // lock that blocks eventlist from being accessed until a writer releases it eventsLock sync.RWMutex // lock that blocks watchers from being accessed until a writer releases it watcherLock sync.RWMutex + // receives notices when a watch event ends and needs to be removed from + // the watchers list + lastId int } // initialized by a call to WatchEvents(), a watch struct will then be added @@ -66,26 +74,14 @@ type watch struct { request *Request // a channel created by the caller through which events satisfying the // request are sent to the caller - channel chan *Event + eventChannel *EventChannel + // unique identifier of a watch that is used as a key in events' watchers + // map + id int } // typedef of a slice of Event pointers -type EventSlice []*Event - -// Event contains information general to events such as the time at which they -// occurred, their specific type, and the actual event. Event types are -// differentiated by the EventType field of Event. -type Event struct { - // the absolute container name for which the event occurred - ContainerName string - // the time at which the event occurred - Timestamp time.Time - // the type of event. EventType is an enumerated type - EventType EventType - // the original event object and all of its extraneous data, ex. an - // OomInstance - EventData EventDataInterface -} +type EventSlice []*info.Event // Request holds a set of parameters by which Event objects may be screened. // The caller may want events that occurred within a specific timeframe @@ -99,7 +95,7 @@ type Request struct { // must be left blank in calls to WatchEvents EndTime time.Time // EventType is a map that specifies the type(s) of events wanted - EventType map[EventType]bool + EventType map[info.EventType]bool // allows the caller to put a limit on how many // events they receive. If there are more events than MaxEventsReturned // then the most chronologically recent events in the time period @@ -112,45 +108,51 @@ type Request struct { IncludeSubcontainers bool } -// EventType is an enumerated type which lists the categories under which -// events may fall. The Event field EventType is populated by this enum. -type EventType int +type EventChannel struct { + watchId int + channel chan *info.Event +} -const ( - TypeOom EventType = iota - TypeContainerCreation - TypeContainerDeletion -) - -// a general interface which populates the Event field EventData. The actual -// object, such as an OomInstance, is set as an Event's EventData -type EventDataInterface interface { +func NewEventChannel(watchId int) *EventChannel { + return &EventChannel{ + watchId: watchId, + channel: make(chan *info.Event, 10), + } } // returns a pointer to an initialized Events object func NewEventManager() *events { return &events{ eventlist: make(EventSlice, 0), - watchers: []*watch{}, + watchers: make(map[int]*watch), } } // returns a pointer to an initialized Request object func NewRequest() *Request { return &Request{ - EventType: map[EventType]bool{}, + EventType: map[info.EventType]bool{}, IncludeSubcontainers: false, + MaxEventsReturned: 10, } } // returns a pointer to an initialized watch object -func newWatch(request *Request, outChannel chan *Event) *watch { +func newWatch(request *Request, eventChannel *EventChannel) *watch { return &watch{ - request: request, - channel: outChannel, + request: request, + eventChannel: eventChannel, } } +func (self *EventChannel) GetChannel() chan *info.Event { + return self.channel +} + +func (self *EventChannel) GetWatchId() int { + return self.watchId +} + // function necessary to implement the sort interface on the Events struct func (e EventSlice) Len() int { return len(e) @@ -180,7 +182,7 @@ func getMaxEventsReturned(request *Request, eSlice EventSlice) EventSlice { // container path is a prefix of the event container path. Otherwise, // it checks that the container paths of the event and request are // equivalent -func checkIfIsSubcontainer(request *Request, event *Event) bool { +func checkIfIsSubcontainer(request *Request, event *info.Event) bool { if request.IncludeSubcontainers == true { return strings.HasPrefix(event.ContainerName+"/", request.ContainerName+"/") } @@ -188,7 +190,7 @@ func checkIfIsSubcontainer(request *Request, event *Event) bool { } // determines if an event occurs within the time set in the request object and is the right type -func checkIfEventSatisfiesRequest(request *Request, event *Event) bool { +func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool { startTime := request.StartTime endTime := request.EndTime eventTime := event.Timestamp @@ -234,29 +236,30 @@ func (self *events) GetEvents(request *Request) (EventSlice, error) { // Request object it is fed to the channel. The StartTime and EndTime of the watch // request should be uninitialized because the purpose is to watch indefinitely // for events that will happen in the future -func (self *events) WatchEvents(outChannel chan *Event, request *Request) error { +func (self *events) WatchEvents(request *Request) (*EventChannel, error) { if !request.StartTime.IsZero() || !request.EndTime.IsZero() { - return errors.New( + return nil, errors.New( "for a call to watch, request.StartTime and request.EndTime must be uninitialized") } - newWatcher := newWatch(request, outChannel) self.watcherLock.Lock() defer self.watcherLock.Unlock() - self.watchers = append(self.watchers, newWatcher) - return nil + new_id := self.lastId + 1 + returnEventChannel := NewEventChannel(new_id) + newWatcher := newWatch(request, returnEventChannel) + self.watchers[new_id] = newWatcher + self.lastId = new_id + return returnEventChannel, nil } // helper function to update the event manager's eventlist -func (self *events) updateEventList(e *Event) { +func (self *events) updateEventList(e *info.Event) { self.eventsLock.Lock() defer self.eventsLock.Unlock() self.eventlist = append(self.eventlist, e) } -func (self *events) findValidWatchers(e *Event) []*watch { +func (self *events) findValidWatchers(e *info.Event) []*watch { watchesToSend := make([]*watch, 0) - self.watcherLock.RLock() - defer self.watcherLock.RUnlock() for _, watcher := range self.watchers { watchRequest := watcher.request if checkIfEventSatisfiesRequest(watchRequest, e) { @@ -269,11 +272,25 @@ func (self *events) findValidWatchers(e *Event) []*watch { // method of Events object that adds the argument Event object to the // eventlist. It also feeds the event to a set of watch channels // held by the manager if it satisfies the request keys of the channels -func (self *events) AddEvent(e *Event) error { +func (self *events) AddEvent(e *info.Event) error { self.updateEventList(e) + self.watcherLock.RLock() + defer self.watcherLock.RUnlock() watchesToSend := self.findValidWatchers(e) for _, watchObject := range watchesToSend { - watchObject.channel <- e + watchObject.eventChannel.GetChannel() <- e } return nil } + +// Removes a watch instance from the EventManager's watchers map +func (self *events) StopWatch(watchId int) { + self.watcherLock.Lock() + defer self.watcherLock.Unlock() + _, ok := self.watchers[watchId] + if !ok { + glog.Errorf("Could not find watcher instance %v", watchId) + } + close(self.watchers[watchId].eventChannel.GetChannel()) + delete(self.watchers, watchId) +} diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/events/handler_test.go b/Godeps/_workspace/src/github.com/google/cadvisor/events/handler_test.go index c1bf4caca5..d11b4b9b7c 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/events/handler_test.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/events/handler_test.go @@ -17,6 +17,9 @@ package events import ( "testing" "time" + + info "github.com/google/cadvisor/info/v1" + "github.com/stretchr/testify/assert" ) func createOldTime(t *testing.T) time.Time { @@ -31,16 +34,16 @@ func createOldTime(t *testing.T) time.Time { } // used to convert an OomInstance to an Event object -func makeEvent(inTime time.Time, containerName string) *Event { - return &Event{ +func makeEvent(inTime time.Time, containerName string) *info.Event { + return &info.Event{ ContainerName: containerName, Timestamp: inTime, - EventType: TypeOom, + EventType: info.EventOom, } } // returns EventManager and Request to use in tests -func initializeScenario(t *testing.T) (*events, *Request, *Event, *Event) { +func initializeScenario(t *testing.T) (*events, *Request, *info.Event, *info.Event) { fakeEvent := makeEvent(createOldTime(t), "/") fakeEvent2 := makeEvent(time.Now(), "/") @@ -54,7 +57,7 @@ func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived } } -func ensureProperEventReturned(t *testing.T, expectedEvent *Event, eventObjectFound *Event) { +func ensureProperEventReturned(t *testing.T, expectedEvent *info.Event, eventObjectFound *info.Event) { if eventObjectFound != expectedEvent { t.Errorf("Expected to find test object %v but found a different object: %v", expectedEvent, eventObjectFound) @@ -65,13 +68,13 @@ func TestCheckIfIsSubcontainer(t *testing.T) { myRequest := NewRequest() myRequest.ContainerName = "/root" - sameContainerEvent := &Event{ + sameContainerEvent := &info.Event{ ContainerName: "/root", } - subContainerEvent := &Event{ + subContainerEvent := &info.Event{ ContainerName: "/root/subdir", } - differentContainerEvent := &Event{ + differentContainerEvent := &info.Event{ ContainerName: "/root-completely-different-container", } @@ -102,9 +105,9 @@ func TestCheckIfIsSubcontainer(t *testing.T) { func TestWatchEventsDetectsNewEvents(t *testing.T) { myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) - myRequest.EventType[TypeOom] = true - outChannel := make(chan *Event, 10) - myEventHolder.WatchEvents(outChannel, myRequest) + myRequest.EventType[info.EventOom] = true + returnEventChannel, err := myEventHolder.WatchEvents(myRequest) + assert.Nil(t, err) myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent2) @@ -114,19 +117,17 @@ func TestWatchEventsDetectsNewEvents(t *testing.T) { time.Sleep(5 * time.Second) if time.Since(startTime) > (5 * time.Second) { t.Errorf("Took too long to receive all the events") - close(outChannel) } }() eventsFound := 0 go func() { - for event := range outChannel { + for event := range returnEventChannel.GetChannel() { eventsFound += 1 if eventsFound == 1 { ensureProperEventReturned(t, fakeEvent, event) } else if eventsFound == 2 { ensureProperEventReturned(t, fakeEvent2, event) - close(outChannel) break } } @@ -145,15 +146,13 @@ func TestAddEventAddsEventsToEventManager(t *testing.T) { func TestGetEventsForOneEvent(t *testing.T) { myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) myRequest.MaxEventsReturned = 1 - myRequest.EventType[TypeOom] = true + myRequest.EventType[info.EventOom] = true myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent2) receivedEvents, err := myEventHolder.GetEvents(myRequest) - if err != nil { - t.Errorf("Failed to GetEvents: %v", err) - } + assert.Nil(t, err) checkNumberOfEvents(t, 1, receivedEvents.Len()) ensureProperEventReturned(t, fakeEvent2, receivedEvents[0]) } @@ -162,15 +161,13 @@ func TestGetEventsForTimePeriod(t *testing.T) { myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) myRequest.StartTime = createOldTime(t).Add(-1 * time.Second * 10) myRequest.EndTime = createOldTime(t).Add(time.Second * 10) - myRequest.EventType[TypeOom] = true + myRequest.EventType[info.EventOom] = true myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent2) receivedEvents, err := myEventHolder.GetEvents(myRequest) - if err != nil { - t.Errorf("Failed to GetEvents: %v", err) - } + assert.Nil(t, err) checkNumberOfEvents(t, 1, receivedEvents.Len()) ensureProperEventReturned(t, fakeEvent, receivedEvents[0]) @@ -183,8 +180,6 @@ func TestGetEventsForNoTypeRequested(t *testing.T) { myEventHolder.AddEvent(fakeEvent2) receivedEvents, err := myEventHolder.GetEvents(myRequest) - if err != nil { - t.Errorf("Failed to GetEvents: %v", err) - } + assert.Nil(t, err) checkNumberOfEvents(t, 0, receivedEvents.Len()) } diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/info/v1/container.go b/Godeps/_workspace/src/github.com/google/cadvisor/info/v1/container.go index a6e70fb10f..a06d388da3 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/info/v1/container.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/info/v1/container.go @@ -473,3 +473,33 @@ func calculateCpuUsage(prev, cur uint64) uint64 { } return cur - prev } + +// Event contains information general to events such as the time at which they +// occurred, their specific type, and the actual event. Event types are +// differentiated by the EventType field of Event. +type Event struct { + // the absolute container name for which the event occurred + ContainerName string + // the time at which the event occurred + Timestamp time.Time + // the type of event. EventType is an enumerated type + EventType EventType + // the original event object and all of its extraneous data, ex. an + // OomInstance + EventData EventDataInterface +} + +// EventType is an enumerated type which lists the categories under which +// events may fall. The Event field EventType is populated by this enum. +type EventType int + +const ( + EventOom EventType = iota + EventContainerCreation + EventContainerDeletion +) + +// a general interface which populates the Event field EventData. The actual +// object, such as an OomInstance, is set as an Event's EventData +type EventDataInterface interface { +} diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager.go b/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager.go index 74dab108de..30cd9ff46f 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager.go @@ -83,10 +83,12 @@ type Manager interface { GetFsInfo(label string) ([]v2.FsInfo, error) // Get events streamed through passedChannel that fit the request. - WatchForEvents(request *events.Request, passedChannel chan *events.Event) error + WatchForEvents(request *events.Request) (*events.EventChannel, error) // Get past events that have been detected and that fit the request. - GetPastEvents(request *events.Request) (events.EventSlice, error) + GetPastEvents(request *events.Request) ([]*info.Event, error) + + CloseEventChannel(watch_id int) } // New takes a memory storage and returns a new manager. @@ -669,11 +671,11 @@ func (m *manager) createContainer(containerName string) error { return err } - newEvent := &events.Event{ + newEvent := &info.Event{ ContainerName: contRef.Name, EventData: contSpecs, Timestamp: contSpecs.CreationTime, - EventType: events.TypeContainerCreation, + EventType: info.EventContainerCreation, } err = m.eventHandler.AddEvent(newEvent) if err != nil { @@ -721,10 +723,10 @@ func (m *manager) destroyContainer(containerName string) error { return err } - newEvent := &events.Event{ + newEvent := &info.Event{ ContainerName: contRef.Name, Timestamp: time.Now(), - EventType: events.TypeContainerDeletion, + EventType: info.EventContainerDeletion, } err = m.eventHandler.AddEvent(newEvent) if err != nil { @@ -868,22 +870,20 @@ func (self *manager) watchForNewOoms() error { if err != nil { return err } - err = oomLog.StreamOoms(outStream) - if err != nil { - return err - } + go oomLog.StreamOoms(outStream) + go func() { for oomInstance := range outStream { - newEvent := &events.Event{ + newEvent := &info.Event{ ContainerName: oomInstance.ContainerName, Timestamp: oomInstance.TimeOfDeath, - EventType: events.TypeOom, + EventType: info.EventOom, EventData: oomInstance, } glog.V(1).Infof("Created an oom event: %v", newEvent) err := self.eventHandler.AddEvent(newEvent) if err != nil { - glog.Errorf("Failed to add event %v, got error: %v", newEvent, err) + glog.Errorf("failed to add event %v, got error: %v", newEvent, err) } } }() @@ -891,11 +891,16 @@ func (self *manager) watchForNewOoms() error { } // can be called by the api which will take events returned on the channel -func (self *manager) WatchForEvents(request *events.Request, passedChannel chan *events.Event) error { - return self.eventHandler.WatchEvents(passedChannel, request) +func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) { + return self.eventHandler.WatchEvents(request) } // can be called by the api which will return all events satisfying the request -func (self *manager) GetPastEvents(request *events.Request) (events.EventSlice, error) { +func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, error) { return self.eventHandler.GetEvents(request) } + +// called by the api when a client is no longer listening to the channel +func (self *manager) CloseEventChannel(watch_id int) { + self.eventHandler.StopWatch(watch_id) +} diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager_mock.go b/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager_mock.go index d732da0946..a74fda5f49 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager_mock.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/manager/manager_mock.go @@ -70,14 +70,14 @@ func (c *ManagerMock) GetRequestedContainersInfo(containerName string, options v return args.Get(0).(map[string]*info.ContainerInfo), args.Error(1) } -func (c *ManagerMock) WatchForEvents(queryuest *events.Request, passedChannel chan *events.Event) error { +func (c *ManagerMock) WatchForEvents(queryuest *events.Request, passedChannel chan *info.Event) error { args := c.Called(queryuest, passedChannel) return args.Error(0) } -func (c *ManagerMock) GetPastEvents(queryuest *events.Request) (events.EventSlice, error) { +func (c *ManagerMock) GetPastEvents(queryuest *events.Request) ([]*info.Event, error) { args := c.Called(queryuest) - return args.Get(0).(events.EventSlice), args.Error(1) + return args.Get(0).([]*info.Event), args.Error(1) } func (c *ManagerMock) GetMachineInfo() (*info.MachineInfo, error) { diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oominfo/main.go b/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomexample/main.go similarity index 93% rename from Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oominfo/main.go rename to Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomexample/main.go index 208d9d9067..8667d3285e 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oominfo/main.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomexample/main.go @@ -31,10 +31,7 @@ func main() { if err != nil { glog.Infof("Couldn't make a new oomparser. %v", err) } else { - err := oomLog.StreamOoms(outStream) - if err != nil { - glog.Errorf("%v", err) - } + go oomLog.StreamOoms(outStream) // demonstration of how to get oomLog's list of oomInstances or access // the user-declared oomInstance channel, here called outStream for oomInstance := range outStream { diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser.go b/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser.go index 874c6a69c8..84936e08b8 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser.go @@ -16,9 +16,10 @@ package oomparser import ( "bufio" - "fmt" + "errors" "io" "os" + "os/exec" "path" "regexp" "strconv" @@ -37,7 +38,7 @@ var firstLineRegexp *regexp.Regexp = regexp.MustCompile( // struct to hold file from which we obtain OomInstances type OomParser struct { - systemFile string + ioreader *bufio.Reader } // struct that contains information related to an OOM kill instance @@ -123,19 +124,18 @@ func readLinesFromFile(lineChannel chan string, ioreader *bufio.Reader) { } } -// Calls goroutine for analyzeLinesHelper, which feeds it complete lines. +// Calls goroutine for readLinesFromFile, which feeds it complete lines. // Lines are checked against a regexp to check for the pid, process name, etc. -// At the end of an oom message group, AnalyzeLines adds the new oomInstance to +// At the end of an oom message group, StreamOoms adds the new oomInstance to // oomLog -func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) { +func (self *OomParser) StreamOoms(outStream chan *OomInstance) { lineChannel := make(chan string, 10) go func() { - readLinesFromFile(lineChannel, ioreader) + readLinesFromFile(lineChannel, self.ioreader) }() for line := range lineChannel { in_oom_kernel_log := checkIfStartOfOomMessages(line) - if in_oom_kernel_log { oomCurrentInstance := &OomInstance{ ContainerName: "/", @@ -153,12 +153,37 @@ func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomI line = <-lineChannel } in_oom_kernel_log = false + glog.V(1).Infof("Sending an oomInstance: %v", oomCurrentInstance) outStream <- oomCurrentInstance } } glog.Infof("exiting analyzeLines") } +func callJournalctl() (io.ReadCloser, error) { + cmd := exec.Command("journalctl", "-f") + readcloser, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + if err := cmd.Start(); err != nil { + return nil, err + } + return readcloser, err +} + +func trySystemd() (*OomParser, error) { + readcloser, err := callJournalctl() + if err != nil { + return nil, err + } + glog.V(1).Infof("oomparser using systemd") + return &OomParser{ + ioreader: bufio.NewReader(readcloser), + }, nil + +} + // looks for system files that contain kernel messages and if one is found, sets // the systemFile attribute of the OomParser object func getSystemFile() (string, error) { @@ -169,37 +194,23 @@ func getSystemFile() (string, error) { } else if utils.FileExists(varLogSyslog) { return varLogSyslog, nil } - return "", fmt.Errorf("neither %s nor %s exists from which to read kernel errors", varLogMessages, varLogSyslog) -} - -// calls a go routine that populates self.OomInstances and fills the argument -// channel with OomInstance objects as they are read from the file. -// opens the OomParser's systemFile which was set in getSystemFile -// to look for OOM messages by calling AnalyzeLines. Takes in the argument -// outStream, which is passed in by the user and passed to AnalyzeLines. -// OomInstance objects are added to outStream when they are found by -// AnalyzeLines -func (self *OomParser) StreamOoms(outStream chan *OomInstance) error { - file, err := os.Open(self.systemFile) - if err != nil { - return err - } - ioreader := bufio.NewReader(file) - - // Process the events received from the kernel. - go func() { - self.analyzeLines(ioreader, outStream) - }() - return nil + return "", errors.New("neither " + varLogSyslog + " nor " + varLogMessages + " exists from which to read kernel errors") } // initializes an OomParser object and calls getSystemFile to set the systemFile // attribute. Returns and OomParser object and an error func New() (*OomParser, error) { - systemFileName, err := getSystemFile() + systemFile, err := getSystemFile() if err != nil { - return nil, err + glog.V(1).Infof("received error %v when calling getSystemFile", err) + return trySystemd() + } + file, err := os.Open(systemFile) + if err != nil { + glog.V(1).Infof("received error %v when opening file", err) + return trySystemd() } return &OomParser{ - systemFile: systemFileName}, nil + ioreader: bufio.NewReader(file), + }, nil } diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser_test.go b/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser_test.go index 1df9d98fb4..4cb6a6d82e 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser_test.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/utils/oomparser/oomparser_test.go @@ -116,44 +116,6 @@ func TestCheckIfStartOfMessages(t *testing.T) { } } -func TestAnalyzeLinesContainerOom(t *testing.T) { - expectedContainerOomInstance := createExpectedContainerOomInstance(t) - helpTestAnalyzeLines(expectedContainerOomInstance, containerLogFile, t) -} - -func TestAnalyzeLinesSystemOom(t *testing.T) { - expectedSystemOomInstance := createExpectedSystemOomInstance(t) - helpTestAnalyzeLines(expectedSystemOomInstance, systemLogFile, t) -} - -func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *testing.T) { - outStream := make(chan *OomInstance) - oomLog := new(OomParser) - oomLog.systemFile = sysFile - file, err := os.Open(oomLog.systemFile) - if err != nil { - t.Errorf("couldn't open test log: %v", err) - } - ioreader := bufio.NewReader(file) - timeout := make(chan bool, 1) - go func() { - time.Sleep(1 * time.Second) - timeout <- true - }() - go oomLog.analyzeLines(ioreader, outStream) - select { - case oomInstance := <-outStream: - if *oomCheckInstance != *oomInstance { - t.Errorf("wrong instance returned. Expected %v and got %v", - oomCheckInstance, oomInstance) - t.Errorf("Container of one was %v and the other %v", oomCheckInstance.ContainerName, oomInstance.ContainerName) - } - case <-timeout: - t.Error( - "timeout happened before oomInstance was found in test file") - } -} - func TestStreamOomsContainer(t *testing.T) { expectedContainerOomInstance := createExpectedContainerOomInstance(t) helpTestStreamOoms(expectedContainerOomInstance, containerLogFile, t) @@ -166,18 +128,14 @@ func TestStreamOomsSystem(t *testing.T) { func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testing.T) { outStream := make(chan *OomInstance) - oomLog := new(OomParser) - oomLog.systemFile = sysFile + oomLog := mockOomParser(sysFile, t) timeout := make(chan bool, 1) go func() { time.Sleep(1 * time.Second) timeout <- true }() - err := oomLog.StreamOoms(outStream) - if err != nil { - t.Errorf("had an error opening file: %v", err) - } + go oomLog.StreamOoms(outStream) select { case oomInstance := <-outStream: @@ -191,9 +149,12 @@ func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testin } } -func TestNew(t *testing.T) { - _, err := New() +func mockOomParser(sysFile string, t *testing.T) *OomParser { + file, err := os.Open(sysFile) if err != nil { - t.Errorf("function New() had error %v", err) + t.Errorf("had an error opening file: %v", err) + } + return &OomParser{ + ioreader: bufio.NewReader(file), } } diff --git a/Godeps/_workspace/src/github.com/google/cadvisor/version/version.go b/Godeps/_workspace/src/github.com/google/cadvisor/version/version.go index f77817c4f9..bed71eb132 100644 --- a/Godeps/_workspace/src/github.com/google/cadvisor/version/version.go +++ b/Godeps/_workspace/src/github.com/google/cadvisor/version/version.go @@ -15,4 +15,4 @@ package version // Version of cAdvisor. -const VERSION = "0.10.1" +const VERSION = "0.11.0"