From 6195d1005d81eaa5dd49da744f5beab178340f5a Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 9 Nov 2018 10:43:44 -0800 Subject: [PATCH 1/8] add a metric that can be used to notice stuck worker threads --- pkg/util/workqueue/prometheus/prometheus.go | 10 ++++ .../client-go/util/workqueue/metrics.go | 52 +++++++++++++++---- .../client-go/util/workqueue/metrics_test.go | 49 +++++++++++++++++ .../k8s.io/client-go/util/workqueue/queue.go | 47 ++++++++++++++--- 4 files changed, 143 insertions(+), 15 deletions(-) create mode 100644 staging/src/k8s.io/client-go/util/workqueue/metrics_test.go diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index 81b2cf4acf..460c3d4dff 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -71,6 +71,16 @@ func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.Su return workDuration } +func (prometheusMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { + unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: name, + Name: "unfinished_work_microseconds", + Help: "How many microseconds of work has " + name + " done that is still in progress and hasn't yet been observed by work_duration.", + }) + prometheus.Register(unfinished) + return unfinished +} + func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { retries := prometheus.NewCounter(prometheus.CounterOpts{ Subsystem: name, diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index a481bdfb26..a55ff5c210 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -28,6 +28,7 @@ type queueMetrics interface { add(item t) get(item t) done(item t) + updateUnfinishedWork() } // GaugeMetric represents a single numerical value that can arbitrarily go up @@ -37,6 +38,12 @@ type GaugeMetric interface { Dec() } +// SettableGaugeMetric represents a single numerical value that can arbitrarily go up +// and down. (Separate from GaugeMetric to preserve backwards compatibility.) +type SettableGaugeMetric interface { + Set(float64) +} + // CounterMetric represents a single numerical value that only ever // goes up. type CounterMetric interface { @@ -52,6 +59,7 @@ type noopMetric struct{} func (noopMetric) Inc() {} func (noopMetric) Dec() {} +func (noopMetric) Set(float64) {} func (noopMetric) Observe(float64) {} type defaultQueueMetrics struct { @@ -65,6 +73,9 @@ type defaultQueueMetrics struct { workDuration SummaryMetric addTimes map[t]time.Time processingStartTimes map[t]time.Time + + // how long have current threads been working? + unfinishedWorkMicroseconds SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -103,6 +114,23 @@ func (m *defaultQueueMetrics) done(item t) { } } +func (m *defaultQueueMetrics) updateUnfinishedWork() { + var total float64 + if m.processingStartTimes != nil { + for _, t := range m.processingStartTimes { + total += sinceInMicroseconds(t) + } + } + m.unfinishedWorkMicroseconds.Set(total) +} + +type noMetrics struct{} + +func (noMetrics) add(item t) {} +func (noMetrics) get(item t) {} +func (noMetrics) done(item t) {} +func (noMetrics) updateUnfinishedWork() {} + // Gets the time since the specified start in microseconds. func sinceInMicroseconds(start time.Time) float64 { return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) @@ -130,6 +158,7 @@ type MetricsProvider interface { NewAddsMetric(name string) CounterMetric NewLatencyMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric + NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric } @@ -151,6 +180,10 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { return noopMetric{} } +func (_ noopMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } @@ -163,17 +196,18 @@ var metricsFactory = struct { } func newQueueMetrics(name string) queueMetrics { - var ret *defaultQueueMetrics - if len(name) == 0 { - return ret + mp := metricsFactory.metricsProvider + if len(name) == 0 || mp == (noopMetricsProvider{}) { + return noMetrics{} } return &defaultQueueMetrics{ - depth: metricsFactory.metricsProvider.NewDepthMetric(name), - adds: metricsFactory.metricsProvider.NewAddsMetric(name), - latency: metricsFactory.metricsProvider.NewLatencyMetric(name), - workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkMicroseconds: mp.NewUnfinishedWorkMicrosecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go new file mode 100644 index 0000000000..e8576cf92e --- /dev/null +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" +) + +type testMetrics struct { + added, gotten, finished int64 + + updateCalled chan<- struct{} +} + +func (m *testMetrics) add(item t) { m.added++ } +func (m *testMetrics) get(item t) { m.gotten++ } +func (m *testMetrics) done(item t) { m.finished++ } +func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } + +func TestMetrics(t *testing.T) { + ch := make(chan struct{}) + m := &testMetrics{ + updateCalled: ch, + } + q := newQueue("test", m, time.Millisecond) + <-ch + q.ShutDown() + select { + case <-time.After(time.Second): + return + case <-ch: + t.Errorf("Unexpected update after shutdown was called.") + } +} diff --git a/staging/src/k8s.io/client-go/util/workqueue/queue.go b/staging/src/k8s.io/client-go/util/workqueue/queue.go index dc9a7cc7b7..66118cd00f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/queue.go @@ -18,6 +18,7 @@ package workqueue import ( "sync" + "time" ) type Interface interface { @@ -35,14 +36,27 @@ func New() *Type { } func NewNamed(name string) *Type { - return &Type{ - dirty: set{}, - processing: set{}, - cond: sync.NewCond(&sync.Mutex{}), - metrics: newQueueMetrics(name), - } + return newQueue( + name, + newQueueMetrics(name), + defaultUnfinishedWorkUpdatePeriod, + ) } +func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type { + t := &Type{ + dirty: set{}, + processing: set{}, + cond: sync.NewCond(&sync.Mutex{}), + metrics: metrics, + unfinishedWorkUpdatePeriod: updatePeriod, + } + go t.updateUnfinishedWorkLook() + return t +} + +const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond + // Type is a work queue (see the package comment). type Type struct { // queue defines the order in which we will work on items. Every @@ -64,6 +78,8 @@ type Type struct { shuttingDown bool metrics queueMetrics + + unfinishedWorkUpdatePeriod time.Duration } type empty struct{} @@ -170,3 +186,22 @@ func (q *Type) ShuttingDown() bool { return q.shuttingDown } + +func (q *Type) updateUnfinishedWorkLook() { + t := time.NewTicker(q.unfinishedWorkUpdatePeriod) + defer t.Stop() + for range t.C { + if !func() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if !q.shuttingDown { + q.metrics.updateUnfinishedWork() + return true + } + return false + + }() { + return + } + } +} From 5a8444ceec9e28e8a7dbf36bfd7cb55554c5b865 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 9 Nov 2018 16:12:11 -0800 Subject: [PATCH 2/8] Test workqueue metrics --- .../client-go/util/workqueue/metrics.go | 52 +++-- .../client-go/util/workqueue/metrics_test.go | 201 +++++++++++++++++- .../k8s.io/client-go/util/workqueue/queue.go | 19 +- 3 files changed, 241 insertions(+), 31 deletions(-) diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index a55ff5c210..af224238a7 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -19,6 +19,8 @@ package workqueue import ( "sync" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) // This file provides abstractions for setting the provider (e.g., prometheus) @@ -63,6 +65,8 @@ func (noopMetric) Set(float64) {} func (noopMetric) Observe(float64) {} type defaultQueueMetrics struct { + clock clock.Clock + // current depth of a workqueue depth GaugeMetric // total number of adds handled by a workqueue @@ -86,7 +90,7 @@ func (m *defaultQueueMetrics) add(item t) { m.adds.Inc() m.depth.Inc() if _, exists := m.addTimes[item]; !exists { - m.addTimes[item] = time.Now() + m.addTimes[item] = m.clock.Now() } } @@ -96,9 +100,9 @@ func (m *defaultQueueMetrics) get(item t) { } m.depth.Dec() - m.processingStartTimes[item] = time.Now() + m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { - m.latency.Observe(sinceInMicroseconds(startTime)) + m.latency.Observe(m.sinceInMicroseconds(startTime)) delete(m.addTimes, item) } } @@ -109,17 +113,15 @@ func (m *defaultQueueMetrics) done(item t) { } if startTime, exists := m.processingStartTimes[item]; exists { - m.workDuration.Observe(sinceInMicroseconds(startTime)) + m.workDuration.Observe(m.sinceInMicroseconds(startTime)) delete(m.processingStartTimes, item) } } func (m *defaultQueueMetrics) updateUnfinishedWork() { var total float64 - if m.processingStartTimes != nil { - for _, t := range m.processingStartTimes { - total += sinceInMicroseconds(t) - } + for _, t := range m.processingStartTimes { + total += m.sinceInMicroseconds(t) } m.unfinishedWorkMicroseconds.Set(total) } @@ -132,8 +134,8 @@ func (noMetrics) done(item t) {} func (noMetrics) updateUnfinishedWork() {} // Gets the time since the specified start in microseconds. -func sinceInMicroseconds(start time.Time) float64 { - return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) +func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 { + return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) } type retryMetrics interface { @@ -188,19 +190,28 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } -var metricsFactory = struct { - metricsProvider MetricsProvider - setProviders sync.Once -}{ +var globalMetricsFactory = metricsFactory{ metricsProvider: noopMetricsProvider{}, } -func newQueueMetrics(name string) queueMetrics { - mp := metricsFactory.metricsProvider +type metricsFactory struct { + metricsProvider MetricsProvider + setProviders sync.Once +} + +func (f *metricsFactory) set(mp MetricsProvider) { + f.setProviders.Do(func() { + f.metricsProvider = mp + }) +} + +func (f *metricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { + mp := f.metricsProvider if len(name) == 0 || mp == (noopMetricsProvider{}) { return noMetrics{} } return &defaultQueueMetrics{ + clock: clock, depth: mp.NewDepthMetric(name), adds: mp.NewAddsMetric(name), latency: mp.NewLatencyMetric(name), @@ -217,13 +228,12 @@ func newRetryMetrics(name string) retryMetrics { return ret } return &defaultRetryMetrics{ - retries: metricsFactory.metricsProvider.NewRetriesMetric(name), + retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name), } } -// SetProvider sets the metrics provider of the metricsFactory. +// SetProvider sets the metrics provider for all subsequently created work +// queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { - metricsFactory.setProviders.Do(func() { - metricsFactory.metricsProvider = metricsProvider - }) + globalMetricsFactory.set(metricsProvider) } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index e8576cf92e..64305acd59 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -19,6 +19,8 @@ package workqueue import ( "testing" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) type testMetrics struct { @@ -32,18 +34,211 @@ func (m *testMetrics) get(item t) { m.gotten++ } func (m *testMetrics) done(item t) { m.finished++ } func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } -func TestMetrics(t *testing.T) { +func TestMetricShutdown(t *testing.T) { ch := make(chan struct{}) m := &testMetrics{ updateCalled: ch, } - q := newQueue("test", m, time.Millisecond) + c := clock.NewFakeClock(time.Now()) + q := newQueue(c, m, time.Millisecond) + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + c.Step(time.Millisecond) <-ch q.ShutDown() + + c.Step(time.Hour) select { - case <-time.After(time.Second): + default: return case <-ch: t.Errorf("Unexpected update after shutdown was called.") } } + +type testMetric struct { + inc int64 + dec int64 + set float64 + + observedValue float64 + observedCount int + + notifyCh chan<- struct{} +} + +func (m *testMetric) Inc() { m.inc++; m.notify() } +func (m *testMetric) Dec() { m.dec++; m.notify() } +func (m *testMetric) Set(f float64) { m.set = f; m.notify() } +func (m *testMetric) Observe(f float64) { m.observedValue = f; m.observedCount++; m.notify() } + +func (m *testMetric) gaugeValue() float64 { + if m.set != 0 { + return m.set + } + return float64(m.inc - m.dec) +} + +func (m *testMetric) notify() { + if m.notifyCh != nil { + m.notifyCh <- struct{}{} + } +} + +type testMetricsProvider struct { + depth testMetric + adds testMetric + latency testMetric + duration testMetric + unfinished testMetric + retries testMetric +} + +func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { + return &m.depth +} + +func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric { + return &m.adds +} + +func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric { + return &m.latency +} + +func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { + return &m.duration +} + +func (m *testMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric { + return &m.unfinished +} + +func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { + return &m.retries +} + +func TestSinceInMicroseconds(t *testing.T) { + mp := testMetricsProvider{} + c := clock.NewFakeClock(time.Now()) + mf := metricsFactory{metricsProvider: &mp} + m := mf.newQueueMetrics("test", c) + dqm := m.(*defaultQueueMetrics) + + for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} { + n := c.Now() + c.Step(time.Duration(i) * time.Microsecond) + if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestMetrics(t *testing.T) { + mp := testMetricsProvider{} + t0 := time.Unix(0, 0) + c := clock.NewFakeClock(t0) + mf := metricsFactory{metricsProvider: &mp} + m := mf.newQueueMetrics("test", c) + q := newQueue(c, m, time.Millisecond) + defer q.ShutDown() + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + q.Add("foo") + if e, a := 1.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(50 * time.Microsecond) + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 50.0, mp.latency.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.latency.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 0.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Add it back while processing; multiple adds of the same item are + // de-duped. + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + if e, a := 2.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(25 * time.Microsecond) + + // Finish it up + q.Done(i) + + if e, a := 25.0, mp.duration.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.duration.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 25.0, mp.latency.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.latency.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // use a channel to ensure we don't look at the metric before it's + // been set. + ch := make(chan struct{}, 1) + mp.unfinished.notifyCh = ch + c.Step(time.Millisecond) + <-ch + mp.unfinished.notifyCh = nil + if e, a := 1000.0, mp.unfinished.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Finish that one up + q.Done(i) + if e, a := 1000.0, mp.duration.observedValue; e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.duration.observedCount; e != a { + t.Errorf("expected %v, got %v", e, a) + } +} diff --git a/staging/src/k8s.io/client-go/util/workqueue/queue.go b/staging/src/k8s.io/client-go/util/workqueue/queue.go index 66118cd00f..39009b8e79 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/queue.go @@ -19,6 +19,8 @@ package workqueue import ( "sync" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) type Interface interface { @@ -36,22 +38,24 @@ func New() *Type { } func NewNamed(name string) *Type { + rc := clock.RealClock{} return newQueue( - name, - newQueueMetrics(name), + rc, + globalMetricsFactory.newQueueMetrics(name, rc), defaultUnfinishedWorkUpdatePeriod, ) } -func newQueue(name string, metrics queueMetrics, updatePeriod time.Duration) *Type { +func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type { t := &Type{ + clock: c, dirty: set{}, processing: set{}, cond: sync.NewCond(&sync.Mutex{}), metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod, } - go t.updateUnfinishedWorkLook() + go t.updateUnfinishedWorkLoop() return t } @@ -80,6 +84,7 @@ type Type struct { metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration + clock clock.Clock } type empty struct{} @@ -187,10 +192,10 @@ func (q *Type) ShuttingDown() bool { return q.shuttingDown } -func (q *Type) updateUnfinishedWorkLook() { - t := time.NewTicker(q.unfinishedWorkUpdatePeriod) +func (q *Type) updateUnfinishedWorkLoop() { + t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) defer t.Stop() - for range t.C { + for range t.C() { if !func() bool { q.cond.L.Lock() defer q.cond.L.Unlock() From 74c50c0ad334ec2c1cc5b8fb9677315ef5aa8f44 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 9 Nov 2018 16:13:15 -0800 Subject: [PATCH 3/8] generated files --- staging/src/k8s.io/client-go/util/workqueue/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/staging/src/k8s.io/client-go/util/workqueue/BUILD b/staging/src/k8s.io/client-go/util/workqueue/BUILD index c139977c1c..ff0e87a7c7 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/BUILD +++ b/staging/src/k8s.io/client-go/util/workqueue/BUILD @@ -11,6 +11,7 @@ go_test( srcs = [ "default_rate_limiters_test.go", "delaying_queue_test.go", + "metrics_test.go", "queue_test.go", "rate_limitting_queue_test.go", ], From 44a87baf09aec4f2e8e159de4c409d27aebbb1da Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sat, 10 Nov 2018 18:24:59 -0800 Subject: [PATCH 4/8] fixup! Test workqueue metrics --- .../k8s.io/client-go/util/workqueue/metrics.go | 16 +++++++++------- .../client-go/util/workqueue/metrics_test.go | 4 ++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index af224238a7..43c0ab5e9e 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -64,6 +64,7 @@ func (noopMetric) Dec() {} func (noopMetric) Set(float64) {} func (noopMetric) Observe(float64) {} +// defaultQueueMetrics expects the caller to lock before setting any metrics. type defaultQueueMetrics struct { clock clock.Clock @@ -190,22 +191,23 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } -var globalMetricsFactory = metricsFactory{ +var globalMetricsFactory = queueMetricsFactory{ metricsProvider: noopMetricsProvider{}, } -type metricsFactory struct { +type queueMetricsFactory struct { metricsProvider MetricsProvider - setProviders sync.Once + + onlyOnce sync.Once } -func (f *metricsFactory) set(mp MetricsProvider) { - f.setProviders.Do(func() { +func (f *queueMetricsFactory) setProvider(mp MetricsProvider) { + f.onlyOnce.Do(func() { f.metricsProvider = mp }) } -func (f *metricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { +func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { mp := f.metricsProvider if len(name) == 0 || mp == (noopMetricsProvider{}) { return noMetrics{} @@ -235,5 +237,5 @@ func newRetryMetrics(name string) retryMetrics { // SetProvider sets the metrics provider for all subsequently created work // queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { - globalMetricsFactory.set(metricsProvider) + globalMetricsFactory.setProvider(metricsProvider) } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index 64305acd59..065e2c40b1 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -124,7 +124,7 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { func TestSinceInMicroseconds(t *testing.T) { mp := testMetricsProvider{} c := clock.NewFakeClock(time.Now()) - mf := metricsFactory{metricsProvider: &mp} + mf := queueMetricsFactory{metricsProvider: &mp} m := mf.newQueueMetrics("test", c) dqm := m.(*defaultQueueMetrics) @@ -141,7 +141,7 @@ func TestMetrics(t *testing.T) { mp := testMetricsProvider{} t0 := time.Unix(0, 0) c := clock.NewFakeClock(t0) - mf := metricsFactory{metricsProvider: &mp} + mf := queueMetricsFactory{metricsProvider: &mp} m := mf.newQueueMetrics("test", c) q := newQueue(c, m, time.Millisecond) defer q.ShutDown() From 578962d934df19cb2cb7ec0536dcb76f53951e68 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sat, 10 Nov 2018 18:46:43 -0800 Subject: [PATCH 5/8] fixup! Test workqueue metrics change units to seconds --- pkg/util/workqueue/prometheus/prometheus.go | 9 ++++--- .../client-go/util/workqueue/metrics.go | 26 ++++++++++--------- .../client-go/util/workqueue/metrics_test.go | 4 +-- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index 460c3d4dff..7c1f270f6f 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -71,11 +71,14 @@ func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.Su return workDuration } -func (prometheusMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { +func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ Subsystem: name, - Name: "unfinished_work_microseconds", - Help: "How many microseconds of work has " + name + " done that is still in progress and hasn't yet been observed by work_duration.", + Name: "unfinished_work_seconds", + Help: "How many seconds of work " + name + " has done that " + + "is in progress and hasn't been observed by work_duration. Large " + + "values indicate stuck threads. One can deduce the number of stuck " + + "threads by observing the rate at which this increases.", }) prometheus.Register(unfinished) return unfinished diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index 43c0ab5e9e..51b5f2426d 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -80,7 +80,7 @@ type defaultQueueMetrics struct { processingStartTimes map[t]time.Time // how long have current threads been working? - unfinishedWorkMicroseconds SettableGaugeMetric + unfinishedWorkSeconds SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -124,7 +124,9 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() { for _, t := range m.processingStartTimes { total += m.sinceInMicroseconds(t) } - m.unfinishedWorkMicroseconds.Set(total) + // Convert to seconds; microseconds is unhelpfully granular for this. + total /= 1000000 + m.unfinishedWorkSeconds.Set(total) } type noMetrics struct{} @@ -161,7 +163,7 @@ type MetricsProvider interface { NewAddsMetric(name string) CounterMetric NewLatencyMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric - NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric + NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric } @@ -183,7 +185,7 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { return noopMetric{} } -func (_ noopMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric { +func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { return noopMetric{} } @@ -213,14 +215,14 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu return noMetrics{} } return &defaultQueueMetrics{ - clock: clock, - depth: mp.NewDepthMetric(name), - adds: mp.NewAddsMetric(name), - latency: mp.NewLatencyMetric(name), - workDuration: mp.NewWorkDurationMetric(name), - unfinishedWorkMicroseconds: mp.NewUnfinishedWorkMicrosecondsMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index 065e2c40b1..90b2cf25e7 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -113,7 +113,7 @@ func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { return &m.duration } -func (m *testMetricsProvider) NewUnfinishedWorkMicrosecondsMetric(name string) SettableGaugeMetric { +func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { return &m.unfinished } @@ -229,7 +229,7 @@ func TestMetrics(t *testing.T) { c.Step(time.Millisecond) <-ch mp.unfinished.notifyCh = nil - if e, a := 1000.0, mp.unfinished.gaugeValue(); e != a { + if e, a := .001, mp.unfinished.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } From fd77aa5a41bbce7490dd4538c0d5743cb59b2be4 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 12 Nov 2018 10:52:18 -0800 Subject: [PATCH 6/8] add longest_running_processor_microseconds metric --- pkg/util/workqueue/prometheus/prometheus.go | 11 ++++++ .../client-go/util/workqueue/metrics.go | 35 +++++++++++++------ .../client-go/util/workqueue/metrics_test.go | 8 +++++ 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index 7c1f270f6f..c2f2cfce52 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -84,6 +84,17 @@ func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) wor return unfinished } +func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { + unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: name, + Name: "longest_running_procesor_microseconds", + Help: "How many microseconds has the longest running " + + "processor for " + name + " been running.", + }) + prometheus.Register(unfinished) + return unfinished +} + func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { retries := prometheus.NewCounter(prometheus.CounterOpts{ Subsystem: name, diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index 51b5f2426d..d4c03d8378 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -80,7 +80,8 @@ type defaultQueueMetrics struct { processingStartTimes map[t]time.Time // how long have current threads been working? - unfinishedWorkSeconds SettableGaugeMetric + unfinishedWorkSeconds SettableGaugeMetric + longestRunningProcessor SettableGaugeMetric } func (m *defaultQueueMetrics) add(item t) { @@ -120,13 +121,21 @@ func (m *defaultQueueMetrics) done(item t) { } func (m *defaultQueueMetrics) updateUnfinishedWork() { + // Note that a summary metric would be better for this, but prometheus + // doesn't seem to have non-hacky ways to reset the summary metrics. var total float64 + var oldest float64 for _, t := range m.processingStartTimes { - total += m.sinceInMicroseconds(t) + age := m.sinceInMicroseconds(t) + total += age + if age > oldest { + oldest = age + } } // Convert to seconds; microseconds is unhelpfully granular for this. total /= 1000000 m.unfinishedWorkSeconds.Set(total) + m.longestRunningProcessor.Set(oldest) // in microseconds. } type noMetrics struct{} @@ -164,6 +173,7 @@ type MetricsProvider interface { NewLatencyMetric(name string) SummaryMetric NewWorkDurationMetric(name string) SummaryMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric + NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric } @@ -189,6 +199,10 @@ func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settabl return noopMetric{} } +func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } @@ -215,14 +229,15 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu return noMetrics{} } return &defaultQueueMetrics{ - clock: clock, - depth: mp.NewDepthMetric(name), - adds: mp.NewAddsMetric(name), - latency: mp.NewLatencyMetric(name), - workDuration: mp.NewWorkDurationMetric(name), - unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, } } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index 90b2cf25e7..ceacabf55f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -94,6 +94,7 @@ type testMetricsProvider struct { latency testMetric duration testMetric unfinished testMetric + longest testMetric retries testMetric } @@ -117,6 +118,10 @@ func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settab return &m.unfinished } +func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { + return &m.longest +} + func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric { return &m.retries } @@ -232,6 +237,9 @@ func TestMetrics(t *testing.T) { if e, a := .001, mp.unfinished.gaugeValue(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := 1000.0, mp.longest.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } // Finish that one up q.Done(i) From 680ddd49d8d23a88744f9de720f266022effd409 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 12 Nov 2018 12:57:42 -0800 Subject: [PATCH 7/8] fixup! add longest_running_processor_microseconds metric fix data race --- .../client-go/util/workqueue/metrics_test.go | 65 +++++++++++++++---- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index ceacabf55f..117f90801f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -17,6 +17,7 @@ limitations under the License. package workqueue import ( + "sync" "testing" "time" @@ -68,20 +69,60 @@ type testMetric struct { observedCount int notifyCh chan<- struct{} + + lock sync.Mutex } -func (m *testMetric) Inc() { m.inc++; m.notify() } -func (m *testMetric) Dec() { m.dec++; m.notify() } -func (m *testMetric) Set(f float64) { m.set = f; m.notify() } -func (m *testMetric) Observe(f float64) { m.observedValue = f; m.observedCount++; m.notify() } +func (m *testMetric) Inc() { + m.lock.Lock() + defer m.lock.Unlock() + m.inc++ + m.notify() +} + +func (m *testMetric) Dec() { + m.lock.Lock() + defer m.lock.Unlock() + m.dec++ + m.notify() +} + +func (m *testMetric) Set(f float64) { + m.lock.Lock() + defer m.lock.Unlock() + m.set = f + m.notify() +} + +func (m *testMetric) Observe(f float64) { + m.lock.Lock() + defer m.lock.Unlock() + m.observedValue = f + m.observedCount++ + m.notify() +} func (m *testMetric) gaugeValue() float64 { + m.lock.Lock() + defer m.lock.Unlock() if m.set != 0 { return m.set } return float64(m.inc - m.dec) } +func (m *testMetric) observationValue() float64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.observedValue +} + +func (m *testMetric) observationCount() int { + m.lock.Lock() + defer m.lock.Unlock() + return m.observedCount +} + func (m *testMetric) notify() { if m.notifyCh != nil { m.notifyCh <- struct{}{} @@ -172,10 +213,10 @@ func TestMetrics(t *testing.T) { t.Errorf("Expected %v, got %v", "foo", i) } - if e, a := 50.0, mp.latency.observedValue; e != a { + if e, a := 50.0, mp.latency.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1, mp.latency.observedCount; e != a { + if e, a := 1, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } if e, a := 0.0, mp.depth.gaugeValue(); e != a { @@ -202,10 +243,10 @@ func TestMetrics(t *testing.T) { // Finish it up q.Done(i) - if e, a := 25.0, mp.duration.observedValue; e != a { + if e, a := 25.0, mp.duration.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 1, mp.duration.observedCount; e != a { + if e, a := 1, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } @@ -220,10 +261,10 @@ func TestMetrics(t *testing.T) { t.Errorf("Expected %v, got %v", "foo", i) } - if e, a := 25.0, mp.latency.observedValue; e != a { + if e, a := 25.0, mp.latency.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 2, mp.latency.observedCount; e != a { + if e, a := 2, mp.latency.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } @@ -243,10 +284,10 @@ func TestMetrics(t *testing.T) { // Finish that one up q.Done(i) - if e, a := 1000.0, mp.duration.observedValue; e != a { + if e, a := 1000.0, mp.duration.observationValue(); e != a { t.Errorf("expected %v, got %v", e, a) } - if e, a := 2, mp.duration.observedCount; e != a { + if e, a := 2, mp.duration.observationCount(); e != a { t.Errorf("expected %v, got %v", e, a) } } From 980242c2092df3594ed38540a80c4bf172c5a68d Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 12 Nov 2018 21:57:57 -0800 Subject: [PATCH 8/8] fixup! add longest_running_processor_microseconds metric --- pkg/util/workqueue/prometheus/prometheus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/workqueue/prometheus/prometheus.go b/pkg/util/workqueue/prometheus/prometheus.go index c2f2cfce52..6f5cf38f09 100644 --- a/pkg/util/workqueue/prometheus/prometheus.go +++ b/pkg/util/workqueue/prometheus/prometheus.go @@ -87,7 +87,7 @@ func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) wor func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ Subsystem: name, - Name: "longest_running_procesor_microseconds", + Name: "longest_running_processor_microseconds", Help: "How many microseconds has the longest running " + "processor for " + name + " been running.", })