mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
615 lines
17 KiB
615 lines
17 KiB
// Copyright 2021 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package tsdb |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"math" |
|
|
|
"github.com/go-kit/log/level" |
|
"github.com/pkg/errors" |
|
|
|
"github.com/prometheus/prometheus/model/exemplar" |
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/storage" |
|
"github.com/prometheus/prometheus/tsdb/chunkenc" |
|
"github.com/prometheus/prometheus/tsdb/chunks" |
|
"github.com/prometheus/prometheus/tsdb/record" |
|
) |
|
|
|
// initAppender is a helper to initialize the time bounds of the head |
|
// upon the first sample it receives. |
|
type initAppender struct { |
|
app storage.Appender |
|
head *Head |
|
} |
|
|
|
var _ storage.GetRef = &initAppender{} |
|
|
|
func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { |
|
if a.app != nil { |
|
return a.app.Append(ref, lset, t, v) |
|
} |
|
|
|
a.head.initTime(t) |
|
a.app = a.head.appender() |
|
return a.app.Append(ref, lset, t, v) |
|
} |
|
|
|
func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { |
|
// Check if exemplar storage is enabled. |
|
if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { |
|
return 0, nil |
|
} |
|
|
|
if a.app != nil { |
|
return a.app.AppendExemplar(ref, l, e) |
|
} |
|
// We should never reach here given we would call Append before AppendExemplar |
|
// and we probably want to always base head/WAL min time on sample times. |
|
a.head.initTime(e.Ts) |
|
a.app = a.head.appender() |
|
|
|
return a.app.AppendExemplar(ref, l, e) |
|
} |
|
|
|
// initTime initializes a head with the first timestamp. This only needs to be called |
|
// for a completely fresh head with an empty WAL. |
|
func (h *Head) initTime(t int64) { |
|
if !h.minTime.CAS(math.MaxInt64, t) { |
|
return |
|
} |
|
// Ensure that max time is initialized to at least the min time we just set. |
|
// Concurrent appenders may already have set it to a higher value. |
|
h.maxTime.CAS(math.MinInt64, t) |
|
} |
|
|
|
func (a *initAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labels.Labels) { |
|
if g, ok := a.app.(storage.GetRef); ok { |
|
return g.GetRef(lset) |
|
} |
|
return 0, nil |
|
} |
|
|
|
func (a *initAppender) Commit() error { |
|
if a.app == nil { |
|
a.head.metrics.activeAppenders.Dec() |
|
return nil |
|
} |
|
return a.app.Commit() |
|
} |
|
|
|
func (a *initAppender) Rollback() error { |
|
if a.app == nil { |
|
a.head.metrics.activeAppenders.Dec() |
|
return nil |
|
} |
|
return a.app.Rollback() |
|
} |
|
|
|
// Appender returns a new Appender on the database. |
|
func (h *Head) Appender(_ context.Context) storage.Appender { |
|
h.metrics.activeAppenders.Inc() |
|
|
|
// The head cache might not have a starting point yet. The init appender |
|
// picks up the first appended timestamp as the base. |
|
if h.MinTime() == math.MaxInt64 { |
|
return &initAppender{ |
|
head: h, |
|
} |
|
} |
|
return h.appender() |
|
} |
|
|
|
func (h *Head) appender() *headAppender { |
|
appendID, cleanupAppendIDsBelow := h.iso.newAppendID() // Every appender gets an ID that is cleared upon commit/rollback. |
|
|
|
// Allocate the exemplars buffer only if exemplars are enabled. |
|
var exemplarsBuf []exemplarWithSeriesRef |
|
if h.opts.EnableExemplarStorage { |
|
exemplarsBuf = h.getExemplarBuffer() |
|
} |
|
|
|
return &headAppender{ |
|
head: h, |
|
minValidTime: h.appendableMinValidTime(), |
|
mint: math.MaxInt64, |
|
maxt: math.MinInt64, |
|
samples: h.getAppendBuffer(), |
|
sampleSeries: h.getSeriesBuffer(), |
|
exemplars: exemplarsBuf, |
|
appendID: appendID, |
|
cleanupAppendIDsBelow: cleanupAppendIDsBelow, |
|
} |
|
} |
|
|
|
func (h *Head) appendableMinValidTime() int64 { |
|
// Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window, |
|
// ensures that no samples will be added within the compaction window to avoid races. |
|
return max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2) |
|
} |
|
|
|
// AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head. |
|
// Returns false if Head hasn't been initialized yet and the minimum time isn't known yet. |
|
func (h *Head) AppendableMinValidTime() (int64, bool) { |
|
if h.MinTime() == math.MaxInt64 { |
|
return 0, false |
|
} |
|
|
|
return h.appendableMinValidTime(), true |
|
} |
|
|
|
func max(a, b int64) int64 { |
|
if a > b { |
|
return a |
|
} |
|
return b |
|
} |
|
|
|
func (h *Head) getAppendBuffer() []record.RefSample { |
|
b := h.appendPool.Get() |
|
if b == nil { |
|
return make([]record.RefSample, 0, 512) |
|
} |
|
return b.([]record.RefSample) |
|
} |
|
|
|
func (h *Head) putAppendBuffer(b []record.RefSample) { |
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.appendPool.Put(b[:0]) |
|
} |
|
|
|
func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef { |
|
b := h.exemplarsPool.Get() |
|
if b == nil { |
|
return make([]exemplarWithSeriesRef, 0, 512) |
|
} |
|
return b.([]exemplarWithSeriesRef) |
|
} |
|
|
|
func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { |
|
if b == nil { |
|
return |
|
} |
|
|
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.exemplarsPool.Put(b[:0]) |
|
} |
|
|
|
func (h *Head) getSeriesBuffer() []*memSeries { |
|
b := h.seriesPool.Get() |
|
if b == nil { |
|
return make([]*memSeries, 0, 512) |
|
} |
|
return b.([]*memSeries) |
|
} |
|
|
|
func (h *Head) putSeriesBuffer(b []*memSeries) { |
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.seriesPool.Put(b[:0]) |
|
} |
|
|
|
func (h *Head) getBytesBuffer() []byte { |
|
b := h.bytesPool.Get() |
|
if b == nil { |
|
return make([]byte, 0, 1024) |
|
} |
|
return b.([]byte) |
|
} |
|
|
|
func (h *Head) putBytesBuffer(b []byte) { |
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.bytesPool.Put(b[:0]) |
|
} |
|
|
|
type exemplarWithSeriesRef struct { |
|
ref storage.SeriesRef |
|
exemplar exemplar.Exemplar |
|
} |
|
|
|
type headAppender struct { |
|
head *Head |
|
minValidTime int64 // No samples below this timestamp are allowed. |
|
mint, maxt int64 |
|
|
|
series []record.RefSeries // New series held by this appender. |
|
samples []record.RefSample // New samples held by this appender. |
|
exemplars []exemplarWithSeriesRef // New exemplars held by this appender. |
|
sampleSeries []*memSeries // Series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). |
|
|
|
appendID, cleanupAppendIDsBelow uint64 |
|
closed bool |
|
} |
|
|
|
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { |
|
if t < a.minValidTime { |
|
a.head.metrics.outOfBoundSamples.Inc() |
|
return 0, storage.ErrOutOfBounds |
|
} |
|
|
|
s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) |
|
if s == nil { |
|
// Ensure no empty labels have gotten through. |
|
lset = lset.WithoutEmpty() |
|
if len(lset) == 0 { |
|
return 0, errors.Wrap(ErrInvalidSample, "empty labelset") |
|
} |
|
|
|
if l, dup := lset.HasDuplicateLabelNames(); dup { |
|
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) |
|
} |
|
|
|
var created bool |
|
var err error |
|
s, created, err = a.head.getOrCreate(lset.Hash(), lset) |
|
if err != nil { |
|
return 0, err |
|
} |
|
if created { |
|
a.series = append(a.series, record.RefSeries{ |
|
Ref: s.ref, |
|
Labels: lset, |
|
}) |
|
} |
|
} |
|
|
|
s.Lock() |
|
if err := s.appendable(t, v); err != nil { |
|
s.Unlock() |
|
if err == storage.ErrOutOfOrderSample { |
|
a.head.metrics.outOfOrderSamples.Inc() |
|
} |
|
return 0, err |
|
} |
|
s.pendingCommit = true |
|
s.Unlock() |
|
|
|
if t < a.mint { |
|
a.mint = t |
|
} |
|
if t > a.maxt { |
|
a.maxt = t |
|
} |
|
|
|
a.samples = append(a.samples, record.RefSample{ |
|
Ref: s.ref, |
|
T: t, |
|
V: v, |
|
}) |
|
a.sampleSeries = append(a.sampleSeries, s) |
|
return storage.SeriesRef(s.ref), nil |
|
} |
|
|
|
// appendable checks whether the given sample is valid for appending to the series. |
|
func (s *memSeries) appendable(t int64, v float64) error { |
|
c := s.head() |
|
if c == nil { |
|
return nil |
|
} |
|
|
|
if t > c.maxTime { |
|
return nil |
|
} |
|
if t < c.maxTime { |
|
return storage.ErrOutOfOrderSample |
|
} |
|
// We are allowing exact duplicates as we can encounter them in valid cases |
|
// like federation and erroring out at that time would be extremely noisy. |
|
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { |
|
return storage.ErrDuplicateSampleForTimestamp |
|
} |
|
return nil |
|
} |
|
|
|
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't |
|
// use getOrCreate or make any of the lset sanity checks that Append does. |
|
func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { |
|
// Check if exemplar storage is enabled. |
|
if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { |
|
return 0, nil |
|
} |
|
|
|
// Get Series |
|
s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) |
|
if s == nil { |
|
s = a.head.series.getByHash(lset.Hash(), lset) |
|
if s != nil { |
|
ref = storage.SeriesRef(s.ref) |
|
} |
|
} |
|
if s == nil { |
|
return 0, fmt.Errorf("unknown HeadSeriesRef when trying to add exemplar: %d", ref) |
|
} |
|
|
|
// Ensure no empty labels have gotten through. |
|
e.Labels = e.Labels.WithoutEmpty() |
|
|
|
err := a.head.exemplars.ValidateExemplar(s.lset, e) |
|
if err != nil { |
|
if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled { |
|
// Duplicate, don't return an error but don't accept the exemplar. |
|
return 0, nil |
|
} |
|
return 0, err |
|
} |
|
|
|
a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) |
|
|
|
return storage.SeriesRef(s.ref), nil |
|
} |
|
|
|
var _ storage.GetRef = &headAppender{} |
|
|
|
func (a *headAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labels.Labels) { |
|
s := a.head.series.getByHash(lset.Hash(), lset) |
|
if s == nil { |
|
return 0, nil |
|
} |
|
// returned labels must be suitable to pass to Append() |
|
return storage.SeriesRef(s.ref), s.lset |
|
} |
|
|
|
// log writes all headAppender's data to the WAL. |
|
func (a *headAppender) log() error { |
|
if a.head.wal == nil { |
|
return nil |
|
} |
|
|
|
buf := a.head.getBytesBuffer() |
|
defer func() { a.head.putBytesBuffer(buf) }() |
|
|
|
var rec []byte |
|
var enc record.Encoder |
|
|
|
if len(a.series) > 0 { |
|
rec = enc.Series(a.series, buf) |
|
buf = rec[:0] |
|
|
|
if err := a.head.wal.Log(rec); err != nil { |
|
return errors.Wrap(err, "log series") |
|
} |
|
} |
|
if len(a.samples) > 0 { |
|
rec = enc.Samples(a.samples, buf) |
|
buf = rec[:0] |
|
|
|
if err := a.head.wal.Log(rec); err != nil { |
|
return errors.Wrap(err, "log samples") |
|
} |
|
} |
|
if len(a.exemplars) > 0 { |
|
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf) |
|
buf = rec[:0] |
|
|
|
if err := a.head.wal.Log(rec); err != nil { |
|
return errors.Wrap(err, "log exemplars") |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar { |
|
ret := make([]record.RefExemplar, 0, len(es)) |
|
for _, e := range es { |
|
ret = append(ret, record.RefExemplar{ |
|
Ref: chunks.HeadSeriesRef(e.ref), |
|
T: e.exemplar.Ts, |
|
V: e.exemplar.Value, |
|
Labels: e.exemplar.Labels, |
|
}) |
|
} |
|
return ret |
|
} |
|
|
|
// Commit writes to the WAL and adds the data to the Head. |
|
func (a *headAppender) Commit() (err error) { |
|
if a.closed { |
|
return ErrAppenderClosed |
|
} |
|
defer func() { a.closed = true }() |
|
|
|
if err := a.log(); err != nil { |
|
_ = a.Rollback() // Most likely the same error will happen again. |
|
return errors.Wrap(err, "write to WAL") |
|
} |
|
|
|
// No errors logging to WAL, so pass the exemplars along to the in memory storage. |
|
for _, e := range a.exemplars { |
|
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref)) |
|
// We don't instrument exemplar appends here, all is instrumented by storage. |
|
if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil { |
|
if err == storage.ErrOutOfOrderExemplar { |
|
continue |
|
} |
|
level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err) |
|
} |
|
} |
|
|
|
defer a.head.metrics.activeAppenders.Dec() |
|
defer a.head.putAppendBuffer(a.samples) |
|
defer a.head.putSeriesBuffer(a.sampleSeries) |
|
defer a.head.putExemplarBuffer(a.exemplars) |
|
defer a.head.iso.closeAppend(a.appendID) |
|
|
|
total := len(a.samples) |
|
var series *memSeries |
|
for i, s := range a.samples { |
|
series = a.sampleSeries[i] |
|
series.Lock() |
|
ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) |
|
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) |
|
series.pendingCommit = false |
|
series.Unlock() |
|
|
|
if !ok { |
|
total-- |
|
a.head.metrics.outOfOrderSamples.Inc() |
|
} |
|
if chunkCreated { |
|
a.head.metrics.chunks.Inc() |
|
a.head.metrics.chunksCreated.Inc() |
|
} |
|
} |
|
|
|
a.head.metrics.samplesAppended.Add(float64(total)) |
|
a.head.updateMinMaxTime(a.mint, a.maxt) |
|
|
|
return nil |
|
} |
|
|
|
// append adds the sample (t, v) to the series. The caller also has to provide |
|
// the appendID for isolation. (The appendID can be zero, which results in no |
|
// isolation for this append.) |
|
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. |
|
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { |
|
// Based on Gorilla white papers this offers near-optimal compression ratio |
|
// so anything bigger that this has diminishing returns and increases |
|
// the time range within which we have to decompress all samples. |
|
const samplesPerChunk = 120 |
|
|
|
c := s.head() |
|
|
|
if c == nil { |
|
if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { |
|
// Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it. |
|
return false, false |
|
} |
|
// There is no chunk in this series yet, create the first chunk for the sample. |
|
c = s.cutNewHeadChunk(t, chunkDiskMapper) |
|
chunkCreated = true |
|
} |
|
|
|
// Out of order sample. |
|
if c.maxTime >= t { |
|
return false, chunkCreated |
|
} |
|
|
|
numSamples := c.chunk.NumSamples() |
|
if numSamples == 0 { |
|
// It could be the new chunk created after reading the chunk snapshot, |
|
// hence we fix the minTime of the chunk here. |
|
c.minTime = t |
|
s.nextAt = rangeForTimestamp(c.minTime, s.chunkRange) |
|
} |
|
|
|
// If we reach 25% of a chunk's desired sample count, predict an end time |
|
// for this chunk that will try to make samples equally distributed within |
|
// the remaining chunks in the current chunk range. |
|
// At latest it must happen at the timestamp set when the chunk was cut. |
|
if numSamples == samplesPerChunk/4 { |
|
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) |
|
} |
|
if t >= s.nextAt { |
|
c = s.cutNewHeadChunk(t, chunkDiskMapper) |
|
chunkCreated = true |
|
} |
|
s.app.Append(t, v) |
|
|
|
c.maxTime = t |
|
|
|
s.sampleBuf[0] = s.sampleBuf[1] |
|
s.sampleBuf[1] = s.sampleBuf[2] |
|
s.sampleBuf[2] = s.sampleBuf[3] |
|
s.sampleBuf[3] = sample{t: t, v: v} |
|
|
|
if appendID > 0 && s.txs != nil { |
|
s.txs.add(appendID) |
|
} |
|
|
|
return true, chunkCreated |
|
} |
|
|
|
// computeChunkEndTime estimates the end timestamp based the beginning of a |
|
// chunk, its current timestamp and the upper bound up to which we insert data. |
|
// It assumes that the time range is 1/4 full. |
|
// Assuming that the samples will keep arriving at the same rate, it will make the |
|
// remaining n chunks within this chunk range (before max) equally sized. |
|
func computeChunkEndTime(start, cur, max int64) int64 { |
|
n := (max - start) / ((cur - start + 1) * 4) |
|
if n <= 1 { |
|
return max |
|
} |
|
return start + (max-start)/n |
|
} |
|
|
|
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { |
|
s.mmapCurrentHeadChunk(chunkDiskMapper) |
|
|
|
s.headChunk = &memChunk{ |
|
chunk: chunkenc.NewXORChunk(), |
|
minTime: mint, |
|
maxTime: math.MinInt64, |
|
} |
|
|
|
// Set upper bound on when the next chunk must be started. An earlier timestamp |
|
// may be chosen dynamically at a later point. |
|
s.nextAt = rangeForTimestamp(mint, s.chunkRange) |
|
|
|
app, err := s.headChunk.chunk.Appender() |
|
if err != nil { |
|
panic(err) |
|
} |
|
s.app = app |
|
return s.headChunk |
|
} |
|
|
|
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { |
|
if s.headChunk == nil { |
|
// There is no head chunk, so nothing to m-map here. |
|
return |
|
} |
|
|
|
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, handleChunkWriteError) |
|
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ |
|
ref: chunkRef, |
|
numSamples: uint16(s.headChunk.chunk.NumSamples()), |
|
minTime: s.headChunk.minTime, |
|
maxTime: s.headChunk.maxTime, |
|
}) |
|
} |
|
|
|
func handleChunkWriteError(err error) { |
|
if err != nil && err != chunks.ErrChunkDiskMapperClosed { |
|
panic(err) |
|
} |
|
} |
|
|
|
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL. |
|
func (a *headAppender) Rollback() (err error) { |
|
if a.closed { |
|
return ErrAppenderClosed |
|
} |
|
defer func() { a.closed = true }() |
|
defer a.head.metrics.activeAppenders.Dec() |
|
defer a.head.iso.closeAppend(a.appendID) |
|
defer a.head.putSeriesBuffer(a.sampleSeries) |
|
|
|
var series *memSeries |
|
for i := range a.samples { |
|
series = a.sampleSeries[i] |
|
series.Lock() |
|
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) |
|
series.pendingCommit = false |
|
series.Unlock() |
|
} |
|
a.head.putAppendBuffer(a.samples) |
|
a.head.putExemplarBuffer(a.exemplars) |
|
a.samples = nil |
|
a.exemplars = nil |
|
|
|
// Series are created in the head memory regardless of rollback. Thus we have |
|
// to log them to the WAL in any case. |
|
return a.log() |
|
}
|
|
|