tsdb: Add support for ingestion of out-of-order native histogram samples (#14546)

Add support for ingesting OOO native histograms

* Add flag for enabling and disabling OOO native histogram ingestion
* Update OOO querying tests to include native histogram samples
* Add OOO head tests
* Add test for OOO native histogram counter reset headers

Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Co-authored by: Carrie Edwards <edwrdscarrie@gmail.com>
Co-authored by: Jeanette Tan <jeanette.tan@grafana.com>
Co-authored by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Co-authored by: Fiona Liao <fiona.liao@grafana.com>
pull/14505/head
Carrie Edwards 2 months ago committed by GitHub
parent 919dc0cbc6
commit 14e3c05ce8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -230,6 +230,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "ooo-native-histograms":
c.tsdb.EnableOOONativeHistograms = true
level.Info(logger).Log("msg", "Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true")
case "created-timestamp-zero-ingestion":
c.scrape.EnableCreatedTimestampZeroIngestion = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
@ -1735,6 +1738,7 @@ type tsdbOptions struct {
EnableNativeHistograms bool
EnableDelayedCompaction bool
EnableOverlappingCompaction bool
EnableOOONativeHistograms bool
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@ -1754,6 +1758,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
MaxExemplars: opts.MaxExemplars,
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
EnableNativeHistograms: opts.EnableNativeHistograms,
EnableOOONativeHistograms: opts.EnableOOONativeHistograms,
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
EnableDelayedCompaction: opts.EnableDelayedCompaction,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,

@ -43,6 +43,7 @@ var (
ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength)
ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0")
ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled")
ErrOOONativeHistogramsDisabled = fmt.Errorf("out-of-order native histogram ingestion is disabled")
// ErrOutOfOrderCT indicates failed append of CT to the storage
// due to CT being older the then newer sample.

@ -173,6 +173,12 @@ type Options struct {
// EnableNativeHistograms enables the ingestion of native histograms.
EnableNativeHistograms bool
// EnableOOONativeHistograms enables the ingestion of OOO native histograms.
// It will only take effect if EnableNativeHistograms is set to true and the
// OutOfOrderTimeWindow is > 0. This flag will be removed after testing of
// OOO Native Histogram ingestion is complete.
EnableOOONativeHistograms bool
// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
// This can change during run-time, so this value from here should only be used
// while initialising.
@ -948,6 +954,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.MaxExemplars.Store(opts.MaxExemplars)
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
headOpts.EnableOOONativeHistograms.Store(opts.EnableOOONativeHistograms)
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.EnableSharding = opts.EnableSharding
@ -1172,6 +1179,16 @@ func (db *DB) DisableNativeHistograms() {
db.head.DisableNativeHistograms()
}
// EnableOOONativeHistograms enables the ingestion of out-of-order native histograms.
func (db *DB) EnableOOONativeHistograms() {
db.head.EnableOOONativeHistograms()
}
// DisableOOONativeHistograms disables the ingestion of out-of-order native histograms.
func (db *DB) DisableOOONativeHistograms() {
db.head.DisableOOONativeHistograms()
}
// dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppender struct {

File diff suppressed because it is too large Load Diff

@ -150,6 +150,11 @@ type HeadOptions struct {
// EnableNativeHistograms enables the ingestion of native histograms.
EnableNativeHistograms atomic.Bool
// EnableOOONativeHistograms enables the ingestion of OOO native histograms.
// It will only take effect if EnableNativeHistograms is set to true and the
// OutOfOrderTimeWindow is > 0
EnableOOONativeHistograms atomic.Bool
// EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableCreatedTimestampZeroIngestion bool
@ -1018,6 +1023,16 @@ func (h *Head) DisableNativeHistograms() {
h.opts.EnableNativeHistograms.Store(false)
}
// EnableOOONativeHistograms enables the ingestion of out-of-order native histograms.
func (h *Head) EnableOOONativeHistograms() {
h.opts.EnableOOONativeHistograms.Store(true)
}
// DisableOOONativeHistograms disables the ingestion of out-of-order native histograms.
func (h *Head) DisableOOONativeHistograms() {
h.opts.EnableOOONativeHistograms.Store(false)
}
// PostingsCardinalityStats returns highest cardinality stats by label and value names.
func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats {
cacheKey := statsByLabelName + ";" + strconv.Itoa(limit)

@ -321,8 +321,8 @@ type headAppender struct {
}
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append.
// If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work.
// Fail fast if OOO is disabled and the sample is out of bounds.
// Otherwise a full check will be done later to decide if the sample is in-order or out-of-order.
if a.oooTimeWindow == 0 && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return 0, storage.ErrOutOfBounds
@ -493,46 +493,94 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
return false, headMaxt - t, storage.ErrOutOfOrderSample
}
// appendableHistogram checks whether the given histogram is valid for appending to the series.
func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error {
if s.headChunks == nil {
return nil
// appendableHistogram checks whether the given histogram sample is valid for appending to the series. (if we return false and no error)
// The sample belongs to the out of order chunk if we return true and no error.
// An error signifies the sample cannot be handled.
func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) {
// Check if we can append in the in-order chunk.
if t >= minValidTime {
if s.headChunks == nil {
// The series has no sample and was freshly created.
return false, 0, nil
}
msMaxt := s.maxTime()
if t > msMaxt {
return false, 0, nil
}
if t == msMaxt {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if !h.Equals(s.lastHistogramValue) {
return false, 0, storage.ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
}
}
if t > s.headChunks.maxTime {
return nil
}
if t < s.headChunks.maxTime {
return storage.ErrOutOfOrderSample
// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow {
if !oooHistogramsEnabled {
return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled
}
return true, headMaxt - t, nil
}
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if !h.Equals(s.lastHistogramValue) {
return storage.ErrDuplicateSampleForTimestamp
// The sample cannot go in both in-order and out-of-order chunk.
if oooTimeWindow > 0 {
return true, headMaxt - t, storage.ErrTooOldSample
}
return nil
if t < minValidTime {
return false, headMaxt - t, storage.ErrOutOfBounds
}
return false, headMaxt - t, storage.ErrOutOfOrderSample
}
// appendableFloatHistogram checks whether the given float histogram is valid for appending to the series.
func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error {
if s.headChunks == nil {
return nil
// appendableFloatHistogram checks whether the given float histogram sample is valid for appending to the series. (if we return false and no error)
// The sample belongs to the out of order chunk if we return true and no error.
// An error signifies the sample cannot be handled.
func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) {
// Check if we can append in the in-order chunk.
if t >= minValidTime {
if s.headChunks == nil {
// The series has no sample and was freshly created.
return false, 0, nil
}
msMaxt := s.maxTime()
if t > msMaxt {
return false, 0, nil
}
if t == msMaxt {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
// This only checks against the latest in-order sample.
// The OOO headchunk has its own method to detect these duplicates.
if !fh.Equals(s.lastFloatHistogramValue) {
return false, 0, storage.ErrDuplicateSampleForTimestamp
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
}
}
if t > s.headChunks.maxTime {
return nil
}
if t < s.headChunks.maxTime {
return storage.ErrOutOfOrderSample
// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow {
if !oooHistogramsEnabled {
return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled
}
return true, headMaxt - t, nil
}
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if !fh.Equals(s.lastFloatHistogramValue) {
return storage.ErrDuplicateSampleForTimestamp
// The sample cannot go in both in-order and out-of-order chunk.
if oooTimeWindow > 0 {
return true, headMaxt - t, storage.ErrTooOldSample
}
return nil
if t < minValidTime {
return false, headMaxt - t, storage.ErrOutOfBounds
}
return false, headMaxt - t, storage.ErrOutOfOrderSample
}
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
@ -577,7 +625,9 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
return 0, storage.ErrNativeHistogramsDisabled
}
if t < a.minValidTime {
// Fail fast if OOO is disabled and the sample is out of bounds.
// Otherwise a full check will be done later to decide if the sample is in-order or out-of-order.
if (a.oooTimeWindow == 0 || !a.head.opts.EnableOOONativeHistograms.Load()) && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
return 0, storage.ErrOutOfBounds
}
@ -629,15 +679,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
switch {
case h != nil:
s.Lock()
if err := s.appendableHistogram(t, h); err != nil {
s.Unlock()
if errors.Is(err, storage.ErrOutOfOrderSample) {
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if err != nil {
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
switch {
case errors.Is(err, storage.ErrOutOfOrderSample):
fallthrough
case errors.Is(err, storage.ErrOOONativeHistogramsDisabled):
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
case errors.Is(err, storage.ErrTooOldSample):
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
}
return 0, err
}
s.pendingCommit = true
s.Unlock()
a.histograms = append(a.histograms, record.RefHistogramSample{
Ref: s.ref,
T: t,
@ -646,15 +708,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
a.histogramSeries = append(a.histogramSeries, s)
case fh != nil:
s.Lock()
if err := s.appendableFloatHistogram(t, fh); err != nil {
s.Unlock()
if errors.Is(err, storage.ErrOutOfOrderSample) {
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
switch {
case errors.Is(err, storage.ErrOutOfOrderSample):
fallthrough
case errors.Is(err, storage.ErrOOONativeHistogramsDisabled):
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
case errors.Is(err, storage.ErrTooOldSample):
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
}
return 0, err
}
s.pendingCommit = true
s.Unlock()
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: t,
@ -841,20 +915,24 @@ func (a *headAppender) Commit() (err error) {
floatsAppended = len(a.samples)
histogramsAppended = len(a.histograms) + len(a.floatHistograms)
// number of samples out of order but accepted: with ooo enabled and within time window
oooFloatsAccepted int
oooFloatsAccepted int
oooHistogramAccepted int
// number of samples rejected due to: out of order but OOO support disabled.
floatOOORejected int
histoOOORejected int
// number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window)
floatTooOldRejected int
histoTooOldRejected int
// number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
floatOOBRejected int
floatOOBRejected int
histoOOBRejected int
inOrderMint int64 = math.MaxInt64
inOrderMaxt int64 = math.MinInt64
oooMinT int64 = math.MaxInt64
oooMaxT int64 = math.MinInt64
wblSamples []record.RefSample
wblHistograms []record.RefHistogramSample
wblFloatHistograms []record.RefFloatHistogramSample
oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
oooMmapMarkersCount int
oooRecords [][]byte
@ -876,6 +954,8 @@ func (a *headAppender) Commit() (err error) {
if a.head.wbl == nil {
// WBL is not enabled. So no need to collect.
wblSamples = nil
wblHistograms = nil
wblFloatHistograms = nil
oooMmapMarkers = nil
oooMmapMarkersCount = 0
return
@ -903,8 +983,18 @@ func (a *headAppender) Commit() (err error) {
r := enc.Samples(wblSamples, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
if len(wblHistograms) > 0 {
r := enc.HistogramSamples(wblHistograms, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
if len(wblFloatHistograms) > 0 {
r := enc.FloatHistogramSamples(wblFloatHistograms, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
}
wblSamples = nil
wblHistograms = nil
wblFloatHistograms = nil
oooMmapMarkers = nil
}
for i, s := range a.samples {
@ -1006,51 +1096,193 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
switch {
case err == nil:
// Do nothing.
case errors.Is(err, storage.ErrOutOfOrderSample):
histogramsAppended--
histoOOORejected++
case errors.Is(err, storage.ErrOutOfBounds):
histogramsAppended--
histoOOBRejected++
case errors.Is(err, storage.ErrTooOldSample):
histogramsAppended--
histoTooOldRejected++
default:
histogramsAppended--
}
var ok, chunkCreated bool
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
}
if ok {
wblHistograms = append(wblHistograms, s)
if s.T < oooMinT {
oooMinT = s.T
}
if s.T > oooMaxT {
oooMaxT = s.T
}
oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
histogramsAppended--
}
default:
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsAppended--
histoOOORejected++
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load())
switch {
case err == nil:
// Do nothing.
case errors.Is(err, storage.ErrOutOfOrderSample):
histogramsAppended--
histoOOORejected++
case errors.Is(err, storage.ErrOutOfBounds):
histogramsAppended--
histoOOBRejected++
case errors.Is(err, storage.ErrTooOldSample):
histogramsAppended--
histoTooOldRejected++
default:
histogramsAppended--
}
var ok, chunkCreated bool
switch {
case err != nil:
// Do nothing here.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
}
if ok {
wblFloatHistograms = append(wblFloatHistograms, s)
if s.T < oooMinT {
oooMinT = s.T
}
if s.T > oooMaxT {
oooMaxT = s.T
}
oooHistogramAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
histogramsAppended--
}
default:
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsAppended--
histoOOORejected++
}
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
for i, m := range a.metadata {
@ -1067,6 +1299,7 @@ func (a *headAppender) Commit() (err error) {
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooFloatsAccepted))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(oooHistogramAccepted))
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT)

@ -2723,15 +2723,32 @@ func TestIsolationWithoutAdd(t *testing.T) {
func TestOutOfOrderSamplesMetric(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOutOfOrderSamplesMetric(t, scenario)
options := DefaultOptions()
options.EnableNativeHistograms = true
options.EnableOOONativeHistograms = true
testOutOfOrderSamplesMetric(t, scenario, options, storage.ErrOutOfOrderSample)
})
}
}
func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
func TestOutOfOrderSamplesMetricNativeHistogramOOODisabled(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
if scenario.sampleType != "histogram" {
continue
}
t.Run(name, func(t *testing.T) {
options := DefaultOptions()
options.OutOfOrderTimeWindow = (1000 * time.Minute).Milliseconds()
options.EnableNativeHistograms = true
options.EnableOOONativeHistograms = false
testOutOfOrderSamplesMetric(t, scenario, options, storage.ErrOOONativeHistogramsDisabled)
})
}
}
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario, options *Options, expectOutOfOrderError error) {
dir := t.TempDir()
db, err := Open(dir, nil, nil, options, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
@ -2755,15 +2772,15 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) {
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
app = db.Appender(ctx)
_, err = appendSample(app, 2)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, expectOutOfOrderError, err)
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = appendSample(app, 3)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, expectOutOfOrderError, err)
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = appendSample(app, 4)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, expectOutOfOrderError, err)
require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
require.NoError(t, app.Commit())
@ -2798,15 +2815,15 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) {
// Test out of order metric.
app = db.Appender(ctx)
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+2)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, expectOutOfOrderError, err)
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+3)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, expectOutOfOrderError, err)
require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+4)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, expectOutOfOrderError, err)
require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
require.NoError(t, app.Commit())
}
@ -4657,10 +4674,172 @@ func TestHistogramCounterResetHeader(t *testing.T) {
}
}
func TestOOOHistogramCounterResetHeaders(t *testing.T) {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
head.opts.OutOfOrderCapMax.Store(5)
head.opts.EnableOOONativeHistograms.Store(true)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
appendHistogram := func(ts int64, h *histogram.Histogram) {
app := head.Appender(context.Background())
var err error
if floatHisto {
_, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat(nil))
} else {
_, err = app.AppendHistogram(0, l, ts, h.Copy(), nil)
}
require.NoError(t, err)
require.NoError(t, app.Commit())
}
type expOOOMmappedChunks struct {
header chunkenc.CounterResetHeader
mint, maxt int64
numSamples uint16
}
var expChunks []expOOOMmappedChunks
checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) {
expChunks = append(expChunks, newChunks...)
ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)
require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks))
for i, mmapChunk := range ms.ooo.oooMmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
if floatHisto {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
} else {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, expChunks[i].mint, mmapChunk.minTime)
require.Equal(t, expChunks[i].maxt, mmapChunk.maxTime)
require.Equal(t, expChunks[i].numSamples, mmapChunk.numSamples)
}
}
// Append an in-order histogram, so the rest of the samples can be detected as OOO.
appendHistogram(1000, tsdbutil.GenerateTestHistogram(1000))
// OOO histogram
for i := 1; i <= 5; i++ {
appendHistogram(100+int64(i), tsdbutil.GenerateTestHistogram(1000+i))
}
// Nothing mmapped yet.
checkOOOExpCounterResetHeader()
// 6th observation (which triggers a head chunk mmapping).
appendHistogram(int64(112), tsdbutil.GenerateTestHistogram(1002))
// One mmapped chunk with (ts, val) [(101, 1001), (102, 1002), (103, 1003), (104, 1004), (105, 1005)].
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 101,
maxt: 105,
numSamples: 5,
})
// Add more samples, there's a counter reset at ts 122.
appendHistogram(int64(110), tsdbutil.GenerateTestHistogram(1001))
appendHistogram(int64(124), tsdbutil.GenerateTestHistogram(904))
appendHistogram(int64(123), tsdbutil.GenerateTestHistogram(903))
appendHistogram(int64(122), tsdbutil.GenerateTestHistogram(902))
// New samples not mmapped yet.
checkOOOExpCounterResetHeader()
// 11th observation (which triggers another head chunk mmapping).
appendHistogram(int64(200), tsdbutil.GenerateTestHistogram(2000))
// Two new mmapped chunks [(110, 1001), (112, 1002)], [(122, 902), (123, 903), (124, 904)].
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 110,
maxt: 112,
numSamples: 2,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 122,
maxt: 124,
numSamples: 3,
},
)
// Count is lower than previous sample at ts 200, and NotCounterReset is always ignored on append.
appendHistogram(int64(205), tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(1000)))
appendHistogram(int64(210), tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(2010)))
appendHistogram(int64(220), tsdbutil.GenerateTestHistogram(2020))
appendHistogram(int64(215), tsdbutil.GenerateTestHistogram(2005))
// 16th observation (which triggers another head chunk mmapping).
appendHistogram(int64(350), tsdbutil.GenerateTestHistogram(4000))
// Four new mmapped chunks: [(200, 2000)] [(205, 1000)], [(210, 2010)], [(215, 2015), (220, 2020)]
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 200,
maxt: 200,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 205,
maxt: 205,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 210,
maxt: 210,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 215,
maxt: 220,
numSamples: 2,
},
)
// Adding five more samples (21 in total), so another mmapped chunk is created.
appendHistogram(300, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(3000)))
for i := 1; i <= 4; i++ {
appendHistogram(300+int64(i), tsdbutil.GenerateTestHistogram(3000+i))
}
// One mmapped chunk with (ts, val) [(300, 3000), (301, 3001), (302, 3002), (303, 3003), (350, 4000)].
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 300,
maxt: 350,
numSamples: 5,
})
})
}
}
func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
dir := t.TempDir()
opts := DefaultOptions()
opts.EnableNativeHistograms = true
opts.EnableOOONativeHistograms = true
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
@ -4931,6 +5110,8 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
opts.EnableNativeHistograms.Store(true)
opts.EnableOOONativeHistograms.Store(true)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -4940,13 +5121,12 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
l := labels.FromStrings("foo", "bar")
appendSample := func(mins int64, val float64, isOOO bool) {
app := h.Appender(context.Background())
ts, v := mins*time.Minute.Milliseconds(), val
_, err := app.Append(0, l, ts, v)
_, s, err := scenario.appendFunc(app, l, mins*time.Minute.Milliseconds(), mins)
require.NoError(t, err)
require.NoError(t, app.Commit())
if isOOO {
expOOOSamples = append(expOOOSamples, sample{t: ts, f: v})
expOOOSamples = append(expOOOSamples, s)
}
}
@ -5025,6 +5205,8 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
opts.ChunkDirRoot = dir
opts.OutOfOrderCapMax.Store(30)
opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds())
opts.EnableNativeHistograms.Store(true)
opts.EnableOOONativeHistograms.Store(true)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -5326,6 +5508,8 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap
opts.ChunkDirRoot = dir
opts.OutOfOrderCapMax.Store(30)
opts.OutOfOrderTimeWindow.Store(120 * time.Minute.Milliseconds())
opts.EnableNativeHistograms.Store(true)
opts.EnableOOONativeHistograms.Store(true)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -5399,7 +5583,9 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap
func TestHeadMinOOOTimeUpdate(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testHeadMinOOOTimeUpdate(t, scenario)
if scenario.sampleType == sampleMetricTypeFloat {
testHeadMinOOOTimeUpdate(t, scenario)
}
})
}
}
@ -5414,6 +5600,8 @@ func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
opts.EnableNativeHistograms.Store(true)
opts.EnableOOONativeHistograms.Store(true)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)

