// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"github.com/prometheus/prometheus/model/exemplar"
Style cleanup of all the changes in sparsehistogram so far
A lot of this code was hacked together, literally during a
hackathon. This commit intends not to change the code substantially,
but just make the code obey the usual style practices.
A (possibly incomplete) list of areas:
* Generally address linter warnings.
* The `pgk` directory is deprecated as per dev-summit. No new packages should
be added to it. I moved the new `pkg/histogram` package to `model`
anticipating what's proposed in #9478.
* Make the naming of the Sparse Histogram more consistent. Including
abbreviations, there were just too many names for it: SparseHistogram,
Histogram, Histo, hist, his, shs, h. The idea is to call it "Histogram" in
general. Only add "Sparse" if it is needed to avoid confusion with
conventional Histograms (which is rare because the TSDB really has no notion
of conventional Histograms). Use abbreviations only in local scope, and then
really abbreviate (not just removing three out of seven letters like in
"Histo"). This is in the spirit of
https://github.com/golang/go/wiki/CodeReviewComments#variable-names
* Several other minor name changes.
* A lot of formatting of doc comments. For one, following
https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences
, but also layout question, anticipating how things will look like
when rendered by `godoc` (even where `godoc` doesn't render them
right now because they are for unexported types or not a doc comment
at all but just a normal code comment - consistency is queen!).
* Re-enabled `TestQueryLog` and `TestEndopints` (they pass now,
leaving them disabled was presumably an oversight).
* Bucket iterator for histogram.Histogram is now created with a
method.
* HistogramChunk.iterator now allows iterator recycling. (I think
@dieterbe only commented it out because he was confused by the
question in the comment.)
* HistogramAppender.Append panics now because we decided to treat
staleness marker differently.
Signed-off-by: beorn7 <beorn@grafana.com>
3 years ago
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
app storage . Appender
head * Head
}
var _ storage . GetRef = & initAppender { }
func ( a * initAppender ) SetOptions ( opts * storage . AppendOptions ) {
if a . app != nil {
a . app . SetOptions ( opts )
}
}
func ( a * initAppender ) Append ( ref storage . SeriesRef , lset labels . Labels , t int64 , v float64 ) ( storage . SeriesRef , error ) {
if a . app != nil {
return a . app . Append ( ref , lset , t , v )
}
a . head . initTime ( t )
a . app = a . head . appender ( )
return a . app . Append ( ref , lset , t , v )
}
func ( a * initAppender ) AppendExemplar ( ref storage . SeriesRef , l labels . Labels , e exemplar . Exemplar ) ( storage . SeriesRef , error ) {
// Check if exemplar storage is enabled.
if ! a . head . opts . EnableExemplarStorage || a . head . opts . MaxExemplars . Load ( ) <= 0 {
return 0 , nil
}
if a . app != nil {
return a . app . AppendExemplar ( ref , l , e )
}
// We should never reach here given we would call Append before AppendExemplar
// and we probably want to always base head/WAL min time on sample times.
a . head . initTime ( e . Ts )
a . app = a . head . appender ( )
return a . app . AppendExemplar ( ref , l , e )
}
func ( a * initAppender ) AppendHistogram ( ref storage . SeriesRef , l labels . Labels , t int64 , h * histogram . Histogram , fh * histogram . FloatHistogram ) ( storage . SeriesRef , error ) {
if a . app != nil {
return a . app . AppendHistogram ( ref , l , t , h , fh )
}
a . head . initTime ( t )
a . app = a . head . appender ( )
return a . app . AppendHistogram ( ref , l , t , h , fh )
}
func ( a * initAppender ) AppendHistogramCTZeroSample ( ref storage . SeriesRef , l labels . Labels , t , ct int64 , h * histogram . Histogram , fh * histogram . FloatHistogram ) ( storage . SeriesRef , error ) {
if a . app != nil {
return a . app . AppendHistogramCTZeroSample ( ref , l , t , ct , h , fh )
}
a . head . initTime ( t )
a . app = a . head . appender ( )
return a . app . AppendHistogramCTZeroSample ( ref , l , t , ct , h , fh )
}
func ( a * initAppender ) UpdateMetadata ( ref storage . SeriesRef , l labels . Labels , m metadata . Metadata ) ( storage . SeriesRef , error ) {
if a . app != nil {
return a . app . UpdateMetadata ( ref , l , m )
}
a . app = a . head . appender ( )
return a . app . UpdateMetadata ( ref , l , m )
}
func ( a * initAppender ) AppendCTZeroSample ( ref storage . SeriesRef , lset labels . Labels , t , ct int64 ) ( storage . SeriesRef , error ) {
if a . app != nil {
return a . app . AppendCTZeroSample ( ref , lset , t , ct )
}
a . head . initTime ( t )
a . app = a . head . appender ( )
return a . app . AppendCTZeroSample ( ref , lset , t , ct )
}
// initTime initializes a head with the first timestamp. This only needs to be called
// for a completely fresh head with an empty WAL.
func ( h * Head ) initTime ( t int64 ) {
if ! h . minTime . CompareAndSwap ( math . MaxInt64 , t ) {
return
}
// Ensure that max time is initialized to at least the min time we just set.
// Concurrent appenders may already have set it to a higher value.
h . maxTime . CompareAndSwap ( math . MinInt64 , t )
}
func ( a * initAppender ) GetRef ( lset labels . Labels , hash uint64 ) ( storage . SeriesRef , labels . Labels ) {
if g , ok := a . app . ( storage . GetRef ) ; ok {
return g . GetRef ( lset , hash )
}
return 0 , labels . EmptyLabels ( )
}
func ( a * initAppender ) Commit ( ) error {
if a . app == nil {
a . head . metrics . activeAppenders . Dec ( )
return nil
}
return a . app . Commit ( )
}
func ( a * initAppender ) Rollback ( ) error {
if a . app == nil {
a . head . metrics . activeAppenders . Dec ( )
return nil
}
return a . app . Rollback ( )
}
// Appender returns a new Appender on the database.
func ( h * Head ) Appender ( _ context . Context ) storage . Appender {
h . metrics . activeAppenders . Inc ( )
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if ! h . initialized ( ) {
return & initAppender {
head : h ,
}
}
return h . appender ( )
}
func ( h * Head ) appender ( ) * headAppender {
minValidTime := h . appendableMinValidTime ( )
appendID , cleanupAppendIDsBelow := h . iso . newAppendID ( minValidTime ) // Every appender gets an ID that is cleared upon commit/rollback.
// Allocate the exemplars buffer only if exemplars are enabled.
var exemplarsBuf [ ] exemplarWithSeriesRef
if h . opts . EnableExemplarStorage {
exemplarsBuf = h . getExemplarBuffer ( )
}
return & headAppender {
head : h ,
minValidTime : minValidTime ,
mint : math . MaxInt64 ,
maxt : math . MinInt64 ,
headMaxt : h . MaxTime ( ) ,
oooTimeWindow : h . opts . OutOfOrderTimeWindow . Load ( ) ,
samples : h . getAppendBuffer ( ) ,
sampleSeries : h . getSeriesBuffer ( ) ,
exemplars : exemplarsBuf ,
histograms : h . getHistogramBuffer ( ) ,
floatHistograms : h . getFloatHistogramBuffer ( ) ,
metadata : h . getMetadataBuffer ( ) ,
appendID : appendID ,
cleanupAppendIDsBelow : cleanupAppendIDsBelow ,
}
}
// appendableMinValidTime returns the minimum valid timestamp for appends,
// such that samples stay ahead of prior blocks and the head compaction window.
func ( h * Head ) appendableMinValidTime ( ) int64 {
// This boundary ensures that no samples will be added to the compaction window.
// This allows race-free, concurrent appending and compaction.
cwEnd := h . MaxTime ( ) - h . chunkRange . Load ( ) / 2
// This boundary ensures that we avoid overlapping timeframes from one block to the next.
// While not necessary for correctness, it means we're not required to use vertical compaction.
minValid := h . minValidTime . Load ( )
return max ( cwEnd , minValid )
}
// AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head.
// Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.
func ( h * Head ) AppendableMinValidTime ( ) ( int64 , bool ) {
if ! h . initialized ( ) {
return 0 , false
}
return h . appendableMinValidTime ( ) , true
}
func ( h * Head ) getAppendBuffer ( ) [ ] record . RefSample {
b := h . appendPool . Get ( )
if b == nil {
return make ( [ ] record . RefSample , 0 , 512 )
}
return b
}
func ( h * Head ) putAppendBuffer ( b [ ] record . RefSample ) {
h . appendPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getExemplarBuffer ( ) [ ] exemplarWithSeriesRef {
b := h . exemplarsPool . Get ( )
if b == nil {
return make ( [ ] exemplarWithSeriesRef , 0 , 512 )
}
return b
}
func ( h * Head ) putExemplarBuffer ( b [ ] exemplarWithSeriesRef ) {
if b == nil {
return
}
for i := range b { // Zero out to avoid retaining label data.
b [ i ] . exemplar . Labels = labels . EmptyLabels ( )
}
h . exemplarsPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getHistogramBuffer ( ) [ ] record . RefHistogramSample {
b := h . histogramsPool . Get ( )
if b == nil {
return make ( [ ] record . RefHistogramSample , 0 , 512 )
}
return b
}
func ( h * Head ) putHistogramBuffer ( b [ ] record . RefHistogramSample ) {
h . histogramsPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getFloatHistogramBuffer ( ) [ ] record . RefFloatHistogramSample {
b := h . floatHistogramsPool . Get ( )
if b == nil {
return make ( [ ] record . RefFloatHistogramSample , 0 , 512 )
}
return b
}
func ( h * Head ) putFloatHistogramBuffer ( b [ ] record . RefFloatHistogramSample ) {
h . floatHistogramsPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getMetadataBuffer ( ) [ ] record . RefMetadata {
b := h . metadataPool . Get ( )
if b == nil {
return make ( [ ] record . RefMetadata , 0 , 512 )
}
return b
}
func ( h * Head ) putMetadataBuffer ( b [ ] record . RefMetadata ) {
h . metadataPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getSeriesBuffer ( ) [ ] * memSeries {
b := h . seriesPool . Get ( )
if b == nil {
return make ( [ ] * memSeries , 0 , 512 )
}
return b
}
func ( h * Head ) putSeriesBuffer ( b [ ] * memSeries ) {
for i := range b { // Zero out to avoid retaining data.
b [ i ] = nil
}
h . seriesPool . Put ( b [ : 0 ] )
}
func ( h * Head ) getBytesBuffer ( ) [ ] byte {
b := h . bytesPool . Get ( )
if b == nil {
return make ( [ ] byte , 0 , 1024 )
}
return b
}
func ( h * Head ) putBytesBuffer ( b [ ] byte ) {
h . bytesPool . Put ( b [ : 0 ] )
}
type exemplarWithSeriesRef struct {
ref storage . SeriesRef
exemplar exemplar . Exemplar
}
type headAppender struct {
head * Head
minValidTime int64 // No samples below this timestamp are allowed.
mint , maxt int64
headMaxt int64 // We track it here to not take the lock for every sample appended.
oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample.
series [ ] record . RefSeries // New series held by this appender.
samples [ ] record . RefSample // New float samples held by this appender.
sampleSeries [ ] * memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
histograms [ ] record . RefHistogramSample // New histogram samples held by this appender.
histogramSeries [ ] * memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
floatHistograms [ ] record . RefFloatHistogramSample // New float histogram samples held by this appender.
floatHistogramSeries [ ] * memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
metadata [ ] record . RefMetadata // New metadata held by this appender.
metadataSeries [ ] * memSeries // Series corresponding to the metadata held by this appender.
exemplars [ ] exemplarWithSeriesRef // New exemplars held by this appender.
appendID , cleanupAppendIDsBelow uint64
closed bool
hints * storage . AppendOptions
}
func ( a * headAppender ) SetOptions ( opts * storage . AppendOptions ) {
a . hints = opts
}
func ( a * headAppender ) Append ( ref storage . SeriesRef , lset labels . Labels , t int64 , v float64 ) ( storage . SeriesRef , error ) {
// Fail fast if OOO is disabled and the sample is out of bounds.
// Otherwise a full check will be done later to decide if the sample is in-order or out-of-order.
if a . oooTimeWindow == 0 && t < a . minValidTime {
a . head . metrics . outOfBoundSamples . WithLabelValues ( sampleMetricTypeFloat ) . Inc ( )
return 0 , storage . ErrOutOfBounds
}
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
var err error
s , _ , err = a . getOrCreate ( lset )
if err != nil {
return 0 , err
}
}
s . Lock ( )
if value . IsStaleNaN ( v ) {
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we could do this conversion
// in commit. This code should move into Commit().
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
switch {
case s . lastHistogramValue != nil :
s . Unlock ( )
return a . AppendHistogram ( ref , lset , t , & histogram . Histogram { Sum : v } , nil )
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
case s . lastFloatHistogramValue != nil :
s . Unlock ( )
return a . AppendHistogram ( ref , lset , t , nil , & histogram . FloatHistogram { Sum : v } )
}
}
defer s . Unlock ( )
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
isOOO , delta , err := s . appendable ( t , v , a . headMaxt , a . minValidTime , a . oooTimeWindow )
if err == nil {
if isOOO && a . hints != nil && a . hints . DiscardOutOfOrder {
a . head . metrics . outOfOrderSamples . WithLabelValues ( sampleMetricTypeFloat ) . Inc ( )
return 0 , storage . ErrOutOfOrderSample
}
s . pendingCommit = true
}
if delta > 0 {
a . head . metrics . oooHistogram . Observe ( float64 ( delta ) / 1000 )
}
if err != nil {
switch {
case errors . Is ( err , storage . ErrOutOfOrderSample ) :
a . head . metrics . outOfOrderSamples . WithLabelValues ( sampleMetricTypeFloat ) . Inc ( )
case errors . Is ( err , storage . ErrTooOldSample ) :
a . head . metrics . tooOldSamples . WithLabelValues ( sampleMetricTypeFloat ) . Inc ( )
}
return 0 , err
}
if t < a . mint {
a . mint = t
}
if t > a . maxt {
a . maxt = t
}
a . samples = append ( a . samples , record . RefSample {
Ref : s . ref ,
T : t ,
V : v ,
} )
a . sampleSeries = append ( a . sampleSeries , s )
return storage . SeriesRef ( s . ref ) , nil
}
// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns
// error when sample can't be appended. See
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func ( a * headAppender ) AppendCTZeroSample ( ref storage . SeriesRef , lset labels . Labels , t , ct int64 ) ( storage . SeriesRef , error ) {
if ct >= t {
return 0 , storage . ErrCTNewerThanSample
}
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
var err error
s , _ , err = a . getOrCreate ( lset )
if err != nil {
return 0 , err
}
}
// Check if CT wouldn't be OOO vs samples we already might have for this series.
// NOTE(bwplotka): This will be often hit as it's expected for long living
// counters to share the same CT.
s . Lock ( )
isOOO , _ , err := s . appendable ( ct , 0 , a . headMaxt , a . minValidTime , a . oooTimeWindow )
if err == nil {
s . pendingCommit = true
}
s . Unlock ( )
if err != nil {
return 0 , err
}
if isOOO {
return storage . SeriesRef ( s . ref ) , storage . ErrOutOfOrderCT
}
if ct > a . maxt {
a . maxt = ct
}
a . samples = append ( a . samples , record . RefSample { Ref : s . ref , T : ct , V : 0.0 } )
a . sampleSeries = append ( a . sampleSeries , s )
return storage . SeriesRef ( s . ref ) , nil
}
func ( a * headAppender ) getOrCreate ( lset labels . Labels ) ( s * memSeries , created bool , err error ) {
// Ensure no empty labels have gotten through.
lset = lset . WithoutEmpty ( )
if lset . IsEmpty ( ) {
return nil , false , fmt . Errorf ( "empty labelset: %w" , ErrInvalidSample )
}
if l , dup := lset . HasDuplicateLabelNames ( ) ; dup {
return nil , false , fmt . Errorf ( ` label name "%s" is not unique: %w ` , l , ErrInvalidSample )
}
s , created , err = a . head . getOrCreate ( lset . Hash ( ) , lset )
if err != nil {
return nil , false , err
}
if created {
a . series = append ( a . series , record . RefSeries {
Ref : s . ref ,
Labels : lset ,
} )
}
return s , created , nil
}
// appendable checks whether the given sample is valid for appending to the series.
// If the sample is valid and in-order, it returns false with no error.
// If the sample belongs to the out-of-order chunk, it returns true with no error.
// If the sample cannot be handled, it returns an error.
func ( s * memSeries ) appendable ( t int64 , v float64 , headMaxt , minValidTime , oooTimeWindow int64 ) ( isOOO bool , oooDelta int64 , err error ) {
// Check if we can append in the in-order chunk.
if t >= minValidTime {
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
if s . headChunks == nil {
// The series has no sample and was freshly created.
return false , 0 , nil
}
msMaxt := s . maxTime ( )
if t > msMaxt {
return false , 0 , nil
}
if t == msMaxt {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if s . lastHistogramValue != nil || s . lastFloatHistogramValue != nil {
return false , 0 , storage . NewDuplicateHistogramToFloatErr ( t , v )
}
if math . Float64bits ( s . lastValue ) != math . Float64bits ( v ) {
return false , 0 , storage . NewDuplicateFloatErr ( t , s . lastValue , v )
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false , 0 , nil
}
}
// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
if oooTimeWindow > 0 && t >= headMaxt - oooTimeWindow {
return true , headMaxt - t , nil
}
// The sample cannot go in both in-order and out-of-order chunk.
if oooTimeWindow > 0 {
return true , headMaxt - t , storage . ErrTooOldSample
}
if t < minValidTime {
return false , headMaxt - t , storage . ErrOutOfBounds
}
return false , headMaxt - t , storage . ErrOutOfOrderSample
}
// appendableHistogram checks whether the given histogram sample is valid for appending to the series. (if we return false and no error)
// The sample belongs to the out of order chunk if we return true and no error.
// An error signifies the sample cannot be handled.
func ( s * memSeries ) appendableHistogram ( t int64 , h * histogram . Histogram , headMaxt , minValidTime , oooTimeWindow int64 , oooHistogramsEnabled bool ) ( isOOO bool , oooDelta int64 , err error ) {
// Check if we can append in the in-order chunk.
if t >= minValidTime {
if s . headChunks == nil {
// The series has no sample and was freshly created.
return false , 0 , nil
}
msMaxt := s . maxTime ( )
if t > msMaxt {
return false , 0 , nil
}
if t == msMaxt {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if ! h . Equals ( s . lastHistogramValue ) {
return false , 0 , storage . ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false , 0 , nil
}
}
// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
if oooTimeWindow > 0 && t >= headMaxt - oooTimeWindow {
if ! oooHistogramsEnabled {
return true , headMaxt - t , storage . ErrOOONativeHistogramsDisabled
}
return true , headMaxt - t , nil
}
// The sample cannot go in both in-order and out-of-order chunk.
if oooTimeWindow > 0 {
return true , headMaxt - t , storage . ErrTooOldSample
}
if t < minValidTime {
return false , headMaxt - t , storage . ErrOutOfBounds
}
return false , headMaxt - t , storage . ErrOutOfOrderSample
}
// appendableFloatHistogram checks whether the given float histogram sample is valid for appending to the series. (if we return false and no error)
// The sample belongs to the out of order chunk if we return true and no error.
// An error signifies the sample cannot be handled.
func ( s * memSeries ) appendableFloatHistogram ( t int64 , fh * histogram . FloatHistogram , headMaxt , minValidTime , oooTimeWindow int64 , oooHistogramsEnabled bool ) ( isOOO bool , oooDelta int64 , err error ) {
// Check if we can append in the in-order chunk.
if t >= minValidTime {
if s . headChunks == nil {
// The series has no sample and was freshly created.
return false , 0 , nil
}
msMaxt := s . maxTime ( )
if t > msMaxt {
return false , 0 , nil
}
if t == msMaxt {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if ! fh . Equals ( s . lastFloatHistogramValue ) {
return false , 0 , storage . ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false , 0 , nil
}
}
// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
if oooTimeWindow > 0 && t >= headMaxt - oooTimeWindow {
if ! oooHistogramsEnabled {
return true , headMaxt - t , storage . ErrOOONativeHistogramsDisabled
}
return true , headMaxt - t , nil
}
// The sample cannot go in both in-order and out-of-order chunk.
if oooTimeWindow > 0 {
return true , headMaxt - t , storage . ErrTooOldSample
}
if t < minValidTime {
return false , headMaxt - t , storage . ErrOutOfBounds
}
return false , headMaxt - t , storage . ErrOutOfOrderSample
}
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset validity checks that Append does.
func ( a * headAppender ) AppendExemplar ( ref storage . SeriesRef , lset labels . Labels , e exemplar . Exemplar ) ( storage . SeriesRef , error ) {
// Check if exemplar storage is enabled.
if ! a . head . opts . EnableExemplarStorage || a . head . opts . MaxExemplars . Load ( ) <= 0 {
return 0 , nil
}
// Get Series
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
s = a . head . series . getByHash ( lset . Hash ( ) , lset )
if s != nil {
ref = storage . SeriesRef ( s . ref )
}
}
if s == nil {
return 0 , fmt . Errorf ( "unknown HeadSeriesRef when trying to add exemplar: %d" , ref )
}
// Ensure no empty labels have gotten through.
e . Labels = e . Labels . WithoutEmpty ( )
err := a . head . exemplars . ValidateExemplar ( s . labels ( ) , e )
if err != nil {
if errors . Is ( err , storage . ErrDuplicateExemplar ) || errors . Is ( err , storage . ErrExemplarsDisabled ) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0 , nil
}
return 0 , err
}
a . exemplars = append ( a . exemplars , exemplarWithSeriesRef { ref , e } )
return storage . SeriesRef ( s . ref ) , nil
}
func ( a * headAppender ) AppendHistogram ( ref storage . SeriesRef , lset labels . Labels , t int64 , h * histogram . Histogram , fh * histogram . FloatHistogram ) ( storage . SeriesRef , error ) {
if ! a . head . opts . EnableNativeHistograms . Load ( ) {
return 0 , storage . ErrNativeHistogramsDisabled
}
// Fail fast if OOO is disabled and the sample is out of bounds.
// Otherwise a full check will be done later to decide if the sample is in-order or out-of-order.
if ( a . oooTimeWindow == 0 || ! a . head . opts . EnableOOONativeHistograms . Load ( ) ) && t < a . minValidTime {
a . head . metrics . outOfBoundSamples . WithLabelValues ( sampleMetricTypeHistogram ) . Inc ( )
return 0 , storage . ErrOutOfBounds
}
if h != nil {
if err := h . Validate ( ) ; err != nil {
return 0 , err
}
}
if fh != nil {
if err := fh . Validate ( ) ; err != nil {
return 0 , err
}
}
var created bool
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
var err error
s , created , err = a . getOrCreate ( lset )
if err != nil {
return 0 , err
}
}
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
switch {
case h != nil :
s . Lock ( )
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s . lastHistogramValue == nil && s . lastFloatHistogramValue == nil {
s . lastHistogramValue = & histogram . Histogram { }
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_ , delta , err := s . appendableHistogram ( t , h , a . headMaxt , a . minValidTime , a . oooTimeWindow , a . head . opts . EnableOOONativeHistograms . Load ( ) )
if err != nil {
s . pendingCommit = true
}
s . Unlock ( )
if delta > 0 {
a . head . metrics . oooHistogram . Observe ( float64 ( delta ) / 1000 )
}
if err != nil {
switch {
case errors . Is ( err , storage . ErrOutOfOrderSample ) :
fallthrough
case errors . Is ( err , storage . ErrOOONativeHistogramsDisabled ) :
a . head . metrics . outOfOrderSamples . WithLabelValues ( sampleMetricTypeHistogram ) . Inc ( )
case errors . Is ( err , storage . ErrTooOldSample ) :
a . head . metrics . tooOldSamples . WithLabelValues ( sampleMetricTypeHistogram ) . Inc ( )
}
return 0 , err
}
a . histograms = append ( a . histograms , record . RefHistogramSample {
Ref : s . ref ,
T : t ,
H : h ,
} )
a . histogramSeries = append ( a . histogramSeries , s )
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
case fh != nil :
s . Lock ( )
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s . lastHistogramValue == nil && s . lastFloatHistogramValue == nil {
s . lastFloatHistogramValue = & histogram . FloatHistogram { }
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_ , delta , err := s . appendableFloatHistogram ( t , fh , a . headMaxt , a . minValidTime , a . oooTimeWindow , a . head . opts . EnableOOONativeHistograms . Load ( ) )
if err == nil {
s . pendingCommit = true
}
s . Unlock ( )
if delta > 0 {
a . head . metrics . oooHistogram . Observe ( float64 ( delta ) / 1000 )
}
if err != nil {
switch {
case errors . Is ( err , storage . ErrOutOfOrderSample ) :
fallthrough
case errors . Is ( err , storage . ErrOOONativeHistogramsDisabled ) :
a . head . metrics . outOfOrderSamples . WithLabelValues ( sampleMetricTypeHistogram ) . Inc ( )
case errors . Is ( err , storage . ErrTooOldSample ) :
a . head . metrics . tooOldSamples . WithLabelValues ( sampleMetricTypeHistogram ) . Inc ( )
}
return 0 , err
}
a . floatHistograms = append ( a . floatHistograms , record . RefFloatHistogramSample {
Ref : s . ref ,
T : t ,
FH : fh ,
} )
a . floatHistogramSeries = append ( a . floatHistogramSeries , s )
}
if t < a . mint {
a . mint = t
}
if t > a . maxt {
a . maxt = t
}
return storage . SeriesRef ( s . ref ) , nil
}
func ( a * headAppender ) AppendHistogramCTZeroSample ( ref storage . SeriesRef , lset labels . Labels , t , ct int64 , h * histogram . Histogram , fh * histogram . FloatHistogram ) ( storage . SeriesRef , error ) {
if ! a . head . opts . EnableNativeHistograms . Load ( ) {
return 0 , storage . ErrNativeHistogramsDisabled
}
if ct >= t {
return 0 , storage . ErrCTNewerThanSample
}
var created bool
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
var err error
s , created , err = a . getOrCreate ( lset )
if err != nil {
return 0 , err
}
}
switch {
case h != nil :
zeroHistogram := & histogram . Histogram { }
s . Lock ( )
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s . lastHistogramValue == nil && s . lastFloatHistogramValue == nil {
s . lastHistogramValue = zeroHistogram
}
// Although we call `appendableHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO , _ , err := s . appendableHistogram ( ct , zeroHistogram , a . headMaxt , a . minValidTime , a . oooTimeWindow , true )
if err != nil {
s . Unlock ( )
if errors . Is ( err , storage . ErrOutOfOrderSample ) {
return 0 , storage . ErrOutOfOrderCT
}
}
// OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples.
// This is to prevent the injected zero from being marked as OOO forever.
if isOOO {
s . Unlock ( )
return 0 , storage . ErrOutOfOrderCT
}
s . pendingCommit = true
s . Unlock ( )
a . histograms = append ( a . histograms , record . RefHistogramSample {
Ref : s . ref ,
T : ct ,
H : zeroHistogram ,
} )
a . histogramSeries = append ( a . histogramSeries , s )
case fh != nil :
zeroFloatHistogram := & histogram . FloatHistogram { }
s . Lock ( )
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s . lastHistogramValue == nil && s . lastFloatHistogramValue == nil {
s . lastFloatHistogramValue = zeroFloatHistogram
}
// Although we call `appendableFloatHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO , _ , err := s . appendableFloatHistogram ( ct , zeroFloatHistogram , a . headMaxt , a . minValidTime , a . oooTimeWindow , true ) // OOO is not allowed for CTZeroSamples.
if err != nil {
s . Unlock ( )
if errors . Is ( err , storage . ErrOutOfOrderSample ) {
return 0 , storage . ErrOutOfOrderCT
}
}
// OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples.
// This is to prevent the injected zero from being marked as OOO forever.
if isOOO {
s . Unlock ( )
return 0 , storage . ErrOutOfOrderCT
}
s . pendingCommit = true
s . Unlock ( )
a . floatHistograms = append ( a . floatHistograms , record . RefFloatHistogramSample {
Ref : s . ref ,
T : ct ,
FH : zeroFloatHistogram ,
} )
a . floatHistogramSeries = append ( a . floatHistogramSeries , s )
}
if ct > a . maxt {
a . maxt = ct
}
return storage . SeriesRef ( s . ref ) , nil
}
// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset sanity checks that Append does.
func ( a * headAppender ) UpdateMetadata ( ref storage . SeriesRef , lset labels . Labels , meta metadata . Metadata ) ( storage . SeriesRef , error ) {
s := a . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
s = a . head . series . getByHash ( lset . Hash ( ) , lset )
if s != nil {
ref = storage . SeriesRef ( s . ref )
}
}
if s == nil {
return 0 , fmt . Errorf ( "unknown series when trying to add metadata with HeadSeriesRef: %d and labels: %s" , ref , lset )
}
s . Lock ( )
hasNewMetadata := s . meta == nil || * s . meta != meta
s . Unlock ( )
if hasNewMetadata {
a . metadata = append ( a . metadata , record . RefMetadata {
Ref : s . ref ,
Type : record . GetMetricType ( meta . Type ) ,
Unit : meta . Unit ,
Help : meta . Help ,
} )
a . metadataSeries = append ( a . metadataSeries , s )
}
return ref , nil
}
var _ storage . GetRef = & headAppender { }
func ( a * headAppender ) GetRef ( lset labels . Labels , hash uint64 ) ( storage . SeriesRef , labels . Labels ) {
s := a . head . series . getByHash ( hash , lset )
if s == nil {
return 0 , labels . EmptyLabels ( )
}
// returned labels must be suitable to pass to Append()
return storage . SeriesRef ( s . ref ) , s . labels ( )
}
// log writes all headAppender's data to the WAL.
func ( a * headAppender ) log ( ) error {
if a . head . wal == nil {
return nil
}
buf := a . head . getBytesBuffer ( )
defer func ( ) { a . head . putBytesBuffer ( buf ) } ( )
var rec [ ] byte
var enc record . Encoder
if len ( a . series ) > 0 {
rec = enc . Series ( a . series , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return fmt . Errorf ( "log series: %w" , err )
}
}
if len ( a . metadata ) > 0 {
rec = enc . Metadata ( a . metadata , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return fmt . Errorf ( "log metadata: %w" , err )
}
}
if len ( a . samples ) > 0 {
rec = enc . Samples ( a . samples , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return fmt . Errorf ( "log samples: %w" , err )
}
}
if len ( a . histograms ) > 0 {
rec = enc . HistogramSamples ( a . histograms , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return fmt . Errorf ( "log histograms: %w" , err )
}
}
if len ( a . floatHistograms ) > 0 {
rec = enc . FloatHistogramSamples ( a . floatHistograms , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return fmt . Errorf ( "log float histograms: %w" , err )
}
}
// Exemplars should be logged after samples (float/native histogram/etc),
// otherwise it might happen that we send the exemplars in a remote write
// batch before the samples, which in turn means the exemplar is rejected
// for missing series, since series are created due to samples.
if len ( a . exemplars ) > 0 {
rec = enc . Exemplars ( exemplarsForEncoding ( a . exemplars ) , buf )
buf = rec [ : 0 ]
if err := a . head . wal . Log ( rec ) ; err != nil {
return fmt . Errorf ( "log exemplars: %w" , err )
}
}
return nil
}
func exemplarsForEncoding ( es [ ] exemplarWithSeriesRef ) [ ] record . RefExemplar {
ret := make ( [ ] record . RefExemplar , 0 , len ( es ) )
for _ , e := range es {
ret = append ( ret , record . RefExemplar {
Ref : chunks . HeadSeriesRef ( e . ref ) ,
T : e . exemplar . Ts ,
V : e . exemplar . Value ,
Labels : e . exemplar . Labels ,
} )
}
return ret
}
type appenderCommitContext struct {
floatsAppended int
histogramsAppended int
// Number of samples out of order but accepted: with ooo enabled and within time window.
oooFloatsAccepted int
oooHistogramAccepted int
// Number of samples rejected due to: out of order but OOO support disabled.
floatOOORejected int
histoOOORejected int
// Number of samples rejected due to: out of order but too old (OOO support enabled, but outside time window).
floatTooOldRejected int
histoTooOldRejected int
// Number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled).
floatOOBRejected int
histoOOBRejected int
inOrderMint int64
inOrderMaxt int64
oooMinT int64
oooMaxT int64
wblSamples [ ] record . RefSample
wblHistograms [ ] record . RefHistogramSample
wblFloatHistograms [ ] record . RefFloatHistogramSample
oooMmapMarkers map [ chunks . HeadSeriesRef ] [ ] chunks . ChunkDiskMapperRef
oooMmapMarkersCount int
oooRecords [ ] [ ] byte
oooCapMax int64
appendChunkOpts chunkOpts
enc record . Encoder
}
// commitExemplars adds all exemplars from headAppender to the head's exemplar storage.
func ( a * headAppender ) commitExemplars ( ) {
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _ , e := range a . exemplars {
s := a . head . series . getByID ( chunks . HeadSeriesRef ( e . ref ) )
if s == nil {
// This is very unlikely to happen, but we have seen it in the wild.
// It means that the series was truncated between AppendExemplar and Commit.
// See TestHeadCompactionWhileAppendAndCommitExemplar.
continue
}
// We don't instrument exemplar appends here, all is instrumented by storage.
if err := a . head . exemplars . AddExemplar ( s . labels ( ) , e . exemplar ) ; err != nil {
if errors . Is ( err , storage . ErrOutOfOrderExemplar ) {
continue
}
a . head . logger . Debug ( "Unknown error while adding exemplar" , "err" , err )
}
}
}
func ( acc * appenderCommitContext ) collectOOORecords ( a * headAppender ) {
if a . head . wbl == nil {
// WBL is not enabled. So no need to collect.
acc . wblSamples = nil
acc . wblHistograms = nil
acc . wblFloatHistograms = nil
acc . oooMmapMarkers = nil
acc . oooMmapMarkersCount = 0
return
}
// The m-map happens before adding a new sample. So we collect
// the m-map markers first, and then samples.
// WBL Graphically:
// WBL Before this Commit(): [old samples before this commit for chunk 1]
// WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
if acc . oooMmapMarkers != nil {
markers := make ( [ ] record . RefMmapMarker , 0 , acc . oooMmapMarkersCount )
for ref , mmapRefs := range acc . oooMmapMarkers {
for _ , mmapRef := range mmapRefs {
markers = append ( markers , record . RefMmapMarker {
Ref : ref ,
MmapRef : mmapRef ,
} )
}
}
r := acc . enc . MmapMarkers ( markers , a . head . getBytesBuffer ( ) )
acc . oooRecords = append ( acc . oooRecords , r )
}
if len ( acc . wblSamples ) > 0 {
r := acc . enc . Samples ( acc . wblSamples , a . head . getBytesBuffer ( ) )
acc . oooRecords = append ( acc . oooRecords , r )
}
if len ( acc . wblHistograms ) > 0 {
r := acc . enc . HistogramSamples ( acc . wblHistograms , a . head . getBytesBuffer ( ) )
acc . oooRecords = append ( acc . oooRecords , r )
}
if len ( acc . wblFloatHistograms ) > 0 {
r := acc . enc . FloatHistogramSamples ( acc . wblFloatHistograms , a . head . getBytesBuffer ( ) )
acc . oooRecords = append ( acc . oooRecords , r )
}
acc . wblSamples = nil
acc . wblHistograms = nil
acc . wblFloatHistograms = nil
acc . oooMmapMarkers = nil
}
// handleAppendableError processes errors encountered during sample appending and updates
// the provided counters accordingly.
//
// Parameters:
// - err: The error encountered during appending.
// - appended: Pointer to the counter tracking the number of successfully appended samples.
// - oooRejected: Pointer to the counter tracking the number of out-of-order samples rejected.
// - oobRejected: Pointer to the counter tracking the number of out-of-bounds samples rejected.
// - tooOldRejected: Pointer to the counter tracking the number of too-old samples rejected.
func handleAppendableError ( err error , appended , oooRejected , oobRejected , tooOldRejected * int ) {
switch {
case errors . Is ( err , storage . ErrOutOfOrderSample ) :
* appended --
* oooRejected ++
case errors . Is ( err , storage . ErrOutOfBounds ) :
* appended --
* oobRejected ++
case errors . Is ( err , storage . ErrTooOldSample ) :
* appended --
* tooOldRejected ++
default :
* appended --
}
}
// commitSamples processes and commits the samples in the headAppender to the series.
// It handles both in-order and out-of-order samples, updating the appenderCommitContext
// with the results of the append operations.
//
// The function iterates over the samples in the headAppender and attempts to append each sample
// to its corresponding series. It handles various error cases such as out-of-order samples,
// out-of-bounds samples, and too-old samples, updating the appenderCommitContext accordingly.
//
// For out-of-order samples, it checks if the sample can be inserted into the series and updates
// the out-of-order mmap markers if necessary. It also updates the write-ahead log (WBL) samples
// and the minimum and maximum timestamps for out-of-order samples.
//
// For in-order samples, it attempts to append the sample to the series and updates the minimum
// and maximum timestamps for in-order samples.
//
// The function also increments the chunk metrics if a new chunk is created and performs cleanup
// operations on the series after appending the samples.
//
// There are also specific functions to commit histograms and float histograms.
func ( a * headAppender ) commitSamples ( acc * appenderCommitContext ) {
var ok , chunkCreated bool
var series * memSeries
for i , s := range a . samples {
series = a . sampleSeries [ i ]
series . Lock ( )
oooSample , _ , err := series . appendable ( s . T , s . V , a . headMaxt , a . minValidTime , a . oooTimeWindow )
if err != nil {
handleAppendableError ( err , & acc . floatsAppended , & acc . floatOOORejected , & acc . floatOOBRejected , & acc . floatTooOldRejected )
}
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
switch {
case err != nil :
// Do nothing here.
case oooSample :
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs [ ] chunks . ChunkDiskMapperRef
ok , chunkCreated , mmapRefs = series . insert ( s . T , s . V , nil , nil , a . head . chunkDiskMapper , acc . oooCapMax , a . head . logger )
if chunkCreated {
r , ok := acc . oooMmapMarkers [ series . ref ]
if ! ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != nil means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
acc . collectOOORecords ( a )
}
if acc . oooMmapMarkers == nil {
acc . oooMmapMarkers = make ( map [ chunks . HeadSeriesRef ] [ ] chunks . ChunkDiskMapperRef )
}
if len ( mmapRefs ) > 0 {
acc . oooMmapMarkers [ series . ref ] = mmapRefs
acc . oooMmapMarkersCount += len ( mmapRefs )
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
acc . oooMmapMarkers [ series . ref ] = [ ] chunks . ChunkDiskMapperRef { 0 }
acc . oooMmapMarkersCount ++
}
}
if ok {
acc . wblSamples = append ( acc . wblSamples , s )
if s . T < acc . oooMinT {
acc . oooMinT = s . T
}
if s . T > acc . oooMaxT {
acc . oooMaxT = s . T
}
acc . oooFloatsAccepted ++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
acc . floatsAppended --
}
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
default :
ok , chunkCreated = series . append ( s . T , s . V , a . appendID , acc . appendChunkOpts )
if ok {
if s . T < acc . inOrderMint {
acc . inOrderMint = s . T
}
if s . T > acc . inOrderMaxt {
acc . inOrderMaxt = s . T
}
} else {
// The sample is an exact duplicate, and should be silently dropped.
acc . floatsAppended --
}
}
if chunkCreated {
a . head . metrics . chunks . Inc ( )
a . head . metrics . chunksCreated . Inc ( )
}
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
}
}
// For details on the commitHistograms function, see the commitSamples docs.
func ( a * headAppender ) commitHistograms ( acc * appenderCommitContext ) {
var ok , chunkCreated bool
var series * memSeries
for i , s := range a . histograms {
series = a . histogramSeries [ i ]
series . Lock ( )
oooSample , _ , err := series . appendableHistogram ( s . T , s . H , a . headMaxt , a . minValidTime , a . oooTimeWindow , a . head . opts . EnableOOONativeHistograms . Load ( ) )
if err != nil {
handleAppendableError ( err , & acc . histogramsAppended , & acc . histoOOORejected , & acc . histoOOBRejected , & acc . histoTooOldRejected )
}
switch {
case err != nil :
// Do nothing here.
case oooSample :
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs [ ] chunks . ChunkDiskMapperRef
ok , chunkCreated , mmapRefs = series . insert ( s . T , 0 , s . H , nil , a . head . chunkDiskMapper , acc . oooCapMax , a . head . logger )
if chunkCreated {
r , ok := acc . oooMmapMarkers [ series . ref ]
if ! ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
acc . collectOOORecords ( a )
}
if acc . oooMmapMarkers == nil {
acc . oooMmapMarkers = make ( map [ chunks . HeadSeriesRef ] [ ] chunks . ChunkDiskMapperRef )
}
if len ( mmapRefs ) > 0 {
acc . oooMmapMarkers [ series . ref ] = mmapRefs
acc . oooMmapMarkersCount += len ( mmapRefs )
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
acc . oooMmapMarkers [ series . ref ] = [ ] chunks . ChunkDiskMapperRef { 0 }
acc . oooMmapMarkersCount ++
}
}
if ok {
acc . wblHistograms = append ( acc . wblHistograms , s )
if s . T < acc . oooMinT {
acc . oooMinT = s . T
}
if s . T > acc . oooMaxT {
acc . oooMaxT = s . T
}
acc . oooHistogramAccepted ++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
acc . histogramsAppended --
}
default :
ok , chunkCreated = series . appendHistogram ( s . T , s . H , a . appendID , acc . appendChunkOpts )
if ok {
if s . T < acc . inOrderMint {
acc . inOrderMint = s . T
}
if s . T > acc . inOrderMaxt {
acc . inOrderMaxt = s . T
}
} else {
acc . histogramsAppended --
acc . histoOOORejected ++
}
}
if chunkCreated {
a . head . metrics . chunks . Inc ( )
a . head . metrics . chunksCreated . Inc ( )
}
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
}
}
// For details on the commitFloatHistograms function, see the commitSamples docs.
func ( a * headAppender ) commitFloatHistograms ( acc * appenderCommitContext ) {
var ok , chunkCreated bool
var series * memSeries
for i , s := range a . floatHistograms {
series = a . floatHistogramSeries [ i ]
series . Lock ( )
oooSample , _ , err := series . appendableFloatHistogram ( s . T , s . FH , a . headMaxt , a . minValidTime , a . oooTimeWindow , a . head . opts . EnableOOONativeHistograms . Load ( ) )
if err != nil {
handleAppendableError ( err , & acc . histogramsAppended , & acc . histoOOORejected , & acc . histoOOBRejected , & acc . histoTooOldRejected )
}
switch {
case err != nil :
// Do nothing here.
case oooSample :
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs [ ] chunks . ChunkDiskMapperRef
ok , chunkCreated , mmapRefs = series . insert ( s . T , 0 , nil , s . FH , a . head . chunkDiskMapper , acc . oooCapMax , a . head . logger )
if chunkCreated {
r , ok := acc . oooMmapMarkers [ series . ref ]
if ! ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
acc . collectOOORecords ( a )
}
if acc . oooMmapMarkers == nil {
acc . oooMmapMarkers = make ( map [ chunks . HeadSeriesRef ] [ ] chunks . ChunkDiskMapperRef )
}
if len ( mmapRefs ) > 0 {
acc . oooMmapMarkers [ series . ref ] = mmapRefs
acc . oooMmapMarkersCount += len ( mmapRefs )
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
acc . oooMmapMarkers [ series . ref ] = [ ] chunks . ChunkDiskMapperRef { 0 }
acc . oooMmapMarkersCount ++
}
}
if ok {
acc . wblFloatHistograms = append ( acc . wblFloatHistograms , s )
if s . T < acc . oooMinT {
acc . oooMinT = s . T
}
if s . T > acc . oooMaxT {
acc . oooMaxT = s . T
}
acc . oooHistogramAccepted ++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
acc . histogramsAppended --
}
default :
ok , chunkCreated = series . appendFloatHistogram ( s . T , s . FH , a . appendID , acc . appendChunkOpts )
if ok {
if s . T < acc . inOrderMint {
acc . inOrderMint = s . T
}
if s . T > acc . inOrderMaxt {
acc . inOrderMaxt = s . T
}
} else {
acc . histogramsAppended --
acc . histoOOORejected ++
}
}
if chunkCreated {
a . head . metrics . chunks . Inc ( )
a . head . metrics . chunksCreated . Inc ( )
}
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
}
}
// commitMetadata commits the metadata for each series in the headAppender.
// It iterates over the metadata slice and updates the corresponding series
// with the new metadata information. The series is locked during the update
// to ensure thread safety.
func ( a * headAppender ) commitMetadata ( ) {
var series * memSeries
for i , m := range a . metadata {
series = a . metadataSeries [ i ]
series . Lock ( )
series . meta = & metadata . Metadata { Type : record . ToMetricType ( m . Type ) , Unit : m . Unit , Help : m . Help }
series . Unlock ( )
}
}
// Commit writes to the WAL and adds the data to the Head.
// TODO(codesome): Refactor this method to reduce indentation and make it more readable.
func ( a * headAppender ) Commit ( ) ( err error ) {
if a . closed {
return ErrAppenderClosed
}
defer func ( ) { a . closed = true } ( )
if err := a . log ( ) ; err != nil {
_ = a . Rollback ( ) // Most likely the same error will happen again.
return fmt . Errorf ( "write to WAL: %w" , err )
}
if a . head . writeNotified != nil {
a . head . writeNotified . Notify ( )
}
a . commitExemplars ( )
defer a . head . metrics . activeAppenders . Dec ( )
defer a . head . putAppendBuffer ( a . samples )
defer a . head . putSeriesBuffer ( a . sampleSeries )
defer a . head . putExemplarBuffer ( a . exemplars )
defer a . head . putHistogramBuffer ( a . histograms )
defer a . head . putFloatHistogramBuffer ( a . floatHistograms )
defer a . head . putMetadataBuffer ( a . metadata )
defer a . head . iso . closeAppend ( a . appendID )
acc := & appenderCommitContext {
floatsAppended : len ( a . samples ) ,
histogramsAppended : len ( a . histograms ) + len ( a . floatHistograms ) ,
inOrderMint : math . MaxInt64 ,
inOrderMaxt : math . MinInt64 ,
oooMinT : math . MaxInt64 ,
oooMaxT : math . MinInt64 ,
oooCapMax : a . head . opts . OutOfOrderCapMax . Load ( ) ,
appendChunkOpts : chunkOpts {
chunkDiskMapper : a . head . chunkDiskMapper ,
chunkRange : a . head . chunkRange . Load ( ) ,
samplesPerChunk : a . head . opts . SamplesPerChunk ,
} ,
}
defer func ( ) {
for i := range acc . oooRecords {
a . head . putBytesBuffer ( acc . oooRecords [ i ] [ : 0 ] )
}
} ( )
a . commitSamples ( acc )
a . commitHistograms ( acc )
a . commitFloatHistograms ( acc )
a . commitMetadata ( )
a . head . metrics . outOfOrderSamples . WithLabelValues ( sampleMetricTypeFloat ) . Add ( float64 ( acc . floatOOORejected ) )
a . head . metrics . outOfOrderSamples . WithLabelValues ( sampleMetricTypeHistogram ) . Add ( float64 ( acc . histoOOORejected ) )
a . head . metrics . outOfBoundSamples . WithLabelValues ( sampleMetricTypeFloat ) . Add ( float64 ( acc . floatOOBRejected ) )
a . head . metrics . tooOldSamples . WithLabelValues ( sampleMetricTypeFloat ) . Add ( float64 ( acc . floatTooOldRejected ) )
a . head . metrics . samplesAppended . WithLabelValues ( sampleMetricTypeFloat ) . Add ( float64 ( acc . floatsAppended ) )
a . head . metrics . samplesAppended . WithLabelValues ( sampleMetricTypeHistogram ) . Add ( float64 ( acc . histogramsAppended ) )
a . head . metrics . outOfOrderSamplesAppended . WithLabelValues ( sampleMetricTypeFloat ) . Add ( float64 ( acc . oooFloatsAccepted ) )
a . head . metrics . outOfOrderSamplesAppended . WithLabelValues ( sampleMetricTypeHistogram ) . Add ( float64 ( acc . oooHistogramAccepted ) )
a . head . updateMinMaxTime ( acc . inOrderMint , acc . inOrderMaxt )
a . head . updateMinOOOMaxOOOTime ( acc . oooMinT , acc . oooMaxT )
acc . collectOOORecords ( a )
if a . head . wbl != nil {
if err := a . head . wbl . Log ( acc . oooRecords ... ) ; err != nil {
// TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging
// until we have found what samples become OOO. We can try having a metric for this failure.
// Returning the error here is not correct because we have already put the samples into the memory,
// hence the append/insert was a success.
a . head . logger . Error ( "Failed to log out of order samples into the WAL" , "err" , err )
}
}
return nil
}
// insert is like append, except it inserts. Used for OOO samples.
func ( s * memSeries ) insert ( t int64 , v float64 , h * histogram . Histogram , fh * histogram . FloatHistogram , chunkDiskMapper * chunks . ChunkDiskMapper , oooCapMax int64 , logger * slog . Logger ) ( inserted , chunkCreated bool , mmapRefs [ ] chunks . ChunkDiskMapperRef ) {
if s . ooo == nil {
s . ooo = & memSeriesOOOFields { }
}
c := s . ooo . oooHeadChunk
if c == nil || c . chunk . NumSamples ( ) == int ( oooCapMax ) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c , mmapRefs = s . cutNewOOOHeadChunk ( t , chunkDiskMapper , logger )
chunkCreated = true
}
ok := c . chunk . Insert ( t , v , h , fh )
if ok {
if chunkCreated || t < c . minTime {
c . minTime = t
}
if chunkCreated || t > c . maxTime {
c . maxTime = t
}
}
return ok , chunkCreated , mmapRefs
}
// chunkOpts are chunk-level options that are passed when appending to a memSeries.
type chunkOpts struct {
chunkDiskMapper * chunks . ChunkDiskMapper
chunkRange int64
samplesPerChunk int
}
// append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// Series lock must be held when calling.
func ( s * memSeries ) append ( t int64 , v float64 , appendID uint64 , o chunkOpts ) ( sampleInOrder , chunkCreated bool ) {
c , sampleInOrder , chunkCreated := s . appendPreprocessor ( t , chunkenc . EncXOR , o )
if ! sampleInOrder {
return sampleInOrder , chunkCreated
}
s . app . Append ( t , v )
c . maxTime = t
s . lastValue = v
s . lastHistogramValue = nil
s . lastFloatHistogramValue = nil
if appendID > 0 {
s . txs . add ( appendID )
}
return true , chunkCreated
}
Style cleanup of all the changes in sparsehistogram so far
A lot of this code was hacked together, literally during a
hackathon. This commit intends not to change the code substantially,
but just make the code obey the usual style practices.
A (possibly incomplete) list of areas:
* Generally address linter warnings.
* The `pgk` directory is deprecated as per dev-summit. No new packages should
be added to it. I moved the new `pkg/histogram` package to `model`
anticipating what's proposed in #9478.
* Make the naming of the Sparse Histogram more consistent. Including
abbreviations, there were just too many names for it: SparseHistogram,
Histogram, Histo, hist, his, shs, h. The idea is to call it "Histogram" in
general. Only add "Sparse" if it is needed to avoid confusion with
conventional Histograms (which is rare because the TSDB really has no notion
of conventional Histograms). Use abbreviations only in local scope, and then
really abbreviate (not just removing three out of seven letters like in
"Histo"). This is in the spirit of
https://github.com/golang/go/wiki/CodeReviewComments#variable-names
* Several other minor name changes.
* A lot of formatting of doc comments. For one, following
https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences
, but also layout question, anticipating how things will look like
when rendered by `godoc` (even where `godoc` doesn't render them
right now because they are for unexported types or not a doc comment
at all but just a normal code comment - consistency is queen!).
* Re-enabled `TestQueryLog` and `TestEndopints` (they pass now,
leaving them disabled was presumably an oversight).
* Bucket iterator for histogram.Histogram is now created with a
method.
* HistogramChunk.iterator now allows iterator recycling. (I think
@dieterbe only commented it out because he was confused by the
question in the comment.)
* HistogramAppender.Append panics now because we decided to treat
staleness marker differently.
Signed-off-by: beorn7 <beorn@grafana.com>
3 years ago
// appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func ( s * memSeries ) appendHistogram ( t int64 , h * histogram . Histogram , appendID uint64 , o chunkOpts ) ( sampleInOrder , chunkCreated bool ) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp , _ := s . app . ( * chunkenc . HistogramAppender )
c , sampleInOrder , chunkCreated := s . histogramsAppendPreprocessor ( t , chunkenc . EncHistogram , o )
if ! sampleInOrder {
return sampleInOrder , chunkCreated
}
var (
newChunk chunkenc . Chunk
recoded bool
)
if ! chunkCreated {
// Ignore the previous appender if we continue the current chunk.
prevApp = nil
}
newChunk , recoded , s . app , _ = s . app . AppendHistogram ( prevApp , t , h , false ) // false=request a new chunk if needed
s . lastHistogramValue = h
s . lastFloatHistogramValue = nil
if appendID > 0 {
s . txs . add ( appendID )
}
if newChunk == nil { // Sample was appended to existing chunk or is the first sample in a new chunk.
c . maxTime = t
return true , chunkCreated
}
if recoded { // The appender needed to recode the chunk.
c . maxTime = t
c . chunk = newChunk
return true , false
}
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
s . headChunks = & memChunk {
chunk : newChunk ,
minTime : t ,
maxTime : t ,
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
prev : s . headChunks ,
}
s . nextAt = rangeForTimestamp ( t , o . chunkRange )
return true , true
}
// appendFloatHistogram adds the float histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped.
// To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total
// consistent, we return chunkCreated=false in this case.
func ( s * memSeries ) appendFloatHistogram ( t int64 , fh * histogram . FloatHistogram , appendID uint64 , o chunkOpts ) ( sampleInOrder , chunkCreated bool ) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards and mmap used up chunks.
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp , _ := s . app . ( * chunkenc . FloatHistogramAppender )
c , sampleInOrder , chunkCreated := s . histogramsAppendPreprocessor ( t , chunkenc . EncFloatHistogram , o )
if ! sampleInOrder {
return sampleInOrder , chunkCreated
}
var (
newChunk chunkenc . Chunk
recoded bool
)
if ! chunkCreated {
// Ignore the previous appender if we continue the current chunk.
prevApp = nil
}
newChunk , recoded , s . app , _ = s . app . AppendFloatHistogram ( prevApp , t , fh , false ) // False means request a new chunk if needed.
s . lastHistogramValue = nil
s . lastFloatHistogramValue = fh
if appendID > 0 {
s . txs . add ( appendID )
}
if newChunk == nil { // Sample was appended to existing chunk or is the first sample in a new chunk.
c . maxTime = t
return true , chunkCreated
}
if recoded { // The appender needed to recode the chunk.
c . maxTime = t
c . chunk = newChunk
return true , false
}
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
s . headChunks = & memChunk {
chunk : newChunk ,
minTime : t ,
maxTime : t ,
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
prev : s . headChunks ,
}
s . nextAt = rangeForTimestamp ( t , o . chunkRange )
return true , true
}
// appendPreprocessor takes care of cutting new XOR chunks and m-mapping old ones. XOR chunks are cut based on the
// number of samples they contain with a soft cap in bytes.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func ( s * memSeries ) appendPreprocessor ( t int64 , e chunkenc . Encoding , o chunkOpts ) ( c * memChunk , sampleInOrder , chunkCreated bool ) {
// We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut
// a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a
// maximally-sized sample (19 bytes).
const maxBytesPerXORChunk = chunkenc . MaxBytesPerXORChunk - 19
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
c = s . headChunks
if c == nil {
if len ( s . mmappedChunks ) > 0 && s . mmappedChunks [ len ( s . mmappedChunks ) - 1 ] . maxTime >= t {
// Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it.
return c , false , false
}
// There is no head chunk in this series yet, create the first chunk for the sample.
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
c = s . cutNewHeadChunk ( t , e , o . chunkRange )
chunkCreated = true
}
// Out of order sample.
if c . maxTime >= t {
return c , false , chunkCreated
}
// Check the chunk size, unless we just created it and if the chunk is too large, cut a new one.
if ! chunkCreated && len ( c . chunk . Bytes ( ) ) > maxBytesPerXORChunk {
c = s . cutNewHeadChunk ( t , e , o . chunkRange )
chunkCreated = true
}
if c . chunk . Encoding ( ) != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
c = s . cutNewHeadChunk ( t , e , o . chunkRange )
chunkCreated = true
}
numSamples := c . chunk . NumSamples ( )
if numSamples == 0 {
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c . minTime = t
s . nextAt = rangeForTimestamp ( c . minTime , o . chunkRange )
}
// If we reach 25% of a chunk's desired sample count, predict an end time
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == o . samplesPerChunk / 4 {
s . nextAt = computeChunkEndTime ( c . minTime , c . maxTime , s . nextAt , 4 )
}
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
// most likely because samples rate has changed and now they are arriving more frequently.
// Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s . nextAt || numSamples >= o . samplesPerChunk * 2 {
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
c = s . cutNewHeadChunk ( t , e , o . chunkRange )
chunkCreated = true
}
return c , true , chunkCreated
}
// histogramsAppendPreprocessor takes care of cutting new histogram chunks and m-mapping old ones. Histogram chunks are
// cut based on their size in bytes.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func ( s * memSeries ) histogramsAppendPreprocessor ( t int64 , e chunkenc . Encoding , o chunkOpts ) ( c * memChunk , sampleInOrder , chunkCreated bool ) {
c = s . headChunks
if c == nil {
if len ( s . mmappedChunks ) > 0 && s . mmappedChunks [ len ( s . mmappedChunks ) - 1 ] . maxTime >= t {
// Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it.
return c , false , false
}
// There is no head chunk in this series yet, create the first chunk for the sample.
c = s . cutNewHeadChunk ( t , e , o . chunkRange )
chunkCreated = true
}
// Out of order sample.
if c . maxTime >= t {
return c , false , chunkCreated
}
if c . chunk . Encoding ( ) != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
c = s . cutNewHeadChunk ( t , e , o . chunkRange )
chunkCreated = true
}
numSamples := c . chunk . NumSamples ( )
targetBytes := chunkenc . TargetBytesPerHistogramChunk
numBytes := len ( c . chunk . Bytes ( ) )
if numSamples == 0 {
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c . minTime = t
s . nextAt = rangeForTimestamp ( c . minTime , o . chunkRange )
}
// Below, we will enforce chunkenc.MinSamplesPerHistogramChunk. There are, however, two cases that supersede it:
// - The current chunk range is ending before chunkenc.MinSamplesPerHistogramChunk will be satisfied.
// - s.nextAt was set while loading a chunk snapshot with the intent that a new chunk be cut on the next append.
var nextChunkRangeStart int64
if s . histogramChunkHasComputedEndTime {
nextChunkRangeStart = rangeForTimestamp ( c . minTime , o . chunkRange )
} else {
// If we haven't yet computed an end time yet, s.nextAt is either set to
// rangeForTimestamp(c.minTime, o.chunkRange) or was set while loading a chunk snapshot. Either way, we want to
// skip enforcing chunkenc.MinSamplesPerHistogramChunk.
nextChunkRangeStart = s . nextAt
}
// If we reach 25% of a chunk's desired maximum size, predict an end time
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At the latest it must happen at the timestamp set when the chunk was cut.
if ! s . histogramChunkHasComputedEndTime && numBytes >= targetBytes / 4 {
ratioToFull := float64 ( targetBytes ) / float64 ( numBytes )
s . nextAt = computeChunkEndTime ( c . minTime , c . maxTime , s . nextAt , ratioToFull )
s . histogramChunkHasComputedEndTime = true
}
// If numBytes > targetBytes*2 then our previous prediction was invalid. This could happen if the sample rate has
// increased or if the bucket/span count has increased.
// Note that next chunk will have its nextAt recalculated for the new rate.
if ( t >= s . nextAt || numBytes >= targetBytes * 2 ) && ( numSamples >= chunkenc . MinSamplesPerHistogramChunk || t >= nextChunkRangeStart ) {
c = s . cutNewHeadChunk ( t , e , o . chunkRange )
chunkCreated = true
}
// The new chunk will also need a new computed end time.
if chunkCreated {
s . histogramChunkHasComputedEndTime = false
}
return c , true , chunkCreated
}
// computeChunkEndTime estimates the end timestamp based the beginning of a
// chunk, its current timestamp and the upper bound up to which we insert data.
// It assumes that the time range is 1/ratioToFull full.
// Assuming that the samples will keep arriving at the same rate, it will make the
// remaining n chunks within this chunk range (before max) equally sized.
func computeChunkEndTime ( start , cur , maxT int64 , ratioToFull float64 ) int64 {
n := float64 ( maxT - start ) / ( float64 ( cur - start + 1 ) * ratioToFull )
if n <= 1 {
return maxT
}
return int64 ( float64 ( start ) + float64 ( maxT - start ) / math . Floor ( n ) )
}
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
func ( s * memSeries ) cutNewHeadChunk ( mint int64 , e chunkenc . Encoding , chunkRange int64 ) * memChunk {
// When cutting a new head chunk we create a new memChunk instance with .prev
// pointing at the current .headChunks, so it forms a linked list.
// All but first headChunks list elements will be m-mapped as soon as possible
// so this is a single element list most of the time.
s . headChunks = & memChunk {
minTime : mint ,
maxTime : math . MinInt64 ,
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
prev : s . headChunks ,
}
if chunkenc . IsValidEncoding ( e ) {
var err error
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
s . headChunks . chunk , err = chunkenc . NewEmptyChunk ( e )
if err != nil {
panic ( err ) // This should never happen.
}
} else {
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
s . headChunks . chunk = chunkenc . NewXORChunk ( )
}
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
s . nextAt = rangeForTimestamp ( mint , chunkRange )
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
app , err := s . headChunks . chunk . Appender ( )
if err != nil {
panic ( err )
}
s . app = app
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
return s . headChunks
}
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s is locked and s.ooo is not nil.
func ( s * memSeries ) cutNewOOOHeadChunk ( mint int64 , chunkDiskMapper * chunks . ChunkDiskMapper , logger * slog . Logger ) ( * oooHeadChunk , [ ] chunks . ChunkDiskMapperRef ) {
ref := s . mmapCurrentOOOHeadChunk ( chunkDiskMapper , logger )
s . ooo . oooHeadChunk = & oooHeadChunk {
chunk : NewOOOChunk ( ) ,
minTime : mint ,
maxTime : math . MinInt64 ,
}
return s . ooo . oooHeadChunk , ref
}
// s must be locked when calling.
func ( s * memSeries ) mmapCurrentOOOHeadChunk ( chunkDiskMapper * chunks . ChunkDiskMapper , logger * slog . Logger ) [ ] chunks . ChunkDiskMapperRef {
if s . ooo == nil || s . ooo . oooHeadChunk == nil {
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
return nil
}
chks , err := s . ooo . oooHeadChunk . chunk . ToEncodedChunks ( math . MinInt64 , math . MaxInt64 )
if err != nil {
handleChunkWriteError ( err )
return nil
}
chunkRefs := make ( [ ] chunks . ChunkDiskMapperRef , 0 , len ( chks ) )
for _ , memchunk := range chks {
if len ( s . ooo . oooMmappedChunks ) >= ( oooChunkIDMask - 1 ) {
logger . Error ( "Too many OOO chunks, dropping data" , "series" , s . lset . String ( ) )
break
}
chunkRef := chunkDiskMapper . WriteChunk ( s . ref , memchunk . minTime , memchunk . maxTime , memchunk . chunk , true , handleChunkWriteError )
chunkRefs = append ( chunkRefs , chunkRef )
s . ooo . oooMmappedChunks = append ( s . ooo . oooMmappedChunks , & mmappedChunk {
ref : chunkRef ,
numSamples : uint16 ( memchunk . chunk . NumSamples ( ) ) ,
minTime : memchunk . minTime ,
maxTime : memchunk . maxTime ,
} )
}
s . ooo . oooHeadChunk = nil
return chunkRefs
}
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
// mmapChunks will m-map all but first chunk on s.headChunks list.
func ( s * memSeries ) mmapChunks ( chunkDiskMapper * chunks . ChunkDiskMapper ) ( count int ) {
if s . headChunks == nil || s . headChunks . prev == nil {
// There is none or only one head chunk, so nothing to m-map here.
return
}
// Write chunks starting from the oldest one and stop before we get to current s.headChunks.
// If we have this chain: s.headChunks{t4} -> t3 -> t2 -> t1 -> t0
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks.
When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call.
If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending
our sample to it.
Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed.
When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes.
Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait
for it to be mmapped.
If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting
queries and scrapes.
Queries might timeout, since by default they have a 2 minute timeout set.
Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries
or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything.
To avoid this we need to remove mmapping from append path, since mmapping is blocking.
But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later.
This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples,
while older, yet to be mmapped, chunks are linked to it.
Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger
it manually, which reduces the risk that it will have to compete for mmap locks with other chunks.
Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
1 year ago
// then we need to write chunks t0 to t3, but skip s.headChunks.
for i := s . headChunks . len ( ) - 1 ; i > 0 ; i -- {
chk := s . headChunks . atOffset ( i )
chunkRef := chunkDiskMapper . WriteChunk ( s . ref , chk . minTime , chk . maxTime , chk . chunk , false , handleChunkWriteError )
s . mmappedChunks = append ( s . mmappedChunks , & mmappedChunk {
ref : chunkRef ,
numSamples : uint16 ( chk . chunk . NumSamples ( ) ) ,
minTime : chk . minTime ,
maxTime : chk . maxTime ,
} )
count ++
}
// Once we've written out all chunks except s.headChunks we need to unlink these from s.headChunk.
s . headChunks . prev = nil
return count
}
func handleChunkWriteError ( err error ) {
if err != nil && ! errors . Is ( err , chunks . ErrChunkDiskMapperClosed ) {
panic ( err )
}
}
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL.
func ( a * headAppender ) Rollback ( ) ( err error ) {
if a . closed {
return ErrAppenderClosed
}
defer func ( ) { a . closed = true } ( )
defer a . head . metrics . activeAppenders . Dec ( )
defer a . head . iso . closeAppend ( a . appendID )
defer a . head . putSeriesBuffer ( a . sampleSeries )
var series * memSeries
for i := range a . samples {
series = a . sampleSeries [ i ]
series . Lock ( )
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
}
for i := range a . histograms {
series = a . histogramSeries [ i ]
series . Lock ( )
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
}
a . head . putAppendBuffer ( a . samples )
a . head . putExemplarBuffer ( a . exemplars )
a . head . putHistogramBuffer ( a . histograms )
a . head . putFloatHistogramBuffer ( a . floatHistograms )
a . head . putMetadataBuffer ( a . metadata )
a . samples = nil
a . exemplars = nil
a . histograms = nil
a . metadata = nil
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
return a . log ( )
}