prometheus/tsdb/exemplar.go

435 lines
13 KiB
Go

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"sort"
"sync"
"unicode/utf8"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
)
const (
// Indicates that there is no index entry for an exmplar.
noExemplar = -1
// Estimated number of exemplars per series, for sizing the index.
estimatedExemplarsPerSeries = 16
)
type CircularExemplarStorage struct {
lock sync.RWMutex
exemplars []*circularBufferEntry
nextIndex int
metrics *ExemplarMetrics
// Map of series labels as a string to index entry, which points to the first
// and last exemplar for the series in the exemplars circular buffer.
index map[string]*indexEntry
}
type indexEntry struct {
oldest int
newest int
seriesLabels labels.Labels
}
type circularBufferEntry struct {
exemplar exemplar.Exemplar
next int
ref *indexEntry
}
type ExemplarMetrics struct {
exemplarsAppended prometheus.Counter
exemplarsInStorage prometheus.Gauge
seriesWithExemplarsInStorage prometheus.Gauge
lastExemplarsTs prometheus.Gauge
maxExemplars prometheus.Gauge
outOfOrderExemplars prometheus.Counter
}
func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics {
m := ExemplarMetrics{
exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_exemplar_exemplars_appended_total",
Help: "Total number of appended exemplars.",
}),
exemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_exemplars_in_storage",
Help: "Number of exemplars currently in circular storage.",
}),
seriesWithExemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_series_with_exemplars_in_storage",
Help: "Number of series with exemplars currently in circular storage.",
}),
lastExemplarsTs: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_last_exemplars_timestamp_seconds",
Help: "The timestamp of the oldest exemplar stored in circular storage. Useful to check for what time" +
"range the current exemplar buffer limit allows. This usually means the last timestamp" +
"for all exemplars for a typical setup. This is not true though if one of the series timestamp is in future compared to rest series.",
}),
outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total",
Help: "Total number of out of order exemplar ingestion failed attempts.",
}),
maxExemplars: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_max_exemplars",
Help: "Total number of exemplars the exemplar storage can store, resizeable.",
}),
}
if reg != nil {
reg.MustRegister(
m.exemplarsAppended,
m.exemplarsInStorage,
m.seriesWithExemplarsInStorage,
m.lastExemplarsTs,
m.outOfOrderExemplars,
m.maxExemplars,
)
}
return &m
}
// NewCircularExemplarStorage creates an circular in memory exemplar storage.
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
// 1GB of extra memory, accounting for the fact that this is heap allocated space.
// If len <= 0, then the exemplar storage is essentially a noop storage but can later be
// resized to store exemplars.
func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error) {
if length < 0 {
length = 0
}
c := &CircularExemplarStorage{
exemplars: make([]*circularBufferEntry, length),
index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries),
metrics: m,
}
c.metrics.maxExemplars.Set(float64(length))
return c, nil
}
func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error {
ce.Resize(cfg.StorageConfig.ExemplarsConfig.MaxExemplars)
return nil
}
func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage {
return ce
}
func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error) {
return ce, nil
}
func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error) {
return ce, nil
}
// Select returns exemplars for a given set of label matchers.
func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
ret := make([]exemplar.QueryResult, 0)
if len(ce.exemplars) == 0 {
return ret, nil
}
ce.lock.RLock()
defer ce.lock.RUnlock()
// Loop through each index entry, which will point us to first/last exemplar for each series.
for _, idx := range ce.index {
var se exemplar.QueryResult
e := ce.exemplars[idx.oldest]
if e.exemplar.Ts > end || ce.exemplars[idx.newest].exemplar.Ts < start {
continue
}
if !matchesSomeMatcherSet(idx.seriesLabels, matchers) {
continue
}
se.SeriesLabels = idx.seriesLabels
// Loop through all exemplars in the circular buffer for the current series.
for e.exemplar.Ts <= end {
if e.exemplar.Ts >= start {
se.Exemplars = append(se.Exemplars, e.exemplar)
}
if e.next == noExemplar {
break
}
e = ce.exemplars[e.next]
}
if len(se.Exemplars) > 0 {
ret = append(ret, se)
}
}
sort.Slice(ret, func(i, j int) bool {
return labels.Compare(ret[i].SeriesLabels, ret[j].SeriesLabels) < 0
})
return ret, nil
}
func matchesSomeMatcherSet(lbls labels.Labels, matchers [][]*labels.Matcher) bool {
Outer:
for _, ms := range matchers {
for _, m := range ms {
if !m.Matches(lbls.Get(m.Name)) {
continue Outer
}
}
return true
}
return false
}
func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error {
var buf [1024]byte
seriesLabels := l.Bytes(buf[:])
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
// Optimize by moving the lock to be per series (& benchmark it).
ce.lock.RLock()
defer ce.lock.RUnlock()
return ce.validateExemplar(seriesLabels, e, false)
}
// Not thread safe. The appended parameters tells us whether this is an external validation, or internal
// as a result of an AddExemplar call, in which case we should update any relevant metrics.
func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemplar, appended bool) error {
if len(ce.exemplars) == 0 {
return storage.ErrExemplarsDisabled
}
// Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
labelSetLen := 0
if err := e.Labels.Validate(func(l labels.Label) error {
labelSetLen += utf8.RuneCountInString(l.Name)
labelSetLen += utf8.RuneCountInString(l.Value)
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
return storage.ErrExemplarLabelLength
}
return nil
}); err != nil {
return err
}
idx, ok := ce.index[string(key)]
if !ok {
return nil
}
// Check for duplicate vs last stored exemplar for this series.
// NB these are expected, and appending them is a no-op.
if ce.exemplars[idx.newest].exemplar.Equals(e) {
return storage.ErrDuplicateExemplar
}
if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts {
if appended {
ce.metrics.outOfOrderExemplars.Inc()
}
return storage.ErrOutOfOrderExemplar
}
return nil
}
// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it.
// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
func (ce *CircularExemplarStorage) Resize(l int64) int {
// Accept negative values as just 0 size.
if l <= 0 {
l = 0
}
if l == int64(len(ce.exemplars)) {
return 0
}
ce.lock.Lock()
defer ce.lock.Unlock()
oldBuffer := ce.exemplars
oldNextIndex := int64(ce.nextIndex)
ce.exemplars = make([]*circularBufferEntry, l)
ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries)
ce.nextIndex = 0
// Replay as many entries as needed, starting with oldest first.
count := int64(len(oldBuffer))
if l < count {
count = l
}
migrated := 0
if l > 0 && len(oldBuffer) > 0 {
// Rewind previous next index by count with wrap-around.
// This math is essentially looking at nextIndex, where we would write the next exemplar to,
// and find the index in the old exemplar buffer that we should start migrating exemplars from.
// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars.
startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer))
for i := int64(0); i < count; i++ {
idx := (startIndex + i) % int64(len(oldBuffer))
if entry := oldBuffer[idx]; entry != nil {
ce.migrate(entry)
migrated++
}
}
}
ce.computeMetrics()
ce.metrics.maxExemplars.Set(float64(l))
return migrated
}
// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
// external lock and does not compute metrics.
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
var buf [1024]byte
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:])
idx, ok := ce.index[string(seriesLabels)]
if !ok {
idx = entry.ref
idx.oldest = ce.nextIndex
ce.index[string(seriesLabels)] = idx
} else {
entry.ref = idx
ce.exemplars[idx.newest].next = ce.nextIndex
}
idx.newest = ce.nextIndex
entry.next = noExemplar
ce.exemplars[ce.nextIndex] = entry
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
}
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
if len(ce.exemplars) == 0 {
return storage.ErrExemplarsDisabled
}
var buf [1024]byte
seriesLabels := l.Bytes(buf[:])
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
// Optimize by moving the lock to be per series (& benchmark it).
ce.lock.Lock()
defer ce.lock.Unlock()
err := ce.validateExemplar(seriesLabels, e, true)
if err != nil {
if err == storage.ErrDuplicateExemplar {
// Duplicate exemplar, noop.
return nil
}
return err
}
_, ok := ce.index[string(seriesLabels)]
if !ok {
ce.index[string(seriesLabels)] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
} else {
ce.exemplars[ce.index[string(seriesLabels)].newest].next = ce.nextIndex
}
if prev := ce.exemplars[ce.nextIndex]; prev == nil {
ce.exemplars[ce.nextIndex] = &circularBufferEntry{}
} else {
// There exists exemplar already on this ce.nextIndex entry, drop it, to make place
// for others.
var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
if prev.next == noExemplar {
// Last item for this series, remove index entry.
delete(ce.index, string(prevLabels))
} else {
ce.index[string(prevLabels)].oldest = prev.next
}
}
// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
// since this is the first exemplar stored for this series.
ce.exemplars[ce.nextIndex].next = noExemplar
ce.exemplars[ce.nextIndex].exemplar = e
ce.exemplars[ce.nextIndex].ref = ce.index[string(seriesLabels)]
ce.index[string(seriesLabels)].newest = ce.nextIndex
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
ce.metrics.exemplarsAppended.Inc()
ce.computeMetrics()
return nil
}
func (ce *CircularExemplarStorage) computeMetrics() {
ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index)))
if len(ce.exemplars) == 0 {
ce.metrics.exemplarsInStorage.Set(float64(0))
ce.metrics.lastExemplarsTs.Set(float64(0))
return
}
if next := ce.exemplars[ce.nextIndex]; next != nil {
ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars)))
ce.metrics.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000)
return
}
// We did not yet fill the buffer.
ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex))
if ce.exemplars[0] != nil {
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000)
}
}
// IterateExemplars iterates through all the exemplars from oldest to newest appended and calls
// the given function on all of them till the end (or) till the first function call that returns an error.
func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error {
ce.lock.RLock()
defer ce.lock.RUnlock()
idx := ce.nextIndex
l := len(ce.exemplars)
for i := 0; i < l; i, idx = i+1, (idx+1)%l {
if ce.exemplars[idx] == nil {
continue
}
err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar)
if err != nil {
return err
}
}
return nil
}