add metrics to record number of pending pods in different queues

k3s-v1.15.3
Wei Huang 2019-03-18 23:43:43 -07:00
parent 62f5fd4c6c
commit 7afbd68730
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
6 changed files with 233 additions and 9 deletions

View File

@ -42,6 +42,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -283,13 +284,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
clock: clock,
stop: stop,
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
activeQ: util.NewHeap(podInfoKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, activeQComp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted)
pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.run()
@ -777,16 +778,27 @@ type UnschedulablePodsMap struct {
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo.
podInfoMap map[string]*podInfo
keyFunc func(*v1.Pod) string
// metricRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
// Add adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) {
u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo
podID := u.keyFunc(pInfo.pod)
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
u.metricRecorder.Inc()
}
u.podInfoMap[podID] = pInfo
}
// Delete deletes a pod from the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
delete(u.podInfoMap, u.keyFunc(pod))
podID := u.keyFunc(pod)
if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
u.metricRecorder.Dec()
}
delete(u.podInfoMap, podID)
}
// Get returns the podInfo if a pod with the same key as the key of the given "pod"
@ -802,13 +814,17 @@ func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo {
// Clear removes all the entries from the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) clear() {
u.podInfoMap = make(map[string]*podInfo)
if u.metricRecorder != nil {
u.metricRecorder.Clear()
}
}
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
func newUnschedulablePodsMap() *UnschedulablePodsMap {
func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
return &UnschedulablePodsMap{
podInfoMap: make(map[string]*podInfo),
keyFunc: util.GetPodFullName,
podInfoMap: make(map[string]*podInfo),
keyFunc: util.GetPodFullName,
metricRecorder: metricRecorder,
}
}

View File

@ -647,7 +647,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
upm := newUnschedulablePodsMap()
upm := newUnschedulablePodsMap(nil)
for _, p := range test.podsToAdd {
upm.addOrUpdate(newPodInfoNoTimestamp(p))
}

View File

@ -0,0 +1,72 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
// MetricRecorder represents a metric recorder which takes action when the
// metric Inc(), Dec() and Clear()
type MetricRecorder interface {
Inc()
Dec()
Clear()
}
var _ MetricRecorder = &PendingPodsRecorder{}
// PendingPodsRecorder is an implementation of MetricRecorder
type PendingPodsRecorder struct {
recorder prometheus.Gauge
}
// NewActivePodsRecorder returns ActivePods in a Prometheus metric fashion
func NewActivePodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: ActivePods,
}
}
// NewUnschedulablePodsRecorder returns UnschedulablePods in a Prometheus metric fashion
func NewUnschedulablePodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: UnschedulablePods,
}
}
// NewBackoffPodsRecorder returns BackoffPods in a Prometheus metric fashion
func NewBackoffPodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: BackoffPods,
}
}
// Inc increases a metric counter by 1, in an atomic way
func (r *PendingPodsRecorder) Inc() {
r.recorder.Inc()
}
// Dec decreases a metric counter by 1, in an atomic way
func (r *PendingPodsRecorder) Dec() {
r.recorder.Dec()
}
// Clear set a metric counter to 0, in an atomic way
func (r *PendingPodsRecorder) Clear() {
r.recorder.Set(float64(0))
}

View File

