fix memcg fd leak

pull/8/head
David Ashpole 2018-06-11 11:37:50 -07:00
parent 796b31edcc
commit 93b6d026d9
11 changed files with 767 additions and 230 deletions

View File

@ -11,6 +11,8 @@ go_test(
srcs = [
"eviction_manager_test.go",
"helpers_test.go",
"memory_threshold_notifier_test.go",
"mock_threshold_notifier_test.go",
],
embed = [":go_default_library"],
deps = [
@ -20,6 +22,7 @@ go_test(
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -36,6 +39,7 @@ go_library(
"doc.go",
"eviction_manager.go",
"helpers.go",
"memory_threshold_notifier.go",
"types.go",
] + select({
"@io_bazel_rules_go//go/platform:android": [

View File

@ -19,7 +19,6 @@ package eviction
import (
"fmt"
"sort"
"strconv"
"sync"
"time"
@ -35,7 +34,6 @@ import (
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@ -85,10 +83,12 @@ type managerImpl struct {
signalToNodeReclaimFuncs map[evictionapi.Signal]nodeReclaimFuncs
// last observations from synchronize
lastObservations signalObservations
// notifierStopCh is a channel used to stop all thresholdNotifiers
notifierStopCh ThresholdStopCh
// dedicatedImageFs indicates if imagefs is on a separate device from the rootfs
dedicatedImageFs *bool
// thresholdNotifiers is a list of memory threshold notifiers which each notify for a memory eviction threshold
thresholdNotifiers []ThresholdNotifier
// thresholdsLastUpdated is the last time the thresholdNotifiers were updated.
thresholdsLastUpdated time.Time
}
// ensure it implements the required interface
@ -116,8 +116,8 @@ func NewManager(
nodeRef: nodeRef,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
notifierStopCh: NewInitialStopCh(clock),
dedicatedImageFs: nil,
thresholdNotifiers: []ThresholdNotifier{},
}
return manager, manager
}
@ -163,6 +163,23 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd
// Start starts the control loop to observe and response to low compute resources.
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
thresholdHandler := func(message string) {
glog.Infof(message)
m.synchronize(diskInfoProvider, podFunc)
}
if m.config.KernelMemcgNotification {
for _, threshold := range m.config.Thresholds {
if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
if err != nil {
glog.Warningf("eviction manager: failed to create memory threshold notifier: %v", err)
} else {
go notifier.Start()
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
}
}
}
}
// start the eviction manager monitoring
go func() {
for {
@ -197,51 +214,6 @@ func (m *managerImpl) IsUnderPIDPressure() bool {
return hasNodeCondition(m.nodeConditions, v1.NodePIDPressure)
}
func (m *managerImpl) startMemoryThresholdNotifier(summary *statsapi.Summary, hard, allocatable bool, handler thresholdNotifierHandlerFunc) error {
for _, threshold := range m.config.Thresholds {
if threshold.Signal != evictionapi.SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) {
continue
}
cgroups, err := cm.GetCgroupSubsystems()
if err != nil {
return err
}
cgpath, found := cgroups.MountPoints["memory"]
if !found || len(cgpath) == 0 {
return fmt.Errorf("memory cgroup mount point not found")
}
attribute := "memory.usage_in_bytes"
memoryStats := summary.Node.Memory
if allocatable {
cgpath += m.config.PodCgroupRoot
allocatableContainer, err := getSysContainer(summary.Node.SystemContainers, statsapi.SystemContainerPods)
if err != nil {
return err
}
memoryStats = allocatableContainer.Memory
}
if memoryStats == nil || memoryStats.UsageBytes == nil || memoryStats.WorkingSetBytes == nil || memoryStats.AvailableBytes == nil {
return fmt.Errorf("summary was incomplete")
}
// Set threshold on usage to capacity - eviction_hard + inactive_file,
// since we want to be notified when working_set = capacity - eviction_hard
inactiveFile := resource.NewQuantity(int64(*memoryStats.UsageBytes-*memoryStats.WorkingSetBytes), resource.BinarySI)
capacity := resource.NewQuantity(int64(*memoryStats.AvailableBytes+*memoryStats.WorkingSetBytes), resource.BinarySI)
evictionThresholdQuantity := evictionapi.GetThresholdQuantity(threshold.Value, capacity)
memcgThreshold := capacity.DeepCopy()
memcgThreshold.Sub(*evictionThresholdQuantity)
memcgThreshold.Add(*inactiveFile)
description := fmt.Sprintf("<%s available", formatThresholdValue(threshold.Value))
memcgThresholdNotifier, err := NewMemCGThresholdNotifier(cgpath, attribute, strconv.FormatInt(memcgThreshold.Value(), 10), description, handler)
if err != nil {
return err
}
go memcgThresholdNotifier.Start(m.notifierStopCh)
return nil
}
return nil
}
// synchronize is the main control loop that enforces eviction thresholds.
// Returns the pod that was killed, or nil if no pod was killed.
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
@ -272,41 +244,12 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
return nil
}
// attempt to create a threshold notifier to improve eviction response time
if m.config.KernelMemcgNotification && m.notifierStopCh.Reset() {
glog.V(4).Infof("eviction manager attempting to integrate with kernel memcg notification api")
// start soft memory notification
err = m.startMemoryThresholdNotifier(summary, false, false, func(desc string) {
glog.Infof("soft memory eviction threshold crossed at %s", desc)
// TODO wait grace period for soft memory limit
m.synchronize(diskInfoProvider, podFunc)
})
if err != nil {
glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err)
} // start soft memory notification
err = m.startMemoryThresholdNotifier(summary, false, true, func(desc string) {
glog.Infof("soft allocatable memory eviction threshold crossed at %s", desc)
// TODO wait grace period for soft memory limit
m.synchronize(diskInfoProvider, podFunc)
})
if err != nil {
glog.Warningf("eviction manager: failed to create allocatable soft memory threshold notifier: %v", err)
}
// start hard memory notification
err = m.startMemoryThresholdNotifier(summary, true, false, func(desc string) {
glog.Infof("hard memory eviction threshold crossed at %s", desc)
m.synchronize(diskInfoProvider, podFunc)
})
if err != nil {
glog.Warningf("eviction manager: failed to create hard memory threshold notifier: %v", err)
}
// start hard memory notification
err = m.startMemoryThresholdNotifier(summary, true, true, func(desc string) {
glog.Infof("hard allocatable memory eviction threshold crossed at %s", desc)
m.synchronize(diskInfoProvider, podFunc)
})
if err != nil {
glog.Warningf("eviction manager: failed to create hard allocatable memory threshold notifier: %v", err)
if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
m.thresholdsLastUpdated = m.clock.Now()
for _, notifier := range m.thresholdNotifiers {
if err := notifier.UpdateThreshold(summary); err != nil {
glog.Warningf("eviction manager: failed to update %s: %v", notifier.Description(), err)
}
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package eviction
import (
"fmt"
"testing"
"time"
@ -1434,3 +1435,74 @@ func TestAllocatableMemoryPressure(t *testing.T) {
}
}
}
func TestUpdateMemcgThreshold(t *testing.T) {
activePodsFunc := func() []*v1.Pod {
return []*v1.Pod{}
}
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
diskGC := &mockDiskGC{err: nil}
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
config := Config{
MaxPodGracePeriodSeconds: 5,
PressureTransitionPeriod: time.Minute * 5,
Thresholds: []evictionapi.Threshold{
{
Signal: evictionapi.SignalMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("1Gi"),
},
},
},
PodCgroupRoot: "kubepods",
}
summaryProvider := &fakeSummaryProvider{result: makeMemoryStats("2Gi", map[*v1.Pod]statsapi.PodStats{})}
thresholdNotifier := &MockThresholdNotifier{}
thresholdNotifier.On("UpdateThreshold", summaryProvider.result).Return(nil).Twice()
manager := &managerImpl{
clock: fakeClock,
killPodFunc: podKiller.killPodNow,
imageGC: diskGC,
containerGC: diskGC,
config: config,
recorder: &record.FakeRecorder{},
summaryProvider: summaryProvider,
nodeRef: nodeRef,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
thresholdNotifiers: []ThresholdNotifier{thresholdNotifier},
}
manager.synchronize(diskInfoProvider, activePodsFunc)
// The UpdateThreshold method should have been called once, since this is the first run.
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1)
manager.synchronize(diskInfoProvider, activePodsFunc)
// The UpdateThreshold method should not have been called again, since not enough time has passed
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1)
fakeClock.Step(2 * notifierRefreshInterval)
manager.synchronize(diskInfoProvider, activePodsFunc)
// The UpdateThreshold method should be called again since enough time has passed
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 2)
// new memory threshold notifier that returns an error
thresholdNotifier = &MockThresholdNotifier{}
thresholdNotifier.On("UpdateThreshold", summaryProvider.result).Return(fmt.Errorf("error updating threshold"))
thresholdNotifier.On("Description").Return("mock thresholdNotifier").Once()
manager.thresholdNotifiers = []ThresholdNotifier{thresholdNotifier}
fakeClock.Step(2 * notifierRefreshInterval)
manager.synchronize(diskInfoProvider, activePodsFunc)
// The UpdateThreshold method should be called because at least notifierRefreshInterval time has passed.
// The Description method should be called because UpdateThreshold returned an error
thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1)
thresholdNotifier.AssertNumberOfCalls(t, "Description", 1)
}

View File

@ -21,13 +21,11 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
@ -52,9 +50,6 @@ const (
emptyDirMessage = "Usage of EmptyDir volume %q exceeds the limit %q. "
// inodes, number. internal to this module, used to account for local disk inode consumption.
resourceInodes v1.ResourceName = "inodes"
// this prevents constantly updating the memcg notifier if synchronize
// is run frequently.
notifierRefreshInterval = 10 * time.Second
// OffendingContainersKey is the key in eviction event annotations for the list of container names which exceeded their requests
OffendingContainersKey = "offending_containers"
// OffendingContainersUsageKey is the key in eviction event annotations for the list of usage of containers which exceeded their requests
@ -1007,6 +1002,10 @@ func isHardEvictionThreshold(threshold evictionapi.Threshold) bool {
return threshold.GracePeriod == time.Duration(0)
}
func isAllocatableEvictionThreshold(threshold evictionapi.Threshold) bool {
return threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable
}
// buildSignalToRankFunc returns ranking functions associated with resources
func buildSignalToRankFunc(withImageFs bool) map[evictionapi.Signal]rankFunc {
signalToRankFunc := map[evictionapi.Signal]rankFunc{
@ -1097,38 +1096,3 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats
annotations[StarvedResourceKey] = string(resourceToReclaim)
return
}
// thresholdStopCh is a ThresholdStopCh which can only be closed after notifierRefreshInterval time has passed
type thresholdStopCh struct {
lock sync.Mutex
ch chan struct{}
startTime time.Time
// used to track time
clock clock.Clock
}
// NewInitialStopCh returns a ThresholdStopCh which can be closed immediately
func NewInitialStopCh(clock clock.Clock) ThresholdStopCh {
return &thresholdStopCh{ch: make(chan struct{}), clock: clock}
}
// implements ThresholdStopCh.Reset
func (t *thresholdStopCh) Reset() (closed bool) {
t.lock.Lock()
defer t.lock.Unlock()
closed = t.clock.Since(t.startTime) > notifierRefreshInterval
if closed {
// close the old channel and reopen a new one
close(t.ch)
t.startTime = t.clock.Now()
t.ch = make(chan struct{})
}
return
}
// implements ThresholdStopCh.Ch
func (t *thresholdStopCh) Ch() <-chan struct{} {
t.lock.Lock()
defer t.lock.Unlock()
return t.ch
}

View File

@ -20,7 +20,6 @@ import (
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
@ -28,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
@ -1948,34 +1946,3 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool {
}
return true
}
func TestThresholdStopCh(t *testing.T) {
var wg sync.WaitGroup
fakeClock := clock.NewFakeClock(time.Now())
stop := NewInitialStopCh(fakeClock)
// Should be able to reset the InitialStopCh right away
if !stop.Reset() {
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
}
// Need to wait notifierRefreshInterval before closing
if stop.Reset() {
t.Errorf("Expected not to be able to close the initialStopCh, but was successful")
}
wg.Add(1)
ch := stop.Ch()
go func() {
defer wg.Done()
// wait for the channel to close
<-ch
}()
fakeClock.Step(2 * notifierRefreshInterval)
if !stop.Reset() {
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
}
// ensure the Reset() closed the channel
wg.Wait()
}

View File

@ -0,0 +1,135 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/resource"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
)
const (
memoryUsageAttribute = "memory.usage_in_bytes"
// this prevents constantly updating the memcg notifier if synchronize
// is run frequently.
notifierRefreshInterval = 10 * time.Second
)
type memoryThresholdNotifier struct {
threshold evictionapi.Threshold
cgroupPath string
events chan struct{}
factory NotifierFactory
handler func(string)
notifier CgroupNotifier
}
var _ ThresholdNotifier = &memoryThresholdNotifier{}
// NewMemoryThresholdNotifier creates a ThresholdNotifier which is designed to respond to the given threshold.
// UpdateThreshold must be called once before the threshold will be active.
func NewMemoryThresholdNotifier(threshold evictionapi.Threshold, cgroupRoot string, factory NotifierFactory, handler func(string)) (ThresholdNotifier, error) {
cgroups, err := cm.GetCgroupSubsystems()
if err != nil {
return nil, err
}
cgpath, found := cgroups.MountPoints["memory"]
if !found || len(cgpath) == 0 {
return nil, fmt.Errorf("memory cgroup mount point not found")
}
if isAllocatableEvictionThreshold(threshold) {
// for allocatable thresholds, point the cgroup notifier at the allocatable cgroup
cgpath += cgroupRoot
}
return &memoryThresholdNotifier{
threshold: threshold,
cgroupPath: cgpath,
events: make(chan struct{}),
handler: handler,
factory: factory,
}, nil
}
func (m *memoryThresholdNotifier) Start() {
glog.Infof("eviction manager: created %s", m.Description())
for range m.events {
m.handler(fmt.Sprintf("eviction manager: %s crossed", m.Description()))
}
}
func (m *memoryThresholdNotifier) UpdateThreshold(summary *statsapi.Summary) error {
memoryStats := summary.Node.Memory
if isAllocatableEvictionThreshold(m.threshold) {
allocatableContainer, err := getSysContainer(summary.Node.SystemContainers, statsapi.SystemContainerPods)
if err != nil {
return err
}
memoryStats = allocatableContainer.Memory
}
if memoryStats == nil || memoryStats.UsageBytes == nil || memoryStats.WorkingSetBytes == nil || memoryStats.AvailableBytes == nil {
return fmt.Errorf("summary was incomplete. Expected MemoryStats and all subfields to be non-nil, but got %+v", memoryStats)
}
// Set threshold on usage to capacity - eviction_hard + inactive_file,
// since we want to be notified when working_set = capacity - eviction_hard
inactiveFile := resource.NewQuantity(int64(*memoryStats.UsageBytes-*memoryStats.WorkingSetBytes), resource.BinarySI)
capacity := resource.NewQuantity(int64(*memoryStats.AvailableBytes+*memoryStats.WorkingSetBytes), resource.BinarySI)
evictionThresholdQuantity := evictionapi.GetThresholdQuantity(m.threshold.Value, capacity)
memcgThreshold := capacity.DeepCopy()
memcgThreshold.Sub(*evictionThresholdQuantity)
memcgThreshold.Add(*inactiveFile)
glog.V(3).Infof("eviction manager: setting %s to %s\n", m.Description(), memcgThreshold.String())
if m.notifier != nil {
m.notifier.Stop()
}
newNotifier, err := m.factory.NewCgroupNotifier(m.cgroupPath, memoryUsageAttribute, memcgThreshold.Value())
if err != nil {
return err
}
m.notifier = newNotifier
go m.notifier.Start(m.events)
return nil
}
func (m *memoryThresholdNotifier) Description() string {
var hard, allocatable string
if isHardEvictionThreshold(m.threshold) {
hard = "hard "
} else {
hard = "soft "
}
if isAllocatableEvictionThreshold(m.threshold) {
allocatable = "allocatable "
}
return fmt.Sprintf("%s%smemory eviction threshold", hard, allocatable)
}
var _ NotifierFactory = &CgroupNotifierFactory{}
// CgroupNotifierFactory knows how to make CgroupNotifiers which integrate with the kernel
type CgroupNotifierFactory struct{}
// NewCgroupNotifier implements the NotifierFactory interface
func (n *CgroupNotifierFactory) NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
return NewCgroupNotifier(path, attribute, threshold)
}

View File

@ -0,0 +1,270 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import (
"fmt"
"strings"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/api/resource"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
)
const testCgroupPath = "/sys/fs/cgroups/memory"
func nodeSummary(available, workingSet, usage resource.Quantity, allocatable bool) *statsapi.Summary {
availableBytes := uint64(available.Value())
workingSetBytes := uint64(workingSet.Value())
usageBytes := uint64(usage.Value())
memoryStats := statsapi.MemoryStats{
AvailableBytes: &availableBytes,
WorkingSetBytes: &workingSetBytes,
UsageBytes: &usageBytes,
}
if allocatable {
return &statsapi.Summary{
Node: statsapi.NodeStats{
SystemContainers: []statsapi.ContainerStats{
{
Name: statsapi.SystemContainerPods,
Memory: &memoryStats,
},
},
},
}
}
return &statsapi.Summary{
Node: statsapi.NodeStats{
Memory: &memoryStats,
},
}
}
func newTestMemoryThresholdNotifier(threshold evictionapi.Threshold, factory NotifierFactory, handler func(string)) *memoryThresholdNotifier {
return &memoryThresholdNotifier{
threshold: threshold,
cgroupPath: testCgroupPath,
events: make(chan struct{}),
factory: factory,
handler: handler,
}
}
func TestUpdateThreshold(t *testing.T) {
testCases := []struct {
description string
available resource.Quantity
workingSet resource.Quantity
usage resource.Quantity
evictionThreshold evictionapi.Threshold
expectedThreshold resource.Quantity
updateThresholdErr error
expectErr bool
}{
{
description: "node level threshold",
available: resource.MustParse("3Gi"),
usage: resource.MustParse("2Gi"),
workingSet: resource.MustParse("1Gi"),
evictionThreshold: evictionapi.Threshold{
Signal: evictionapi.SignalMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("1Gi"),
},
},
expectedThreshold: resource.MustParse("4Gi"),
updateThresholdErr: nil,
expectErr: false,
},
{
description: "allocatable threshold",
available: resource.MustParse("4Gi"),
usage: resource.MustParse("3Gi"),
workingSet: resource.MustParse("1Gi"),
evictionThreshold: evictionapi.Threshold{
Signal: evictionapi.SignalAllocatableMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("1Gi"),
},
},
expectedThreshold: resource.MustParse("6Gi"),
updateThresholdErr: nil,
expectErr: false,
},
{
description: "error updating node level threshold",
available: resource.MustParse("3Gi"),
usage: resource.MustParse("2Gi"),
workingSet: resource.MustParse("1Gi"),
evictionThreshold: evictionapi.Threshold{
Signal: evictionapi.SignalMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("1Gi"),
},
},
expectedThreshold: resource.MustParse("4Gi"),
updateThresholdErr: fmt.Errorf("unexpected error"),
expectErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
notifierFactory := &MockNotifierFactory{}
notifier := &MockCgroupNotifier{}
m := newTestMemoryThresholdNotifier(tc.evictionThreshold, notifierFactory, nil)
notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, tc.expectedThreshold.Value()).Return(notifier, tc.updateThresholdErr)
var events chan<- struct{}
events = m.events
notifier.On("Start", events).Return()
err := m.UpdateThreshold(nodeSummary(tc.available, tc.workingSet, tc.usage, isAllocatableEvictionThreshold(tc.evictionThreshold)))
if err != nil && !tc.expectErr {
t.Errorf("Unexpected error updating threshold: %v", err)
} else if err == nil && tc.expectErr {
t.Errorf("Expected error updating threshold, but got nil")
}
if !tc.expectErr {
notifierFactory.AssertNumberOfCalls(t, "NewCgroupNotifier", 1)
}
})
}
}
func TestStart(t *testing.T) {
noResources := resource.MustParse("0")
threshold := evictionapi.Threshold{
Signal: evictionapi.SignalMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: &noResources,
},
}
notifier := &MockCgroupNotifier{}
notifierFactory := &MockNotifierFactory{}
var wg sync.WaitGroup
wg.Add(4)
m := newTestMemoryThresholdNotifier(threshold, notifierFactory, func(string) {
wg.Done()
})
notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, int64(0)).Return(notifier, nil)
var events chan<- struct{}
events = m.events
notifier.On("Start", events).Return()
notifier.On("Stop").Return()
err := m.UpdateThreshold(nodeSummary(noResources, noResources, noResources, isAllocatableEvictionThreshold(threshold)))
if err != nil {
t.Errorf("Unexpected error updating threshold: %v", err)
}
notifierFactory.AssertNumberOfCalls(t, "NewCgroupNotifier", 1)
go m.Start()
for i := 0; i < 4; i++ {
m.events <- struct{}{}
}
wg.Wait()
}
func TestThresholdDescription(t *testing.T) {
testCases := []struct {
description string
evictionThreshold evictionapi.Threshold
expectedSubstrings []string
omittedSubstrings []string
}{
{
description: "hard node level threshold",
evictionThreshold: evictionapi.Threshold{
Signal: evictionapi.SignalMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("2Gi"),
},
},
expectedSubstrings: []string{"hard"},
omittedSubstrings: []string{"allocatable", "soft"},
},
{
description: "soft node level threshold",
evictionThreshold: evictionapi.Threshold{
Signal: evictionapi.SignalMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("2Gi"),
},
GracePeriod: time.Minute * 2,
},
expectedSubstrings: []string{"soft"},
omittedSubstrings: []string{"allocatable", "hard"},
},
{
description: "hard allocatable threshold",
evictionThreshold: evictionapi.Threshold{
Signal: evictionapi.SignalAllocatableMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("2Gi"),
},
GracePeriod: time.Minute * 2,
},
expectedSubstrings: []string{"soft", "allocatable"},
omittedSubstrings: []string{"hard"},
},
{
description: "soft allocatable threshold",
evictionThreshold: evictionapi.Threshold{
Signal: evictionapi.SignalAllocatableMemoryAvailable,
Operator: evictionapi.OpLessThan,
Value: evictionapi.ThresholdValue{
Quantity: quantityMustParse("2Gi"),
},
},
expectedSubstrings: []string{"hard", "allocatable"},
omittedSubstrings: []string{"soft"},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
m := &memoryThresholdNotifier{
notifier: &MockCgroupNotifier{},
threshold: tc.evictionThreshold,
cgroupPath: testCgroupPath,
}
desc := m.Description()
for _, expected := range tc.expectedSubstrings {
if !strings.Contains(desc, expected) {
t.Errorf("expected description for notifier with threshold %+v to contain %s, but it did not", tc.evictionThreshold, expected)
}
}
for _, omitted := range tc.omittedSubstrings {
if strings.Contains(desc, omitted) {
t.Errorf("expected description for notifier with threshold %+v NOT to contain %s, but it did", tc.evictionThreshold, omitted)
}
}
})
}
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import (
mock "github.com/stretchr/testify/mock"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// MockCgroupNotifier is a mock implementation of the CgroupNotifier interface
type MockCgroupNotifier struct {
mock.Mock
}
// Start implements the NotifierFactory interface
func (m *MockCgroupNotifier) Start(a0 chan<- struct{}) {
m.Called(a0)
}
// Stop implements the NotifierFactory interface
func (m *MockCgroupNotifier) Stop() {
m.Called()
}
// MockNotifierFactory is a mock of the NotifierFactory interface
type MockNotifierFactory struct {
mock.Mock
}
// NewCgroupNotifier implements the NotifierFactory interface
func (m *MockNotifierFactory) NewCgroupNotifier(a0, a1 string, a2 int64) (CgroupNotifier, error) {
ret := m.Called(a0, a1, a2)
var r0 CgroupNotifier
if rf, ok := ret.Get(0).(func(string, string, int64) CgroupNotifier); ok {
r0 = rf(a0, a1, a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(CgroupNotifier)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, string, int64) error); ok {
r1 = rf(a0, a1, a2)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockThresholdNotifier is a mock implementation of the ThresholdNotifier interface
type MockThresholdNotifier struct {
mock.Mock
}
// Start implements the ThresholdNotifier interface
func (m *MockThresholdNotifier) Start() {
m.Called()
}
// UpdateThreshold implements the ThresholdNotifier interface
func (m *MockThresholdNotifier) UpdateThreshold(a0 *statsapi.Summary) error {
ret := m.Called(a0)
var r0 error
if rf, ok := ret.Get(0).(func(*statsapi.Summary) error); ok {
r0 = rf(a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// Description implements the ThresholdNotifier interface
func (m *MockThresholdNotifier) Description() string {
ret := m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.String(0)
}
return r0
}

View File

@ -18,43 +18,47 @@ package eviction
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
"golang.org/x/sys/unix"
)
type memcgThresholdNotifier struct {
watchfd int
controlfd int
eventfd int
handler thresholdNotifierHandlerFunc
description string
const (
// eventSize is the number of bytes returned by a successful read from an eventfd
// see http://man7.org/linux/man-pages/man2/eventfd.2.html for more information
eventSize = 8
// numFdEvents is the number of events we can record at once.
// If EpollWait finds more than this, they will be missed.
numFdEvents = 6
)
type linuxCgroupNotifier struct {
eventfd int
epfd int
stop chan struct{}
stopLock sync.Mutex
}
var _ ThresholdNotifier = &memcgThresholdNotifier{}
var _ CgroupNotifier = &linuxCgroupNotifier{}
// NewMemCGThresholdNotifier sends notifications when a cgroup threshold
// is crossed (in either direction) for a given cgroup attribute
func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) {
watchfd, err := unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY, 0)
// NewCgroupNotifier returns a linuxCgroupNotifier, which performs cgroup control operations required
// to receive notifications from the cgroup when the threshold is crossed in either direction.
func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
var watchfd, eventfd, epfd, controlfd int
var err error
watchfd, err = unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
unix.Close(watchfd)
}
}()
controlfd, err := unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY, 0)
defer unix.Close(watchfd)
controlfd, err = unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY, 0)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
unix.Close(controlfd)
}
}()
eventfd, err := unix.Eventfd(0, unix.EFD_CLOEXEC)
defer unix.Close(controlfd)
eventfd, err = unix.Eventfd(0, unix.EFD_CLOEXEC)
if err != nil {
return nil, err
}
@ -63,55 +67,119 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h
return nil, err
}
defer func() {
// Close eventfd if we get an error later in initialization
if err != nil {
unix.Close(eventfd)
}
}()
glog.V(3).Infof("eviction: setting notification threshold to %s", threshold)
config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold)
epfd, err = unix.EpollCreate1(0)
if err != nil {
return nil, err
}
if epfd < 0 {
err = fmt.Errorf("EpollCreate1 call failed")
return nil, err
}
defer func() {
// Close epfd if we get an error later in initialization
if err != nil {
unix.Close(epfd)
}
}()
config := fmt.Sprintf("%d %d %d", eventfd, watchfd, threshold)
_, err = unix.Write(controlfd, []byte(config))
if err != nil {
return nil, err
}
return &memcgThresholdNotifier{
watchfd: watchfd,
controlfd: controlfd,
eventfd: eventfd,
handler: handler,
description: description,
return &linuxCgroupNotifier{
eventfd: eventfd,
epfd: epfd,
stop: make(chan struct{}),
}, nil
}
func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stop ThresholdStopCh) {
func (n *linuxCgroupNotifier) Start(eventCh chan<- struct{}) {
err := unix.EpollCtl(n.epfd, unix.EPOLL_CTL_ADD, n.eventfd, &unix.EpollEvent{
Fd: int32(n.eventfd),
Events: unix.EPOLLIN,
})
if err != nil {
glog.Warningf("eviction manager: error adding epoll eventfd: %v", err)
return
}
for {
buf := make([]byte, 8)
_, err := unix.Read(eventfd, buf)
select {
case <-n.stop:
return
default:
}
event, err := wait(n.epfd, n.eventfd, notifierRefreshInterval)
if err != nil {
glog.Warningf("eviction manager: error while waiting for memcg events: %v", err)
return
} else if !event {
// Timeout on wait. This is expected if the threshold was not crossed
continue
}
// Consume the event from the eventfd
buf := make([]byte, eventSize)
_, err = unix.Read(n.eventfd, buf)
if err != nil {
glog.Warningf("eviction manager: error reading memcg events: %v", err)
return
}
select {
case eventCh <- struct{}{}:
case <-stop.Ch():
return
}
eventCh <- struct{}{}
}
}
func (n *memcgThresholdNotifier) Start(stop ThresholdStopCh) {
eventCh := make(chan struct{})
go getThresholdEvents(n.eventfd, eventCh, stop)
for {
select {
case <-stop.Ch():
glog.V(3).Infof("eviction: stopping threshold notifier")
unix.Close(n.watchfd)
unix.Close(n.controlfd)
unix.Close(n.eventfd)
return
case <-eventCh:
glog.V(2).Infof("eviction: threshold crossed")
n.handler(n.description)
// wait waits up to notifierRefreshInterval for an event on the Epoll FD for the
// eventfd we are concerned about. It returns an error if one occurrs, and true
// if the consumer should read from the eventfd.
func wait(epfd, eventfd int, timeout time.Duration) (bool, error) {
events := make([]unix.EpollEvent, numFdEvents+1)
timeoutMS := int(timeout / time.Millisecond)
n, err := unix.EpollWait(epfd, events, timeoutMS)
if n == -1 {
if err == unix.EINTR {
// Interrupt, ignore the error
return false, nil
}
return false, err
}
if n == 0 {
// Timeout
return false, nil
}
if n > numFdEvents {
return false, fmt.Errorf("epoll_wait returned more events than we know what to do with")
}
for _, event := range events[:n] {
if event.Fd == int32(eventfd) {
if event.Events&unix.EPOLLHUP != 0 || event.Events&unix.EPOLLERR != 0 || event.Events&unix.EPOLLIN != 0 {
// EPOLLHUP: should not happen, but if it does, treat it as a wakeup.
// EPOLLERR: If an error is waiting on the file descriptor, we should pretend
// something is ready to read, and let unix.Read pick up the error.
// EPOLLIN: There is data to read.
return true, nil
}
}
}
// An event occurred that we don't care about.
return false, nil
}
func (n *linuxCgroupNotifier) Stop() {
n.stopLock.Lock()
defer n.stopLock.Unlock()
select {
case <-n.stop:
// the linuxCgroupNotifier is already stopped
return
default:
}
unix.Close(n.eventfd)
unix.Close(n.epfd)
close(n.stop)
}

View File

@ -18,10 +18,16 @@ limitations under the License.
package eviction
import "fmt"
import "github.com/golang/glog"
// NewMemCGThresholdNotifier sends notifications when a cgroup threshold
// is crossed (in either direction) for a given cgroup attribute
func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) {
return nil, fmt.Errorf("threshold notification not supported")
// NewCgroupNotifier creates a cgroup notifier that does nothing because cgroups do not exist on non-linux systems.
func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
glog.V(5).Infof("cgroup notifications not supported")
return &unsupportedThresholdNotifier{}, nil
}
type unsupportedThresholdNotifier struct{}
func (*unsupportedThresholdNotifier) Start(_ chan<- struct{}) {}
func (*unsupportedThresholdNotifier) Stop() {}

View File

@ -131,20 +131,30 @@ type nodeReclaimFunc func() error
// nodeReclaimFuncs is an ordered list of nodeReclaimFunc
type nodeReclaimFuncs []nodeReclaimFunc
// thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold
type thresholdNotifierHandlerFunc func(thresholdDescription string)
// ThresholdStopCh is an interface for a channel which is closed to stop waiting goroutines.
// Implementations of ThresholdStopCh must correctly handle concurrent calls to all functions.
type ThresholdStopCh interface {
// Reset closes the channel if it can be closed, and returns true if it was closed.
// Reset also creates a new channel.
Reset() bool
// Ch returns the channel that is closed when Reset() is called
Ch() <-chan struct{}
// CgroupNotifier generates events from cgroup events
type CgroupNotifier interface {
// Start causes the CgroupNotifier to begin notifying on the eventCh
Start(eventCh chan<- struct{})
// Stop stops all processes and cleans up file descriptors associated with the CgroupNotifier
Stop()
}
// ThresholdNotifier notifies the user when an attribute crosses a threshold value
// NotifierFactory creates CgroupNotifer
type NotifierFactory interface {
// NewCgroupNotifier creates a CgroupNotifier that creates events when the threshold
// on the attribute in the cgroup specified by the path is crossed.
NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error)
}
// ThresholdNotifier manages CgroupNotifiers based on memory eviction thresholds, and performs a function
// when memory eviction thresholds are crossed
type ThresholdNotifier interface {
Start(ThresholdStopCh)
// Start calls the notifier function when the CgroupNotifier notifies the ThresholdNotifier that an event occurred
Start()
// UpdateThreshold updates the memory cgroup threshold based on the metrics provided.
// Calling UpdateThreshold with recent metrics allows the ThresholdNotifier to trigger at the
// eviction threshold more accurately
UpdateThreshold(summary *statsapi.Summary) error
// Description produces a relevant string describing the Memory Threshold Notifier
Description() string
}