Merge pull request #70884 from lavalamp/workqueue

add a metric that can be used to notice stuck worker threads
pull/58/head
k8s-ci-robot 2018-11-13 14:59:27 -08:00 committed by GitHub
commit bc6aee19b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 452 additions and 31 deletions

View File

@ -71,6 +71,30 @@ func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.Su
return workDuration return workDuration
} }
func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "unfinished_work_seconds",
Help: "How many seconds of work " + name + " has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
})
prometheus.Register(unfinished)
return unfinished
}
func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "longest_running_processor_microseconds",
Help: "How many microseconds has the longest running " +
"processor for " + name + " been running.",
})
prometheus.Register(unfinished)
return unfinished
}
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{ retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name, Subsystem: name,

View File

@ -11,6 +11,7 @@ go_test(
srcs = [ srcs = [
"default_rate_limiters_test.go", "default_rate_limiters_test.go",
"delaying_queue_test.go", "delaying_queue_test.go",
"metrics_test.go",
"queue_test.go", "queue_test.go",
"rate_limitting_queue_test.go", "rate_limitting_queue_test.go",
], ],

View File

@ -19,6 +19,8 @@ package workqueue
import ( import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
) )
// This file provides abstractions for setting the provider (e.g., prometheus) // This file provides abstractions for setting the provider (e.g., prometheus)
@ -28,6 +30,7 @@ type queueMetrics interface {
add(item t) add(item t)
get(item t) get(item t)
done(item t) done(item t)
updateUnfinishedWork()
} }
// GaugeMetric represents a single numerical value that can arbitrarily go up // GaugeMetric represents a single numerical value that can arbitrarily go up
@ -37,6 +40,12 @@ type GaugeMetric interface {
Dec() Dec()
} }
// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SettableGaugeMetric interface {
Set(float64)
}
// CounterMetric represents a single numerical value that only ever // CounterMetric represents a single numerical value that only ever
// goes up. // goes up.
type CounterMetric interface { type CounterMetric interface {
@ -52,9 +61,13 @@ type noopMetric struct{}
func (noopMetric) Inc() {} func (noopMetric) Inc() {}
func (noopMetric) Dec() {} func (noopMetric) Dec() {}
func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {} func (noopMetric) Observe(float64) {}
// defaultQueueMetrics expects the caller to lock before setting any metrics.
type defaultQueueMetrics struct { type defaultQueueMetrics struct {
clock clock.Clock
// current depth of a workqueue // current depth of a workqueue
depth GaugeMetric depth GaugeMetric
// total number of adds handled by a workqueue // total number of adds handled by a workqueue
@ -65,6 +78,10 @@ type defaultQueueMetrics struct {
workDuration SummaryMetric workDuration SummaryMetric
addTimes map[t]time.Time addTimes map[t]time.Time
processingStartTimes map[t]time.Time processingStartTimes map[t]time.Time
// how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
} }
func (m *defaultQueueMetrics) add(item t) { func (m *defaultQueueMetrics) add(item t) {
@ -75,7 +92,7 @@ func (m *defaultQueueMetrics) add(item t) {
m.adds.Inc() m.adds.Inc()
m.depth.Inc() m.depth.Inc()
if _, exists := m.addTimes[item]; !exists { if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now() m.addTimes[item] = m.clock.Now()
} }
} }
@ -85,9 +102,9 @@ func (m *defaultQueueMetrics) get(item t) {
} }
m.depth.Dec() m.depth.Dec()
m.processingStartTimes[item] = time.Now() m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists { if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime)) m.latency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item) delete(m.addTimes, item)
} }
} }
@ -98,14 +115,39 @@ func (m *defaultQueueMetrics) done(item t) {
} }
if startTime, exists := m.processingStartTimes[item]; exists { if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime)) m.workDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item) delete(m.processingStartTimes, item)
} }
} }
func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Note that a summary metric would be better for this, but prometheus
// doesn't seem to have non-hacky ways to reset the summary metrics.
var total float64
var oldest float64
for _, t := range m.processingStartTimes {
age := m.sinceInMicroseconds(t)
total += age
if age > oldest {
oldest = age
}
}
// Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000
m.unfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest) // in microseconds.
}
type noMetrics struct{}
func (noMetrics) add(item t) {}
func (noMetrics) get(item t) {}
func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {}
// Gets the time since the specified start in microseconds. // Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 { func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
} }
type retryMetrics interface { type retryMetrics interface {
@ -130,6 +172,8 @@ type MetricsProvider interface {
NewAddsMetric(name string) CounterMetric NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric NewRetriesMetric(name string) CounterMetric
} }
@ -151,29 +195,49 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return noopMetric{} return noopMetric{}
} }
func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{} return noopMetric{}
} }
var metricsFactory = struct { var globalMetricsFactory = queueMetricsFactory{
metricsProvider MetricsProvider
setProviders sync.Once
}{
metricsProvider: noopMetricsProvider{}, metricsProvider: noopMetricsProvider{},
} }
func newQueueMetrics(name string) queueMetrics { type queueMetricsFactory struct {
var ret *defaultQueueMetrics metricsProvider MetricsProvider
if len(name) == 0 {
return ret onlyOnce sync.Once
}
func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
f.onlyOnce.Do(func() {
f.metricsProvider = mp
})
}
func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
mp := f.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{}
} }
return &defaultQueueMetrics{ return &defaultQueueMetrics{
depth: metricsFactory.metricsProvider.NewDepthMetric(name), clock: clock,
adds: metricsFactory.metricsProvider.NewAddsMetric(name), depth: mp.NewDepthMetric(name),
latency: metricsFactory.metricsProvider.NewLatencyMetric(name), adds: mp.NewAddsMetric(name),
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name), latency: mp.NewLatencyMetric(name),
addTimes: map[t]time.Time{}, workDuration: mp.NewWorkDurationMetric(name),
processingStartTimes: map[t]time.Time{}, unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
} }
} }
@ -183,13 +247,12 @@ func newRetryMetrics(name string) retryMetrics {
return ret return ret
} }
return &defaultRetryMetrics{ return &defaultRetryMetrics{
retries: metricsFactory.metricsProvider.NewRetriesMetric(name), retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
} }
} }
// SetProvider sets the metrics provider of the metricsFactory. // SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) { func SetProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() { globalMetricsFactory.setProvider(metricsProvider)
metricsFactory.metricsProvider = metricsProvider
})
} }

