Merge pull request #32270 from gmarek/metric

Automatic merge from submit-queue

Change the eviction metric type and fix rate-limited-timed-queue

People how know better convinced me that aggregate counter is better than a gauge for a number of evictions metric. @Q-Lee 

Per discussion with @pwittrock I add a v1.4 label and a cherrypick candidate label. This is a slightly bigger change than I thought, but it fixes a bug in eviction logic, so it's also important.

cc @derekwaynecarr @smarterclayton @timothysc
pull/6/head
Kubernetes Submit Queue 2016-09-08 06:59:43 -07:00 committed by GitHub
commit 23079c4569
5 changed files with 70 additions and 290 deletions

View File

@ -18,11 +18,7 @@ package node
import (
"sync"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
@ -31,8 +27,7 @@ const (
ZoneHealthStatisticKey = "zone_health"
ZoneSizeKey = "zone_size"
ZoneNoUnhealthyNodesKey = "unhealty_nodes_in_zone"
EvictionsIn10MinutesKey = "10_minute_evictions"
EvictionsIn1HourKey = "1_hour_evictions"
EvictionsNumberKey = "evictions_number"
)
var (
@ -60,19 +55,11 @@ var (
},
[]string{"zone"},
)
Evictions10Minutes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
EvictionsNumber = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: NodeControllerSubsystem,
Name: EvictionsIn10MinutesKey,
Help: "Gauge measuring number of Node evictions that happened in previous 10 minutes per zone.",
},
[]string{"zone"},
)
Evictions1Hour = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: NodeControllerSubsystem,
Name: EvictionsIn1HourKey,
Help: "Gauge measuring number of Node evictions that happened in previous hour per zone.",
Name: EvictionsNumberKey,
Help: "Number of Node evictions that happened since current instance of NodeController started.",
},
[]string{"zone"},
)
@ -85,117 +72,6 @@ func Register() {
prometheus.MustRegister(ZoneHealth)
prometheus.MustRegister(ZoneSize)
prometheus.MustRegister(UnhealthyNodes)
prometheus.MustRegister(Evictions10Minutes)
prometheus.MustRegister(Evictions1Hour)
prometheus.MustRegister(EvictionsNumber)
})
}
type eviction struct {
node string
time unversioned.Time
}
type evictionData struct {
sync.Mutex
nodeEvictionCount map[string]map[string]int
nodeEvictionList []eviction
now func() unversioned.Time
windowSize time.Duration
}
func newEvictionData(windowSize time.Duration) *evictionData {
return &evictionData{
nodeEvictionCount: make(map[string]map[string]int),
nodeEvictionList: make([]eviction, 0),
now: unversioned.Now,
windowSize: windowSize,
}
}
func (e *evictionData) slideWindow() {
e.Lock()
defer e.Unlock()
now := e.now()
firstInside := 0
for _, v := range e.nodeEvictionList {
if v.time.Add(e.windowSize).Before(now.Time) {
firstInside++
zone := ""
for z := range e.nodeEvictionCount {
if _, ok := e.nodeEvictionCount[z][v.node]; ok {
zone = z
break
}
}
if zone == "" {
glog.Warningf("EvictionData corruption - unknown zone for node %v", v.node)
continue
}
if e.nodeEvictionCount[zone][v.node] > 1 {
e.nodeEvictionCount[zone][v.node] = e.nodeEvictionCount[zone][v.node] - 1
} else {
delete(e.nodeEvictionCount[zone], v.node)
}
} else {
break
}
}
e.nodeEvictionList = e.nodeEvictionList[firstInside:]
}
func (e *evictionData) registerEviction(node, zone string) {
e.Lock()
defer e.Unlock()
e.nodeEvictionList = append(e.nodeEvictionList, eviction{node: node, time: e.now()})
if _, ok := e.nodeEvictionCount[zone]; !ok {
e.nodeEvictionCount[zone] = make(map[string]int)
}
if _, ok := e.nodeEvictionCount[zone][node]; !ok {
e.nodeEvictionCount[zone][node] = 1
} else {
e.nodeEvictionCount[zone][node] = e.nodeEvictionCount[zone][node] + 1
}
}
func (e *evictionData) removeEviction(node, zone string) {
e.Lock()
defer e.Unlock()
// TODO: This may be inefficient, but hopefully will be rarely called. Verify that this is true.
for i := len(e.nodeEvictionList) - 1; i >= 0; i-- {
if e.nodeEvictionList[i].node == node {
e.nodeEvictionList = append(e.nodeEvictionList[:i], e.nodeEvictionList[i+1:]...)
break
}
}
if e.nodeEvictionCount[zone][node] > 1 {
e.nodeEvictionCount[zone][node] = e.nodeEvictionCount[zone][node] - 1
} else {
delete(e.nodeEvictionCount[zone], node)
}
}
func (e *evictionData) countEvictions(zone string) int {
e.Lock()
defer e.Unlock()
return len(e.nodeEvictionCount[zone])
}
func (e *evictionData) getZones() []string {
e.Lock()
defer e.Unlock()
zones := make([]string, 0, len(e.nodeEvictionCount))
for k := range e.nodeEvictionCount {
zones = append(zones, k)
}
return zones
}
func (e *evictionData) initZone(zone string) {
e.Lock()
defer e.Unlock()
e.nodeEvictionCount[zone] = make(map[string]int)
}

