k3s-v1.15.3
Robert Krawitz 2018-12-03 18:27:53 -05:00 committed by Robert Krawitz
parent e47fc04adc
commit 022f7c2cd7
9 changed files with 195 additions and 72 deletions

View File

@ -47,6 +47,7 @@ go_library(
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/cache:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/logreduction:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//pkg/securitycontext:go_default_library",
"//pkg/util/parsers:go_default_library",

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/util/logreduction"
)
const (
@ -86,6 +87,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
keyring: keyring,
seccompProfileRoot: fakeSeccompProfileRoot,
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
}
typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion)

View File

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"os"
"sync"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
@ -48,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/logreduction"
)
const (
@ -128,11 +128,7 @@ type kubeGenericRuntimeManager struct {
runtimeClassManager *runtimeclass.Manager
// Cache last per-container error message to reduce log spam
lastError map[string]string
// Time last per-container error message was printed
errorPrinted map[string]time.Time
errorMapLock sync.Mutex
logReduction *logreduction.LogReduction
}
// KubeGenericRuntime is a interface contains interfaces for container runtime and command.
@ -187,8 +183,7 @@ func NewKubeGenericRuntimeManager(
internalLifecycle: internalLifecycle,
legacyLogProvider: legacyLogProvider,
runtimeClassManager: runtimeClassManager,
lastError: make(map[string]string),
errorPrinted: make(map[string]time.Time),
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
}
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)
@ -850,17 +845,6 @@ func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPo
return
}
func (m *kubeGenericRuntimeManager) cleanupErrorTimeouts() {
m.errorMapLock.Lock()
defer m.errorMapLock.Unlock()
for name, timeout := range m.errorPrinted {
if time.Since(timeout) >= identicalErrorDelay {
delete(m.errorPrinted, name)
delete(m.lastError, name)
}
}
}
// GetPodStatus retrieves the status of the pod, including the
// information of all containers in the pod that are visible in Runtime.
func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
@ -909,19 +893,13 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp
// Get statuses of all containers visible in the pod.
containerStatuses, err := m.getPodContainerStatuses(uid, name, namespace)
m.errorMapLock.Lock()
defer m.errorMapLock.Unlock()
if err != nil {
lastMsg, ok := m.lastError[podFullName]
if !ok || err.Error() != lastMsg || time.Since(m.errorPrinted[podFullName]) >= identicalErrorDelay {
if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
klog.Errorf("getPodContainerStatuses for pod %q failed: %v", podFullName, err)
m.errorPrinted[podFullName] = time.Now()
m.lastError[podFullName] = err.Error()
}
return nil, err
}
delete(m.errorPrinted, podFullName)
delete(m.lastError, podFullName)
m.logReduction.ClearID(podFullName)
return &kubecontainer.PodStatus{
ID: uid,

View File

@ -17,6 +17,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/remote",
deps = [
"//pkg/kubelet/util:go_default_library",
"//pkg/kubelet/util/logreduction:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",

View File

@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"
"google.golang.org/grpc"
@ -30,6 +29,7 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/logreduction"
utilexec "k8s.io/utils/exec"
)
@ -38,10 +38,7 @@ type RemoteRuntimeService struct {
timeout time.Duration
runtimeClient runtimeapi.RuntimeServiceClient
// Cache last per-container error message to reduce log spam
lastError map[string]string
// Time last per-container error message was printed
errorPrinted map[string]time.Time
errorMapLock sync.Mutex
logReduction *logreduction.LogReduction
}
const (
@ -68,8 +65,7 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (
return &RemoteRuntimeService{
timeout: connectionTimeout,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
lastError: make(map[string]string),
errorPrinted: make(map[string]time.Time),
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
}, nil
}
@ -238,10 +234,7 @@ func (r *RemoteRuntimeService) StopContainer(containerID string, timeout int64)
ctx, cancel := getContextWithTimeout(t)
defer cancel()
r.errorMapLock.Lock()
delete(r.lastError, containerID)
delete(r.errorPrinted, containerID)
r.errorMapLock.Unlock()
r.logReduction.ClearID(containerID)
_, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{
ContainerId: containerID,
Timeout: timeout,
@ -260,10 +253,7 @@ func (r *RemoteRuntimeService) RemoveContainer(containerID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
r.errorMapLock.Lock()
delete(r.lastError, containerID)
delete(r.errorPrinted, containerID)
r.errorMapLock.Unlock()
r.logReduction.ClearID(containerID)
_, err := r.runtimeClient.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{
ContainerId: containerID,
})
@ -291,18 +281,6 @@ func (r *RemoteRuntimeService) ListContainers(filter *runtimeapi.ContainerFilter
return resp.Containers, nil
}
// Clean up any expired last-error timers
func (r *RemoteRuntimeService) cleanupErrorTimeouts() {
r.errorMapLock.Lock()
defer r.errorMapLock.Unlock()
for ID, timeout := range r.errorPrinted {
if time.Since(timeout) >= identicalErrorDelay {
delete(r.lastError, ID)
delete(r.errorPrinted, ID)
}
}
}
// ContainerStatus returns the container status.
func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
@ -311,21 +289,14 @@ func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi.
resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
ContainerId: containerID,
})
r.cleanupErrorTimeouts()
r.errorMapLock.Lock()
defer r.errorMapLock.Unlock()
if err != nil {
// Don't spam the log with endless messages about the same failure.
lastMsg, ok := r.lastError[containerID]
if !ok || err.Error() != lastMsg || time.Since(r.errorPrinted[containerID]) >= identicalErrorDelay {
if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
klog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err)
r.errorPrinted[containerID] = time.Now()
r.lastError[containerID] = err.Error()
}
return nil, err
}
delete(r.lastError, containerID)
delete(r.errorPrinted, containerID)
r.logReduction.ClearID(containerID)
if resp.Status != nil {
if err := verifyContainerStatus(resp.Status); err != nil {
@ -500,20 +471,13 @@ func (r *RemoteRuntimeService) ContainerStats(containerID string) (*runtimeapi.C
resp, err := r.runtimeClient.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{
ContainerId: containerID,
})
r.cleanupErrorTimeouts()
r.errorMapLock.Lock()
defer r.errorMapLock.Unlock()
if err != nil {
lastMsg, ok := r.lastError[containerID]
if !ok || err.Error() != lastMsg || time.Since(r.errorPrinted[containerID]) >= identicalErrorDelay {
if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
klog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err)
r.errorPrinted[containerID] = time.Now()
r.lastError[containerID] = err.Error()
}
return nil, err
}
delete(r.lastError, containerID)
delete(r.errorPrinted, containerID)
r.logReduction.ClearID(containerID)
return resp.GetStats(), nil
}

