Add Collection Timeout

Signed-off-by: Mitanshu Mittal <mitanshu.sheo@gmail.com>
pull/3270/head
Mitanshu Mittal 2025-03-16 16:56:22 +05:30 committed by GitHub
parent 0c10545e8d
commit c2bb15b146
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 60 additions and 4 deletions

View File

@ -15,6 +15,7 @@
package collector
import (
"context"
"errors"
"fmt"
"log/slog"
@ -27,6 +28,7 @@ import (
// Namespace defines the common namespace to be used by all metrics.
const namespace = "node"
const collectorTimeout = 20 * time.Second
var (
scrapeDurationDesc = prometheus.NewDesc(
@ -157,12 +159,56 @@ func (n NodeCollector) Collect(ch chan<- prometheus.Metric) {
func execute(name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger) {
begin := time.Now()
err := c.Update(ch)
// Create a context with a 20-second timeout
ctx, cancel := context.WithTimeout(context.Background(), collectorTimeout)
defer cancel()
// Create a separate channel for the collector to write to
collectorCh := make(chan prometheus.Metric, 100) // Buffered channel to reduce blocking
// Create a channel to receive the error from Update
errCh := make(chan error, 1)
// Run Update in a goroutine
go func() {
defer close(collectorCh)
errCh <- c.Update(collectorCh)
}()
// Forward metrics from the collector to the channel
var err error
doForwarding := true
for doForwarding {
select {
case metric, ok := <-collectorCh:
if !ok {
// Channel closed, get the error from Update
err = <-errCh
doForwarding = false
break
}
sendMetricSafely(ch, metric, logger, name)
case <-ctx.Done():
err = ctx.Err()
doForwarding = false
// Start a goroutine to drain collectorCh without blocking
go func() {
for range collectorCh {
// Drain collectorCh
}
}()
}
}
duration := time.Since(begin)
var success float64
if err != nil {
if IsNoDataError(err) {
if err == context.DeadlineExceeded {
logger.Error("collector timed out", "name", name, "duration_seconds", duration.Seconds(), "err", err)
} else if IsNoDataError(err) {
logger.Debug("collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
} else {
logger.Error("collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
@ -172,8 +218,18 @@ func execute(name string, c Collector, ch chan<- prometheus.Metric, logger *slog
logger.Debug("collector succeeded", "name", name, "duration_seconds", duration.Seconds())
success = 1
}
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
// Send the final metrics
sendMetricSafely(ch, prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name), logger, name)
sendMetricSafely(ch, prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name), logger, name)
}
// sendMetricSafely sends a metric to the channel, logging a warning if it fails
func sendMetricSafely(ch chan<- prometheus.Metric, metric prometheus.Metric, logger *slog.Logger, name string) {
select {
case ch <- metric:
default:
logger.Warn("msg", "failed to send metric", "name", name)
}
}
// Collector is the interface a collector has to implement.