View File

@ -1,129 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
)
func TestEvictionData(t *testing.T) {
evictionData := newEvictionData(time.Hour)
now := unversioned.Now()
evictionData.now = func() unversioned.Time {
return *(&now)
}
if evictionData.countEvictions("zone1") != 0 {
t.Fatalf("Invalid eviction count before doing anything")
}
evictionData.initZone("zone1")
if evictionData.countEvictions("zone1") != 0 {
t.Fatalf("Invalid eviction after zone initialization")
}
evictionData.registerEviction("first", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 1 {
t.Fatalf("Invalid eviction count after adding first Node")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.registerEviction("second", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 2 {
t.Fatalf("Invalid eviction count after adding second Node")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.registerEviction("second", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 2 {
t.Fatalf("Invalid eviction count after adding second Node second time")
}
if evictionData.countEvictions("zone2") != 0 {
t.Fatalf("Invalid eviction in nonexistent zone")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.registerEviction("third", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 3 {
t.Fatalf("Invalid eviction count after adding third Node first time")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.removeEviction("third", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 2 {
t.Fatalf("Invalid eviction count after remove third Node")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.removeEviction("third", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 2 {
t.Fatalf("Invalid eviction count after remove third Node second time")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.registerEviction("fourth", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 3 {
t.Fatalf("Invalid eviction count after adding fourth Node first time")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.registerEviction("fourth", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 3 {
t.Fatalf("Invalid eviction count after adding fourth Node second time")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.removeEviction("fourth", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 3 {
t.Fatalf("Invalid eviction count after remove fourth Node first time")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.removeEviction("fourth", "zone1")
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 2 {
t.Fatalf("Invalid eviction count after remove fourth Node second time")
}
now = unversioned.NewTime(now.Add(52 * time.Minute))
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 1 {
t.Fatalf("Invalid eviction count after first Node went out of scope")
}
now = unversioned.NewTime(now.Add(time.Minute))
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 1 {
t.Fatalf("Invalid eviction count after first occurence of the second Node went out of scope")
}
now = unversioned.NewTime(now.Add(time.Second))
evictionData.slideWindow()
if evictionData.countEvictions("zone1") != 0 {
t.Fatalf("Invalid eviction count after second occurence of the second Node went out of scope")
}
}

View File

@ -46,8 +46,6 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
"github.com/prometheus/client_golang/prometheus"
)
func init() {
@ -167,9 +165,6 @@ type NodeController struct {
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedIndexInformer
evictions10Minutes *evictionData
evictions1Hour *evictionData
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -241,8 +236,6 @@ func NewNodeController(
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState),
evictions10Minutes: newEvictionData(10 * time.Minute),
evictions1Hour: newEvictionData(time.Hour),
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
@ -417,7 +410,7 @@ func (nc *NodeController) Run() {
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
obj, exists, err := nc.nodeStore.Get(value.Value)
obj, exists, err := nc.nodeStore.GetByKey(value.Value)
if err != nil {
glog.Warningf("Failed to get Node %v from the nodeStore: %v", value.Value, err)
} else if !exists {
@ -425,8 +418,7 @@ func (nc *NodeController) Run() {
} else {
node, _ := obj.(*api.Node)
zone := utilnode.GetZoneKey(node)
nc.evictions10Minutes.registerEviction(zone, value.Value)
nc.evictions1Hour.registerEviction(zone, value.Value)
EvictionsNumber.WithLabelValues(zone).Inc()
}
nodeUid, _ := value.UID.(string)
@ -506,8 +498,9 @@ func (nc *NodeController) monitorNodeStatus() error {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
nc.evictions10Minutes.initZone(zone)
nc.evictions1Hour.initZone(zone)
// Init the metric for the new zone.
glog.Infof("Initilizing eviction metric for zone: %v", zone)
EvictionsNumber.WithLabelValues(zone).Add(0)
}
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
@ -607,20 +600,10 @@ func (nc *NodeController) monitorNodeStatus() error {
}
}
nc.handleDisruption(zoneToNodeConditions, nodes)
nc.updateEvictionMetric(Evictions10Minutes, nc.evictions10Minutes)
nc.updateEvictionMetric(Evictions1Hour, nc.evictions1Hour)
return nil
}
func (nc *NodeController) updateEvictionMetric(metric *prometheus.GaugeVec, data *evictionData) {
data.slideWindow()
zones := data.getZones()
for _, z := range zones {
metric.WithLabelValues(z).Set(float64(data.countEvictions(z)))
}
}
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*api.NodeCondition, nodes *api.NodeList) {
newZoneStates := map[string]zoneState{}
allAreFullyDisrupted := true
@ -924,8 +907,6 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name)
if wasDeleting || wasTerminating {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
nc.evictions10Minutes.removeEviction(zone, node.Name)
nc.evictions1Hour.removeEviction(zone, node.Name)
return true
}
return false

View File

@ -97,13 +97,15 @@ func (q *UniqueQueue) Replace(value TimedValue) bool {
return false
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
// Removes the value from the queue, but keeps it in the set, so it won't be added second time.
// Returns true if something was removed.
func (q *UniqueQueue) RemoveFromQueue(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.set.Delete(value)
if !q.set.Has(value) {
return false
}
for i, val := range q.queue {
if val.Value == value {
heap.Remove(&q.queue, i)
@ -113,6 +115,25 @@ func (q *UniqueQueue) Remove(value string) bool {
return false
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.set.Has(value) {
return false
}
q.set.Delete(value)
for i, val := range q.queue {
if val.Value == value {
heap.Remove(&q.queue, i)
return true
}
}
return true
}
// Returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (TimedValue, bool) {
q.lock.Lock()
@ -174,7 +195,11 @@ type ActionFunc func(TimedValue) (bool, time.Duration)
// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
// time to execute the next item in the queue.
// time to execute the next item in the queue. The same value is processed only once unless
// Remove is explicitly called on it (it's done by the cancelPodEviction function in NodeController
// when Node becomes Ready again)
// TODO: figure out a good way to do garbage collection for all Nodes that were removed from
// the cluster.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
q.limiterLock.Lock()
@ -196,7 +221,7 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val.ProcessAt = now.Add(wait + 1)
q.queue.Replace(val)
} else {
q.queue.Remove(val.Value)
q.queue.RemoveFromQueue(val.Value)
}
val, ok = q.queue.Head()
}

View File

@ -303,5 +303,32 @@ func TestSwapLimiter(t *testing.T) {
if qps != createdQPS {
t.Fatalf("QPS does not match create one: %v instead of %v", qps, createdQPS)
}
}
func TestAddAfterTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first", "11111")
evictor.Add("second", "22222")
evictor.Add("third", "33333")
evictor.Remove("second")
deletedMap := sets.NewString()
evictor.Try(func(value TimedValue) (bool, time.Duration) {
deletedMap.Insert(value.Value)
return true, 0
})
setPattern := sets.NewString("first", "third")
if len(deletedMap) != len(setPattern) {
t.Fatalf("Map %v should have length %d", evictor.queue.set, len(setPattern))
}
if !CheckSetEq(setPattern, deletedMap) {
t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern)
}
evictor.Add("first", "11111")
evictor.Try(func(value TimedValue) (bool, time.Duration) {
t.Errorf("We shouldn't process the same value if the explicit remove wasn't called.")
return true, 0
})
}