mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
370 lines
9.7 KiB
370 lines
9.7 KiB
// Copyright 2023 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package main |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"math" |
|
"net/http" |
|
"net/url" |
|
"os" |
|
"sort" |
|
"strconv" |
|
"strings" |
|
"time" |
|
|
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1" |
|
"github.com/prometheus/common/model" |
|
|
|
"github.com/prometheus/prometheus/model/labels" |
|
) |
|
|
|
var ( |
|
errNotNativeHistogram = errors.New("not a native histogram") |
|
errNotEnoughData = errors.New("not enough data") |
|
|
|
outputHeader = `Bucket stats for each histogram series over time |
|
------------------------------------------------ |
|
First the min, avg, and max number of populated buckets, followed by the total |
|
number of buckets (only if different from the max number of populated buckets |
|
which is typical for classic but not native histograms).` |
|
outputFooter = `Aggregated bucket stats |
|
----------------------- |
|
Each line shows min/avg/max over the series above.` |
|
) |
|
|
|
type QueryAnalyzeConfig struct { |
|
metricType string |
|
duration time.Duration |
|
time string |
|
matchers []string |
|
} |
|
|
|
// run retrieves metrics that look like conventional histograms (i.e. have _bucket |
|
// suffixes) or native histograms, depending on metricType flag. |
|
func (c *QueryAnalyzeConfig) run(url *url.URL, roundtripper http.RoundTripper) error { |
|
if c.metricType != "histogram" { |
|
return fmt.Errorf("analyze type is %s, must be 'histogram'", c.metricType) |
|
} |
|
|
|
ctx := context.Background() |
|
|
|
api, err := newAPI(url, roundtripper, nil) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
var endTime time.Time |
|
if c.time != "" { |
|
endTime, err = parseTime(c.time) |
|
if err != nil { |
|
return fmt.Errorf("error parsing time '%s': %w", c.time, err) |
|
} |
|
} else { |
|
endTime = time.Now() |
|
} |
|
|
|
return c.getStatsFromMetrics(ctx, api, endTime, os.Stdout, c.matchers) |
|
} |
|
|
|
func (c *QueryAnalyzeConfig) getStatsFromMetrics(ctx context.Context, api v1.API, endTime time.Time, out io.Writer, matchers []string) error { |
|
fmt.Fprintf(out, "%s\n\n", outputHeader) |
|
metastatsNative := newMetaStatistics() |
|
metastatsClassic := newMetaStatistics() |
|
for _, matcher := range matchers { |
|
seriesSel := seriesSelector(matcher, c.duration) |
|
matrix, err := querySamples(ctx, api, seriesSel, endTime) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
matrices := make(map[string]model.Matrix) |
|
for _, series := range matrix { |
|
// We do not handle mixed types. If there are float values, we assume it is a |
|
// classic histogram, otherwise we assume it is a native histogram, and we |
|
// ignore series with errors if they do not match the expected type. |
|
if len(series.Values) == 0 { |
|
stats, err := calcNativeBucketStatistics(series) |
|
if err != nil { |
|
if errors.Is(err, errNotNativeHistogram) || errors.Is(err, errNotEnoughData) { |
|
continue |
|
} |
|
return err |
|
} |
|
fmt.Fprintf(out, "- %s (native): %v\n", series.Metric, *stats) |
|
metastatsNative.update(stats) |
|
} else { |
|
lbs := model.LabelSet(series.Metric).Clone() |
|
if _, ok := lbs["le"]; !ok { |
|
continue |
|
} |
|
metricName := string(lbs[labels.MetricName]) |
|
if !strings.HasSuffix(metricName, "_bucket") { |
|
continue |
|
} |
|
delete(lbs, labels.MetricName) |
|
delete(lbs, "le") |
|
key := formatSeriesName(metricName, lbs) |
|
matrices[key] = append(matrices[key], series) |
|
} |
|
} |
|
|
|
for key, matrix := range matrices { |
|
stats, err := calcClassicBucketStatistics(matrix) |
|
if err != nil { |
|
if errors.Is(err, errNotEnoughData) { |
|
continue |
|
} |
|
return err |
|
} |
|
fmt.Fprintf(out, "- %s (classic): %v\n", key, *stats) |
|
metastatsClassic.update(stats) |
|
} |
|
} |
|
fmt.Fprintf(out, "\n%s\n", outputFooter) |
|
if metastatsNative.Count() > 0 { |
|
fmt.Fprintf(out, "\nNative %s\n", metastatsNative) |
|
} |
|
if metastatsClassic.Count() > 0 { |
|
fmt.Fprintf(out, "\nClassic %s\n", metastatsClassic) |
|
} |
|
return nil |
|
} |
|
|
|
func seriesSelector(metricName string, duration time.Duration) string { |
|
builder := strings.Builder{} |
|
builder.WriteString(metricName) |
|
builder.WriteRune('[') |
|
builder.WriteString(duration.String()) |
|
builder.WriteRune(']') |
|
return builder.String() |
|
} |
|
|
|
func formatSeriesName(metricName string, lbs model.LabelSet) string { |
|
builder := strings.Builder{} |
|
builder.WriteString(metricName) |
|
builder.WriteString(lbs.String()) |
|
return builder.String() |
|
} |
|
|
|
func querySamples(ctx context.Context, api v1.API, query string, end time.Time) (model.Matrix, error) { |
|
values, _, err := api.Query(ctx, query, end) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
matrix, ok := values.(model.Matrix) |
|
if !ok { |
|
return nil, errors.New("query of buckets resulted in non-Matrix") |
|
} |
|
|
|
return matrix, nil |
|
} |
|
|
|
// minPop/avgPop/maxPop is for the number of populated (non-zero) buckets. |
|
// total is the total number of buckets across all samples in the series, |
|
// populated or not. |
|
type statistics struct { |
|
minPop, maxPop, total int |
|
avgPop float64 |
|
} |
|
|
|
func (s statistics) String() string { |
|
if s.maxPop == s.total { |
|
return fmt.Sprintf("%d/%.3f/%d", s.minPop, s.avgPop, s.maxPop) |
|
} |
|
return fmt.Sprintf("%d/%.3f/%d/%d", s.minPop, s.avgPop, s.maxPop, s.total) |
|
} |
|
|
|
func calcClassicBucketStatistics(matrix model.Matrix) (*statistics, error) { |
|
numBuckets := len(matrix) |
|
|
|
stats := &statistics{ |
|
minPop: math.MaxInt, |
|
total: numBuckets, |
|
} |
|
|
|
if numBuckets == 0 || len(matrix[0].Values) < 2 { |
|
return stats, errNotEnoughData |
|
} |
|
|
|
numSamples := len(matrix[0].Values) |
|
|
|
sortMatrix(matrix) |
|
|
|
totalPop := 0 |
|
for timeIdx := 0; timeIdx < numSamples; timeIdx++ { |
|
curr, err := getBucketCountsAtTime(matrix, numBuckets, timeIdx) |
|
if err != nil { |
|
return stats, err |
|
} |
|
countPop := 0 |
|
for _, b := range curr { |
|
if b != 0 { |
|
countPop++ |
|
} |
|
} |
|
|
|
totalPop += countPop |
|
if stats.minPop > countPop { |
|
stats.minPop = countPop |
|
} |
|
if stats.maxPop < countPop { |
|
stats.maxPop = countPop |
|
} |
|
} |
|
stats.avgPop = float64(totalPop) / float64(numSamples) |
|
return stats, nil |
|
} |
|
|
|
func sortMatrix(matrix model.Matrix) { |
|
sort.SliceStable(matrix, func(i, j int) bool { |
|
return getLe(matrix[i]) < getLe(matrix[j]) |
|
}) |
|
} |
|
|
|
func getLe(series *model.SampleStream) float64 { |
|
lbs := model.LabelSet(series.Metric) |
|
le, _ := strconv.ParseFloat(string(lbs["le"]), 64) |
|
return le |
|
} |
|
|
|
func getBucketCountsAtTime(matrix model.Matrix, numBuckets, timeIdx int) ([]int, error) { |
|
counts := make([]int, numBuckets) |
|
if timeIdx >= len(matrix[0].Values) { |
|
// Just return zeroes instead of erroring out so we can get partial results. |
|
return counts, nil |
|
} |
|
counts[0] = int(matrix[0].Values[timeIdx].Value) |
|
for i, bucket := range matrix[1:] { |
|
if timeIdx >= len(bucket.Values) { |
|
// Just return zeroes instead of erroring out so we can get partial results. |
|
return counts, nil |
|
} |
|
curr := bucket.Values[timeIdx] |
|
prev := matrix[i].Values[timeIdx] |
|
// Assume the results are nicely aligned. |
|
if curr.Timestamp != prev.Timestamp { |
|
return counts, errors.New("matrix result is not time aligned") |
|
} |
|
counts[i+1] = int(curr.Value - prev.Value) |
|
} |
|
return counts, nil |
|
} |
|
|
|
type bucketBounds struct { |
|
boundaries int32 |
|
upper, lower float64 |
|
} |
|
|
|
func makeBucketBounds(b *model.HistogramBucket) bucketBounds { |
|
return bucketBounds{ |
|
boundaries: b.Boundaries, |
|
upper: float64(b.Upper), |
|
lower: float64(b.Lower), |
|
} |
|
} |
|
|
|
func calcNativeBucketStatistics(series *model.SampleStream) (*statistics, error) { |
|
stats := &statistics{ |
|
minPop: math.MaxInt, |
|
} |
|
|
|
overall := make(map[bucketBounds]struct{}) |
|
totalPop := 0 |
|
if len(series.Histograms) == 0 { |
|
return nil, errNotNativeHistogram |
|
} |
|
if len(series.Histograms) == 1 { |
|
return nil, errNotEnoughData |
|
} |
|
for _, histogram := range series.Histograms { |
|
for _, bucket := range histogram.Histogram.Buckets { |
|
bb := makeBucketBounds(bucket) |
|
overall[bb] = struct{}{} |
|
} |
|
countPop := len(histogram.Histogram.Buckets) |
|
|
|
totalPop += countPop |
|
if stats.minPop > countPop { |
|
stats.minPop = countPop |
|
} |
|
if stats.maxPop < countPop { |
|
stats.maxPop = countPop |
|
} |
|
} |
|
stats.avgPop = float64(totalPop) / float64(len(series.Histograms)) |
|
stats.total = len(overall) |
|
return stats, nil |
|
} |
|
|
|
type distribution struct { |
|
min, max, count int |
|
avg float64 |
|
} |
|
|
|
func newDistribution() distribution { |
|
return distribution{ |
|
min: math.MaxInt, |
|
} |
|
} |
|
|
|
func (d *distribution) update(num int) { |
|
if d.min > num { |
|
d.min = num |
|
} |
|
if d.max < num { |
|
d.max = num |
|
} |
|
d.count++ |
|
d.avg += float64(num)/float64(d.count) - d.avg/float64(d.count) |
|
} |
|
|
|
func (d distribution) String() string { |
|
return fmt.Sprintf("%d/%.3f/%d", d.min, d.avg, d.max) |
|
} |
|
|
|
type metaStatistics struct { |
|
minPop, avgPop, maxPop, total distribution |
|
} |
|
|
|
func newMetaStatistics() *metaStatistics { |
|
return &metaStatistics{ |
|
minPop: newDistribution(), |
|
avgPop: newDistribution(), |
|
maxPop: newDistribution(), |
|
total: newDistribution(), |
|
} |
|
} |
|
|
|
func (ms metaStatistics) Count() int { |
|
return ms.minPop.count |
|
} |
|
|
|
func (ms metaStatistics) String() string { |
|
if ms.maxPop == ms.total { |
|
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop) |
|
} |
|
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v\n- total: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop, ms.total) |
|
} |
|
|
|
func (ms *metaStatistics) update(s *statistics) { |
|
ms.minPop.update(s.minPop) |
|
ms.avgPop.update(int(s.avgPop)) |
|
ms.maxPop.update(s.maxPop) |
|
ms.total.update(s.total) |
|
}
|
|
|