refresh eviction interval periodically

pull/8/head
David Ashpole 2018-03-06 15:14:05 -08:00
parent 54cf14ffcc
commit 39d9fa60e8
6 changed files with 101 additions and 20 deletions

View File

@ -94,7 +94,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
] + select({

View File

@ -27,7 +27,6 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource"
@ -83,8 +82,8 @@ type managerImpl struct {
resourceToNodeReclaimFuncs map[v1.ResourceName]nodeReclaimFuncs
// last observations from synchronize
lastObservations signalObservations
// notifiersInitialized indicates if the threshold notifiers have been initialized (i.e. synchronize() has been called once)
notifiersInitialized bool
// notifierStopCh is a channel used to stop all thresholdNotifiers
notifierStopCh ThresholdStopCh
// dedicatedImageFs indicates if imagefs is on a separate device from the rootfs
dedicatedImageFs *bool
}
@ -114,6 +113,7 @@ func NewManager(
nodeRef: nodeRef,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
notifierStopCh: NewInitialStopCh(clock),
dedicatedImageFs: nil,
}
return manager, manager
@ -184,8 +184,8 @@ func (m *managerImpl) IsUnderPIDPressure() bool {
return hasNodeCondition(m.nodeConditions, v1.NodePIDPressure)
}
func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, summary *statsapi.Summary, hard bool, handler thresholdNotifierHandlerFunc) error {
for _, threshold := range thresholds {
func (m *managerImpl) startMemoryThresholdNotifier(summary *statsapi.Summary, hard bool, handler thresholdNotifierHandlerFunc) error {
for _, threshold := range m.config.Thresholds {
if threshold.Signal != evictionapi.SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) {
continue
}
@ -215,7 +215,7 @@ func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, summary *s
if err != nil {
return err
}
go memcgThresholdNotifier.Start(wait.NeverStop)
go memcgThresholdNotifier.Start(m.notifierStopCh)
return nil
}
return nil
@ -252,11 +252,10 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
}
// attempt to create a threshold notifier to improve eviction response time
if m.config.KernelMemcgNotification && !m.notifiersInitialized {
glog.Infof("eviction manager attempting to integrate with kernel memcg notification api")
m.notifiersInitialized = true
if m.config.KernelMemcgNotification && m.notifierStopCh.Reset() {
glog.V(4).Infof("eviction manager attempting to integrate with kernel memcg notification api")
// start soft memory notification
err = startMemoryThresholdNotifier(m.config.Thresholds, summary, false, func(desc string) {
err = m.startMemoryThresholdNotifier(summary, false, func(desc string) {
glog.Infof("soft memory eviction threshold crossed at %s", desc)
// TODO wait grace period for soft memory limit
m.synchronize(diskInfoProvider, podFunc)
@ -265,7 +264,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err)
}
// start hard memory notification
err = startMemoryThresholdNotifier(m.config.Thresholds, summary, true, func(desc string) {
err = m.startMemoryThresholdNotifier(summary, true, func(desc string) {
glog.Infof("hard memory eviction threshold crossed at %s", desc)
m.synchronize(diskInfoProvider, podFunc)
})

View File

@ -21,11 +21,13 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
@ -52,6 +54,9 @@ const (
resourceNodeFs v1.ResourceName = "nodefs"
// nodefs inodes, number. internal to this module, used to account for local node root filesystem inodes.
resourceNodeFsInodes v1.ResourceName = "nodefsInodes"
// this prevents constantly updating the memcg notifier if synchronize
// is run frequently.
notifierRefreshInterval = 10 * time.Second
)
var (
@ -1080,3 +1085,38 @@ func buildResourceToNodeReclaimFuncs(imageGC ImageGC, containerGC ContainerGC, w
}
return resourceToReclaimFunc
}
// thresholdStopCh is a ThresholdStopCh which can only be closed after notifierRefreshInterval time has passed
type thresholdStopCh struct {
lock sync.Mutex
ch chan struct{}
startTime time.Time
// used to track time
clock clock.Clock
}
// NewInitialStopCh returns a ThresholdStopCh which can be closed immediately
func NewInitialStopCh(clock clock.Clock) ThresholdStopCh {
return &thresholdStopCh{ch: make(chan struct{}), clock: clock}
}
// implements ThresholdStopCh.Reset
func (t *thresholdStopCh) Reset() (closed bool) {
t.lock.Lock()
defer t.lock.Unlock()
closed = t.clock.Since(t.startTime) > notifierRefreshInterval
if closed {
// close the old channel and reopen a new one
close(t.ch)
t.startTime = t.clock.Now()
t.ch = make(chan struct{})
}
return
}
// implements ThresholdStopCh.Ch
func (t *thresholdStopCh) Ch() <-chan struct{} {
t.lock.Lock()
defer t.lock.Unlock()
return t.ch
}

View File

@ -19,6 +19,7 @@ package eviction
import (
"fmt"
"reflect"
"sync"
"testing"
"time"
@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
@ -1914,3 +1916,34 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool {
}
return true
}
func TestThresholdStopCh(t *testing.T) {
var wg sync.WaitGroup
fakeClock := clock.NewFakeClock(time.Now())
stop := NewInitialStopCh(fakeClock)
// Should be able to reset the InitialStopCh right away
if !stop.Reset() {
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
}
// Need to wait notifierRefreshInterval before closing
if stop.Reset() {
t.Errorf("Expected not to be able to close the initialStopCh, but was successful")
}
wg.Add(1)
ch := stop.Ch()
go func() {
defer wg.Done()
// wait for the channel to close
<-ch
}()
fakeClock.Step(2 * notifierRefreshInterval)
if !stop.Reset() {
t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful")
}
// ensure the Reset() closed the channel
wg.Wait()
}

View File

@ -67,7 +67,7 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h
unix.Close(eventfd)
}
}()
glog.V(2).Infof("eviction: setting notification threshold to %s", threshold)
glog.V(3).Infof("eviction: setting notification threshold to %s", threshold)
config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold)
_, err = unix.Write(controlfd, []byte(config))
if err != nil {
@ -82,7 +82,7 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h
}, nil
}
func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stopCh <-chan struct{}) {
func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stop ThresholdStopCh) {
for {
buf := make([]byte, 8)
_, err := unix.Read(eventfd, buf)
@ -92,19 +92,19 @@ func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stopCh <-chan stru
select {
case eventCh <- struct{}{}:
case <-stopCh:
case <-stop.Ch():
return
}
}
}
func (n *memcgThresholdNotifier) Start(stopCh <-chan struct{}) {
func (n *memcgThresholdNotifier) Start(stop ThresholdStopCh) {
eventCh := make(chan struct{})
go getThresholdEvents(n.eventfd, eventCh, stopCh)
go getThresholdEvents(n.eventfd, eventCh, stop)
for {
select {
case <-stopCh:
glog.V(2).Infof("eviction: stopping threshold notifier")
case <-stop.Ch():
glog.V(3).Infof("eviction: stopping threshold notifier")
unix.Close(n.watchfd)
unix.Close(n.controlfd)
unix.Close(n.eventfd)

View File

@ -132,7 +132,17 @@ type nodeReclaimFuncs []nodeReclaimFunc
// thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold
type thresholdNotifierHandlerFunc func(thresholdDescription string)
// ThresholdStopCh is an interface for a channel which is closed to stop waiting goroutines.
// Implementations of ThresholdStopCh must correctly handle concurrent calls to all functions.
type ThresholdStopCh interface {
// Reset closes the channel if it can be closed, and returns true if it was closed.
// Reset also creates a new channel.
Reset() bool
// Ch returns the channel that is closed when Reset() is called
Ch() <-chan struct{}
}
// ThresholdNotifier notifies the user when an attribute crosses a threshold value
type ThresholdNotifier interface {
Start(stopCh <-chan struct{})
Start(ThresholdStopCh)
}