View File

@ -0,0 +1,293 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
type testMetrics struct {
added, gotten, finished int64
updateCalled chan<- struct{}
}
func (m *testMetrics) add(item t) { m.added++ }
func (m *testMetrics) get(item t) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetricShutdown(t *testing.T) {
ch := make(chan struct{})
m := &testMetrics{
updateCalled: ch,
}
c := clock.NewFakeClock(time.Now())
q := newQueue(c, m, time.Millisecond)
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
c.Step(time.Millisecond)
<-ch
q.ShutDown()
c.Step(time.Hour)
select {
default:
return
case <-ch:
t.Errorf("Unexpected update after shutdown was called.")
}
}
type testMetric struct {
inc int64
dec int64
set float64
observedValue float64
observedCount int
notifyCh chan<- struct{}
lock sync.Mutex
}
func (m *testMetric) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.inc++
m.notify()
}
func (m *testMetric) Dec() {
m.lock.Lock()
defer m.lock.Unlock()
m.dec++
m.notify()
}
func (m *testMetric) Set(f float64) {
m.lock.Lock()
defer m.lock.Unlock()
m.set = f
m.notify()
}
func (m *testMetric) Observe(f float64) {
m.lock.Lock()
defer m.lock.Unlock()
m.observedValue = f
m.observedCount++
m.notify()
}
func (m *testMetric) gaugeValue() float64 {
m.lock.Lock()
defer m.lock.Unlock()
if m.set != 0 {
return m.set
}
return float64(m.inc - m.dec)
}
func (m *testMetric) observationValue() float64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.observedValue
}
func (m *testMetric) observationCount() int {
m.lock.Lock()
defer m.lock.Unlock()
return m.observedCount
}
func (m *testMetric) notify() {
if m.notifyCh != nil {
m.notifyCh <- struct{}{}
}
}
type testMetricsProvider struct {
depth testMetric
adds testMetric
latency testMetric
duration testMetric
unfinished testMetric
longest testMetric
retries testMetric
}
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
return &m.depth
}
func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
return &m.adds
}
func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
return &m.latency
}
func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return &m.duration
}
func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return &m.unfinished
}
func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.longest
}
func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries
}
func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now())
mf := queueMetricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
dqm := m.(*defaultQueueMetrics)
for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} {
n := c.Now()
c.Step(time.Duration(i) * time.Microsecond)
if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}
func TestMetrics(t *testing.T) {
mp := testMetricsProvider{}
t0 := time.Unix(0, 0)
c := clock.NewFakeClock(t0)
mf := queueMetricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
q := newQueue(c, m, time.Millisecond)
defer q.ShutDown()
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
q.Add("foo")
if e, a := 1.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond)
// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 50.0, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are
// de-duped.
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond)
// Finish it up
q.Done(i)
if e, a := 25.0, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue
i, _ = q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 25.0, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's
// been set.
ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch
c.Step(time.Millisecond)
<-ch
mp.unfinished.notifyCh = nil
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up
q.Done(i)
if e, a := 1000.0, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -18,6 +18,9 @@ package workqueue
import ( import (
"sync" "sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
) )
type Interface interface { type Interface interface {
@ -35,14 +38,29 @@ func New() *Type {
} }
func NewNamed(name string) *Type { func NewNamed(name string) *Type {
return &Type{ rc := clock.RealClock{}
dirty: set{}, return newQueue(
processing: set{}, rc,
cond: sync.NewCond(&sync.Mutex{}), globalMetricsFactory.newQueueMetrics(name, rc),
metrics: newQueueMetrics(name), defaultUnfinishedWorkUpdatePeriod,
} )
} }
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
clock: c,
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
go t.updateUnfinishedWorkLoop()
return t
}
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
// Type is a work queue (see the package comment). // Type is a work queue (see the package comment).
type Type struct { type Type struct {
// queue defines the order in which we will work on items. Every // queue defines the order in which we will work on items. Every
@ -64,6 +82,9 @@ type Type struct {
shuttingDown bool shuttingDown bool
metrics queueMetrics metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
} }
type empty struct{} type empty struct{}
@ -170,3 +191,22 @@ func (q *Type) ShuttingDown() bool {
return q.shuttingDown return q.shuttingDown
} }
func (q *Type) updateUnfinishedWorkLoop() {
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C() {
if !func() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown {
q.metrics.updateUnfinishedWork()
return true
}
return false
}() {
return
}
}
}