mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
448 lines
14 KiB
448 lines
14 KiB
// Copyright 2020 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package tsdb |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"slices" |
|
"sync" |
|
"unicode/utf8" |
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
"github.com/prometheus/prometheus/model/exemplar" |
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/storage" |
|
) |
|
|
|
const ( |
|
// Indicates that there is no index entry for an exmplar. |
|
noExemplar = -1 |
|
// Estimated number of exemplars per series, for sizing the index. |
|
estimatedExemplarsPerSeries = 16 |
|
) |
|
|
|
type CircularExemplarStorage struct { |
|
lock sync.RWMutex |
|
exemplars []circularBufferEntry |
|
nextIndex int |
|
metrics *ExemplarMetrics |
|
|
|
// Map of series labels as a string to index entry, which points to the first |
|
// and last exemplar for the series in the exemplars circular buffer. |
|
index map[string]*indexEntry |
|
} |
|
|
|
type indexEntry struct { |
|
oldest int |
|
newest int |
|
seriesLabels labels.Labels |
|
} |
|
|
|
type circularBufferEntry struct { |
|
exemplar exemplar.Exemplar |
|
next int |
|
ref *indexEntry |
|
} |
|
|
|
type ExemplarMetrics struct { |
|
exemplarsAppended prometheus.Counter |
|
exemplarsInStorage prometheus.Gauge |
|
seriesWithExemplarsInStorage prometheus.Gauge |
|
lastExemplarsTs prometheus.Gauge |
|
maxExemplars prometheus.Gauge |
|
outOfOrderExemplars prometheus.Counter |
|
} |
|
|
|
func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics { |
|
m := ExemplarMetrics{ |
|
exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_exemplar_exemplars_appended_total", |
|
Help: "Total number of appended exemplars.", |
|
}), |
|
exemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_exemplar_exemplars_in_storage", |
|
Help: "Number of exemplars currently in circular storage.", |
|
}), |
|
seriesWithExemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_exemplar_series_with_exemplars_in_storage", |
|
Help: "Number of series with exemplars currently in circular storage.", |
|
}), |
|
lastExemplarsTs: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_exemplar_last_exemplars_timestamp_seconds", |
|
Help: "The timestamp of the oldest exemplar stored in circular storage. Useful to check for what time" + |
|
"range the current exemplar buffer limit allows. This usually means the last timestamp" + |
|
"for all exemplars for a typical setup. This is not true though if one of the series timestamp is in future compared to rest series.", |
|
}), |
|
outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", |
|
Help: "Total number of out of order exemplar ingestion failed attempts.", |
|
}), |
|
maxExemplars: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_exemplar_max_exemplars", |
|
Help: "Total number of exemplars the exemplar storage can store, resizeable.", |
|
}), |
|
} |
|
|
|
if reg != nil { |
|
reg.MustRegister( |
|
m.exemplarsAppended, |
|
m.exemplarsInStorage, |
|
m.seriesWithExemplarsInStorage, |
|
m.lastExemplarsTs, |
|
m.outOfOrderExemplars, |
|
m.maxExemplars, |
|
) |
|
} |
|
|
|
return &m |
|
} |
|
|
|
// NewCircularExemplarStorage creates a circular in memory exemplar storage. |
|
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in |
|
// 1GB of extra memory, accounting for the fact that this is heap allocated space. |
|
// If len <= 0, then the exemplar storage is essentially a noop storage but can later be |
|
// resized to store exemplars. |
|
func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error) { |
|
if length < 0 { |
|
length = 0 |
|
} |
|
c := &CircularExemplarStorage{ |
|
exemplars: make([]circularBufferEntry, length), |
|
index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries), |
|
metrics: m, |
|
} |
|
|
|
c.metrics.maxExemplars.Set(float64(length)) |
|
|
|
return c, nil |
|
} |
|
|
|
func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error { |
|
ce.Resize(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) |
|
return nil |
|
} |
|
|
|
func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage { |
|
return ce |
|
} |
|
|
|
func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error) { |
|
return ce, nil |
|
} |
|
|
|
func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error) { |
|
return ce, nil |
|
} |
|
|
|
// Select returns exemplars for a given set of label matchers. |
|
func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { |
|
ret := make([]exemplar.QueryResult, 0) |
|
|
|
if len(ce.exemplars) == 0 { |
|
return ret, nil |
|
} |
|
|
|
ce.lock.RLock() |
|
defer ce.lock.RUnlock() |
|
|
|
// Loop through each index entry, which will point us to first/last exemplar for each series. |
|
for _, idx := range ce.index { |
|
var se exemplar.QueryResult |
|
e := ce.exemplars[idx.oldest] |
|
if e.exemplar.Ts > end || ce.exemplars[idx.newest].exemplar.Ts < start { |
|
continue |
|
} |
|
if !matchesSomeMatcherSet(idx.seriesLabels, matchers) { |
|
continue |
|
} |
|
se.SeriesLabels = idx.seriesLabels |
|
|
|
// Loop through all exemplars in the circular buffer for the current series. |
|
for e.exemplar.Ts <= end { |
|
if e.exemplar.Ts >= start { |
|
se.Exemplars = append(se.Exemplars, e.exemplar) |
|
} |
|
if e.next == noExemplar { |
|
break |
|
} |
|
e = ce.exemplars[e.next] |
|
} |
|
if len(se.Exemplars) > 0 { |
|
ret = append(ret, se) |
|
} |
|
} |
|
|
|
slices.SortFunc(ret, func(a, b exemplar.QueryResult) int { |
|
return labels.Compare(a.SeriesLabels, b.SeriesLabels) |
|
}) |
|
|
|
return ret, nil |
|
} |
|
|
|
func matchesSomeMatcherSet(lbls labels.Labels, matchers [][]*labels.Matcher) bool { |
|
Outer: |
|
for _, ms := range matchers { |
|
for _, m := range ms { |
|
if !m.Matches(lbls.Get(m.Name)) { |
|
continue Outer |
|
} |
|
} |
|
return true |
|
} |
|
return false |
|
} |
|
|
|
func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error { |
|
var buf [1024]byte |
|
seriesLabels := l.Bytes(buf[:]) |
|
|
|
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. |
|
// Optimize by moving the lock to be per series (& benchmark it). |
|
ce.lock.RLock() |
|
defer ce.lock.RUnlock() |
|
return ce.validateExemplar(ce.index[string(seriesLabels)], e, false) |
|
} |
|
|
|
// Not thread safe. The appended parameters tells us whether this is an external validation, or internal |
|
// as a result of an AddExemplar call, in which case we should update any relevant metrics. |
|
func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.Exemplar, appended bool) error { |
|
if len(ce.exemplars) == 0 { |
|
return storage.ErrExemplarsDisabled |
|
} |
|
|
|
// Exemplar label length does not include chars involved in text rendering such as quotes |
|
// equals sign, or commas. See definition of const ExemplarMaxLabelLength. |
|
labelSetLen := 0 |
|
if err := e.Labels.Validate(func(l labels.Label) error { |
|
labelSetLen += utf8.RuneCountInString(l.Name) |
|
labelSetLen += utf8.RuneCountInString(l.Value) |
|
|
|
if labelSetLen > exemplar.ExemplarMaxLabelSetLength { |
|
return storage.ErrExemplarLabelLength |
|
} |
|
return nil |
|
}); err != nil { |
|
return err |
|
} |
|
|
|
if idx == nil { |
|
return nil |
|
} |
|
|
|
// Check for duplicate vs last stored exemplar for this series. |
|
// NB these are expected, and appending them is a no-op. |
|
// For floats and classic histograms, there is only 1 exemplar per series, |
|
// so this is sufficient. For native histograms with multiple exemplars per series, |
|
// we have another check below. |
|
newestExemplar := ce.exemplars[idx.newest].exemplar |
|
if newestExemplar.Equals(e) { |
|
return storage.ErrDuplicateExemplar |
|
} |
|
|
|
// Since during the scrape the exemplars are sorted first by timestamp, then value, then labels, |
|
// if any of these conditions are true, we know that the exemplar is either a duplicate |
|
// of a previous one (but not the most recent one as that is checked above) or out of order. |
|
// We now allow exemplars with duplicate timestamps as long as they have different values and/or labels |
|
// since that can happen for different buckets of a native histogram. |
|
// We do not distinguish between duplicates and out of order as iterating through the exemplars |
|
// to check for that would be expensive (versus just comparing with the most recent one) especially |
|
// since this is run under a lock, and not worth it as we just need to return an error so we do not |
|
// append the exemplar. |
|
if e.Ts < newestExemplar.Ts || |
|
(e.Ts == newestExemplar.Ts && e.Value < newestExemplar.Value) || |
|
(e.Ts == newestExemplar.Ts && e.Value == newestExemplar.Value && e.Labels.Hash() < newestExemplar.Labels.Hash()) { |
|
if appended { |
|
ce.metrics.outOfOrderExemplars.Inc() |
|
} |
|
return storage.ErrOutOfOrderExemplar |
|
} |
|
return nil |
|
} |
|
|
|
// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. |
|
// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed. |
|
func (ce *CircularExemplarStorage) Resize(l int64) int { |
|
// Accept negative values as just 0 size. |
|
if l <= 0 { |
|
l = 0 |
|
} |
|
|
|
if l == int64(len(ce.exemplars)) { |
|
return 0 |
|
} |
|
|
|
ce.lock.Lock() |
|
defer ce.lock.Unlock() |
|
|
|
oldBuffer := ce.exemplars |
|
oldNextIndex := int64(ce.nextIndex) |
|
|
|
ce.exemplars = make([]circularBufferEntry, l) |
|
ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries) |
|
ce.nextIndex = 0 |
|
|
|
// Replay as many entries as needed, starting with oldest first. |
|
count := int64(len(oldBuffer)) |
|
if l < count { |
|
count = l |
|
} |
|
|
|
migrated := 0 |
|
|
|
if l > 0 && len(oldBuffer) > 0 { |
|
// Rewind previous next index by count with wrap-around. |
|
// This math is essentially looking at nextIndex, where we would write the next exemplar to, |
|
// and find the index in the old exemplar buffer that we should start migrating exemplars from. |
|
// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars. |
|
startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer)) |
|
|
|
var buf [1024]byte |
|
for i := int64(0); i < count; i++ { |
|
idx := (startIndex + i) % int64(len(oldBuffer)) |
|
if oldBuffer[idx].ref != nil { |
|
ce.migrate(&oldBuffer[idx], buf[:]) |
|
migrated++ |
|
} |
|
} |
|
} |
|
|
|
ce.computeMetrics() |
|
ce.metrics.maxExemplars.Set(float64(l)) |
|
|
|
return migrated |
|
} |
|
|
|
// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires |
|
// external lock and does not compute metrics. |
|
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byte) { |
|
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:0]) |
|
|
|
idx, ok := ce.index[string(seriesLabels)] |
|
if !ok { |
|
idx = entry.ref |
|
idx.oldest = ce.nextIndex |
|
ce.index[string(seriesLabels)] = idx |
|
} else { |
|
entry.ref = idx |
|
ce.exemplars[idx.newest].next = ce.nextIndex |
|
} |
|
idx.newest = ce.nextIndex |
|
|
|
entry.next = noExemplar |
|
ce.exemplars[ce.nextIndex] = *entry |
|
|
|
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) |
|
} |
|
|
|
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { |
|
if len(ce.exemplars) == 0 { |
|
return storage.ErrExemplarsDisabled |
|
} |
|
|
|
var buf [1024]byte |
|
seriesLabels := l.Bytes(buf[:]) |
|
|
|
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. |
|
// Optimize by moving the lock to be per series (& benchmark it). |
|
ce.lock.Lock() |
|
defer ce.lock.Unlock() |
|
|
|
idx, ok := ce.index[string(seriesLabels)] |
|
err := ce.validateExemplar(idx, e, true) |
|
if err != nil { |
|
if errors.Is(err, storage.ErrDuplicateExemplar) { |
|
// Duplicate exemplar, noop. |
|
return nil |
|
} |
|
return err |
|
} |
|
|
|
if !ok { |
|
idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} |
|
ce.index[string(seriesLabels)] = idx |
|
} else { |
|
ce.exemplars[idx.newest].next = ce.nextIndex |
|
} |
|
|
|
if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil { |
|
// There exists an exemplar already on this ce.nextIndex entry, |
|
// drop it, to make place for others. |
|
if prev.next == noExemplar { |
|
// Last item for this series, remove index entry. |
|
var buf [1024]byte |
|
prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) |
|
delete(ce.index, string(prevLabels)) |
|
} else { |
|
prev.ref.oldest = prev.next |
|
} |
|
} |
|
|
|
// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select) |
|
// since this is the first exemplar stored for this series. |
|
ce.exemplars[ce.nextIndex].next = noExemplar |
|
ce.exemplars[ce.nextIndex].exemplar = e |
|
ce.exemplars[ce.nextIndex].ref = idx |
|
idx.newest = ce.nextIndex |
|
|
|
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) |
|
|
|
ce.metrics.exemplarsAppended.Inc() |
|
ce.computeMetrics() |
|
return nil |
|
} |
|
|
|
func (ce *CircularExemplarStorage) computeMetrics() { |
|
ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) |
|
|
|
if len(ce.exemplars) == 0 { |
|
ce.metrics.exemplarsInStorage.Set(float64(0)) |
|
ce.metrics.lastExemplarsTs.Set(float64(0)) |
|
return |
|
} |
|
|
|
if ce.exemplars[ce.nextIndex].ref != nil { |
|
ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars))) |
|
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[ce.nextIndex].exemplar.Ts) / 1000) |
|
return |
|
} |
|
|
|
// We did not yet fill the buffer. |
|
ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex)) |
|
if ce.exemplars[0].ref != nil { |
|
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) |
|
} |
|
} |
|
|
|
// IterateExemplars iterates through all the exemplars from oldest to newest appended and calls |
|
// the given function on all of them till the end (or) till the first function call that returns an error. |
|
func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error { |
|
ce.lock.RLock() |
|
defer ce.lock.RUnlock() |
|
|
|
idx := ce.nextIndex |
|
l := len(ce.exemplars) |
|
for i := 0; i < l; i, idx = i+1, (idx+1)%l { |
|
if ce.exemplars[idx].ref == nil { |
|
continue |
|
} |
|
err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
}
|
|
|