The Prometheus monitoring system and time series database.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

370 lines
9.7 KiB

// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
var (
errNotNativeHistogram = errors.New("not a native histogram")
errNotEnoughData = errors.New("not enough data")
outputHeader = `Bucket stats for each histogram series over time
------------------------------------------------
First the min, avg, and max number of populated buckets, followed by the total
number of buckets (only if different from the max number of populated buckets
which is typical for classic but not native histograms).`
outputFooter = `Aggregated bucket stats
-----------------------
Each line shows min/avg/max over the series above.`
)
type QueryAnalyzeConfig struct {
metricType string
duration time.Duration
time string
matchers []string
}
// run retrieves metrics that look like conventional histograms (i.e. have _bucket
// suffixes) or native histograms, depending on metricType flag.
func (c *QueryAnalyzeConfig) run(url *url.URL, roundtripper http.RoundTripper) error {
if c.metricType != "histogram" {
return fmt.Errorf("analyze type is %s, must be 'histogram'", c.metricType)
}
ctx := context.Background()
api, err := newAPI(url, roundtripper, nil)
if err != nil {
return err
}
var endTime time.Time
if c.time != "" {
endTime, err = parseTime(c.time)
if err != nil {
return fmt.Errorf("error parsing time '%s': %w", c.time, err)
}
} else {
endTime = time.Now()
}
return c.getStatsFromMetrics(ctx, api, endTime, os.Stdout, c.matchers)
}
func (c *QueryAnalyzeConfig) getStatsFromMetrics(ctx context.Context, api v1.API, endTime time.Time, out io.Writer, matchers []string) error {
fmt.Fprintf(out, "%s\n\n", outputHeader)
metastatsNative := newMetaStatistics()
metastatsClassic := newMetaStatistics()
for _, matcher := range matchers {
seriesSel := seriesSelector(matcher, c.duration)
matrix, err := querySamples(ctx, api, seriesSel, endTime)
if err != nil {
return err
}
matrices := make(map[string]model.Matrix)
for _, series := range matrix {
// We do not handle mixed types. If there are float values, we assume it is a
// classic histogram, otherwise we assume it is a native histogram, and we
// ignore series with errors if they do not match the expected type.
if len(series.Values) == 0 {
stats, err := calcNativeBucketStatistics(series)
if err != nil {
if errors.Is(err, errNotNativeHistogram) || errors.Is(err, errNotEnoughData) {
continue
}
return err
}
fmt.Fprintf(out, "- %s (native): %v\n", series.Metric, *stats)
metastatsNative.update(stats)
} else {
lbs := model.LabelSet(series.Metric).Clone()
if _, ok := lbs["le"]; !ok {
continue
}
metricName := string(lbs[labels.MetricName])
if !strings.HasSuffix(metricName, "_bucket") {
continue
}
delete(lbs, labels.MetricName)
delete(lbs, "le")
key := formatSeriesName(metricName, lbs)
matrices[key] = append(matrices[key], series)
}
}
for key, matrix := range matrices {
stats, err := calcClassicBucketStatistics(matrix)
if err != nil {
if errors.Is(err, errNotEnoughData) {
continue
}
return err
}
fmt.Fprintf(out, "- %s (classic): %v\n", key, *stats)
metastatsClassic.update(stats)
}
}
fmt.Fprintf(out, "\n%s\n", outputFooter)
if metastatsNative.Count() > 0 {
fmt.Fprintf(out, "\nNative %s\n", metastatsNative)
}
if metastatsClassic.Count() > 0 {
fmt.Fprintf(out, "\nClassic %s\n", metastatsClassic)
}
return nil
}
func seriesSelector(metricName string, duration time.Duration) string {
builder := strings.Builder{}
builder.WriteString(metricName)
builder.WriteRune('[')
builder.WriteString(duration.String())
builder.WriteRune(']')
return builder.String()
}
func formatSeriesName(metricName string, lbs model.LabelSet) string {
builder := strings.Builder{}
builder.WriteString(metricName)
builder.WriteString(lbs.String())
return builder.String()
}
func querySamples(ctx context.Context, api v1.API, query string, end time.Time) (model.Matrix, error) {
values, _, err := api.Query(ctx, query, end)
if err != nil {
return nil, err
}
matrix, ok := values.(model.Matrix)
if !ok {
return nil, errors.New("query of buckets resulted in non-Matrix")
}
return matrix, nil
}
// minPop/avgPop/maxPop is for the number of populated (non-zero) buckets.
// total is the total number of buckets across all samples in the series,
// populated or not.
type statistics struct {
minPop, maxPop, total int
avgPop float64
}
func (s statistics) String() string {
if s.maxPop == s.total {
return fmt.Sprintf("%d/%.3f/%d", s.minPop, s.avgPop, s.maxPop)
}
return fmt.Sprintf("%d/%.3f/%d/%d", s.minPop, s.avgPop, s.maxPop, s.total)
}
func calcClassicBucketStatistics(matrix model.Matrix) (*statistics, error) {
numBuckets := len(matrix)
stats := &statistics{
minPop: math.MaxInt,
total: numBuckets,
}
if numBuckets == 0 || len(matrix[0].Values) < 2 {
return stats, errNotEnoughData
}
numSamples := len(matrix[0].Values)
sortMatrix(matrix)
totalPop := 0
for timeIdx := 0; timeIdx < numSamples; timeIdx++ {
curr, err := getBucketCountsAtTime(matrix, numBuckets, timeIdx)
if err != nil {
return stats, err
}
countPop := 0
for _, b := range curr {
if b != 0 {
countPop++
}
}
totalPop += countPop
if stats.minPop > countPop {
stats.minPop = countPop
}
if stats.maxPop < countPop {
stats.maxPop = countPop
}
}
stats.avgPop = float64(totalPop) / float64(numSamples)
return stats, nil
}
func sortMatrix(matrix model.Matrix) {
sort.SliceStable(matrix, func(i, j int) bool {
return getLe(matrix[i]) < getLe(matrix[j])
})
}
func getLe(series *model.SampleStream) float64 {
lbs := model.LabelSet(series.Metric)
le, _ := strconv.ParseFloat(string(lbs["le"]), 64)
return le
}
func getBucketCountsAtTime(matrix model.Matrix, numBuckets, timeIdx int) ([]int, error) {
counts := make([]int, numBuckets)
if timeIdx >= len(matrix[0].Values) {
// Just return zeroes instead of erroring out so we can get partial results.
return counts, nil
}
counts[0] = int(matrix[0].Values[timeIdx].Value)
for i, bucket := range matrix[1:] {
if timeIdx >= len(bucket.Values) {
// Just return zeroes instead of erroring out so we can get partial results.
return counts, nil
}
curr := bucket.Values[timeIdx]
prev := matrix[i].Values[timeIdx]
// Assume the results are nicely aligned.
if curr.Timestamp != prev.Timestamp {
return counts, errors.New("matrix result is not time aligned")
}
counts[i+1] = int(curr.Value - prev.Value)
}
return counts, nil
}
type bucketBounds struct {
boundaries int32
upper, lower float64
}
func makeBucketBounds(b *model.HistogramBucket) bucketBounds {
return bucketBounds{
boundaries: b.Boundaries,
upper: float64(b.Upper),
lower: float64(b.Lower),
}
}
func calcNativeBucketStatistics(series *model.SampleStream) (*statistics, error) {
stats := &statistics{
minPop: math.MaxInt,
}
overall := make(map[bucketBounds]struct{})
totalPop := 0
if len(series.Histograms) == 0 {
return nil, errNotNativeHistogram
}
if len(series.Histograms) == 1 {
return nil, errNotEnoughData
}
for _, histogram := range series.Histograms {
for _, bucket := range histogram.Histogram.Buckets {
bb := makeBucketBounds(bucket)
overall[bb] = struct{}{}
}
countPop := len(histogram.Histogram.Buckets)
totalPop += countPop
if stats.minPop > countPop {
stats.minPop = countPop
}
if stats.maxPop < countPop {
stats.maxPop = countPop
}
}
stats.avgPop = float64(totalPop) / float64(len(series.Histograms))
stats.total = len(overall)
return stats, nil
}
type distribution struct {
min, max, count int
avg float64
}
func newDistribution() distribution {
return distribution{
min: math.MaxInt,
}
}
func (d *distribution) update(num int) {
if d.min > num {
d.min = num
}
if d.max < num {
d.max = num
}
d.count++
d.avg += float64(num)/float64(d.count) - d.avg/float64(d.count)
}
func (d distribution) String() string {
return fmt.Sprintf("%d/%.3f/%d", d.min, d.avg, d.max)
}
type metaStatistics struct {
minPop, avgPop, maxPop, total distribution
}
func newMetaStatistics() *metaStatistics {
return &metaStatistics{
minPop: newDistribution(),
avgPop: newDistribution(),
maxPop: newDistribution(),
total: newDistribution(),
}
}
func (ms metaStatistics) Count() int {
return ms.minPop.count
}
func (ms metaStatistics) String() string {
if ms.maxPop == ms.total {
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop)
}
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v\n- total: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop, ms.total)
}
func (ms *metaStatistics) update(s *statistics) {
ms.minPop.update(s.minPop)
ms.avgPop.update(int(s.avgPop))
ms.maxPop.update(s.maxPop)
ms.total.update(s.total)
}