diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index 81b2cf4acf..6f5cf38f09 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -71,6 +71,30 @@ func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.Su return workDuration } +func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { + unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: name, + Name: "unfinished_work_seconds", + Help: "How many seconds of work " + name + " has done that " + + "is in progress and hasn't been observed by work_duration. Large " + + "values indicate stuck threads. One can deduce the number of stuck " + + "threads by observing the rate at which this increases.", + }) + prometheus.Register(unfinished) + return unfinished +} + +func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { + unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: name, + Name: "longest_running_processor_microseconds", + Help: "How many microseconds has the longest running " + + "processor for " + name + " been running.", + }) + prometheus.Register(unfinished) + return unfinished +} + func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { retries := prometheus.NewCounter(prometheus.CounterOpts{ Subsystem: name, diff --git a/staging/src/k8s.io/client-go/util/workqueue/BUILD b/staging/src/k8s.io/client-go/util/workqueue/BUILD index c139977c1c..ff0e87a7c7 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/BUILD +++ b/staging/src/k8s.io/client-go/util/workqueue/BUILD @@ -11,6 +11,7 @@ go_test( srcs = [ "default_rate_limiters_test.go", "delaying_queue_test.go", + "metrics_test.go", "queue_test.go", "rate_limitting_queue_test.go", ], diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index a481bdfb26..d4c03d8378 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -19,6 +19,8 @@ package workqueue import ( "sync" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) // This file provides abstractions for setting the provider (e.g., prometheus) @@ -28,6 +30,7 @@ type queueMetrics interface { add(item t) get(item t) done(item t) + updateUnfinishedWork() } // GaugeMetric represents a single numerical value that can arbitrarily go up @@ -37,6 +40,12 @@ type GaugeMetric interface { Dec() } +// SettableGaugeMetric represents a single numerical value that can arbitrarily go up +// and down. (Separate from GaugeMetric to preserve backwards compatibility.) +type SettableGaugeMetric interface { + Set(float64) +} + // CounterMetric represents a single numerical value that only ever // goes up. type CounterMetric interface { @@ -52,9 +61,13 @@ type noopMetric struct{} func (noopMetric) Inc() {} func (noopMetric) Dec() {} +func (noopMetric) Set(float64) {} func (noopMetric) Observe(float64) {} +// defaultQueueMetrics expects the caller to lock before setting any metrics. type defaultQueueMetrics struct { + clock clock.Clock + // current depth of a workqueue depth GaugeMetric // total number of adds handled by a workqueue @@ -65,6 +78,10 @@ type defaultQueueMetrics struct { workDuration SummaryMetric addTimes map[t]time.Time processingStartTimes map[t]time.Time + + // how long have current threads been working? + unfinishedWorkSeconds SettableGaugeMetric + longestRunningProcessor SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -75,7 +92,7 @@ func (m *defaultQueueMetrics) add(item t) { m.adds.Inc() m.depth.Inc() if _, exists := m.addTimes[item]; !exists { - m.addTimes[item] = time.Now() + m.addTimes[item] = m.clock.Now() } } @@ -85,9 +102,9 @@ func (m *defaultQueueMetrics) get(item t) { } m.depth.Dec() - m.processingStartTimes[item] = time.Now() + m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { - m.latency.Observe(sinceInMicroseconds(startTime)) + m.latency.Observe(m.sinceInMicroseconds(startTime)) delete(m.addTimes, item) } } @@ -98,14 +115,39 @@ func (m *defaultQueueMetrics) done(item t) { } if startTime, exists := m.processingStartTimes[item]; exists { - m.workDuration.Observe(sinceInMicroseconds(startTime)) + m.workDuration.Observe(m.sinceInMicroseconds(startTime)) delete(m.processingStartTimes, item) } } +func (m *defaultQueueMetrics) updateUnfinishedWork() { + // Note that a summary metric would be better for this, but prometheus + // doesn't seem to have non-hacky ways to reset the summary metrics. + var total float64 + var oldest float64 + for _, t := range m.processingStartTimes { + age := m.sinceInMicroseconds(t) + total += age + if age > oldest { + oldest = age + } + } + // Convert to seconds; microseconds is unhelpfully granular for this. + total /= 1000000 + m.unfinishedWorkSeconds.Set(total) + m.longestRunningProcessor.Set(oldest) // in microseconds. +} + +type noMetrics struct{} + +func (noMetrics) add(item t) {} +func (noMetrics) get(item t) {} +func (noMetrics) done(item t) {} +func (noMetrics) updateUnfinishedWork() {} + // Gets the time since the specified start in microseconds. -func sinceInMicroseconds(start time.Time) float64 { - return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) +func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 { + return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) } type retryMetrics interface { @@ -130,6 +172,8 @@ type MetricsProvider interface { NewAddsMetric(name string) CounterMetric NewLatencyMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric + NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric + NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric } @@ -151,29 +195,49 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { return noopMetric{} } +func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } -var metricsFactory = struct { - metricsProvider MetricsProvider - setProviders sync.Once -}{ +var globalMetricsFactory = queueMetricsFactory{ metricsProvider: noopMetricsProvider{}, } -func newQueueMetrics(name string) queueMetrics { - var ret *defaultQueueMetrics - if len(name) == 0 { - return ret +type queueMetricsFactory struct { + metricsProvider MetricsProvider + + onlyOnce sync.Once +} + +func (f *queueMetricsFactory) setProvider(mp MetricsProvider) { + f.onlyOnce.Do(func() { + f.metricsProvider = mp + }) +} + +func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { + mp := f.metricsProvider + if len(name) == 0 || mp == (noopMetricsProvider{}) { + return noMetrics{} } return &defaultQueueMetrics{ - depth: metricsFactory.metricsProvider.NewDepthMetric(name), - adds: metricsFactory.metricsProvider.NewAddsMetric(name), - latency: metricsFactory.metricsProvider.NewLatencyMetric(name), - workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } @@ -183,13 +247,12 @@ func newRetryMetrics(name string) retryMetrics { return ret } return &defaultRetryMetrics{ - retries: metricsFactory.metricsProvider.NewRetriesMetric(name), + retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name), } } -// SetProvider sets the metrics provider of the metricsFactory. +// SetProvider sets the metrics provider for all subsequently created work +// queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { - metricsFactory.setProviders.Do(func() { - metricsFactory.metricsProvider = metricsProvider - }) + globalMetricsFactory.setProvider(metricsProvider) } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go new file mode 100644 index 0000000000..117f90801f --- /dev/null +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -0,0 +1,293 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +type testMetrics struct { + added, gotten, finished int64 + + updateCalled chan<- struct{} +} + +func (m *testMetrics) add(item t) { m.added++ } +func (m *testMetrics) get(item t) { m.gotten++ } +func (m *testMetrics) done(item t) { m.finished++ } +func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } + +func TestMetricShutdown(t *testing.T) { + ch := make(chan struct{}) + m := &testMetrics{ + updateCalled: ch, + } + c := clock.NewFakeClock(time.Now()) + q := newQueue(c, m, time.Millisecond) + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + c.Step(time.Millisecond) + <-ch + q.ShutDown() + + c.Step(time.Hour) + select { + default: + return + case <-ch: + t.Errorf("Unexpected update after shutdown was called.") + } +} + +type testMetric struct { + inc int64 + dec int64 + set float64 + + observedValue float64 + observedCount int + + notifyCh chan<- struct{} + + lock sync.Mutex +} + +func (m *testMetric) Inc() { + m.lock.Lock() + defer m.lock.Unlock() + m.inc++ + m.notify() +} + +func (m *testMetric) Dec() { + m.lock.Lock() + defer m.lock.Unlock() + m.dec++ + m.notify() +} + +func (m *testMetric) Set(f float64) { + m.lock.Lock() + defer m.lock.Unlock() + m.set = f + m.notify() +} + +func (m *testMetric) Observe(f float64) { + m.lock.Lock() + defer m.lock.Unlock() + m.observedValue = f + m.observedCount++ + m.notify() +} + +func (m *testMetric) gaugeValue() float64 { + m.lock.Lock() + defer m.lock.Unlock() + if m.set != 0 { + return m.set + } + return float64(m.inc - m.dec) +} + +func (m *testMetric) observationValue() float64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.observedValue +} + +func (m *testMetric) observationCount() int { + m.lock.Lock() + defer m.lock.Unlock() + return m.observedCount +} + +func (m *testMetric) notify() { + if m.notifyCh != nil { + m.notifyCh <- struct{}{} + } +} + +type testMetricsProvider struct { + depth testMetric + adds testMetric + latency testMetric + duration testMetric + unfinished testMetric + longest testMetric + retries testMetric +} + +func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { + return &m.depth +} + +func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric { + return &m.adds +} + +func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric { + return &m.latency +} + +func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { + return &m.duration +} + +func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { + return &m.unfinished +} + +func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return &m.longest +} + +func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { + return &m.retries +} + +func TestSinceInMicroseconds(t *testing.T) { + mp := testMetricsProvider{} + c := clock.NewFakeClock(time.Now()) + mf := queueMetricsFactory{metricsProvider: &mp} + m := mf.newQueueMetrics("test", c) + dqm := m.(*defaultQueueMetrics) + + for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} { + n := c.Now() + c.Step(time.Duration(i) * time.Microsecond) + if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestMetrics(t *testing.T) { + mp := testMetricsProvider{} + t0 := time.Unix(0, 0) + c := clock.NewFakeClock(t0) + mf := queueMetricsFactory{metricsProvider: &mp} + m := mf.newQueueMetrics("test", c) + q := newQueue(c, m, time.Millisecond) + defer q.ShutDown() + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + q.Add("foo") + if e, a := 1.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(50 * time.Microsecond) + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 50.0, mp.latency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.latency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 0.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Add it back while processing; multiple adds of the same item are + // de-duped. + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + if e, a := 2.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(25 * time.Microsecond) + + // Finish it up + q.Done(i) + + if e, a := 25.0, mp.duration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.duration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 25.0, mp.latency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.latency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // use a channel to ensure we don't look at the metric before it's + // been set. + ch := make(chan struct{}, 1) + mp.unfinished.notifyCh = ch + c.Step(time.Millisecond) + <-ch + mp.unfinished.notifyCh = nil + if e, a := .001, mp.unfinished.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1000.0, mp.longest.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Finish that one up + q.Done(i) + if e, a := 1000.0, mp.duration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.duration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } +} diff --git a/staging/src/k8s.io/client-go/util/workqueue/queue.go b/staging/src/k8s.io/client-go/util/workqueue/queue.go index dc9a7cc7b7..39009b8e79 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/queue.go @@ -18,6 +18,9 @@ package workqueue import ( "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" ) type Interface interface { @@ -35,14 +38,29 @@ func New() *Type { } func NewNamed(name string) *Type { - return &Type{ - dirty: set{}, - processing: set{}, - cond: sync.NewCond(&sync.Mutex{}), - metrics: newQueueMetrics(name), - } + rc := clock.RealClock{} + return newQueue( + rc, + globalMetricsFactory.newQueueMetrics(name, rc), + defaultUnfinishedWorkUpdatePeriod, + ) } +func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type { + t := &Type{ + clock: c, + dirty: set{}, + processing: set{}, + cond: sync.NewCond(&sync.Mutex{}), + metrics: metrics, + unfinishedWorkUpdatePeriod: updatePeriod, + } + go t.updateUnfinishedWorkLoop() + return t +} + +const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond + // Type is a work queue (see the package comment). type Type struct { // queue defines the order in which we will work on items. Every @@ -64,6 +82,9 @@ type Type struct { shuttingDown bool metrics queueMetrics + + unfinishedWorkUpdatePeriod time.Duration + clock clock.Clock } type empty struct{} @@ -170,3 +191,22 @@ func (q *Type) ShuttingDown() bool { return q.shuttingDown } + +func (q *Type) updateUnfinishedWorkLoop() { + t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) + defer t.Stop() + for range t.C() { + if !func() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if !q.shuttingDown { + q.metrics.updateUnfinishedWork() + return true + } + return false + + }() { + return + } + } +}