@ -646,9 +646,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
}
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
// Track number of samples, m-map markers, that referenced a series we don't know about
// Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about
// for error reporting.
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
lastSeq, lastOff := lastMmapRef.Unpack()
// Start workers that each process samples for a partition of the series ID space.
@ -657,8 +657,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
concurrency = h.opts.WALReplayConcurrency
processors = make([]wblSubsetProcessor, concurrency)
dec = record.NewDecoder(syms)
shards = make([][]record.RefSample, concurrency)
dec record.Decoder
shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, concurrency)
decodedCh = make(chan interface{}, 10)
decodeErr error
@ -672,6 +673,16 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return []record.RefMmapMarker{}
},
}
histogramSamplesPool = sync.Pool{
New: func() interface{} {
return []record.RefHistogramSample{}
},
}
floatHistogramSamplesPool = sync.Pool{
New: func() interface{} {
return []record.RefFloatHistogramSample{}
},
}
)
defer func() {
@ -692,8 +703,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
processors[i].setup()
go func(wp *wblSubsetProcessor) {
unknown := wp.processWBLSamples(h)
unknown, unknownHistograms := wp.processWBLSamples(h)
unknownRefs.Add(unknown)
unknownHistogramRefs.Add(unknownHistograms)
wg.Done()
}(&processors[i])
}
@ -727,6 +739,30 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decodedCh <- markers
case record.HistogramSamples:
hists := histogramSamplesPool.Get().([]record.RefHistogramSample)[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode histograms: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decodedCh <- hists
case record.FloatHistogramSamples:
hists := floatHistogramSamplesPool.Get().([]record.RefFloatHistogramSample)[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: fmt.Errorf("decode float histograms: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decodedCh <- hists
default:
// Noop.
}
@ -791,6 +827,70 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
idx := uint64(ms.ref) % uint64(concurrency)
processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms}
}
case []record.RefHistogramSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf()
}
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
}
for i := 0; i < concurrency; i++ {
if len(histogramShards[i]) > 0 {
processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil
}
}
samples = samples[m:]
}
histogramSamplesPool.Put(v) //nolint:staticcheck
case []record.RefFloatHistogramSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf()
}
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
}
for i := 0; i < concurrency; i++ {
if len(histogramShards[i]) > 0 {
processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil
}
}
samples = samples[m:]
}
floatHistogramSamplesPool.Put(v) //nolint:staticcheck
default:
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
}
@ -833,17 +933,20 @@ func (e errLoadWbl) Unwrap() error {
}
type wblSubsetProcessor struct {
input chan wblSubsetProcessorInputItem
output chan []record.RefSample
input chan wblSubsetProcessorInputItem
output chan []record.RefSample
histogramsOutput chan []histogramRecord
}
type wblSubsetProcessorInputItem struct {
mmappedSeries *memSeries
samples []record.RefSample
mmappedSeries *memSeries
samples []record.RefSample
histogramSamples []histogramRecord
}
func (wp *wblSubsetProcessor) setup() {
wp.output = make(chan []record.RefSample, 300)
wp.histogramsOutput = make(chan []histogramRecord, 300)
wp.input = make(chan wblSubsetProcessorInputItem, 300)
}
@ -851,6 +954,8 @@ func (wp *wblSubsetProcessor) closeAndDrain() {
close(wp.input)
for range wp.output {
}
for range wp.histogramsOutput {
}
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
@ -863,10 +968,21 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
return nil
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord {
select {
case buf := <-wp.histogramsOutput:
return buf[:0]
default:
}
return nil
}
// processWBLSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse.
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) {
defer close(wp.output)
defer close(wp.histogramsOutput)
oooCapMax := h.opts.OutOfOrderCapMax.Load()
// We don't check for minValidTime for ooo samples.
@ -905,11 +1021,41 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
case wp.output <- in.samples:
default:
}
for _, s := range in.histogramSamples {
ms := h.series.getByID(s.ref)
if ms == nil {
unknownHistogramRefs++
continue
}
var chunkCreated bool
var ok bool
if s.h != nil {
ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
} else {
ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
}
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if ok {
if s.t > maxt {
maxt = s.t
}
if s.t < mint {
mint = s.t
}
}
}
select {
case wp.histogramsOutput <- in.histogramSamples:
default:
}
}
h.updateMinOOOMaxOOOTime(mint, maxt)
return unknownRefs
return unknownRefs, unknownHistogramRefs
}
const (

@ -389,6 +389,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
head.opts.EnableOOONativeHistograms.Store(true)
t.Cleanup(func() { require.NoError(t, head.Close()) })
ctx := context.Background()
@ -493,6 +494,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
opts.EnableNativeHistograms = true
opts.EnableOOONativeHistograms = true
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
@ -902,6 +905,8 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
opts.EnableNativeHistograms = true
opts.EnableOOONativeHistograms = true
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }

