// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"fmt"
"math"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
app storage . Appender
head * Head
}
var _ storage . GetRef = & initAppender { }
func ( a * initAppender ) Append ( ref storage . SeriesRef , lset labels . Labels , t int64 , v float64 ) ( storage . SeriesRef , error ) {
if a . app != nil {
return a . app . Append ( ref , lset , t , v )
}
a . head . initTime ( t )
a . app = a . head . appender ( )
return a . app . Append ( ref , lset , t , v )
}
func ( a * initAppender ) AppendExemplar ( ref storage . SeriesRef , l labels . Labels , e exemplar . Exemplar ) ( storage . SeriesRef , error ) {
// Check if exemplar storage is enabled.
if ! a . head . opts . EnableExemplarStorage || a . head . opts . MaxExemplars . Load ( ) <= 0 {
return 0 , nil
}
if a . app != nil {
return a . app . AppendExemplar ( ref , l , e )
}
// We should never reach here given we would call Append before AppendExemplar
// and we probably want to always base head/WAL min time on sample times.
a . head . initTime ( e . Ts )
a . app = a . head . appender ( )
return a . app . AppendExemplar ( ref , l , e )
}
// initTime initializes a head with the first timestamp. This only needs to be called
// for a completely fresh head with an empty WAL.
func ( h * Head ) initTime ( t int64 ) {
if ! h . minTime . CAS ( math . MaxInt64 , t ) {
return
}
// Ensure that max time is initialized to at least the min time we just set.
// Concurrent appenders may already have set it to a higher value.
h . maxTime . CAS ( math . MinInt64 , t )
}
func ( a * initAppender ) GetRef ( lset labels . Labels ) ( storage . SeriesRef , labels . Labels ) {
if g , ok := a . app . ( storage . GetRef ) ; ok {
return g . GetRef ( lset )
}
return 0 , nil
}
func ( a * initAppender ) Commit ( ) error {
if a . app == nil {
a . head . metrics . activeAppenders . Dec ( )
return nil
}
return a . app . Commit ( )
}
func ( a * initAppender ) Rollback ( ) error {
if a . app == nil {
a . head . metrics . activeAppenders . Dec ( )
return nil
}
return a . app . Rollback ( )
}
// Appender returns a new Appender on the database.
func ( h * Head ) Appender ( _ context . Context ) storage . Appender {
h . metrics . activeAppenders . Inc ( )
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h . MinTime ( ) == math . MaxInt64 {
return & initAppender {
head : h ,
}
}
return h . appender ( )
}
func ( h * Head ) appender ( ) * headAppender {
appendID , cleanupAppendIDsBelow := h . iso . newAppendID ( ) // Every appender gets an ID that is cleared upon commit/rollback.
// Allocate the exemplars buffer only if exemplars are enabled.
var exemplarsBuf [ ] exemplarWithSeriesRef
if h . opts . EnableExemplarStorage {
exemplarsBuf = h . getExemplarBuffer ( )
}
return & headAppender {
head : h ,
minValidTime : h . appendableMinValidTime ( ) ,
mint : math . MaxInt64 ,
maxt : math . MinInt64 ,
samples : h . getAppendBuffer ( ) ,
sampleSeries : h . getSeriesBuffer ( ) ,
exemplars : exemplarsBuf ,
appendID : appendID ,
cleanupAppendIDsBelow : cleanupAppendIDsBelow ,
}
}
func ( h * Head ) appendableMinValidTime ( ) int64 {
// Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window,
// ensures that no samples will be added within the compaction window to avoid races.
return max ( h . minValidTime . Load ( ) , h . MaxTime ( ) - h . chunkRange . Load ( ) / 2 )
}
// AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head.
// Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.
func ( h * Head ) AppendableMinValidTime ( ) ( int64 , bool ) {
if h . MinTime ( ) == math . MaxInt64 {
return 0 , false
}
return h . appendableMinValidTime ( ) , true
}
func max ( a , b int64 ) int64 {
if a > b {
return a
}
return b
}
func ( h * Head ) getAppendBuffer ( ) [ ] record . RefSample {
b := h . appendPool . Get ( )
if b == nil {
return make ( [ ] record . RefSample , 0 , 512 )
}
return b . ( [ ] record . RefSample )
}
func ( h * Head ) putAppendBuffer ( b [ ] record . RefSample ) {
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h . appendPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getExemplarBuffer ( ) [ ] exemplarWithSeriesRef {
b := h . exemplarsPool . Get ( )
if b == nil {
return make ( [ ] exemplarWithSeriesRef , 0 , 512 )
}
return b . ( [ ] exemplarWithSeriesRef )
}
func ( h * Head ) putExemplarBuffer ( b [ ] exemplarWithSeriesRef ) {
if b == nil {
return
}
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h . exemplarsPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getSeriesBuffer ( ) [ ] * memSeries {
b := h . seriesPool . Get ( )
if b == nil {
return make ( [ ] * memSeries , 0 , 512 )
}
return b . ( [ ] * memSeries )
}
func ( h * Head ) putSeriesBuffer ( b [ ] * memSeries ) {
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h . seriesPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getBytesBuffer ( ) [ ] byte {
b := h . bytesPool . Get ( )
if b == nil {
return make ( [ ] byte , 0 , 1024 )
}
return b . ( [ ] byte )
}
func ( h * Head ) putBytesBuffer ( b [ ] byte ) {
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h . bytesPool . Put ( b [ : 0 ] )
}
type exemplarWithSeriesRef struct {
ref storage . SeriesRef
exemplar exemplar . Exemplar
}
type headAppender struct {
head * Head
minValidTime int64 // No samples below this timestamp are allowed.
mint , maxt int64
series [ ] record . RefSeries // New series held by this appender.
samples [ ] record . RefSample // New samples held by this appender.
exemplars [ ] exemplarWithSeriesRef // New exemplars held by this appender.
sampleSeries [ ] * memSeries // Series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
appendID , cleanupAppendIDsBelow uint64
closed bool
}
func ( a * headAppender ) Append ( ref storage . SeriesRef , lset labels . Labels , t int64 , v float64 ) ( storage . SeriesRef , error ) {
if t < a . minValidTime {
a . head . metrics . outOfBoundSamples . Inc ( )
return 0 , storage . ErrOutOfBounds
}
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
// Ensure no empty labels have gotten through.
lset = lset . WithoutEmpty ( )
if len ( lset ) == 0 {
return 0 , errors . Wrap ( ErrInvalidSample , "empty labelset" )
}
if l , dup := lset . HasDuplicateLabelNames ( ) ; dup {
return 0 , errors . Wrap ( ErrInvalidSample , fmt . Sprintf ( ` label name "%s" is not unique ` , l ) )
}
var created bool
var err error
s , created , err = a . head . getOrCreate ( lset . Hash ( ) , lset )
if err != nil {
return 0 , err
}
if created {
a . series = append ( a . series , record . RefSeries {
Ref : s . ref ,
Labels : lset ,
} )
}
}
s . Lock ( )
if err := s . appendable ( t , v ) ; err != nil {
s . Unlock ( )
if err == storage . ErrOutOfOrderSample {
a . head . metrics . outOfOrderSamples . Inc ( )
}
return 0 , err
}
s . pendingCommit = true
s . Unlock ( )
if t < a . mint {
a . mint = t
}
if t > a . maxt {
a . maxt = t
}
a . samples = append ( a . samples , record . RefSample {
Ref : s . ref ,
T : t ,
V : v ,
} )
a . sampleSeries = append ( a . sampleSeries , s )
return storage . SeriesRef ( s . ref ) , nil
}
// appendable checks whether the given sample is valid for appending to the series.
func ( s * memSeries ) appendable ( t int64 , v float64 ) error {
c := s . head ( )
if c == nil {
return nil
}
if t > c . maxTime {
return nil
}
if t < c . maxTime {
return storage . ErrOutOfOrderSample
}
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if math . Float64bits ( s . sampleBuf [ 3 ] . v ) != math . Float64bits ( v ) {
return storage . ErrDuplicateSampleForTimestamp
}
return nil
}
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset sanity checks that Append does.
func ( a * headAppender ) AppendExemplar ( ref storage . SeriesRef , lset labels . Labels , e exemplar . Exemplar ) ( storage . SeriesRef , error ) {
// Check if exemplar storage is enabled.
if ! a . head . opts . EnableExemplarStorage || a . head . opts . MaxExemplars . Load ( ) <= 0 {
return 0 , nil
}
// Get Series
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
s = a . head . series . getByHash ( lset . Hash ( ) , lset )
if s != nil {
ref = storage . SeriesRef ( s . ref )
}
}
if s == nil {
return 0 , fmt . Errorf ( "unknown HeadSeriesRef when trying to add exemplar: %d" , ref )
}
// Ensure no empty labels have gotten through.
e . Labels = e . Labels . WithoutEmpty ( )
err := a . head . exemplars . ValidateExemplar ( s . lset , e )
if err != nil {
if err == storage . ErrDuplicateExemplar || err == storage . ErrExemplarsDisabled {
// Duplicate, don't return an error but don't accept the exemplar.
return 0 , nil
}
return 0 , err
}
a . exemplars = append ( a . exemplars , exemplarWithSeriesRef { ref , e } )
return storage . SeriesRef ( s . ref ) , nil
}
var _ storage . GetRef = & headAppender { }
func ( a * headAppender ) GetRef ( lset labels . Labels ) ( storage . SeriesRef , labels . Labels ) {
s := a . head . series . getByHash ( lset . Hash ( ) , lset )
if s == nil {
return 0 , nil
}
// returned labels must be suitable to pass to Append()
return storage . SeriesRef ( s . ref ) , s . lset
}
// log writes all headAppender's data to the WAL.
func ( a * headAppender ) log ( ) error {
if a . head . wal == nil {
return nil
}
buf := a . head . getBytesBuffer ( )
defer func ( ) { a . head . putBytesBuffer ( buf ) } ( )
var rec [ ] byte
var enc record . Encoder
if len ( a . series ) > 0 {
rec = enc . Series ( a . series , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return errors . Wrap ( err , "log series" )
}
}
if len ( a . samples ) > 0 {
rec = enc . Samples ( a . samples , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return errors . Wrap ( err , "log samples" )
}
}
if len ( a . exemplars ) > 0 {
rec = enc . Exemplars ( exemplarsForEncoding ( a . exemplars ) , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return errors . Wrap ( err , "log exemplars" )
}
}
return nil
}
func exemplarsForEncoding ( es [ ] exemplarWithSeriesRef ) [ ] record . RefExemplar {
ret := make ( [ ] record . RefExemplar , 0 , len ( es ) )
for _ , e := range es {
ret = append ( ret , record . RefExemplar {
Ref : chunks . HeadSeriesRef ( e . ref ) ,
T : e . exemplar . Ts ,
V : e . exemplar . Value ,
Labels : e . exemplar . Labels ,
} )
}
return ret
}
// Commit writes to the WAL and adds the data to the Head.
func ( a * headAppender ) Commit ( ) ( err error ) {
if a . closed {
return ErrAppenderClosed
}
defer func ( ) { a . closed = true } ( )
if err := a . log ( ) ; err != nil {
_ = a . Rollback ( ) // Most likely the same error will happen again.
return errors . Wrap ( err , "write to WAL" )
}
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _ , e := range a . exemplars {
s := a . head . series . getByID ( chunks . HeadSeriesRef ( e . ref ) )
// We don't instrument exemplar appends here, all is instrumented by storage.
if err := a . head . exemplars . AddExemplar ( s . lset , e . exemplar ) ; err != nil {
if err == storage . ErrOutOfOrderExemplar {
continue
}
level . Debug ( a . head . logger ) . Log ( "msg" , "Unknown error while adding exemplar" , "err" , err )
}
}
defer a . head . metrics . activeAppenders . Dec ( )
defer a . head . putAppendBuffer ( a . samples )
defer a . head . putSeriesBuffer ( a . sampleSeries )
defer a . head . putExemplarBuffer ( a . exemplars )
defer a . head . iso . closeAppend ( a . appendID )
total := len ( a . samples )
var series * memSeries
for i , s := range a . samples {
series = a . sampleSeries [ i ]
series . Lock ( )
ok , chunkCreated := series . append ( s . T , s . V , a . appendID , a . head . chunkDiskMapper )
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
if ! ok {
total --
a . head . metrics . outOfOrderSamples . Inc ( )
}
if chunkCreated {
a . head . metrics . chunks . Inc ( )
a . head . metrics . chunksCreated . Inc ( )
}
}
a . head . metrics . samplesAppended . Add ( float64 ( total ) )
a . head . updateMinMaxTime ( a . mint , a . maxt )
return nil
}
// append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func ( s * memSeries ) append ( t int64 , v float64 , appendID uint64 , chunkDiskMapper * chunks . ChunkDiskMapper ) ( sampleInOrder , chunkCreated bool ) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
const samplesPerChunk = 120
c := s . head ( )
if c == nil {
if len ( s . mmappedChunks ) > 0 && s . mmappedChunks [ len ( s . mmappedChunks ) - 1 ] . maxTime >= t {
// Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it.
return false , false
}
// There is no chunk in this series yet, create the first chunk for the sample.
c = s . cutNewHeadChunk ( t , chunkDiskMapper )
chunkCreated = true
}
// Out of order sample.
if c . maxTime >= t {
return false , chunkCreated
}
numSamples := c . chunk . NumSamples ( )
if numSamples == 0 {
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c . minTime = t
s . nextAt = rangeForTimestamp ( c . minTime , s . chunkRange )
}
// If we reach 25% of a chunk's desired sample count, predict an end time
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk / 4 {
s . nextAt = computeChunkEndTime ( c . minTime , c . maxTime , s . nextAt )
}
if t >= s . nextAt {
c = s . cutNewHeadChunk ( t , chunkDiskMapper )
chunkCreated = true
}
s . app . Append ( t , v )
c . maxTime = t
s . sampleBuf [ 0 ] = s . sampleBuf [ 1 ]
s . sampleBuf [ 1 ] = s . sampleBuf [ 2 ]
s . sampleBuf [ 2 ] = s . sampleBuf [ 3 ]
s . sampleBuf [ 3 ] = sample { t : t , v : v }
if appendID > 0 {
s . txs . add ( appendID )
}
return true , chunkCreated
}
// computeChunkEndTime estimates the end timestamp based the beginning of a
// chunk, its current timestamp and the upper bound up to which we insert data.
// It assumes that the time range is 1/4 full.
// Assuming that the samples will keep arriving at the same rate, it will make the
// remaining n chunks within this chunk range (before max) equally sized.
func computeChunkEndTime ( start , cur , max int64 ) int64 {
n := ( max - start ) / ( ( cur - start + 1 ) * 4 )
if n <= 1 {
return max
}
return start + ( max - start ) / n
}
func ( s * memSeries ) cutNewHeadChunk ( mint int64 , chunkDiskMapper * chunks . ChunkDiskMapper ) * memChunk {
s . mmapCurrentHeadChunk ( chunkDiskMapper )
s . headChunk = & memChunk {
chunk : chunkenc . NewXORChunk ( ) ,
minTime : mint ,
maxTime : math . MinInt64 ,
}
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
s . nextAt = rangeForTimestamp ( mint , s . chunkRange )
app , err := s . headChunk . chunk . Appender ( )
if err != nil {
panic ( err )
}
s . app = app
return s . headChunk
}
func ( s * memSeries ) mmapCurrentHeadChunk ( chunkDiskMapper * chunks . ChunkDiskMapper ) {
if s . headChunk == nil {
// There is no head chunk, so nothing to m-map here.
return
}
chunkRef , err := chunkDiskMapper . WriteChunk ( s . ref , s . headChunk . minTime , s . headChunk . maxTime , s . headChunk . chunk )
if err != nil {
if err != chunks . ErrChunkDiskMapperClosed {
panic ( err )
}
}
s . mmappedChunks = append ( s . mmappedChunks , & mmappedChunk {
ref : chunkRef ,
numSamples : uint16 ( s . headChunk . chunk . NumSamples ( ) ) ,
minTime : s . headChunk . minTime ,
maxTime : s . headChunk . maxTime ,
} )
}
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL.
func ( a * headAppender ) Rollback ( ) ( err error ) {
if a . closed {
return ErrAppenderClosed
}
defer func ( ) { a . closed = true } ( )
defer a . head . metrics . activeAppenders . Dec ( )
defer a . head . iso . closeAppend ( a . appendID )
defer a . head . putSeriesBuffer ( a . sampleSeries )
var series * memSeries
for i := range a . samples {
series = a . sampleSeries [ i ]
series . Lock ( )
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
}
a . head . putAppendBuffer ( a . samples )
a . head . putExemplarBuffer ( a . exemplars )
a . samples = nil
a . exemplars = nil
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
return a . log ( )
}