mirror of https://github.com/prometheus/prometheus
Merge pull request #110 from prometheus/julius-query-interpolation
Implement sample interpolation in query layer.pull/111/head
commit
b912f75d11
|
@ -27,33 +27,72 @@ var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default stal
|
|||
var queryStorage metric.Storage = nil
|
||||
|
||||
type viewAdapter struct {
|
||||
view metric.View
|
||||
// TODO: use this.
|
||||
view metric.View
|
||||
stalenessPolicy *metric.StalenessPolicy
|
||||
}
|
||||
|
||||
func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) {
|
||||
var minDelta time.Duration
|
||||
for _, candidate := range samples {
|
||||
// Ignore samples outside of staleness policy window.
|
||||
delta := candidate.Timestamp.Sub(*timestamp)
|
||||
if delta < 0 {
|
||||
delta = -delta
|
||||
}
|
||||
if delta > v.stalenessPolicy.DeltaAllowance {
|
||||
continue
|
||||
}
|
||||
// interpolateSamples interpolates a value at a target time between two
|
||||
// provided sample pairs.
|
||||
func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *model.SamplePair {
|
||||
dv := second.Value - first.Value
|
||||
dt := second.Timestamp.Sub(first.Timestamp)
|
||||
|
||||
// Skip sample if we've seen a closer one before this.
|
||||
if sample != nil {
|
||||
if delta > minDelta {
|
||||
dDt := dv / model.SampleValue(dt)
|
||||
offset := model.SampleValue(timestamp.Sub(first.Timestamp))
|
||||
|
||||
return &model.SamplePair{
|
||||
Value: first.Value + (offset * dDt),
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
// chooseClosestSample chooses the closest sample of a list of samples
|
||||
// surrounding a given target time. If samples are found both before and after
|
||||
// the target time, the sample value is interpolated between these. Otherwise,
|
||||
// the single closest sample is returned verbatim.
|
||||
func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) {
|
||||
var closestBefore *model.SamplePair
|
||||
var closestAfter *model.SamplePair
|
||||
for _, candidate := range samples {
|
||||
delta := candidate.Timestamp.Sub(*timestamp)
|
||||
// Samples before target time.
|
||||
if delta < 0 {
|
||||
// Ignore samples outside of staleness policy window.
|
||||
if -delta > v.stalenessPolicy.DeltaAllowance {
|
||||
continue
|
||||
}
|
||||
// Ignore samples that are farther away than what we've seen before.
|
||||
if closestBefore != nil && candidate.Timestamp.Before(closestBefore.Timestamp) {
|
||||
continue
|
||||
}
|
||||
sample := candidate
|
||||
closestBefore = &sample
|
||||
}
|
||||
|
||||
sample = &candidate
|
||||
minDelta = delta
|
||||
// Samples after target time.
|
||||
if delta >= 0 {
|
||||
// Ignore samples outside of staleness policy window.
|
||||
if delta > v.stalenessPolicy.DeltaAllowance {
|
||||
continue
|
||||
}
|
||||
// Ignore samples that are farther away than samples we've seen before.
|
||||
if closestAfter != nil && candidate.Timestamp.After(closestAfter.Timestamp) {
|
||||
continue
|
||||
}
|
||||
sample := candidate
|
||||
closestAfter = &sample
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case closestBefore != nil && closestAfter != nil:
|
||||
sample = interpolateSamples(closestBefore, closestAfter, *timestamp)
|
||||
case closestBefore != nil:
|
||||
sample = closestBefore
|
||||
default:
|
||||
sample = closestAfter
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue