|
|
|
@ -488,26 +488,28 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
|
|
|
|
batch := leveldb.NewBatch()
|
|
|
|
|
defer batch.Close()
|
|
|
|
|
|
|
|
|
|
mutationCount := 0
|
|
|
|
|
value := &dto.MetricHighWatermark{}
|
|
|
|
|
for fingerprint, samples := range groups {
|
|
|
|
|
value := &dto.MetricHighWatermark{}
|
|
|
|
|
newestSampleTimestamp := samples[len(samples)-1].Timestamp
|
|
|
|
|
value.Reset()
|
|
|
|
|
present, err := l.MetricHighWatermarks.Get(fingerprint.ToDTO(), value)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
newestSampleTimestamp := samples[len(samples)-1].Timestamp
|
|
|
|
|
|
|
|
|
|
if !present {
|
|
|
|
|
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
|
|
|
|
batch.Put(fingerprint.ToDTO(), value)
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// BUG(matt): Repace this with watermark management.
|
|
|
|
|
if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
|
|
|
|
|
continue
|
|
|
|
|
if !newestSampleTimestamp.Before(time.Unix(value.GetTimestamp(), 0)) {
|
|
|
|
|
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
|
|
|
|
batch.Put(fingerprint.ToDTO(), value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
|
|
|
|
batch.Put(fingerprint.ToDTO(), value)
|
|
|
|
|
mutationCount++
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = l.MetricHighWatermarks.Commit(batch)
|
|
|
|
|