From 8e376dc84377dff29738fa0ab4bd8a1445f5ee20 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Thu, 20 Oct 2016 15:35:34 -0700 Subject: [PATCH 1/3] Add kuberuntime container logs support. --- pkg/kubelet/kubelet_pods.go | 4 + pkg/kubelet/kuberuntime/helpers.go | 5 + .../kuberuntime/kuberuntime_container.go | 27 +- pkg/kubelet/kuberuntime/kuberuntime_logs.go | 386 ++++++++++++++++++ pkg/kubelet/server/server.go | 3 + 5 files changed, 410 insertions(+), 15 deletions(-) create mode 100644 pkg/kubelet/kuberuntime/kuberuntime_logs.go diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index d0c7d6068f..02f3b2105a 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -791,6 +791,10 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, lo podStatus = pod.Status } + // TODO: Consolidate the logic here with kuberuntime.GetContainerLogs, here we convert container name to containerID, + // but inside kuberuntime we convert container id back to container name and restart count. + // TODO: After separate container log lifecycle management, we should get log based on the existing log files + // instead of container status. containerID, err := kl.validateContainerLogStatus(pod.Name, &podStatus, containerName, logOptions.Previous) if err != nil { return err diff --git a/pkg/kubelet/kuberuntime/helpers.go b/pkg/kubelet/kuberuntime/helpers.go index 34cb93f45c..9057a02886 100644 --- a/pkg/kubelet/kuberuntime/helpers.go +++ b/pkg/kubelet/kuberuntime/helpers.go @@ -207,6 +207,11 @@ func buildContainerLogsPath(containerName string, restartCount int) string { return fmt.Sprintf("%s_%d.log", containerName, restartCount) } +// buildFullContainerLogsPath builds absolute log path for container. +func buildFullContainerLogsPath(podUID types.UID, containerName string, restartCount int) string { + return filepath.Join(buildPodLogsDirectory(podUID), buildContainerLogsPath(containerName, restartCount)) +} + // buildPodLogsDirectory builds absolute log directory path for a pod sandbox. func buildPodLogsDirectory(podUID types.UID) string { return filepath.Join(podLogsRootDirectory, string(podUID)) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 83727d5688..3db1b3d64f 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -675,14 +675,14 @@ func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID // GetContainerLogs returns logs of a specific container. func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) { - // Get logs directly from docker for in-process docker integration for - // now to unblock other tests. - // TODO: remove this hack after setting down on how to implement log - // retrieval/management. - if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { - return ds.GetContainerLogs(pod, containerID, logOptions, stdout, stderr) + status, err := m.runtimeService.ContainerStatus(containerID.ID) + if err != nil { + return fmt.Errorf("failed to get container status %q: %v", containerID, err) } - return fmt.Errorf("not implemented") + labeledInfo := getContainerInfoFromLabels(status.Labels) + annotatedInfo := getContainerInfoFromAnnotations(status.Annotations) + path := buildFullContainerLogsPath(pod.UID, labeledInfo.ContainerName, annotatedInfo.RestartCount) + return ReadLogs(path, logOptions, stdout, stderr) } // Runs the command in the container of the specified pod using nsenter. @@ -708,6 +708,7 @@ func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.Co func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error { glog.V(4).Infof("Removing container %q", containerID) // Remove the container log. + // TODO: Separate log and container lifecycle management. if err := m.removeContainerLog(containerID); err != nil { return err } @@ -720,16 +721,13 @@ func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error // Remove the container log. status, err := m.runtimeService.ContainerStatus(containerID) if err != nil { - glog.Errorf("ContainerStatus for %q error: %v", containerID, err) - return err + return fmt.Errorf("failed to get container status %q: %v", containerID, err) } labeledInfo := getContainerInfoFromLabels(status.Labels) annotatedInfo := getContainerInfoFromAnnotations(status.Annotations) - path := filepath.Join(buildPodLogsDirectory(labeledInfo.PodUID), - buildContainerLogsPath(labeledInfo.ContainerName, annotatedInfo.RestartCount)) + path := buildFullContainerLogsPath(labeledInfo.PodUID, labeledInfo.ContainerName, annotatedInfo.RestartCount) if err := m.osInterface.Remove(path); err != nil && !os.IsNotExist(err) { - glog.Errorf("Failed to remove container %q log %q: %v", containerID, path, err) - return err + return fmt.Errorf("failed to remove container %q log %q: %v", containerID, path, err) } // Remove the legacy container log symlink. @@ -737,9 +735,8 @@ func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error legacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName, labeledInfo.PodNamespace) if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) { - glog.Errorf("Failed to remove container %q log legacy symbolic link %q: %v", + return fmt.Errorf("failed to remove container %q log legacy symbolic link %q: %v", containerID, legacySymlink, err) - return err } return nil } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs.go b/pkg/kubelet/kuberuntime/kuberuntime_logs.go new file mode 100644 index 0000000000..5c425015b3 --- /dev/null +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs.go @@ -0,0 +1,386 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kuberuntime + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "math" + "os" + "time" + + "github.com/docker/docker/pkg/jsonlog" + "github.com/fsnotify/fsnotify" + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" +) + +// Notice that the current kuberuntime logs implementation doesn't handle +// log rotation. +// * It will not retrieve logs in rotated log file. +// * If log rotation happens when following the log: +// * If the rotation is using create mode, we'll still follow the old file. +// * If the rotation is using copytruncate, we'll be reading at the original position and get nothing. +// TODO(random-liu): Support log rotation. + +// streamType is the type of the stream. +type streamType string + +const ( + stderrType streamType = "stderr" + stdoutType streamType = "stdout" + + // timeFormat is the time format used in the log. + timeFormat = time.RFC3339Nano + // blockSize is the block size used in tail. + blockSize = 1024 +) + +var ( + // eol is the end-of-line sign in the log. + eol = []byte{'\n'} + // delimiter is the delimiter for timestamp and streamtype in log line. + delimiter = []byte{' '} +) + +// logMessage is the internal log type. +type logMessage struct { + timestamp time.Time + stream streamType + log []byte +} + +// reset resets the log to nil. +func (l *logMessage) reset() { + l.timestamp = time.Time{} + l.stream = "" + l.log = nil +} + +// logOptions is the internal type of all log options. +type logOptions struct { + tail int64 + bytes int64 + since time.Time + follow bool + timestamp bool +} + +// newLogOptions convert the api.PodLogOptions to internal logOptions. +func newLogOptions(apiOpts *api.PodLogOptions, now time.Time) *logOptions { + opts := &logOptions{ + tail: -1, // -1 by default which means read all logs. + bytes: -1, // -1 by default which means read all logs. + follow: apiOpts.Follow, + timestamp: apiOpts.Timestamps, + } + if apiOpts.TailLines != nil { + opts.tail = *apiOpts.TailLines + } + if apiOpts.LimitBytes != nil { + opts.bytes = *apiOpts.LimitBytes + } + if apiOpts.SinceSeconds != nil { + opts.since = now.Add(-time.Duration(*apiOpts.SinceSeconds) * time.Second) + } + if apiOpts.SinceTime != nil && apiOpts.SinceTime.After(opts.since) { + opts.since = apiOpts.SinceTime.Time + } + return opts +} + +// ReadLogs read the container log and redirect into stdout and stderr. +func ReadLogs(path string, apiOpts *api.PodLogOptions, stdout, stderr io.Writer) error { + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("failed to open log file %q: %v", path, err) + } + defer f.Close() + + // Convert api.PodLogOptions into internal log options. + opts := newLogOptions(apiOpts, time.Now()) + + // Search start point based on tail line. + start, err := tail(f, opts.tail) + if err != nil { + return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err) + } + if _, err := f.Seek(start, os.SEEK_SET); err != nil { + return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err) + } + + // Start parsing the logs. + r := bufio.NewReader(f) + // Do not create watcher here because it is not needed if `Follow` is false. + var watcher *fsnotify.Watcher + var parse parseFunc + writer := newLogWriter(stdout, stderr, opts) + msg := &logMessage{} + for { + l, err := r.ReadBytes(eol[0]) + if err != nil { + if err != io.EOF { // This is an real error + return fmt.Errorf("failed to read log file %q: %v", path, err) + } + if !opts.follow { + // Return directly when reading to the end if not follow. + if len(l) > 0 { + glog.Warningf("Incomplete line in log file %q: %q", path, l) + } + glog.V(2).Infof("Finish parsing log file %q", path) + return nil + } + // Reset seek so that if this is an incomplete line, + // it will be read again. + if _, err := f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil { + return fmt.Errorf("failed to reset seek in log file %q: %v", path, err) + } + if watcher == nil { + // Intialize the watcher if it has not been initialized yet. + if watcher, err = fsnotify.NewWatcher(); err != nil { + return fmt.Errorf("failed to create fsnotify watcher: %v", err) + } + defer watcher.Close() + if err := watcher.Add(f.Name()); err != nil { + return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) + } + } + // Wait until the next log change. + if err := waitLogs(watcher); err != nil { + return fmt.Errorf("failed to wait logs for log file %q: %v", path, err) + } + continue + } + if parse == nil { + // Intialize the log parsing function. + parse, err = getParseFunc(l) + if err != nil { + return fmt.Errorf("failed to get parse function: %v", err) + } + } + // Parse the log line. + msg.reset() + if err := parse(l, msg); err != nil { + glog.Errorf("Failed with err %v when parsing log for log file %q: %q", err, path, l) + continue + } + // Write the log line into the stream. + if err := writer.write(msg); err != nil { + if err == errMaximumWrite { + glog.V(2).Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", path, opts.bytes) + return nil + } + glog.Errorf("Failed with err %v when writing log for log file %q: %+v", err, path, msg) + } + } +} + +// parseFunc is a function parsing one log line to the internal log type. +// Notice that the caller must make sure logMessage is not nil. +type parseFunc func([]byte, *logMessage) error + +var parseFuncs []parseFunc = []parseFunc{ + parseCRILog, // CRI log format parse function + parseDockerJSONLog, // Docker JSON log format parse function +} + +// parseCRILog parses logs in CRI log format. CRI Log format example: +// 2016-10-06T00:17:09.669794202Z stdout log content 1 +// 2016-10-06T00:17:09.669794203Z stderr log content 2 +func parseCRILog(log []byte, msg *logMessage) error { + var err error + // Parse timestamp + idx := bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("timestamp is not found") + } + msg.timestamp, err = time.Parse(timeFormat, string(log[:idx])) + if err != nil { + return fmt.Errorf("unexpected timestamp format %q: %v", timeFormat, err) + } + + // Parse stream type + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("stream type is not found") + } + msg.stream = streamType(log[:idx]) + if msg.stream != stdoutType && msg.stream != stderrType { + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + + // Get log content + msg.log = log[idx+1:] + + return nil +} + +// dockerJSONLog is the JSON log buffer used in parseDockerJSONLog. +var dockerJSONLog = &jsonlog.JSONLog{} + +// parseDockerJSONLog parses logs in Docker JSON log format. Docker JSON log format +// example: +// {"log":"content 1","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"} +// {"log":"content 2","stream":"stderr","time":"2016-10-20T18:39:20.57606444Z"} +func parseDockerJSONLog(log []byte, msg *logMessage) error { + dockerJSONLog.Reset() + l := dockerJSONLog + // TODO: JSON decoding is fairly expensive, we should evaluate this. + if err := json.Unmarshal(log, l); err != nil { + return fmt.Errorf("failed with %v to unmarshal log %q", err, l) + } + msg.timestamp = l.Created + msg.stream = streamType(l.Stream) + msg.log = []byte(l.Log) + return nil +} + +// getParseFunc returns proper parse function based on the sample log line passed in. +func getParseFunc(log []byte) (parseFunc, error) { + for _, p := range parseFuncs { + if err := p(log, &logMessage{}); err == nil { + return p, nil + } + } + return nil, fmt.Errorf("unsupported log format: %q", log) +} + +// waitLogs wait for the next log write. +func waitLogs(w *fsnotify.Watcher) error { + errRetry := 5 + for { + select { + case e := <-w.Events: + switch e.Op { + case fsnotify.Write: + return nil + default: + glog.Errorf("Unexpected fsnotify event: %v, retrying...", e) + } + case err := <-w.Errors: + glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry) + if errRetry == 0 { + return err + } + errRetry-- + } + } +} + +// logWriter controls the writing into the stream based on the log options. +type logWriter struct { + stdout io.Writer + stderr io.Writer + opts *logOptions + remain int64 +} + +// errMaximumWrite is returned when all bytes have been written. +var errMaximumWrite = errors.New("maximum write") + +func newLogWriter(stdout io.Writer, stderr io.Writer, opts *logOptions) *logWriter { + w := &logWriter{ + stdout: stdout, + stderr: stderr, + opts: opts, + remain: math.MaxInt64, // initialize it as infinity + } + if opts.bytes >= 0 { + w.remain = opts.bytes + } + return w +} + +// writeLogs writes logs into stdout, stderr. +func (w *logWriter) write(msg *logMessage) error { + if msg.timestamp.Before(w.opts.since) { + // Skip the line because it's older than since + return nil + } + line := msg.log + if w.opts.timestamp { + prefix := append([]byte(msg.timestamp.Format(timeFormat)), delimiter[0]) + line = append(prefix, line...) + } + // If the line is longer than the remaining bytes, cut it. + if int64(len(line)) > w.remain { + line = line[:w.remain] + } + // Get the proper stream to write to. + var stream io.Writer + switch msg.stream { + case stdoutType: + stream = w.stdout + case stderrType: + stream = w.stderr + default: + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + n, err := stream.Write(line) + w.remain -= int64(n) + if err != nil { + return err + } + // If there are no more bytes left, return errMaximumWrite + if w.remain <= 0 { + return errMaximumWrite + } + return nil +} + +// tail returns the start of last nth line. +// * If n < 0, return the beginning of the file. +// * If n >= 0, return the beginning of last nth line. +// Notice that if the last line is incomplete (no end-of-line), it will not be counted +// as one line. +func tail(f io.ReadSeeker, n int64) (int64, error) { + if n < 0 { + return 0, nil + } + size, err := f.Seek(0, os.SEEK_END) + if err != nil { + return 0, err + } + var left, cnt int64 + buf := make([]byte, blockSize) + for right := size; right > 0 && cnt <= n; right -= blockSize { + left = right - blockSize + if left < 0 { + left = 0 + buf = make([]byte, right) + } + if _, err := f.Seek(left, os.SEEK_SET); err != nil { + return 0, err + } + if _, err := f.Read(buf); err != nil { + return 0, err + } + cnt += int64(bytes.Count(buf, eol)) + } + for ; cnt > n; cnt-- { + idx := bytes.Index(buf, eol) + 1 + buf = buf[idx:] + left += int64(idx) + } + return left, nil +} diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index c33be0e48e..ec609c0d05 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -493,6 +493,9 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re return } fw := flushwriter.Wrap(response.ResponseWriter) + // Byte limit logic is already implemented in kuberuntime. However, we still need this for + // old runtime integration. + // TODO(random-liu): Remove this once we switch to CRI integration. if logOptions.LimitBytes != nil { fw = limitwriter.New(fw, *logOptions.LimitBytes) } From 35195ef50e8554f181d54163a43a59a6f87c0efe Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Fri, 21 Oct 2016 15:57:10 -0700 Subject: [PATCH 2/3] Add unit test for kuberuntime container logs. --- .../kuberuntime/kuberuntime_logs_test.go | 269 ++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 pkg/kubelet/kuberuntime/kuberuntime_logs_test.go diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go b/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go new file mode 100644 index 0000000000..18538e25eb --- /dev/null +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go @@ -0,0 +1,269 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kuberuntime + +import ( + "bytes" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" +) + +func TestLogOptions(t *testing.T) { + var ( + line = int64(8) + bytes = int64(64) + timestamp = unversioned.Now() + sinceseconds = int64(10) + ) + for c, test := range []struct { + apiOpts *api.PodLogOptions + expect *logOptions + }{ + { // empty options + apiOpts: &api.PodLogOptions{}, + expect: &logOptions{tail: -1, bytes: -1}, + }, + { // test tail lines + apiOpts: &api.PodLogOptions{TailLines: &line}, + expect: &logOptions{tail: line, bytes: -1}, + }, + { // test limit bytes + apiOpts: &api.PodLogOptions{LimitBytes: &bytes}, + expect: &logOptions{tail: -1, bytes: bytes}, + }, + { // test since timestamp + apiOpts: &api.PodLogOptions{SinceTime: ×tamp}, + expect: &logOptions{tail: -1, bytes: -1, since: timestamp.Time}, + }, + { // test since seconds + apiOpts: &api.PodLogOptions{SinceSeconds: &sinceseconds}, + expect: &logOptions{tail: -1, bytes: -1, since: timestamp.Add(-10 * time.Second)}, + }, + } { + t.Logf("TestCase #%d: %+v", c, test) + opts := newLogOptions(test.apiOpts, timestamp.Time) + assert.Equal(t, test.expect, opts) + } +} + +func TestParseLog(t *testing.T) { + timestamp, err := time.Parse(timeFormat, "2016-10-20T18:39:20.57606443Z") + assert.NoError(t, err) + msg := &logMessage{} + for c, test := range []struct { + line string + msg *logMessage + err bool + }{ + { // Docker log format stdout + line: `{"log":"docker stdout test log","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"}` + "\n", + msg: &logMessage{ + timestamp: timestamp, + stream: stdoutType, + log: []byte("docker stdout test log"), + }, + }, + { // Docker log format stderr + line: `{"log":"docker stderr test log","stream":"stderr","time":"2016-10-20T18:39:20.57606443Z"}` + "\n", + msg: &logMessage{ + timestamp: timestamp, + stream: stderrType, + log: []byte("docker stderr test log"), + }, + }, + { // CRI log format stdout + line: "2016-10-20T18:39:20.57606443Z stdout cri stdout test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: stdoutType, + log: []byte("cri stdout test log\n"), + }, + }, + { // CRI log format stderr + line: "2016-10-20T18:39:20.57606443Z stderr cri stderr test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: stderrType, + log: []byte("cri stderr test log\n"), + }, + }, + { // Unsupported Log format + line: "unsupported log format test log\n", + msg: &logMessage{}, + err: true, + }, + } { + t.Logf("TestCase #%d: %+v", c, test) + parse, err := getParseFunc([]byte(test.line)) + if test.err { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + err = parse([]byte(test.line), msg) + assert.NoError(t, err) + assert.Equal(t, test.msg, msg) + } +} + +func TestWriteLogs(t *testing.T) { + timestamp := time.Unix(1234, 4321) + log := "abcdefg\n" + + for c, test := range []struct { + stream streamType + since time.Time + timestamp bool + expectStdout string + expectStderr string + }{ + { // stderr log + stream: stderrType, + expectStderr: log, + }, + { // stdout log + stream: stdoutType, + expectStdout: log, + }, + { // since is after timestamp + stream: stdoutType, + since: timestamp.Add(1 * time.Second), + }, + { // timestamp enabled + stream: stderrType, + timestamp: true, + expectStderr: timestamp.Format(timeFormat) + " " + log, + }, + } { + t.Logf("TestCase #%d: %+v", c, test) + msg := &logMessage{ + timestamp: timestamp, + stream: test.stream, + log: []byte(log), + } + stdoutBuf := bytes.NewBuffer(nil) + stderrBuf := bytes.NewBuffer(nil) + w := newLogWriter(stdoutBuf, stderrBuf, &logOptions{since: test.since, timestamp: test.timestamp, bytes: -1}) + err := w.write(msg) + assert.NoError(t, err) + assert.Equal(t, test.expectStdout, stdoutBuf.String()) + assert.Equal(t, test.expectStderr, stderrBuf.String()) + } +} + +func TestWriteLogsWithBytesLimit(t *testing.T) { + timestamp := time.Unix(1234, 4321) + timestampStr := timestamp.Format(timeFormat) + log := "abcdefg\n" + + for c, test := range []struct { + stdoutLines int + stderrLines int + bytes int + timestamp bool + expectStdout string + expectStderr string + }{ + { // limit bytes less than one line + stdoutLines: 3, + bytes: 3, + expectStdout: "abc", + }, + { // limit bytes accross lines + stdoutLines: 3, + bytes: len(log) + 3, + expectStdout: "abcdefg\nabc", + }, + { // limit bytes more than all lines + stdoutLines: 3, + bytes: 3 * len(log), + expectStdout: "abcdefg\nabcdefg\nabcdefg\n", + }, + { // limit bytes for stderr + stderrLines: 3, + bytes: len(log) + 3, + expectStderr: "abcdefg\nabc", + }, + { // limit bytes for both stdout and stderr, stdout first. + stdoutLines: 1, + stderrLines: 2, + bytes: len(log) + 3, + expectStdout: "abcdefg\n", + expectStderr: "abc", + }, + { // limit bytes with timestamp + stdoutLines: 3, + timestamp: true, + bytes: len(timestampStr) + 1 + len(log) + 2, + expectStdout: timestampStr + " " + log + timestampStr[:2], + }, + } { + t.Logf("TestCase #%d: %+v", c, test) + msg := &logMessage{ + timestamp: timestamp, + log: []byte(log), + } + stdoutBuf := bytes.NewBuffer(nil) + stderrBuf := bytes.NewBuffer(nil) + w := newLogWriter(stdoutBuf, stderrBuf, &logOptions{timestamp: test.timestamp, bytes: int64(test.bytes)}) + for i := 0; i < test.stdoutLines; i++ { + msg.stream = stdoutType + if err := w.write(msg); err != nil { + assert.EqualError(t, err, errMaximumWrite.Error()) + } + } + for i := 0; i < test.stderrLines; i++ { + msg.stream = stderrType + if err := w.write(msg); err != nil { + assert.EqualError(t, err, errMaximumWrite.Error()) + } + } + assert.Equal(t, test.expectStdout, stdoutBuf.String()) + assert.Equal(t, test.expectStderr, stderrBuf.String()) + } +} + +func TestTail(t *testing.T) { + line := strings.Repeat("a", blockSize) + testBytes := []byte(line + "\n" + + line + "\n" + + line + "\n" + + line + "\n" + + line[blockSize/2:]) // incomplete line + + for c, test := range []struct { + n int64 + start int64 + }{ + {n: -1, start: 0}, + {n: 0, start: int64(len(line)+1) * 4}, + {n: 1, start: int64(len(line)+1) * 3}, + {n: 9999, start: 0}, + } { + t.Logf("TestCase #%d: %+v", c, test) + r := bytes.NewReader(testBytes) + s, err := tail(r, test.n) + assert.NoError(t, err) + assert.Equal(t, s, test.start) + } +} From e403ccef75509b3116f50ec4dceb2f9c103c4179 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Wed, 26 Oct 2016 17:37:37 -0700 Subject: [PATCH 3/3] Generate bazel. --- pkg/kubelet/kuberuntime/BUILD | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/kubelet/kuberuntime/BUILD b/pkg/kubelet/kuberuntime/BUILD index 8b87f73325..98b62fb4b6 100644 --- a/pkg/kubelet/kuberuntime/BUILD +++ b/pkg/kubelet/kuberuntime/BUILD @@ -20,6 +20,7 @@ go_library( "kuberuntime_container.go", "kuberuntime_gc.go", "kuberuntime_image.go", + "kuberuntime_logs.go", "kuberuntime_manager.go", "kuberuntime_sandbox.go", "labels.go", @@ -54,6 +55,8 @@ go_library( "//pkg/util/sets:go_default_library", "//pkg/util/term:go_default_library", "//vendor:github.com/coreos/go-semver/semver", + "//vendor:github.com/docker/docker/pkg/jsonlog", + "//vendor:github.com/fsnotify/fsnotify", "//vendor:github.com/golang/glog", "//vendor:github.com/google/cadvisor/info/v1", ], @@ -66,6 +69,7 @@ go_test( "kuberuntime_container_test.go", "kuberuntime_gc_test.go", "kuberuntime_image_test.go", + "kuberuntime_logs_test.go", "kuberuntime_manager_test.go", "kuberuntime_sandbox_test.go", "labels_test.go", @@ -74,6 +78,7 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/api/unversioned:go_default_library", "//pkg/apis/componentconfig:go_default_library", "//pkg/kubelet/api/testing:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library",