mirror of https://github.com/prometheus/prometheus
Merge pull request #13607 from fionaliao/ooo-samples-appended-type
Add sample type label to outOfOrderSamplesAppended metricpull/13635/head
commit
c6c8f63516
|
@ -4945,7 +4945,7 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
|||
require.NotNil(t, seriesSet[series1.String()])
|
||||
require.Len(t, seriesSet, 1)
|
||||
require.Equal(t, expSamples, seriesSet[series1.String()])
|
||||
require.GreaterOrEqual(t, float64(oooSamples), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
|
||||
requireEqualOOOSamples(t, oooSamples, db)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -5028,7 +5028,7 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
|||
chks := queryChunks(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
|
||||
require.NotNil(t, chks[series1.String()])
|
||||
require.Len(t, chks, 1)
|
||||
require.Equal(t, float64(oooSamples), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
|
||||
requireEqualOOOSamples(t, oooSamples, db)
|
||||
var gotSamples []chunks.Sample
|
||||
for _, chunk := range chks[series1.String()] {
|
||||
it := chunk.Chunk.Iterator(nil)
|
||||
|
@ -5107,7 +5107,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
|
|||
}
|
||||
}
|
||||
require.Equal(t, expSamples, seriesSet)
|
||||
require.Equal(t, float64(totalSamples-2), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
|
||||
requireEqualOOOSamples(t, totalSamples-2, db)
|
||||
}
|
||||
|
||||
verifyOOOMinMaxTimes := func(expMin, expMax int64) {
|
||||
|
@ -5216,7 +5216,7 @@ func TestOOODisabled(t *testing.T) {
|
|||
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar."))
|
||||
require.Equal(t, expSamples, seriesSet)
|
||||
require.Equal(t, float64(0), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
|
||||
requireEqualOOOSamples(t, 0, db)
|
||||
require.Equal(t, float64(failedSamples),
|
||||
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)),
|
||||
"number of ooo/oob samples mismatch")
|
||||
|
@ -6943,3 +6943,9 @@ Outer:
|
|||
|
||||
require.NoError(t, writerErr)
|
||||
}
|
||||
|
||||
func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
|
||||
require.Equal(t, float64(expectedSamples),
|
||||
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat)),
|
||||
"number of ooo appended samples mismatch")
|
||||
}
|
||||
|
|
|
@ -340,7 +340,7 @@ type headMetrics struct {
|
|||
chunksRemoved prometheus.Counter
|
||||
gcDuration prometheus.Summary
|
||||
samplesAppended *prometheus.CounterVec
|
||||
outOfOrderSamplesAppended prometheus.Counter
|
||||
outOfOrderSamplesAppended *prometheus.CounterVec
|
||||
outOfBoundSamples *prometheus.CounterVec
|
||||
outOfOrderSamples *prometheus.CounterVec
|
||||
tooOldSamples *prometheus.CounterVec
|
||||
|
@ -420,10 +420,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||
Name: "prometheus_tsdb_head_samples_appended_total",
|
||||
Help: "Total number of appended samples.",
|
||||
}, []string{"type"}),
|
||||
outOfOrderSamplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
outOfOrderSamplesAppended: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "prometheus_tsdb_head_out_of_order_samples_appended_total",
|
||||
Help: "Total number of appended out of order samples.",
|
||||
}),
|
||||
}, []string{"type"}),
|
||||
outOfBoundSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "prometheus_tsdb_out_of_bound_samples_total",
|
||||
Help: "Total number of out of bound samples ingestion failed attempts with out of order support disabled.",
|
||||
|
|
|
@ -835,11 +835,18 @@ func (a *headAppender) Commit() (err error) {
|
|||
defer a.head.iso.closeAppend(a.appendID)
|
||||
|
||||
var (
|
||||
samplesAppended = len(a.samples)
|
||||
oooAccepted int // number of samples out of order but accepted: with ooo enabled and within time window
|
||||
oooRejected int // number of samples rejected due to: out of order but OOO support disabled.
|
||||
tooOldRejected int // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window)
|
||||
oobRejected int // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
|
||||
floatsAppended = len(a.samples)
|
||||
histogramsAppended = len(a.histograms) + len(a.floatHistograms)
|
||||
// number of samples out of order but accepted: with ooo enabled and within time window
|
||||
floatOOOAccepted int
|
||||
// number of samples rejected due to: out of order but OOO support disabled.
|
||||
floatOOORejected int
|
||||
histoOOORejected int
|
||||
// number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window)
|
||||
floatTooOldRejected int
|
||||
// number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
|
||||
floatOOBRejected int
|
||||
|
||||
inOrderMint int64 = math.MaxInt64
|
||||
inOrderMaxt int64 = math.MinInt64
|
||||
ooomint int64 = math.MaxInt64
|
||||
|
@ -902,16 +909,16 @@ func (a *headAppender) Commit() (err error) {
|
|||
case err == nil:
|
||||
// Do nothing.
|
||||
case errors.Is(err, storage.ErrOutOfOrderSample):
|
||||
samplesAppended--
|
||||
oooRejected++
|
||||
floatsAppended--
|
||||
floatOOORejected++
|
||||
case errors.Is(err, storage.ErrOutOfBounds):
|
||||
samplesAppended--
|
||||
oobRejected++
|
||||
floatsAppended--
|
||||
floatOOBRejected++
|
||||
case errors.Is(err, storage.ErrTooOldSample):
|
||||
samplesAppended--
|
||||
tooOldRejected++
|
||||
floatsAppended--
|
||||
floatTooOldRejected++
|
||||
default:
|
||||
samplesAppended--
|
||||
floatsAppended--
|
||||
}
|
||||
|
||||
var ok, chunkCreated bool
|
||||
|
@ -949,13 +956,13 @@ func (a *headAppender) Commit() (err error) {
|
|||
if s.T > ooomaxt {
|
||||
ooomaxt = s.T
|
||||
}
|
||||
oooAccepted++
|
||||
floatOOOAccepted++
|
||||
} else {
|
||||
// Sample is an exact duplicate of the last sample.
|
||||
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
|
||||
// not with samples in already flushed OOO chunks.
|
||||
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
|
||||
samplesAppended--
|
||||
floatsAppended--
|
||||
}
|
||||
default:
|
||||
ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts)
|
||||
|
@ -968,7 +975,7 @@ func (a *headAppender) Commit() (err error) {
|
|||
}
|
||||
} else {
|
||||
// The sample is an exact duplicate, and should be silently dropped.
|
||||
samplesAppended--
|
||||
floatsAppended--
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -982,8 +989,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
series.Unlock()
|
||||
}
|
||||
|
||||
histogramsTotal := len(a.histograms)
|
||||
histoOOORejected := 0
|
||||
for i, s := range a.histograms {
|
||||
series = a.histogramSeries[i]
|
||||
series.Lock()
|
||||
|
@ -1000,7 +1005,7 @@ func (a *headAppender) Commit() (err error) {
|
|||
inOrderMaxt = s.T
|
||||
}
|
||||
} else {
|
||||
histogramsTotal--
|
||||
histogramsAppended--
|
||||
histoOOORejected++
|
||||
}
|
||||
if chunkCreated {
|
||||
|
@ -1009,7 +1014,6 @@ func (a *headAppender) Commit() (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
histogramsTotal += len(a.floatHistograms)
|
||||
for i, s := range a.floatHistograms {
|
||||
series = a.floatHistogramSeries[i]
|
||||
series.Lock()
|
||||
|
@ -1026,7 +1030,7 @@ func (a *headAppender) Commit() (err error) {
|
|||
inOrderMaxt = s.T
|
||||
}
|
||||
} else {
|
||||
histogramsTotal--
|
||||
histogramsAppended--
|
||||
histoOOORejected++
|
||||
}
|
||||
if chunkCreated {
|
||||
|
@ -1042,13 +1046,13 @@ func (a *headAppender) Commit() (err error) {
|
|||
series.Unlock()
|
||||
}
|
||||
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooRejected))
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOORejected))
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histoOOORejected))
|
||||
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oobRejected))
|
||||
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(tooOldRejected))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(samplesAppended))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsTotal))
|
||||
a.head.metrics.outOfOrderSamplesAppended.Add(float64(oooAccepted))
|
||||
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOBRejected))
|
||||
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
|
||||
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted))
|
||||
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
|
||||
a.head.updateMinOOOMaxOOOTime(ooomint, ooomaxt)
|
||||
|
||||
|
|
Loading…
Reference in New Issue