diff --git a/retrieval/scrape.go b/retrieval/scrape.go index fc6b5a116..e34482dd9 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -444,20 +444,31 @@ func (sl *scrapeLoop) stop() { } func (sl *scrapeLoop) append(samples model.Samples) { - numOutOfOrder := 0 + var ( + numOutOfOrder = 0 + numDuplicates = 0 + ) for _, s := range samples { if err := sl.appender.Append(s); err != nil { - if err == local.ErrOutOfOrderSample { + switch err { + case local.ErrOutOfOrderSample: numOutOfOrder++ - } else { - log.Warnf("Error inserting sample: %s", err) + log.With("sample", s).With("error", err).Debug("Sample discarded") + case local.ErrDuplicateSampleForTimestamp: + numDuplicates++ + log.With("sample", s).With("error", err).Debug("Sample discarded") + default: + log.With("sample", s).With("error", err).Warn("Sample discarded") } } } if numOutOfOrder > 0 { log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") } + if numDuplicates > 0 { + log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") + } } func (sl *scrapeLoop) report(start time.Time, duration time.Duration, err error) { diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 6d43ebd39..271d85949 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -82,6 +82,12 @@ const ( // Maintenance types for maintainSeriesDuration. maintainInMemory = "memory" maintainArchived = "archived" + + discardReasonLabel = "reason" + + // Reasons to discard samples. + outOfOrderTimestamp = "timestamp_out_of_order" + duplicateSample = "multiple_values_for_timestamp" ) func init() { diff --git a/storage/local/storage.go b/storage/local/storage.go index a1b915d8e..20a73748b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -163,7 +163,7 @@ type memorySeriesStorage struct { numSeries prometheus.Gauge seriesOps *prometheus.CounterVec ingestedSamplesCount prometheus.Counter - outOfOrderSamplesCount prometheus.Counter + discardedSamplesCount *prometheus.CounterVec nonExistentSeriesMatchesCount prometheus.Counter maintainSeriesDuration *prometheus.SummaryVec persistenceUrgencyScore prometheus.Gauge @@ -242,12 +242,15 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { Name: "ingested_samples_total", Help: "The total number of samples ingested.", }), - outOfOrderSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "out_of_order_samples_total", - Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", - }), + discardedSamplesCount: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "out_of_order_samples_total", + Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", + }, + []string{discardReasonLabel}, + ), nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -276,6 +279,25 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { Help: "1 if the storage is in rushed mode, 0 otherwise. In rushed mode, the system behaves as if the persistence_urgency_score is 1.", }), } + + // Initialize metric vectors. + // TODO(beorn7): Rework once we have a utility function for it in client_golang. + s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp) + s.discardedSamplesCount.WithLabelValues(duplicateSample) + s.maintainSeriesDuration.WithLabelValues(maintainInMemory) + s.maintainSeriesDuration.WithLabelValues(maintainArchived) + s.seriesOps.WithLabelValues(create) + s.seriesOps.WithLabelValues(archive) + s.seriesOps.WithLabelValues(unarchive) + s.seriesOps.WithLabelValues(memoryPurge) + s.seriesOps.WithLabelValues(archivePurge) + s.seriesOps.WithLabelValues(requestedPurge) + s.seriesOps.WithLabelValues(memoryMaintenance) + s.seriesOps.WithLabelValues(archiveMaintenance) + s.seriesOps.WithLabelValues(completedQurantine) + s.seriesOps.WithLabelValues(droppedQuarantine) + s.seriesOps.WithLabelValues(failedQuarantine) + return s } @@ -577,9 +599,16 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } -// ErrOutOfOrderSample is returned if a sample has a timestamp before the latest -// timestamp in the series it is appended to. -var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") +var ( + // ErrOutOfOrderSample is returned if a sample has a timestamp before the latest + // timestamp in the series it is appended to. + ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") + // ErrDuplicateSampleForTimestamp is returned if a sample has the same + // timestamp as the latest sample in the series it is appended to but a + // different value. (Appending an identical sample is a no-op and does + // not cause an error.) + ErrDuplicateSampleForTimestamp = fmt.Errorf("sample with repeated timestamp but different value") +) // Append implements Storage. func (s *memorySeriesStorage) Append(sample *model.Sample) error { @@ -604,7 +633,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { return err // getOrCreateSeries took care of quarantining already. } - if sample.Timestamp <= series.lastTime { + if sample.Timestamp == series.lastTime { // Don't report "no-op appends", i.e. where timestamp and sample // value are the same as for the last append, as they are a // common occurrence when using client-side timestamps @@ -614,7 +643,11 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { sample.Value == series.lastSampleValue { return nil } - s.outOfOrderSamplesCount.Inc() + s.discardedSamplesCount.WithLabelValues(duplicateSample).Inc() + return ErrDuplicateSampleForTimestamp // Caused by the caller. + } + if sample.Timestamp < series.lastTime { + s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc() return ErrOutOfOrderSample // Caused by the caller. } completedChunksCount, err := series.add(model.SamplePair{ @@ -1494,7 +1527,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.numSeries.Desc() s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() - ch <- s.outOfOrderSamplesCount.Desc() + s.discardedSamplesCount.Describe(ch) ch <- s.nonExistentSeriesMatchesCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) @@ -1521,7 +1554,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { ch <- s.numSeries s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount - ch <- s.outOfOrderSamplesCount + s.discardedSamplesCount.Collect(ch) ch <- s.nonExistentSeriesMatchesCount ch <- prometheus.MustNewConstMetric( numMemChunksDesc,