Limit the size of the termination log and allow log input

Enforce the following limits:

12kb for total message length in container status
4kb for the termination message path file
2kb or 80 lines (whichever is shorter) from the log on error

Fallback to log output if the user requests it.
pull/6/head
Clayton Coleman 2016-12-07 15:56:06 -05:00
parent eff134cd5f
commit 2bb2604f0b
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
15 changed files with 347 additions and 118 deletions

View File

@ -289,6 +289,7 @@ pkg/util/replicaset
pkg/util/restoptions
pkg/util/runtime
pkg/util/sets
pkg/util/tail
pkg/util/validation
pkg/util/validation/field
pkg/util/version

View File

@ -626,3 +626,19 @@ func (s SortContainerStatusesByCreationTime) Swap(i, j int) { s[i], s[j] = s[j],
func (s SortContainerStatusesByCreationTime) Less(i, j int) bool {
return s[i].CreatedAt.Before(s[j].CreatedAt)
}
const (
// MaxPodTerminationMessageLogLength is the maximum bytes any one pod may have written
// as termination message output across all containers. Containers will be evenly truncated
// until output is below this limit.
MaxPodTerminationMessageLogLength = 1024 * 12
// MaxContainerTerminationMessageLength is the upper bound any one container may write to
// its termination message path. Contents above this length will be truncated.
MaxContainerTerminationMessageLength = 1024 * 4
// MaxContainerTerminationMessageLogLength is the maximum bytes any one container will
// have written to its termination message when the message is read from the logs.
MaxContainerTerminationMessageLogLength = 1024 * 2
// MaxContainerTerminationMessageLogLines is the maximum number of previous lines of
// log output that the termination message can contain.
MaxContainerTerminationMessageLogLines = 80
)

View File

@ -52,8 +52,10 @@ go_library(
"//pkg/util/procfs:go_default_library",
"//pkg/util/selinux:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/util/tail:go_default_library",
"//pkg/util/term:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor:github.com/armon/circbuf",
"//vendor:github.com/docker/distribution/digest",
"//vendor:github.com/docker/distribution/reference",
"//vendor:github.com/docker/docker/pkg/jsonmessage",

View File

@ -33,6 +33,7 @@ import (
"sync"
"time"
"github.com/armon/circbuf"
dockertypes "github.com/docker/engine-api/types"
dockercontainer "github.com/docker/engine-api/types/container"
dockerstrslice "github.com/docker/engine-api/types/strslice"
@ -70,6 +71,7 @@ import (
"k8s.io/kubernetes/pkg/util/procfs"
"k8s.io/kubernetes/pkg/util/selinux"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/tail"
"k8s.io/kubernetes/pkg/util/term"
utilversion "k8s.io/kubernetes/pkg/util/version"
)
@ -482,19 +484,12 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin
startedAt = createdAt
}
terminationMessagePath := containerInfo.TerminationMessagePath
if terminationMessagePath != "" {
for _, mount := range iResult.Mounts {
if mount.Destination == terminationMessagePath {
path := mount.Source
if data, err := ioutil.ReadFile(path); err != nil {
message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err)
} else {
message = string(data)
}
}
}
// retrieve the termination message from logs, file, or file with fallback to logs in case of failure
fallbackToLogs := containerInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && (iResult.State.ExitCode != 0 || iResult.State.OOMKilled)
if msg := getTerminationMessage(dm.c, iResult, containerInfo.TerminationMessagePath, fallbackToLogs); len(msg) > 0 {
message = msg
}
status.State = kubecontainer.ContainerStateExited
status.Message = message
status.Reason = reason
@ -508,6 +503,49 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin
return &status, "", nil
}
func getTerminationMessage(c DockerInterface, iResult *dockertypes.ContainerJSON, terminationMessagePath string, fallbackToLogs bool) string {
if len(terminationMessagePath) != 0 {
for _, mount := range iResult.Mounts {
if mount.Destination != terminationMessagePath {
continue
}
path := mount.Source
data, _, err := tail.ReadAtMost(path, kubecontainer.MaxContainerTerminationMessageLength)
if err != nil {
return fmt.Sprintf("Error on reading termination log %s: %v", path, err)
}
if !fallbackToLogs || len(data) != 0 {
return string(data)
}
}
}
if !fallbackToLogs {
return ""
}
return readLastStringFromContainerLogs(c, iResult.Name)
}
// readLastStringFromContainerLogs attempts to a certain amount from the end of the logs for containerName.
// It will attempt to avoid reading excessive logs from the server, which may result in underestimating the amount
// of logs to fetch (such that the length of the response message is < max).
func readLastStringFromContainerLogs(c DockerInterface, containerName string) string {
logOptions := dockertypes.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
}
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
streamOptions := StreamOptions{
ErrorStream: buf,
OutputStream: buf,
}
logOptions.Tail = strconv.FormatInt(kubecontainer.MaxContainerTerminationMessageLogLines, 10)
if err := c.Logs(containerName, logOptions, streamOptions); err != nil {
return fmt.Sprintf("Error on reading termination message from logs: %v", err)
}
return buf.String()
}
// makeEnvList converts EnvVar list to a list of strings, in the form of
// '<key>=<value>', which can be understood by docker.
func makeEnvList(envs []kubecontainer.EnvVar) (result []string) {

View File

@ -39,11 +39,12 @@ const (
kubernetesPodDeletionGracePeriodLabel = "io.kubernetes.pod.deletionGracePeriod"
kubernetesPodTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod"
kubernetesContainerHashLabel = "io.kubernetes.container.hash"
kubernetesContainerRestartCountLabel = "io.kubernetes.container.restartCount"
kubernetesContainerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath"
kubernetesContainerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler"
kubernetesContainerPortsLabel = "io.kubernetes.container.ports" // Added in 1.4
kubernetesContainerHashLabel = "io.kubernetes.container.hash"
kubernetesContainerRestartCountLabel = "io.kubernetes.container.restartCount"
kubernetesContainerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath"
kubernetesContainerTerminationMessagePolicyLabel = "io.kubernetes.container.terminationMessagePolicy"
kubernetesContainerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler"
kubernetesContainerPortsLabel = "io.kubernetes.container.ports" // Added in 1.4
// TODO(random-liu): Keep this for old containers, remove this when we drop support for v1.1.
kubernetesPodLabel = "io.kubernetes.pod.data"
@ -63,6 +64,7 @@ type labelledContainerInfo struct {
Hash string
RestartCount int
TerminationMessagePath string
TerminationMessagePolicy v1.TerminationMessagePolicy
PreStopHandler *v1.Handler
Ports []v1.ContainerPort
}
@ -83,6 +85,7 @@ func newLabels(container *v1.Container, pod *v1.Pod, restartCount int, enableCus
labels[kubernetesContainerHashLabel] = strconv.FormatUint(kubecontainer.HashContainer(container), 16)
labels[kubernetesContainerRestartCountLabel] = strconv.Itoa(restartCount)
labels[kubernetesContainerTerminationMessagePathLabel] = container.TerminationMessagePath
labels[kubernetesContainerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy)
if container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
// Using json enconding so that the PreStop handler object is readable after writing as a label
rawPreStop, err := json.Marshal(container.Lifecycle.PreStop)
@ -118,7 +121,8 @@ func getContainerInfoFromLabel(labels map[string]string) *labelledContainerInfo
PodUID: kubetypes.UID(getStringValueFromLabel(labels, types.KubernetesPodUIDLabel)),
Name: getStringValueFromLabel(labels, types.KubernetesContainerNameLabel),
Hash: getStringValueFromLabel(labels, kubernetesContainerHashLabel),
TerminationMessagePath: getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePathLabel),
TerminationMessagePath: getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePathLabel),
TerminationMessagePolicy: v1.TerminationMessagePolicy(getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePolicyLabel)),
}
if containerInfo.RestartCount, err = getIntValueFromLabel(labels, kubernetesContainerRestartCountLabel); err != nil {
logError(containerInfo, kubernetesContainerRestartCountLabel, err)

View File

@ -47,7 +47,9 @@ go_library(
"//pkg/securitycontext:go_default_library",
"//pkg/util/parsers:go_default_library",
"//pkg/util/selinux:go_default_library",
"//pkg/util/tail:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor:github.com/armon/circbuf",
"//vendor:github.com/docker/docker/pkg/jsonlog",
"//vendor:github.com/fsnotify/fsnotify",
"//vendor:github.com/golang/glog",

View File

@ -19,7 +19,6 @@ package kuberuntime
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/url"
"os"
@ -28,7 +27,9 @@ import (
"sync"
"time"
"github.com/armon/circbuf"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -41,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/tail"
)
// startContainer starts a container and returns a message indicates why it is failed on error.
@ -334,29 +336,36 @@ func makeUID() string {
return fmt.Sprintf("%08x", rand.Uint32())
}
// getTerminationMessage gets termination message of the container.
func getTerminationMessage(status *runtimeapi.ContainerStatus, kubeStatus *kubecontainer.ContainerStatus, terminationMessagePath string) string {
message := ""
if !kubeStatus.FinishedAt.IsZero() || kubeStatus.ExitCode != 0 {
if terminationMessagePath == "" {
return ""
}
// getTerminationMessage looks on the filesystem for the provided termination message path, returning a limited
// amount of those bytes, or returns true if the logs should be checked.
func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessagePath string, fallbackToLogs bool) (string, bool) {
if len(terminationMessagePath) != 0 {
for _, mount := range status.Mounts {
if mount.ContainerPath == terminationMessagePath {
path := mount.HostPath
if data, err := ioutil.ReadFile(path); err != nil {
message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err)
} else {
message = string(data)
}
break
if mount.ContainerPath != terminationMessagePath {
continue
}
path := mount.HostPath
data, _, err := tail.ReadAtMost(path, kubecontainer.MaxContainerTerminationMessageLength)
if err != nil {
return fmt.Sprintf("Error on reading termination log %s: %v", path, err), false
}
if !fallbackToLogs || len(data) != 0 {
return string(data), false
}
}
}
return "", fallbackToLogs
}
return message
// readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented
// by path. It reads up to max log lines.
func readLastStringFromContainerLogs(path string) string {
value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
if err := ReadLogs(path, &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
return fmt.Sprintf("Error on reading termination message from logs: %v", err)
}
return buf.String()
}
// getPodContainerStatuses gets all containers' statuses for the pod.
@ -402,13 +411,19 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n
cStatus.Message = status.Message
cStatus.ExitCode = int(status.ExitCode)
cStatus.FinishedAt = time.Unix(0, status.FinishedAt)
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && (cStatus.ExitCode != 0 || cStatus.Reason == "OOMKilled")
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
path := buildFullContainerLogsPath(uid, labeledInfo.ContainerName, annotatedInfo.RestartCount)
tMessage = readLastStringFromContainerLogs(path)
}
// Use the termination message written by the application is not empty
if len(tMessage) != 0 {
cStatus.Message = tMessage
}
}
tMessage := getTerminationMessage(status, cStatus, annotatedInfo.TerminationMessagePath)
// Use the termination message written by the application is not empty
if len(tMessage) != 0 {
cStatus.Message = tMessage
}
statuses[i] = cStatus
}

