Merge pull request #1365 from prometheus/appenderr

Return error on sample appending
pull/1373/head
Fabian Reinartz 2016-02-03 12:13:49 +01:00
commit 0dd3a23510
7 changed files with 55 additions and 26 deletions

View File

@ -21,7 +21,8 @@ import (
type nopAppender struct{} type nopAppender struct{}
func (a nopAppender) Append(*model.Sample) { func (a nopAppender) Append(*model.Sample) error {
return nil
} }
func (a nopAppender) NeedsThrottling() bool { func (a nopAppender) NeedsThrottling() bool {
@ -33,13 +34,14 @@ type collectResultAppender struct {
throttled bool throttled bool
} }
func (a *collectResultAppender) Append(s *model.Sample) { func (a *collectResultAppender) Append(s *model.Sample) error {
for ln, lv := range s.Metric { for ln, lv := range s.Metric {
if len(lv) == 0 { if len(lv) == 0 {
delete(s.Metric, ln) delete(s.Metric, ln)
} }
} }
a.result = append(a.result, s) a.result = append(a.result, s)
return nil
} }
func (a *collectResultAppender) NeedsThrottling() bool { func (a *collectResultAppender) NeedsThrottling() bool {

View File

@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
) )
@ -449,16 +450,31 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
}, },
} }
var samples model.Vector var (
samples model.Vector
numOutOfOrder int
logger = log.With("target", t.InstanceIdentifier())
)
for { for {
if err = sdec.Decode(&samples); err != nil { if err = sdec.Decode(&samples); err != nil {
break break
} }
for _, s := range samples { for _, s := range samples {
appender.Append(s) err := appender.Append(s)
if err != nil {
if err == local.ErrOutOfOrderSample {
numOutOfOrder++
} else {
logger.With("sample", s).Warnf("Error inserting sample: %s", err)
} }
} }
}
}
if numOutOfOrder > 0 {
logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
}
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
@ -472,7 +488,7 @@ type ruleLabelsAppender struct {
labels model.LabelSet labels model.LabelSet
} }
func (app ruleLabelsAppender) Append(s *model.Sample) { func (app ruleLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels { for ln, lv := range app.labels {
if v, ok := s.Metric[ln]; ok && v != "" { if v, ok := s.Metric[ln]; ok && v != "" {
s.Metric[model.ExportedLabelPrefix+ln] = v s.Metric[model.ExportedLabelPrefix+ln] = v
@ -480,7 +496,7 @@ func (app ruleLabelsAppender) Append(s *model.Sample) {
s.Metric[ln] = lv s.Metric[ln] = lv
} }
app.SampleAppender.Append(s) return app.SampleAppender.Append(s)
} }
type honorLabelsAppender struct { type honorLabelsAppender struct {
@ -491,14 +507,14 @@ type honorLabelsAppender struct {
// Merges the sample's metric with the given labels if the label is not // Merges the sample's metric with the given labels if the label is not
// already present in the metric. // already present in the metric.
// This also considers labels explicitly set to the empty string. // This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Append(s *model.Sample) { func (app honorLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels { for ln, lv := range app.labels {
if _, ok := s.Metric[ln]; !ok { if _, ok := s.Metric[ln]; !ok {
s.Metric[ln] = lv s.Metric[ln] = lv
} }
} }
app.SampleAppender.Append(s) return app.SampleAppender.Append(s)
} }
// Applies a set of relabel configurations to the sample's metric // Applies a set of relabel configurations to the sample's metric
@ -508,19 +524,18 @@ type relabelAppender struct {
relabelings []*config.RelabelConfig relabelings []*config.RelabelConfig
} }
func (app relabelAppender) Append(s *model.Sample) { func (app relabelAppender) Append(s *model.Sample) error {
labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...) labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...)
if err != nil { if err != nil {
log.Errorf("Error while relabeling metric %s: %s", s.Metric, err) return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err)
return
} }
// Check if the timeseries was dropped. // Check if the timeseries was dropped.
if labels == nil { if labels == nil {
return return nil
} }
s.Metric = model.Metric(labels) s.Metric = model.Metric(labels)
app.SampleAppender.Append(s) return app.SampleAppender.Append(s)
} }
// URL returns a copy of the target's URL. // URL returns a copy of the target's URL.

View File

