Implement series eviction.

Change-Id: I7a503e0ba78aae3761d032851b06f2807122b085
pull/413/head
Bjoern Rabenstein 10 years ago
parent bbf49200ab
commit 3592dc2359

@ -54,10 +54,6 @@ var (
diskAppendQueueCapacity = flag.Int("storage.queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.")
memoryAppendQueueCapacity = flag.Int("storage.queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.")
deleteInterval = flag.Duration("delete.interval", 11*time.Hour, "The amount of time between deletion of old values.")
deleteAge = flag.Duration("delete.ageMaximum", 15*24*time.Hour, "The relative maximum age for values before they are deleted.")
memoryEvictionInterval = flag.Duration("storage.memory.evictionInterval", 15*time.Minute, "The period at which old data is evicted from memory.")
memoryRetentionPeriod = flag.Duration("storage.memory.retentionPeriod", time.Hour, "The period of time to retain in memory during evictions.")

@ -158,20 +158,21 @@ func (s *memorySeries) add(v *metric.SamplePair, persistQueue chan *persistReque
}
}
func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) {
func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) {
s.mtx.Lock()
defer s.mtx.Unlock()
// For now, always drop the entire range from oldest to t.
for _, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) {
break
return false
}
if cd.chunk == nil {
continue
}
cd.evictOnUnpin()
}
return true
}
// purgeOlderThan returns true if all chunks have been purged.
@ -347,6 +348,14 @@ func (s *memorySeries) values() metric.Values {
return values
}
func (s *memorySeries) firstTime() clientmodel.Timestamp {
return s.chunkDescs[0].firstTime()
}
func (s *memorySeries) lastTime() clientmodel.Timestamp {
return s.head().lastTime()
}
func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
it.mtx.Lock()
defer it.mtx.Unlock()

@ -170,8 +170,19 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, series := range s.fingerprintToSeries {
series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl))
for fp, series := range s.fingerprintToSeries {
if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) {
if err := s.persistence.ArchiveMetric(
fp, series.metric, series.firstTime(), series.lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", series.metric, err)
}
delete(s.fingerprintToSeries, fp)
s.persistQueue <- &persistRequest{
fingerprint: fp,
chunkDesc: series.head(),
}
}
}
}
@ -436,8 +447,13 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
series, ok := s.fingerprintToSeries[fp]
if ok {
// TODO: Does this have to be a copy? Ask Julius!
return series.metric
// Copy required here because caller might mutate the returned
// metric.
m := make(clientmodel.Metric, len(series.metric))
for ln, lv := range series.metric {
m[ln] = lv
}
return m
}
metric, err := s.persistence.GetArchivedMetric(fp)
if err != nil {

Loading…
Cancel
Save