diff --git a/hack/.linted_packages b/hack/.linted_packages index cecfd019d3..a1d4546aaa 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -289,6 +289,7 @@ pkg/util/replicaset pkg/util/restoptions pkg/util/runtime pkg/util/sets +pkg/util/tail pkg/util/validation pkg/util/validation/field pkg/util/version diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 3a3650dbbe..dab4f4ec21 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -626,3 +626,19 @@ func (s SortContainerStatusesByCreationTime) Swap(i, j int) { s[i], s[j] = s[j], func (s SortContainerStatusesByCreationTime) Less(i, j int) bool { return s[i].CreatedAt.Before(s[j].CreatedAt) } + +const ( + // MaxPodTerminationMessageLogLength is the maximum bytes any one pod may have written + // as termination message output across all containers. Containers will be evenly truncated + // until output is below this limit. + MaxPodTerminationMessageLogLength = 1024 * 12 + // MaxContainerTerminationMessageLength is the upper bound any one container may write to + // its termination message path. Contents above this length will be truncated. + MaxContainerTerminationMessageLength = 1024 * 4 + // MaxContainerTerminationMessageLogLength is the maximum bytes any one container will + // have written to its termination message when the message is read from the logs. + MaxContainerTerminationMessageLogLength = 1024 * 2 + // MaxContainerTerminationMessageLogLines is the maximum number of previous lines of + // log output that the termination message can contain. + MaxContainerTerminationMessageLogLines = 80 +) diff --git a/pkg/kubelet/dockertools/BUILD b/pkg/kubelet/dockertools/BUILD index d574b11243..25bf8539af 100644 --- a/pkg/kubelet/dockertools/BUILD +++ b/pkg/kubelet/dockertools/BUILD @@ -52,8 +52,10 @@ go_library( "//pkg/util/procfs:go_default_library", "//pkg/util/selinux:go_default_library", "//pkg/util/strings:go_default_library", + "//pkg/util/tail:go_default_library", "//pkg/util/term:go_default_library", "//pkg/util/version:go_default_library", + "//vendor:github.com/armon/circbuf", "//vendor:github.com/docker/distribution/digest", "//vendor:github.com/docker/distribution/reference", "//vendor:github.com/docker/docker/pkg/jsonmessage", diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 653a6fed4c..d972dea199 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -33,6 +33,7 @@ import ( "sync" "time" + "github.com/armon/circbuf" dockertypes "github.com/docker/engine-api/types" dockercontainer "github.com/docker/engine-api/types/container" dockerstrslice "github.com/docker/engine-api/types/strslice" @@ -70,6 +71,7 @@ import ( "k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/selinux" utilstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/util/tail" "k8s.io/kubernetes/pkg/util/term" utilversion "k8s.io/kubernetes/pkg/util/version" ) @@ -482,19 +484,12 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin startedAt = createdAt } - terminationMessagePath := containerInfo.TerminationMessagePath - if terminationMessagePath != "" { - for _, mount := range iResult.Mounts { - if mount.Destination == terminationMessagePath { - path := mount.Source - if data, err := ioutil.ReadFile(path); err != nil { - message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err) - } else { - message = string(data) - } - } - } + // retrieve the termination message from logs, file, or file with fallback to logs in case of failure + fallbackToLogs := containerInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && (iResult.State.ExitCode != 0 || iResult.State.OOMKilled) + if msg := getTerminationMessage(dm.c, iResult, containerInfo.TerminationMessagePath, fallbackToLogs); len(msg) > 0 { + message = msg } + status.State = kubecontainer.ContainerStateExited status.Message = message status.Reason = reason @@ -508,6 +503,49 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin return &status, "", nil } +func getTerminationMessage(c DockerInterface, iResult *dockertypes.ContainerJSON, terminationMessagePath string, fallbackToLogs bool) string { + if len(terminationMessagePath) != 0 { + for _, mount := range iResult.Mounts { + if mount.Destination != terminationMessagePath { + continue + } + path := mount.Source + data, _, err := tail.ReadAtMost(path, kubecontainer.MaxContainerTerminationMessageLength) + if err != nil { + return fmt.Sprintf("Error on reading termination log %s: %v", path, err) + } + if !fallbackToLogs || len(data) != 0 { + return string(data) + } + } + } + if !fallbackToLogs { + return "" + } + + return readLastStringFromContainerLogs(c, iResult.Name) +} + +// readLastStringFromContainerLogs attempts to a certain amount from the end of the logs for containerName. +// It will attempt to avoid reading excessive logs from the server, which may result in underestimating the amount +// of logs to fetch (such that the length of the response message is < max). +func readLastStringFromContainerLogs(c DockerInterface, containerName string) string { + logOptions := dockertypes.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + } + buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) + streamOptions := StreamOptions{ + ErrorStream: buf, + OutputStream: buf, + } + logOptions.Tail = strconv.FormatInt(kubecontainer.MaxContainerTerminationMessageLogLines, 10) + if err := c.Logs(containerName, logOptions, streamOptions); err != nil { + return fmt.Sprintf("Error on reading termination message from logs: %v", err) + } + return buf.String() +} + // makeEnvList converts EnvVar list to a list of strings, in the form of // '=', which can be understood by docker. func makeEnvList(envs []kubecontainer.EnvVar) (result []string) { diff --git a/pkg/kubelet/dockertools/labels.go b/pkg/kubelet/dockertools/labels.go index 3e453cbfae..37edd50fde 100644 --- a/pkg/kubelet/dockertools/labels.go +++ b/pkg/kubelet/dockertools/labels.go @@ -39,11 +39,12 @@ const ( kubernetesPodDeletionGracePeriodLabel = "io.kubernetes.pod.deletionGracePeriod" kubernetesPodTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod" - kubernetesContainerHashLabel = "io.kubernetes.container.hash" - kubernetesContainerRestartCountLabel = "io.kubernetes.container.restartCount" - kubernetesContainerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" - kubernetesContainerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" - kubernetesContainerPortsLabel = "io.kubernetes.container.ports" // Added in 1.4 + kubernetesContainerHashLabel = "io.kubernetes.container.hash" + kubernetesContainerRestartCountLabel = "io.kubernetes.container.restartCount" + kubernetesContainerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" + kubernetesContainerTerminationMessagePolicyLabel = "io.kubernetes.container.terminationMessagePolicy" + kubernetesContainerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" + kubernetesContainerPortsLabel = "io.kubernetes.container.ports" // Added in 1.4 // TODO(random-liu): Keep this for old containers, remove this when we drop support for v1.1. kubernetesPodLabel = "io.kubernetes.pod.data" @@ -63,6 +64,7 @@ type labelledContainerInfo struct { Hash string RestartCount int TerminationMessagePath string + TerminationMessagePolicy v1.TerminationMessagePolicy PreStopHandler *v1.Handler Ports []v1.ContainerPort } @@ -83,6 +85,7 @@ func newLabels(container *v1.Container, pod *v1.Pod, restartCount int, enableCus labels[kubernetesContainerHashLabel] = strconv.FormatUint(kubecontainer.HashContainer(container), 16) labels[kubernetesContainerRestartCountLabel] = strconv.Itoa(restartCount) labels[kubernetesContainerTerminationMessagePathLabel] = container.TerminationMessagePath + labels[kubernetesContainerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy) if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { // Using json enconding so that the PreStop handler object is readable after writing as a label rawPreStop, err := json.Marshal(container.Lifecycle.PreStop) @@ -118,7 +121,8 @@ func getContainerInfoFromLabel(labels map[string]string) *labelledContainerInfo PodUID: kubetypes.UID(getStringValueFromLabel(labels, types.KubernetesPodUIDLabel)), Name: getStringValueFromLabel(labels, types.KubernetesContainerNameLabel), Hash: getStringValueFromLabel(labels, kubernetesContainerHashLabel), - TerminationMessagePath: getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePathLabel), + TerminationMessagePath: getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePathLabel), + TerminationMessagePolicy: v1.TerminationMessagePolicy(getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePolicyLabel)), } if containerInfo.RestartCount, err = getIntValueFromLabel(labels, kubernetesContainerRestartCountLabel); err != nil { logError(containerInfo, kubernetesContainerRestartCountLabel, err) diff --git a/pkg/kubelet/kuberuntime/BUILD b/pkg/kubelet/kuberuntime/BUILD index 25619ef2b3..6861064cfe 100644 --- a/pkg/kubelet/kuberuntime/BUILD +++ b/pkg/kubelet/kuberuntime/BUILD @@ -47,7 +47,9 @@ go_library( "//pkg/securitycontext:go_default_library", "//pkg/util/parsers:go_default_library", "//pkg/util/selinux:go_default_library", + "//pkg/util/tail:go_default_library", "//pkg/util/version:go_default_library", + "//vendor:github.com/armon/circbuf", "//vendor:github.com/docker/docker/pkg/jsonlog", "//vendor:github.com/fsnotify/fsnotify", "//vendor:github.com/golang/glog", diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 529d0ba5e5..128d52292f 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -19,7 +19,6 @@ package kuberuntime import ( "fmt" "io" - "io/ioutil" "math/rand" "net/url" "os" @@ -28,7 +27,9 @@ import ( "sync" "time" + "github.com/armon/circbuf" "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -41,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/util/selinux" + "k8s.io/kubernetes/pkg/util/tail" ) // startContainer starts a container and returns a message indicates why it is failed on error. @@ -334,29 +336,36 @@ func makeUID() string { return fmt.Sprintf("%08x", rand.Uint32()) } -// getTerminationMessage gets termination message of the container. -func getTerminationMessage(status *runtimeapi.ContainerStatus, kubeStatus *kubecontainer.ContainerStatus, terminationMessagePath string) string { - message := "" - - if !kubeStatus.FinishedAt.IsZero() || kubeStatus.ExitCode != 0 { - if terminationMessagePath == "" { - return "" - } - +// getTerminationMessage looks on the filesystem for the provided termination message path, returning a limited +// amount of those bytes, or returns true if the logs should be checked. +func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessagePath string, fallbackToLogs bool) (string, bool) { + if len(terminationMessagePath) != 0 { for _, mount := range status.Mounts { - if mount.ContainerPath == terminationMessagePath { - path := mount.HostPath - if data, err := ioutil.ReadFile(path); err != nil { - message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err) - } else { - message = string(data) - } - break + if mount.ContainerPath != terminationMessagePath { + continue + } + path := mount.HostPath + data, _, err := tail.ReadAtMost(path, kubecontainer.MaxContainerTerminationMessageLength) + if err != nil { + return fmt.Sprintf("Error on reading termination log %s: %v", path, err), false + } + if !fallbackToLogs || len(data) != 0 { + return string(data), false } } } + return "", fallbackToLogs +} - return message +// readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented +// by path. It reads up to max log lines. +func readLastStringFromContainerLogs(path string) string { + value := int64(kubecontainer.MaxContainerTerminationMessageLogLines) + buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) + if err := ReadLogs(path, &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { + return fmt.Sprintf("Error on reading termination message from logs: %v", err) + } + return buf.String() } // getPodContainerStatuses gets all containers' statuses for the pod. @@ -402,13 +411,19 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n cStatus.Message = status.Message cStatus.ExitCode = int(status.ExitCode) cStatus.FinishedAt = time.Unix(0, status.FinishedAt) + + fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && (cStatus.ExitCode != 0 || cStatus.Reason == "OOMKilled") + tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs) + if checkLogs { + path := buildFullContainerLogsPath(uid, labeledInfo.ContainerName, annotatedInfo.RestartCount) + tMessage = readLastStringFromContainerLogs(path) + } + // Use the termination message written by the application is not empty + if len(tMessage) != 0 { + cStatus.Message = tMessage + } } - tMessage := getTerminationMessage(status, cStatus, annotatedInfo.TerminationMessagePath) - // Use the termination message written by the application is not empty - if len(tMessage) != 0 { - cStatus.Message = tMessage - } statuses[i] = cStatus } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs.go b/pkg/kubelet/kuberuntime/kuberuntime_logs.go index 3cc0eb1764..21d81bc6a0 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_logs.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs.go @@ -32,6 +32,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/util/tail" ) // Notice that the current kuberuntime logs implementation doesn't handle @@ -120,7 +121,7 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) opts := newLogOptions(apiOpts, time.Now()) // Search start point based on tail line. - start, err := tail(f, opts.tail) + start, err := tail.FindTailLineStartIndex(f, opts.tail) if err != nil { return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err) } @@ -347,40 +348,3 @@ func (w *logWriter) write(msg *logMessage) error { } return nil } - -// tail returns the start of last nth line. -// * If n < 0, return the beginning of the file. -// * If n >= 0, return the beginning of last nth line. -// Notice that if the last line is incomplete (no end-of-line), it will not be counted -// as one line. -func tail(f io.ReadSeeker, n int64) (int64, error) { - if n < 0 { - return 0, nil - } - size, err := f.Seek(0, os.SEEK_END) - if err != nil { - return 0, err - } - var left, cnt int64 - buf := make([]byte, blockSize) - for right := size; right > 0 && cnt <= n; right -= blockSize { - left = right - blockSize - if left < 0 { - left = 0 - buf = make([]byte, right) - } - if _, err := f.Seek(left, os.SEEK_SET); err != nil { - return 0, err - } - if _, err := f.Read(buf); err != nil { - return 0, err - } - cnt += int64(bytes.Count(buf, eol)) - } - for ; cnt > n; cnt-- { - idx := bytes.Index(buf, eol) + 1 - buf = buf[idx:] - left += int64(idx) - } - return left, nil -} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go b/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go index 9fb07d6aad..1602827a83 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go @@ -18,7 +18,6 @@ package kuberuntime import ( "bytes" - "strings" "testing" "time" @@ -242,28 +241,3 @@ func TestWriteLogsWithBytesLimit(t *testing.T) { assert.Equal(t, test.expectStderr, stderrBuf.String()) } } - -func TestTail(t *testing.T) { - line := strings.Repeat("a", blockSize) - testBytes := []byte(line + "\n" + - line + "\n" + - line + "\n" + - line + "\n" + - line[blockSize/2:]) // incomplete line - - for c, test := range []struct { - n int64 - start int64 - }{ - {n: -1, start: 0}, - {n: 0, start: int64(len(line)+1) * 4}, - {n: 1, start: int64(len(line)+1) * 3}, - {n: 9999, start: 0}, - } { - t.Logf("TestCase #%d: %+v", c, test) - r := bytes.NewReader(testBytes) - s, err := tail(r, test.n) - assert.NoError(t, err) - assert.Equal(t, s, test.start) - } -} diff --git a/pkg/kubelet/kuberuntime/labels.go b/pkg/kubelet/kuberuntime/labels.go index 462442b909..ccd0dc99b0 100644 --- a/pkg/kubelet/kuberuntime/labels.go +++ b/pkg/kubelet/kuberuntime/labels.go @@ -33,11 +33,12 @@ const ( podDeletionGracePeriodLabel = "io.kubernetes.pod.deletionGracePeriod" podTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod" - containerHashLabel = "io.kubernetes.container.hash" - containerRestartCountLabel = "io.kubernetes.container.restartCount" - containerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" - containerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" - containerPortsLabel = "io.kubernetes.container.ports" + containerHashLabel = "io.kubernetes.container.hash" + containerRestartCountLabel = "io.kubernetes.container.restartCount" + containerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" + containerTerminationMessagePolicyLabel = "io.kubernetes.container.terminationMessagePolicy" + containerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" + containerPortsLabel = "io.kubernetes.container.ports" // kubernetesManagedLabel is used to distinguish whether a container/sandbox is managed by kubelet or not kubernetesManagedLabel = "io.kubernetes.managed" @@ -69,6 +70,7 @@ type annotatedContainerInfo struct { PodDeletionGracePeriod *int64 PodTerminationGracePeriod *int64 TerminationMessagePath string + TerminationMessagePolicy v1.TerminationMessagePolicy PreStopHandler *v1.Handler ContainerPorts []v1.ContainerPort } @@ -113,6 +115,7 @@ func newContainerAnnotations(container *v1.Container, pod *v1.Pod, restartCount annotations[containerHashLabel] = strconv.FormatUint(kubecontainer.HashContainer(container), 16) annotations[containerRestartCountLabel] = strconv.Itoa(restartCount) annotations[containerTerminationMessagePathLabel] = container.TerminationMessagePath + annotations[containerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy) if pod.DeletionGracePeriodSeconds != nil { annotations[podDeletionGracePeriodLabel] = strconv.FormatInt(*pod.DeletionGracePeriodSeconds, 10) @@ -192,7 +195,8 @@ func isManagedByKubelet(labels map[string]string) bool { func getContainerInfoFromAnnotations(annotations map[string]string) *annotatedContainerInfo { var err error containerInfo := &annotatedContainerInfo{ - TerminationMessagePath: getStringValueFromLabel(annotations, containerTerminationMessagePathLabel), + TerminationMessagePath: getStringValueFromLabel(annotations, containerTerminationMessagePathLabel), + TerminationMessagePolicy: v1.TerminationMessagePolicy(getStringValueFromLabel(annotations, containerTerminationMessagePolicyLabel)), } if containerInfo.Hash, err = getUint64ValueFromLabel(annotations, containerHashLabel); err != nil { diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index bcb8161375..47a5f3a632 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -513,6 +513,10 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { // kubelet temporarily. // TODO(random-liu): Remove timestamp related logic after apiserver supports nanosecond or makes it consistent. func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { + bytesPerStatus := kubecontainer.MaxPodTerminationMessageLogLength + if containers := len(pod.Spec.Containers) + len(pod.Spec.InitContainers); containers > 0 { + bytesPerStatus = bytesPerStatus / containers + } normalizeTimeStamp := func(t *metav1.Time) { *t = t.Rfc3339Copy() } @@ -523,6 +527,9 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { if c.Terminated != nil { normalizeTimeStamp(&c.Terminated.StartedAt) normalizeTimeStamp(&c.Terminated.FinishedAt) + if len(c.Terminated.Message) > bytesPerStatus { + c.Terminated.Message = c.Terminated.Message[:bytesPerStatus] + } } } diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index c4b5a2b759..9540a3a9d8 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -20,21 +20,21 @@ import ( "fmt" "math/rand" "strconv" + "strings" "testing" "time" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" - "k8s.io/kubernetes/pkg/client/testing/core" - "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + "k8s.io/kubernetes/pkg/client/testing/core" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" @@ -480,6 +480,40 @@ func TestStatusEquality(t *testing.T) { } } +func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) { + pod := v1.Pod{ + Spec: v1.PodSpec{}, + } + containerStatus := []v1.ContainerStatus{} + for i := 0; i < 48; i++ { + s := v1.ContainerStatus{ + Name: fmt.Sprintf("container%d", i), + LastTerminationState: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: strings.Repeat("abcdefgh", int(24+i%3)), + }, + }, + } + containerStatus = append(containerStatus, s) + } + podStatus := v1.PodStatus{ + InitContainerStatuses: containerStatus[:24], + ContainerStatuses: containerStatus[24:], + } + result := normalizeStatus(&pod, &podStatus) + count := 0 + for _, s := range result.InitContainerStatuses { + l := len(s.LastTerminationState.Terminated.Message) + if l < 192 || l > 256 { + t.Errorf("container message had length %d", l) + } + count += l + } + if count > kubecontainer.MaxPodTerminationMessageLogLength { + t.Errorf("message length not truncated") + } +} + func TestStaticPod(t *testing.T) { staticPod := getTestPod() staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} diff --git a/pkg/util/tail/BUILD b/pkg/util/tail/BUILD index a7403fd473..5d7a1b54c4 100644 --- a/pkg/util/tail/BUILD +++ b/pkg/util/tail/BUILD @@ -2,7 +2,11 @@ package(default_visibility = ["//visibility:public"]) licenses(["notice"]) -load("@io_bazel_rules_go//go:def.bzl") +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) filegroup( name = "package-srcs", @@ -16,3 +20,16 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["tail_test.go"], + library = ":go_default_library", + tags = ["automanaged"], +) + +go_library( + name = "go_default_library", + srcs = ["tail.go"], + tags = ["automanaged"], +) diff --git a/pkg/util/tail/tail.go b/pkg/util/tail/tail.go new file mode 100644 index 0000000000..23ad4ae791 --- /dev/null +++ b/pkg/util/tail/tail.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tail + +import ( + "bytes" + "io" + "io/ioutil" + "os" +) + +const ( + // blockSize is the block size used in tail. + blockSize = 1024 +) + +var ( + // eol is the end-of-line sign in the log. + eol = []byte{'\n'} +) + +// ReadAtMost reads at most max bytes from the end of the file identified by path or +// returns an error. It returns true if the file was longer than max. It will +// allocate up to max bytes. +func ReadAtMost(path string, max int64) ([]byte, bool, error) { + f, err := os.Open(path) + if err != nil { + return nil, false, err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return nil, false, err + } + size := fi.Size() + if size == 0 { + return nil, false, nil + } + if size < max { + max = size + } + offset, err := f.Seek(-max, os.SEEK_END) + if err != nil { + return nil, false, err + } + data, err := ioutil.ReadAll(f) + return data, offset > 0, err +} + +// FindTailLineStartIndex returns the start of last nth line. +// * If n < 0, return the beginning of the file. +// * If n >= 0, return the beginning of last nth line. +// Notice that if the last line is incomplete (no end-of-line), it will not be counted +// as one line. +func FindTailLineStartIndex(f io.ReadSeeker, n int64) (int64, error) { + if n < 0 { + return 0, nil + } + size, err := f.Seek(0, os.SEEK_END) + if err != nil { + return 0, err + } + var left, cnt int64 + buf := make([]byte, blockSize) + for right := size; right > 0 && cnt <= n; right -= blockSize { + left = right - blockSize + if left < 0 { + left = 0 + buf = make([]byte, right) + } + if _, err := f.Seek(left, os.SEEK_SET); err != nil { + return 0, err + } + if _, err := f.Read(buf); err != nil { + return 0, err + } + cnt += int64(bytes.Count(buf, eol)) + } + for ; cnt > n; cnt-- { + idx := bytes.Index(buf, eol) + 1 + buf = buf[idx:] + left += int64(idx) + } + return left, nil +} diff --git a/pkg/util/tail/tail_test.go b/pkg/util/tail/tail_test.go new file mode 100644 index 0000000000..18d3f90c21 --- /dev/null +++ b/pkg/util/tail/tail_test.go @@ -0,0 +1,52 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tail + +import ( + "bytes" + "strings" + "testing" +) + +func TestTail(t *testing.T) { + line := strings.Repeat("a", blockSize) + testBytes := []byte(line + "\n" + + line + "\n" + + line + "\n" + + line + "\n" + + line[blockSize/2:]) // incomplete line + + for c, test := range []struct { + n int64 + start int64 + }{ + {n: -1, start: 0}, + {n: 0, start: int64(len(line)+1) * 4}, + {n: 1, start: int64(len(line)+1) * 3}, + {n: 9999, start: 0}, + } { + t.Logf("TestCase #%d: %+v", c, test) + r := bytes.NewReader(testBytes) + s, err := FindTailLineStartIndex(r, test.n) + if err != nil { + t.Error(err) + } + if s != test.start { + t.Errorf("%d != %d", s, test.start) + } + } +}