@ -33,7 +33,7 @@ type Storage interface {
// processing.) The implementation might remove labels with empty value // processing.) The implementation might remove labels with empty value
// from the provided Sample as those labels are considered equivalent to // from the provided Sample as those labels are considered equivalent to
// a label not present at all. // a label not present at all.
Append(*model.Sample) Append(*model.Sample) error
// NeedsThrottling returns true if the Storage has too many chunks in memory // NeedsThrottling returns true if the Storage has too many chunks in memory
// already or has too many chunks waiting for persistence. // already or has too many chunks waiting for persistence.
NeedsThrottling() bool NeedsThrottling() bool

View File

@ -567,8 +567,10 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin
} }
} }
var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order")
// Append implements Storage. // Append implements Storage.
func (s *memorySeriesStorage) Append(sample *model.Sample) { func (s *memorySeriesStorage) Append(sample *model.Sample) error {
for ln, lv := range sample.Metric { for ln, lv := range sample.Metric {
if len(lv) == 0 { if len(lv) == 0 {
delete(sample.Metric, ln) delete(sample.Metric, ln)
@ -589,16 +591,16 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) {
series := s.getOrCreateSeries(fp, sample.Metric) series := s.getOrCreateSeries(fp, sample.Metric)
if sample.Timestamp <= series.lastTime { if sample.Timestamp <= series.lastTime {
s.fpLocker.Unlock(fp)
// Don't log and track equal timestamps, as they are a common occurrence // Don't log and track equal timestamps, as they are a common occurrence
// when using client-side timestamps (e.g. Pushgateway or federation). // when using client-side timestamps (e.g. Pushgateway or federation).
// It would be even better to also compare the sample values here, but // It would be even better to also compare the sample values here, but
// we don't have efficient access to a series's last value. // we don't have efficient access to a series's last value.
if sample.Timestamp != series.lastTime { if sample.Timestamp != series.lastTime {
log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime)
s.outOfOrderSamplesCount.Inc() s.outOfOrderSamplesCount.Inc()
return ErrOutOfOrderSample
} }
s.fpLocker.Unlock(fp) return nil
return
} }
completedChunksCount := series.add(&model.SamplePair{ completedChunksCount := series.add(&model.SamplePair{
Value: sample.Value, Value: sample.Value,
@ -607,6 +609,8 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) {
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
s.ingestedSamplesCount.Inc() s.ingestedSamplesCount.Inc()
s.incNumChunksToPersist(completedChunksCount) s.incNumChunksToPersist(completedChunksCount)
return nil
} }
// NeedsThrottling implements Storage. // NeedsThrottling implements Storage.

View File

@ -133,13 +133,15 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue
// Append queues a sample to be sent to the remote storage. It drops the // Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full. // sample on the floor if the queue is full.
func (t *StorageQueueManager) Append(s *model.Sample) { // Always returns nil.
func (t *StorageQueueManager) Append(s *model.Sample) error {
select { select {
case t.queue <- s: case t.queue <- s:
default: default:
t.samplesCount.WithLabelValues(dropped).Inc() t.samplesCount.WithLabelValues(dropped).Inc()
log.Warn("Remote storage queue full, discarding sample.") log.Warn("Remote storage queue full, discarding sample.")
} }
return nil
} }
// Stop stops sending samples to the remote storage and waits for pending // Stop stops sending samples to the remote storage and waits for pending

View File

@ -104,8 +104,8 @@ func (s *Storage) Stop() {
} }
} }
// Append implements storage.SampleAppender. // Append implements storage.SampleAppender. Always returns nil.
func (s *Storage) Append(smpl *model.Sample) { func (s *Storage) Append(smpl *model.Sample) error {
s.mtx.RLock() s.mtx.RLock()
var snew model.Sample var snew model.Sample
@ -122,6 +122,7 @@ func (s *Storage) Append(smpl *model.Sample) {
for _, q := range s.queues { for _, q := range s.queues {
q.Append(&snew) q.Append(&snew)
} }
return nil
} }
// NeedsThrottling implements storage.SampleAppender. It will always return // NeedsThrottling implements storage.SampleAppender. It will always return

View File

@ -25,9 +25,8 @@ type SampleAppender interface {
// of the sample after Append has returned. Remote storage // of the sample after Append has returned. Remote storage
// implementation will simply drop samples if they cannot keep up with // implementation will simply drop samples if they cannot keep up with
// sending samples. Local storage implementations will only drop metrics // sending samples. Local storage implementations will only drop metrics
// upon unrecoverable errors. Reporting any errors is done via metrics // upon unrecoverable errors.
// and logs and not the concern of the caller. Append(*model.Sample) error
Append(*model.Sample)
// NeedsThrottling returns true if the underlying storage wishes to not // NeedsThrottling returns true if the underlying storage wishes to not
// receive any more samples. Append will still work but might lead to // receive any more samples. Append will still work but might lead to
// undue resource usage. It is recommended to call NeedsThrottling once // undue resource usage. It is recommended to call NeedsThrottling once
@ -53,11 +52,17 @@ type Fanout []SampleAppender
// Append implements SampleAppender. It appends the provided sample to all // Append implements SampleAppender. It appends the provided sample to all
// SampleAppenders in the Fanout slice and waits for each append to complete // SampleAppenders in the Fanout slice and waits for each append to complete
// before proceeding with the next. // before proceeding with the next.
func (f Fanout) Append(s *model.Sample) { // If any of the SampleAppenders returns an error, the first one is returned
// at the end.
func (f Fanout) Append(s *model.Sample) error {
var err error
for _, a := range f { for _, a := range f {
a.Append(s) if e := a.Append(s); e != nil && err == nil {
err = e
} }
} }
return err
}
// NeedsThrottling returns true if at least one of the SampleAppenders in the // NeedsThrottling returns true if at least one of the SampleAppenders in the
// Fanout slice is throttled. // Fanout slice is throttled.