@ -28,15 +28,14 @@ import (
const testMaxSize int = 32
// Formulas chosen to make testing easy.
func valEven(pos int) int { return pos*2 + 2 } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values
func valOdd(pos int) int { return pos*2 + 1 } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals.
func samplify(v int) sample { return sample{int64(v), float64(v), nil, nil} }
// Formulas chosen to make testing easy.
func valEven(pos int) int64 { return int64(pos*2 + 2) } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values
func valOdd(pos int) int64 { return int64(pos*2 + 1) } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals.
func makeEvenSampleSlice(n int) []sample {
func makeEvenSampleSlice(n int, sampleFunc func(ts int64) sample) []sample {
s := make([]sample, n)
for i := 0; i < n; i++ {
s[i] = samplify(valEven(i))
s[i] = sampleFunc(valEven(i))
}
return s
}
@ -45,8 +44,36 @@ func makeEvenSampleSlice(n int) []sample {
// - Number of pre-existing samples anywhere from 0 to testMaxSize-1.
// - Insert new sample before first pre-existing samples, after the last, and anywhere in between.
// - With a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves.
// Note: In all samples used, t always equals v in numeric value. when we talk about 'value' we just refer to a value that will be used for both sample.t and sample.v.
func TestOOOInsert(t *testing.T) {
scenarios := map[string]struct {
sampleFunc func(ts int64) sample
}{
"float": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, f: float64(ts)}
},
},
"integer histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
},
},
"float histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))}
},
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
testOOOInsert(t, scenario.sampleFunc)
})
}
}
func testOOOInsert(t *testing.T,
sampleFunc func(ts int64) sample,
) {
for numPreExisting := 0; numPreExisting <= testMaxSize; numPreExisting++ {
// For example, if we have numPreExisting 2, then:
// chunk.samples indexes filled 0 1
@ -56,20 +83,21 @@ func TestOOOInsert(t *testing.T) {
for insertPos := 0; insertPos <= numPreExisting; insertPos++ {
chunk := NewOOOChunk()
chunk.samples = makeEvenSampleSlice(numPreExisting)
newSample := samplify(valOdd(insertPos))
chunk.Insert(newSample.t, newSample.f, nil, nil)
chunk.samples = make([]sample, numPreExisting)
chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc)
newSample := sampleFunc(valOdd(insertPos))
chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh)
var expSamples []sample
// Our expected new samples slice, will be first the original samples.
for i := 0; i < insertPos; i++ {
expSamples = append(expSamples, samplify(valEven(i)))
expSamples = append(expSamples, sampleFunc(valEven(i)))
}
// Then the new sample.
expSamples = append(expSamples, newSample)
// Followed by any original samples that were pushed back by the new one.
for i := insertPos; i < numPreExisting; i++ {
expSamples = append(expSamples, samplify(valEven(i)))
expSamples = append(expSamples, sampleFunc(valEven(i)))
}
require.Equal(t, expSamples, chunk.samples, "numPreExisting %d, insertPos %d", numPreExisting, insertPos)
@ -81,17 +109,46 @@ func TestOOOInsert(t *testing.T) {
// pre-existing samples, with between 1 and testMaxSize pre-existing samples and
// with a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves.
func TestOOOInsertDuplicate(t *testing.T) {
scenarios := map[string]struct {
sampleFunc func(ts int64) sample
}{
"float": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, f: float64(ts)}
},
},
"integer histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
},
},
"float histogram": {
sampleFunc: func(ts int64) sample {
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))}
},
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
testOOOInsertDuplicate(t, scenario.sampleFunc)
})
}
}
func testOOOInsertDuplicate(t *testing.T,
sampleFunc func(ts int64) sample,
) {
for num := 1; num <= testMaxSize; num++ {
for dupPos := 0; dupPos < num; dupPos++ {
chunk := NewOOOChunk()
chunk.samples = makeEvenSampleSlice(num)
chunk.samples = makeEvenSampleSlice(num, sampleFunc)
dupSample := chunk.samples[dupPos]
dupSample.f = 0.123
ok := chunk.Insert(dupSample.t, dupSample.f, nil, nil)
ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh)
expSamples := makeEvenSampleSlice(num) // We expect no change.
expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change.
require.False(t, ok)
require.Equal(t, expSamples, chunk.samples, "num %d, dupPos %d", num, dupPos)
}

@ -16,6 +16,8 @@ package tsdb
import (
"testing"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
@ -27,7 +29,11 @@ import (
)
const (
float = "float"
float = "float"
intHistogram = "integer histogram"
floatHistogram = "float histogram"
gaugeIntHistogram = "gauge int histogram"
gaugeFloatHistogram = "gauge float histogram"
)
type testValue struct {
@ -42,7 +48,6 @@ type sampleTypeScenario struct {
sampleFunc func(ts, value int64) sample
}
// TODO: native histogram sample types will be added as part of out-of-order native histogram support; see #11220.
var sampleTypeScenarios = map[string]sampleTypeScenario{
float: {
sampleType: sampleMetricTypeFloat,
@ -55,50 +60,50 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
return sample{t: ts, f: float64(value)}
},
},
// intHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
// },
// },
// floatHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
// },
// },
// gaugeIntHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
// },
// },
// gaugeFloatHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
// },
// },
intHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err
},
sampleFunc: func(ts, value int64) sample {
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
},
},
floatHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err
},
sampleFunc: func(ts, value int64) sample {
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
},
},
gaugeIntHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err
},
sampleFunc: func(ts, value int64) sample {
return sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
},
},
gaugeFloatHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err
},
sampleFunc: func(ts, value int64) sample {
return sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
},
},
}
// requireEqualSeries checks that the actual series are equal to the expected ones. It ignores the counter reset hints for histograms.

Loading…
Cancel
Save