mirror of https://github.com/prometheus/prometheus
Move lastSamplePair method up to memorySeries
This implies a slight change of behavior as only samples added to the respective instance of a memorySeries are returned. However, this is most likely anyway what we want. Following cases: - Server has been restarted: Given the time it takes to cleanly shutdown and start up a server, the series are now stale anyway. An improved staleness handling (still to be implemented) will be based on tracking if a given target is continuing to expose samples for a given time series. In that case, we need a full scrape cycle to decide about staleness. So again, it makes sense to consider everything stale directly after a server restart. - Series unarchived due to a read request: The series is definitely stale so we don't want to return anything anyway. - Freshly created time series or series unarchived because of a sample append: That happens because appending a sample is imminent. Before the fingerprint lock is released, the series will have received a sample, and lastSamplePair will always returned the expected value.pull/1409/head
parent
1e13f89039
commit
b876f8e6a5
|
@ -81,13 +81,6 @@ const (
|
|||
// is populated upon creation of a chunkDesc, so it is alway safe to call
|
||||
// firstTime. The firstTime method is arguably not needed and only there for
|
||||
// consistency with lastTime.
|
||||
//
|
||||
// Yet another (deprecated) case is lastSamplePair. It's used in federation and
|
||||
// must be callable without pinning. Locking the fingerprint of the series is
|
||||
// still required (to avoid concurrent appends to the chunk). The call is
|
||||
// relatively expensive because of the required acquisition of the evict
|
||||
// mutex. It will go away, though, once tracking the lastSamplePair has been
|
||||
// moved into the series object.
|
||||
type chunkDesc struct {
|
||||
sync.Mutex // Protects pinning.
|
||||
c chunk // nil if chunk is evicted.
|
||||
|
@ -194,24 +187,6 @@ func (cd *chunkDesc) maybePopulateLastTime() {
|
|||
}
|
||||
}
|
||||
|
||||
// lastSamplePair returns the last sample pair of the underlying chunk, or
|
||||
// ZeroSampleValue if the chunk is evicted. For safe concurrent access, this
|
||||
// method requires the fingerprint of the time series to be locked.
|
||||
// TODO(beorn7): Move up into memorySeries.
|
||||
func (cd *chunkDesc) lastSamplePair() model.SamplePair {
|
||||
cd.Lock()
|
||||
defer cd.Unlock()
|
||||
|
||||
if cd.c == nil {
|
||||
return ZeroSamplePair
|
||||
}
|
||||
it := cd.c.newIterator()
|
||||
return model.SamplePair{
|
||||
Timestamp: it.lastTimestamp(),
|
||||
Value: it.lastSampleValue(),
|
||||
}
|
||||
}
|
||||
|
||||
// isEvicted returns whether the chunk is evicted. For safe concurrent access,
|
||||
// the caller must have locked the fingerprint of the series.
|
||||
func (cd *chunkDesc) isEvicted() bool {
|
||||
|
|
|
@ -162,9 +162,15 @@ type memorySeries struct {
|
|||
// first chunk before its chunk desc is evicted. In doubt, this field is
|
||||
// just set to the oldest possible timestamp.
|
||||
savedFirstTime model.Time
|
||||
// The timestamp of the last sample in this series. Needed for fast access to
|
||||
// ensure timestamp monotonicity during ingestion.
|
||||
// The timestamp of the last sample in this series. Needed for fast
|
||||
// access for federation and to ensure timestamp monotonicity during
|
||||
// ingestion.
|
||||
lastTime model.Time
|
||||
// The last ingested sample value. Needed for fast access for
|
||||
// federation.
|
||||
lastSampleValue model.SampleValue
|
||||
// Whether lastSampleValue has been set already.
|
||||
lastSampleValueSet bool
|
||||
// Whether the current head chunk has already been finished. If true,
|
||||
// the current head chunk must not be modified anymore.
|
||||
headChunkClosed bool
|
||||
|
@ -242,6 +248,8 @@ func (s *memorySeries) add(v *model.SamplePair) int {
|
|||
}
|
||||
|
||||
s.lastTime = v.Timestamp
|
||||
s.lastSampleValue = v.Value
|
||||
s.lastSampleValueSet = true
|
||||
return len(chunks) - 1
|
||||
}
|
||||
|
||||
|
@ -435,8 +443,9 @@ func (s *memorySeries) head() *chunkDesc {
|
|||
return s.chunkDescs[len(s.chunkDescs)-1]
|
||||
}
|
||||
|
||||
// firstTime returns the timestamp of the first sample in the series. The caller
|
||||
// must have locked the fingerprint of the memorySeries.
|
||||
// firstTime returns the timestamp of the first sample in the series.
|
||||
//
|
||||
// The caller must have locked the fingerprint of the memorySeries.
|
||||
func (s *memorySeries) firstTime() model.Time {
|
||||
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
|
||||
return s.chunkDescs[0].firstTime()
|
||||
|
@ -444,6 +453,23 @@ func (s *memorySeries) firstTime() model.Time {
|
|||
return s.savedFirstTime
|
||||
}
|
||||
|
||||
// lastSamplePair returns the last ingested SamplePair. It returns
|
||||
// ZeroSamplePair if this memorySeries has never received a sample (via the add
|
||||
// method), which is the case for freshly unarchived series or newly created
|
||||
// ones and also for all series after a server restart. However, in that case,
|
||||
// series will most likely be considered stale anyway.
|
||||
//
|
||||
// The caller must have locked the fingerprint of the memorySeries.
|
||||
func (s *memorySeries) lastSamplePair() model.SamplePair {
|
||||
if !s.lastSampleValueSet {
|
||||
return ZeroSamplePair
|
||||
}
|
||||
return model.SamplePair{
|
||||
Timestamp: s.lastTime,
|
||||
Value: s.lastSampleValue,
|
||||
}
|
||||
}
|
||||
|
||||
// chunksToPersist returns a slice of chunkDescs eligible for persistence. It's
|
||||
// the caller's responsibility to actually persist the returned chunks
|
||||
// afterwards. The method sets the persistWatermark and the dirty flag
|
||||
|
|
|
@ -354,7 +354,7 @@ func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint)
|
|||
if !ok {
|
||||
return ZeroSamplePair
|
||||
}
|
||||
return series.head().lastSamplePair()
|
||||
return series.lastSamplePair()
|
||||
}
|
||||
|
||||
// boundedIterator wraps a SeriesIterator and does not allow fetching
|
||||
|
@ -576,15 +576,17 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
|
|||
|
||||
if sample.Timestamp <= series.lastTime {
|
||||
s.fpLocker.Unlock(fp)
|
||||
// Don't log and track equal timestamps, as they are a common occurrence
|
||||
// when using client-side timestamps (e.g. Pushgateway or federation).
|
||||
// It would be even better to also compare the sample values here, but
|
||||
// we don't have efficient access to a series's last value.
|
||||
if sample.Timestamp != series.lastTime {
|
||||
s.outOfOrderSamplesCount.Inc()
|
||||
return ErrOutOfOrderSample
|
||||
// Don't report "no-op appends", i.e. where timestamp and sample
|
||||
// value are the same as for the last append, as they are a
|
||||
// common occurrence when using client-side timestamps
|
||||
// (e.g. Pushgateway or federation).
|
||||
if sample.Timestamp == series.lastTime &&
|
||||
series.lastSampleValueSet &&
|
||||
sample.Value == series.lastSampleValue {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
s.outOfOrderSamplesCount.Inc()
|
||||
return ErrOutOfOrderSample
|
||||
}
|
||||
completedChunksCount := series.add(&model.SamplePair{
|
||||
Value: sample.Value,
|
||||
|
|
Loading…
Reference in New Issue