@ -0,0 +1,103 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"sync/atomic"
"testing"
)
var _ MetricRecorder = &fakePodsRecorder{}
type fakePodsRecorder struct {
counter int64
}
func (r *fakePodsRecorder) Inc() {
atomic.AddInt64(&r.counter, 1)
}
func (r *fakePodsRecorder) Dec() {
atomic.AddInt64(&r.counter, -1)
}
func (r *fakePodsRecorder) Clear() {
atomic.StoreInt64(&r.counter, 0)
}
func TestInc(t *testing.T) {
fakeRecorder := fakePodsRecorder{}
var wg sync.WaitGroup
loops := 100
wg.Add(loops)
for i := 0; i < loops; i++ {
go func() {
fakeRecorder.Inc()
wg.Done()
}()
}
wg.Wait()
if fakeRecorder.counter != int64(loops) {
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
}
}
func TestDec(t *testing.T) {
fakeRecorder := fakePodsRecorder{counter: 100}
var wg sync.WaitGroup
loops := 100
wg.Add(loops)
for i := 0; i < loops; i++ {
go func() {
fakeRecorder.Dec()
wg.Done()
}()
}
wg.Wait()
if fakeRecorder.counter != int64(0) {
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
}
}
func TestClear(t *testing.T) {
fakeRecorder := fakePodsRecorder{}
var wg sync.WaitGroup
incLoops, decLoops := 100, 80
wg.Add(incLoops + decLoops)
for i := 0; i < incLoops; i++ {
go func() {
fakeRecorder.Inc()
wg.Done()
}()
}
for i := 0; i < decLoops; i++ {
go func() {
fakeRecorder.Dec()
wg.Done()
}()
}
wg.Wait()
if fakeRecorder.counter != int64(incLoops-decLoops) {
t.Errorf("Expected %v, got %v", incLoops-decLoops, fakeRecorder.counter)
}
// verify Clear() works
fakeRecorder.Clear()
if fakeRecorder.counter != int64(0) {
t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter)
}
}

View File

@ -192,6 +192,16 @@ var (
Help: "Total preemption attempts in the cluster till now",
})
pendingPods = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: SchedulerSubsystem,
Name: "pending_pods_total",
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulableQ.",
}, []string{"queue"})
ActivePods = pendingPods.With(prometheus.Labels{"queue": "active"})
BackoffPods = pendingPods.With(prometheus.Labels{"queue": "backoff"})
UnschedulablePods = pendingPods.With(prometheus.Labels{"queue": "unschedulable"})
metricsList = []prometheus.Collector{
scheduleAttempts,
SchedulingLatency,
@ -210,6 +220,7 @@ var (
DeprecatedSchedulingAlgorithmPremptionEvaluationDuration,
PreemptionVictims,
PreemptionAttempts,
pendingPods,
}
)

View File

@ -25,6 +25,7 @@ import (
"fmt"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// KeyFunc is a function type to get the key from an object.
@ -127,6 +128,9 @@ type Heap struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *heapData
// metricRecorder updates the counter when elements of a heap get added or
// removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
// Add inserts an item, and puts it in the queue. The item is updated if it
@ -141,6 +145,9 @@ func (h *Heap) Add(obj interface{}) error {
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue{key, obj})
if h.metricRecorder != nil {
h.metricRecorder.Inc()
}
}
return nil
}
@ -154,6 +161,9 @@ func (h *Heap) AddIfNotPresent(obj interface{}) error {
}
if _, exists := h.data.items[key]; !exists {
heap.Push(h.data, &itemKeyValue{key, obj})
if h.metricRecorder != nil {
h.metricRecorder.Inc()
}
}
return nil
}
@ -172,6 +182,9 @@ func (h *Heap) Delete(obj interface{}) error {
}
if item, ok := h.data.items[key]; ok {
heap.Remove(h.data, item.index)
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return nil
}
return fmt.Errorf("object not found")
@ -186,6 +199,9 @@ func (h *Heap) Peek() interface{} {
func (h *Heap) Pop() (interface{}, error) {
obj := heap.Pop(h.data)
if obj != nil {
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return obj, nil
}
return nil, fmt.Errorf("object was removed from heap data")
@ -225,6 +241,11 @@ func (h *Heap) Len() int {
// NewHeap returns a Heap which can be used to queue up items to process.
func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
return NewHeapWithRecorder(keyFn, lessFn, nil)
}
// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object.
func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap {
return &Heap{
data: &heapData{
items: map[string]*heapItem{},
@ -232,5 +253,6 @@ func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
keyFunc: keyFn,
lessFunc: lessFn,
},
metricRecorder: metricRecorder,
}
}