add a metric that can be used to notice stuck worker threads

pull/58/head
Daniel Smith 2018-11-09 10:43:44 -08:00
parent 06e737367d
commit 6195d1005d
4 changed files with 143 additions and 15 deletions

View File

@ -71,6 +71,16 @@ func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.Su
return workDuration
}
func (prometheusMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "unfinished_work_microseconds",
Help: "How many microseconds of work has " + name + " done that is still in progress and hasn't yet been observed by work_duration.",
})
prometheus.Register(unfinished)
return unfinished
}
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,

View File

@ -28,6 +28,7 @@ type queueMetrics interface {
add(item t)
get(item t)
done(item t)
updateUnfinishedWork()
}
// GaugeMetric represents a single numerical value that can arbitrarily go up
@ -37,6 +38,12 @@ type GaugeMetric interface {
Dec()
}
// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SettableGaugeMetric interface {
Set(float64)
}
// CounterMetric represents a single numerical value that only ever
// goes up.
type CounterMetric interface {
@ -52,6 +59,7 @@ type noopMetric struct{}
func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {}
type defaultQueueMetrics struct {
@ -65,6 +73,9 @@ type defaultQueueMetrics struct {
workDuration SummaryMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
// how long have current threads been working?
unfinishedWorkMicroseconds SettableGaugeMetric
}
func (m *defaultQueueMetrics) add(item t) {
@ -103,6 +114,23 @@ func (m *defaultQueueMetrics) done(item t) {
}
}
func (m *defaultQueueMetrics) updateUnfinishedWork() {
var total float64
if m.processingStartTimes != nil {
for _, t := range m.processingStartTimes {
total += sinceInMicroseconds(t)
}
}
m.unfinishedWorkMicroseconds.Set(total)
}
type noMetrics struct{}
func (noMetrics) add(item t) {}
func (noMetrics) get(item t) {}
func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {}
// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
@ -130,6 +158,7 @@ type MetricsProvider interface {
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
}
@ -151,6 +180,10 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
@ -163,17 +196,18 @@ var metricsFactory = struct {
}
func newQueueMetrics(name string) queueMetrics {
var ret *defaultQueueMetrics
if len(name) == 0 {
return ret
mp := metricsFactory.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{}
}
return &defaultQueueMetrics{
depth: metricsFactory.metricsProvider.NewDepthMetric(name),
adds: metricsFactory.metricsProvider.NewAddsMetric(name),
latency: metricsFactory.metricsProvider.NewLatencyMetric(name),
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkMicroseconds: mp.NewUnfinishedWorkMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"testing"
"time"
)
type testMetrics struct {
added, gotten, finished int64
updateCalled chan<- struct{}
}
func (m *testMetrics) add(item t) { m.added++ }
func (m *testMetrics) get(item t) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetrics(t *testing.T) {
ch := make(chan struct{})
m := &testMetrics{
updateCalled: ch,
}
q := newQueue("test", m, time.Millisecond)
<-ch
q.ShutDown()
select {
case <-time.After(time.Second):
return
case <-ch:
t.Errorf("Unexpected update after shutdown was called.")
}
}

View File

@ -18,6 +18,7 @@ package workqueue
import (
"sync"
"time"
)
type Interface interface {
@ -35,14 +36,27 @@ func New() *Type {
}
func NewNamed(name string) *Type {
return &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: newQueueMetrics(name),
}
return newQueue(
name,
newQueueMetrics(name),
defaultUnfinishedWorkUpdatePeriod,
)
}
func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
go t.updateUnfinishedWorkLook()
return t
}
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
@ -64,6 +78,8 @@ type Type struct {
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
}
type empty struct{}
@ -170,3 +186,22 @@ func (q *Type) ShuttingDown() bool {
return q.shuttingDown
}
func (q *Type) updateUnfinishedWorkLook() {
t := time.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C {
if !func() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown {
q.metrics.updateUnfinishedWork()
return true
}
return false
}() {
return
}
}
}