View File

@ -32,6 +32,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/tail"
)
// Notice that the current kuberuntime logs implementation doesn't handle
@ -120,7 +121,7 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer)
opts := newLogOptions(apiOpts, time.Now())
// Search start point based on tail line.
start, err := tail(f, opts.tail)
start, err := tail.FindTailLineStartIndex(f, opts.tail)
if err != nil {
return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err)
}
@ -347,40 +348,3 @@ func (w *logWriter) write(msg *logMessage) error {
}
return nil
}
// tail returns the start of last nth line.
// * If n < 0, return the beginning of the file.
// * If n >= 0, return the beginning of last nth line.
// Notice that if the last line is incomplete (no end-of-line), it will not be counted
// as one line.
func tail(f io.ReadSeeker, n int64) (int64, error) {
if n < 0 {
return 0, nil
}
size, err := f.Seek(0, os.SEEK_END)
if err != nil {
return 0, err
}
var left, cnt int64
buf := make([]byte, blockSize)
for right := size; right > 0 && cnt <= n; right -= blockSize {
left = right - blockSize
if left < 0 {
left = 0
buf = make([]byte, right)
}
if _, err := f.Seek(left, os.SEEK_SET); err != nil {
return 0, err
}
if _, err := f.Read(buf); err != nil {
return 0, err
}
cnt += int64(bytes.Count(buf, eol))
}
for ; cnt > n; cnt-- {
idx := bytes.Index(buf, eol) + 1
buf = buf[idx:]
left += int64(idx)
}
return left, nil
}