View File

@ -79,6 +79,7 @@ filegroup(
"//pkg/kubelet/util/cache:all-srcs",
"//pkg/kubelet/util/format:all-srcs",
"//pkg/kubelet/util/ioutils:all-srcs",
"//pkg/kubelet/util/logreduction:all-srcs",
"//pkg/kubelet/util/manager:all-srcs",
"//pkg/kubelet/util/pluginwatcher:all-srcs",
"//pkg/kubelet/util/queue:all-srcs",

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["logreduction.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/logreduction",
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["logreduction_test.go"],
embed = [":go_default_library"],
)

View File

@ -0,0 +1,78 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logreduction
import (
"sync"
"time"
)
var nowfunc = func() time.Time { return time.Now() }
// LogReduction provides a filter for consecutive identical log messages;
// a message will be printed no more than once per interval.
// If a string of messages is interrupted by a different message,
// the interval timer will be reset.
type LogReduction struct {
lastError map[string]string
errorPrinted map[string]time.Time
errorMapLock sync.Mutex
identicalErrorDelay time.Duration
}
// NewLogReduction returns an initialized LogReduction
func NewLogReduction(identicalErrorDelay time.Duration) *LogReduction {
l := new(LogReduction)
l.lastError = make(map[string]string)
l.errorPrinted = make(map[string]time.Time)
l.identicalErrorDelay = identicalErrorDelay
return l
}
func (l *LogReduction) cleanupErrorTimeouts() {
for name, timeout := range l.errorPrinted {
if nowfunc().Sub(timeout) >= l.identicalErrorDelay {
delete(l.errorPrinted, name)
delete(l.lastError, name)
}
}
}
// ShouldMessageBePrinted determines whether a message should be printed based
// on how long ago this particular message was last printed
func (l *LogReduction) ShouldMessageBePrinted(message string, parentID string) bool {
l.errorMapLock.Lock()
defer l.errorMapLock.Unlock()
l.cleanupErrorTimeouts()
lastMsg, ok := l.lastError[parentID]
lastPrinted, ok1 := l.errorPrinted[parentID]
if !ok || !ok1 || message != lastMsg || nowfunc().Sub(lastPrinted) >= l.identicalErrorDelay {
l.errorPrinted[parentID] = nowfunc()
l.lastError[parentID] = message
return true
}
return false
}
// ClearID clears out log reduction records pertaining to a particular parent
// (e. g. container ID)
func (l *LogReduction) ClearID(parentID string) {
l.errorMapLock.Lock()
defer l.errorMapLock.Unlock()
delete(l.lastError, parentID)
delete(l.errorPrinted, parentID)
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logreduction
import (
"testing"
"time"
)
var time0 = time.Unix(1000, 0)
var time1 = time.Unix(1001, 0)
var time2 = time.Unix(1012, 0)
var identicalErrorDelay = 10 * time.Second
var testCount = 0
const (
mesg1 = "This is a message"
mesg2 = "This is not a message"
id1 = "Container1"
id2 = "Container2"
)
func checkThat(t *testing.T, r *LogReduction, m, id string) {
testCount++
if !r.ShouldMessageBePrinted(m, id) {
t.Errorf("Case %d failed (%s/%s should be printed)", testCount, m, id)
}
}
func checkThatNot(t *testing.T, r *LogReduction, m, id string) {
testCount++
if r.ShouldMessageBePrinted(m, id) {
t.Errorf("Case %d failed (%s/%s should not be printed)", testCount, m, id)
}
}
func TestLogReduction(t *testing.T) {
var timeToReturn = time0
nowfunc = func() time.Time { return timeToReturn }
r := NewLogReduction(identicalErrorDelay)
checkThat(t, r, mesg1, id1) // 1
checkThatNot(t, r, mesg1, id1) // 2
checkThat(t, r, mesg1, id2) // 3
checkThatNot(t, r, mesg1, id1) // 4
timeToReturn = time1
checkThatNot(t, r, mesg1, id1) // 5
timeToReturn = time2
checkThat(t, r, mesg1, id1) // 6
checkThatNot(t, r, mesg1, id1) // 7
checkThat(t, r, mesg2, id1) // 8
checkThat(t, r, mesg1, id1) // 9
checkThat(t, r, mesg1, id2) // 10
r.ClearID(id1)
checkThat(t, r, mesg1, id1) // 11
checkThatNot(t, r, mesg1, id2) // 12
}