View File

@ -18,7 +18,6 @@ package kuberuntime
import (
"bytes"
"strings"
"testing"
"time"
@ -242,28 +241,3 @@ func TestWriteLogsWithBytesLimit(t *testing.T) {
assert.Equal(t, test.expectStderr, stderrBuf.String())
}
}
func TestTail(t *testing.T) {
line := strings.Repeat("a", blockSize)
testBytes := []byte(line + "\n" +
line + "\n" +
line + "\n" +
line + "\n" +
line[blockSize/2:]) // incomplete line
for c, test := range []struct {
n int64
start int64
}{
{n: -1, start: 0},
{n: 0, start: int64(len(line)+1) * 4},
{n: 1, start: int64(len(line)+1) * 3},
{n: 9999, start: 0},
} {
t.Logf("TestCase #%d: %+v", c, test)
r := bytes.NewReader(testBytes)
s, err := tail(r, test.n)
assert.NoError(t, err)
assert.Equal(t, s, test.start)
}
}

View File

@ -33,11 +33,12 @@ const (
podDeletionGracePeriodLabel = "io.kubernetes.pod.deletionGracePeriod"
podTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod"
containerHashLabel = "io.kubernetes.container.hash"
containerRestartCountLabel = "io.kubernetes.container.restartCount"
containerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath"
containerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler"
containerPortsLabel = "io.kubernetes.container.ports"
containerHashLabel = "io.kubernetes.container.hash"
containerRestartCountLabel = "io.kubernetes.container.restartCount"
containerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath"
containerTerminationMessagePolicyLabel = "io.kubernetes.container.terminationMessagePolicy"
containerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler"
containerPortsLabel = "io.kubernetes.container.ports"
// kubernetesManagedLabel is used to distinguish whether a container/sandbox is managed by kubelet or not
kubernetesManagedLabel = "io.kubernetes.managed"
@ -69,6 +70,7 @@ type annotatedContainerInfo struct {
PodDeletionGracePeriod *int64
PodTerminationGracePeriod *int64
TerminationMessagePath string
TerminationMessagePolicy v1.TerminationMessagePolicy
PreStopHandler *v1.Handler
ContainerPorts []v1.ContainerPort
}
@ -113,6 +115,7 @@ func newContainerAnnotations(container *v1.Container, pod *v1.Pod, restartCount
annotations[containerHashLabel] = strconv.FormatUint(kubecontainer.HashContainer(container), 16)
annotations[containerRestartCountLabel] = strconv.Itoa(restartCount)
annotations[containerTerminationMessagePathLabel] = container.TerminationMessagePath
annotations[containerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy)
if pod.DeletionGracePeriodSeconds != nil {
annotations[podDeletionGracePeriodLabel] = strconv.FormatInt(*pod.DeletionGracePeriodSeconds, 10)
@ -192,7 +195,8 @@ func isManagedByKubelet(labels map[string]string) bool {
func getContainerInfoFromAnnotations(annotations map[string]string) *annotatedContainerInfo {
var err error
containerInfo := &annotatedContainerInfo{
TerminationMessagePath: getStringValueFromLabel(annotations, containerTerminationMessagePathLabel),
TerminationMessagePath: getStringValueFromLabel(annotations, containerTerminationMessagePathLabel),
TerminationMessagePolicy: v1.TerminationMessagePolicy(getStringValueFromLabel(annotations, containerTerminationMessagePolicyLabel)),
}
if containerInfo.Hash, err = getUint64ValueFromLabel(annotations, containerHashLabel); err != nil {

View File

@ -513,6 +513,10 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
// kubelet temporarily.
// TODO(random-liu): Remove timestamp related logic after apiserver supports nanosecond or makes it consistent.
func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
bytesPerStatus := kubecontainer.MaxPodTerminationMessageLogLength
if containers := len(pod.Spec.Containers) + len(pod.Spec.InitContainers); containers > 0 {
bytesPerStatus = bytesPerStatus / containers
}
normalizeTimeStamp := func(t *metav1.Time) {
*t = t.Rfc3339Copy()
}
@ -523,6 +527,9 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
if c.Terminated != nil {
normalizeTimeStamp(&c.Terminated.StartedAt)
normalizeTimeStamp(&c.Terminated.FinishedAt)
if len(c.Terminated.Message) > bytesPerStatus {
c.Terminated.Message = c.Terminated.Message[:bytesPerStatus]
}
}
}

View File

@ -20,21 +20,21 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
@ -480,6 +480,40 @@ func TestStatusEquality(t *testing.T) {
}
}
func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) {
pod := v1.Pod{
Spec: v1.PodSpec{},
}
containerStatus := []v1.ContainerStatus{}
for i := 0; i < 48; i++ {
s := v1.ContainerStatus{
Name: fmt.Sprintf("container%d", i),
LastTerminationState: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Message: strings.Repeat("abcdefgh", int(24+i%3)),
},
},
}
containerStatus = append(containerStatus, s)
}
podStatus := v1.PodStatus{
InitContainerStatuses: containerStatus[:24],
ContainerStatuses: containerStatus[24:],
}
result := normalizeStatus(&pod, &podStatus)
count := 0
for _, s := range result.InitContainerStatuses {
l := len(s.LastTerminationState.Terminated.Message)
if l < 192 || l > 256 {
t.Errorf("container message had length %d", l)
}
count += l
}
if count > kubecontainer.MaxPodTerminationMessageLogLength {
t.Errorf("message length not truncated")
}
}
func TestStaticPod(t *testing.T) {
staticPod := getTestPod()
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}

View File

@ -2,7 +2,11 @@ package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load("@io_bazel_rules_go//go:def.bzl")
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
filegroup(
name = "package-srcs",
@ -16,3 +20,16 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["tail_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
)
go_library(
name = "go_default_library",
srcs = ["tail.go"],
tags = ["automanaged"],
)

99
pkg/util/tail/tail.go Normal file
View File

@ -0,0 +1,99 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tail
import (
"bytes"
"io"
"io/ioutil"
"os"
)
const (
// blockSize is the block size used in tail.
blockSize = 1024
)
var (
// eol is the end-of-line sign in the log.
eol = []byte{'\n'}
)
// ReadAtMost reads at most max bytes from the end of the file identified by path or
// returns an error. It returns true if the file was longer than max. It will
// allocate up to max bytes.
func ReadAtMost(path string, max int64) ([]byte, bool, error) {
f, err := os.Open(path)
if err != nil {
return nil, false, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, false, err
}
size := fi.Size()
if size == 0 {
return nil, false, nil
}
if size < max {
max = size
}
offset, err := f.Seek(-max, os.SEEK_END)
if err != nil {
return nil, false, err
}
data, err := ioutil.ReadAll(f)
return data, offset > 0, err
}
// FindTailLineStartIndex returns the start of last nth line.
// * If n < 0, return the beginning of the file.
// * If n >= 0, return the beginning of last nth line.
// Notice that if the last line is incomplete (no end-of-line), it will not be counted
// as one line.
func FindTailLineStartIndex(f io.ReadSeeker, n int64) (int64, error) {
if n < 0 {
return 0, nil
}
size, err := f.Seek(0, os.SEEK_END)
if err != nil {
return 0, err
}
var left, cnt int64
buf := make([]byte, blockSize)
for right := size; right > 0 && cnt <= n; right -= blockSize {
left = right - blockSize
if left < 0 {
left = 0
buf = make([]byte, right)
}
if _, err := f.Seek(left, os.SEEK_SET); err != nil {
return 0, err
}
if _, err := f.Read(buf); err != nil {
return 0, err
}
cnt += int64(bytes.Count(buf, eol))
}
for ; cnt > n; cnt-- {
idx := bytes.Index(buf, eol) + 1
buf = buf[idx:]
left += int64(idx)
}
return left, nil
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tail
import (
"bytes"
"strings"
"testing"
)
func TestTail(t *testing.T) {
line := strings.Repeat("a", blockSize)
testBytes := []byte(line + "\n" +
line + "\n" +
line + "\n" +
line + "\n" +
line[blockSize/2:]) // incomplete line
for c, test := range []struct {
n int64
start int64
}{
{n: -1, start: 0},
{n: 0, start: int64(len(line)+1) * 4},
{n: 1, start: int64(len(line)+1) * 3},
{n: 9999, start: 0},
} {
t.Logf("TestCase #%d: %+v", c, test)
r := bytes.NewReader(testBytes)
s, err := FindTailLineStartIndex(r, test.n)
if err != nil {
t.Error(err)
}
if s != test.start {
t.Errorf("%d != %d", s, test.start)
}
}
}