Merge pull request #589 from prometheus/beorn7/persistence

Redesign series maintenance and chunk persistence.
pull/604/head
Björn Rabenstein 2015-03-19 17:56:46 +01:00
commit bf5fc720d3
8 changed files with 865 additions and 646 deletions

26
main.go
View File

@ -57,9 +57,9 @@ var (
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 32*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 1024*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.")
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
@ -82,7 +82,7 @@ var (
)
type prometheus struct {
unwrittenSamples chan clientmodel.Samples
incomingSamples chan clientmodel.Samples
ruleManager manager.RuleManager
targetManager retrieval.TargetManager
@ -103,12 +103,12 @@ func NewPrometheus() *prometheus {
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
unwrittenSamples := make(chan clientmodel.Samples, *samplesQueueCapacity)
incomingSamples := make(chan clientmodel.Samples, *samplesQueueCapacity)
ingester := &retrieval.MergeLabelsIngester{
Labels: conf.GlobalLabels(),
CollisionPrefix: clientmodel.ExporterLabelPrefix,
Ingester: retrieval.ChannelIngester(unwrittenSamples),
Ingester: retrieval.ChannelIngester(incomingSamples),
}
targetManager := retrieval.NewTargetManager(ingester)
targetManager.AddTargetsFromConfig(conf)
@ -130,7 +130,7 @@ func NewPrometheus() *prometheus {
}
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
Results: unwrittenSamples,
Results: incomingSamples,
NotificationHandler: notificationHandler,
EvaluationInterval: conf.EvaluationInterval(),
Storage: memStorage,
@ -183,7 +183,7 @@ func NewPrometheus() *prometheus {
}
p := &prometheus{
unwrittenSamples: unwrittenSamples,
incomingSamples: incomingSamples,
ruleManager: ruleManager,
targetManager: targetManager,
@ -217,7 +217,7 @@ func (p *prometheus) Serve() {
}
}()
for samples := range p.unwrittenSamples {
for samples := range p.incomingSamples {
p.storage.AppendSamples(samples)
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Queue(samples)
@ -225,7 +225,7 @@ func (p *prometheus) Serve() {
}
// The following shut-down operations have to happen after
// unwrittenSamples is drained. So do not move them into close().
// incomingSamples is drained. So do not move them into close().
if err := p.storage.Stop(); err != nil {
glog.Error("Error stopping local storage: ", err)
}
@ -257,9 +257,9 @@ func (p *prometheus) close() {
p.targetManager.Stop()
p.ruleManager.Stop()
close(p.unwrittenSamples)
close(p.incomingSamples)
// Note: Before closing the remaining subsystems (storage, ...), we have
// to wait until p.unwrittenSamples is actually drained. Therefore,
// to wait until p.incomingSamples is actually drained. Therefore,
// remaining shut-downs happen in Serve().
}
@ -279,12 +279,12 @@ func (p *prometheus) Collect(ch chan<- registry.Metric) {
ch <- registry.MustNewConstMetric(
samplesQueueCapDesc,
registry.GaugeValue,
float64(cap(p.unwrittenSamples)),
float64(cap(p.incomingSamples)),
)
ch <- registry.MustNewConstMetric(
samplesQueueLenDesc,
registry.GaugeValue,
float64(len(p.unwrittenSamples)),
float64(len(p.incomingSamples)),
)
p.notificationHandler.Collect(ch)
p.storage.Collect(ch)

View File

@ -19,6 +19,7 @@ import (
"os"
"path"
"strings"
"sync/atomic"
"github.com/golang/glog"
@ -74,9 +75,11 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
for fp, s := range fingerprintToSeries {
if _, seen := fpsSeen[fp]; !seen {
// fp exists in fingerprintToSeries, but has no representation on disk.
if s.headChunkPersisted {
// Oops, head chunk was persisted, but nothing on disk.
// Thus, we lost that series completely. Clean up the remnants.
if s.headChunkClosed {
// Oops, everything including the head chunk was
// already persisted, but nothing on disk.
// Thus, we lost that series completely. Clean
// up the remnants.
delete(fingerprintToSeries, fp)
if err := p.purgeArchivedMetric(fp); err != nil {
// Purging the archived metric didn't work, so try
@ -86,10 +89,10 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
glog.Warningf("Lost series detected: fingerprint %v, metric %v.", fp, s.metric)
continue
}
// If we are here, the only chunk we have is the head chunk.
// If we are here, the only chunks we have are the chunks in the checkpoint.
// Adjust things accordingly.
if len(s.chunkDescs) > 1 || s.chunkDescsOffset != 0 {
minLostChunks := len(s.chunkDescs) + s.chunkDescsOffset - 1
if s.persistWatermark > 0 || s.chunkDescsOffset != 0 {
minLostChunks := s.persistWatermark + s.chunkDescsOffset
if minLostChunks <= 0 {
glog.Warningf(
"Possible loss of chunks for fingerprint %v, metric %v.",
@ -101,7 +104,12 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
minLostChunks, fp, s.metric,
)
}
s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:]
s.chunkDescs = append(
make([]*chunkDesc, 0, len(s.chunkDescs)-s.persistWatermark),
s.chunkDescs[s.persistWatermark:]...,
)
numMemChunkDescs.Sub(float64(s.persistWatermark))
s.persistWatermark = 0
s.chunkDescsOffset = 0
}
fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete.
@ -139,15 +147,17 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
// - A file that is empty (after truncation) is deleted.
//
// - A series that is not archived (i.e. it is in the fingerprintToSeries map)
// is checked for consistency of its various parameters (like head-chunk
// persistence state, offset of chunkDescs etc.). In particular, overlap
// between an in-memory head chunk with the most recent persisted chunk is
// is checked for consistency of its various parameters (like persist
// watermark, offset of chunkDescs etc.). In particular, overlap between an
// in-memory head chunk with the most recent persisted chunk is
// checked. Inconsistencies are rectified.
//
// - A series that is archived (i.e. it is not in the fingerprintToSeries map)
// is checked for its presence in the index of archived series. If it cannot
// be found there, it is moved into the orphaned directory.
func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) {
func (p *persistence) sanitizeSeries(
dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries,
) (clientmodel.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name())
purge := func() {
var err error
@ -211,35 +221,38 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint
if s == nil {
panic("fingerprint mapped to nil pointer")
}
if bytesToTrim == 0 && s.chunkDescsOffset != -1 &&
((s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)) ||
(!s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)-1)) {
if bytesToTrim == 0 && s.chunkDescsOffset != -1 && chunksInFile == s.chunkDescsOffset+s.persistWatermark {
// Everything is consistent. We are good.
return fp, true
}
// If we are here, something's fishy.
if s.headChunkPersisted {
// This is the easy case as we don't have a head chunk
// in heads.db. Treat this series as a freshly
// unarchived one. No chunks or chunkDescs in memory, no
// current head chunk.
// If we are here, we cannot be sure the series file is
// consistent with the checkpoint, so we have to take a closer
// look.
if s.headChunkClosed {
// This is the easy case as we don't have any chunks in
// heads.db. Treat this series as a freshly unarchived
// one. No chunks or chunkDescs in memory, no current
// head chunk.
glog.Warningf(
"Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.",
s.metric, fp, chunksInFile,
)
s.chunkDescs = nil
s.chunkDescsOffset = -1
s.persistWatermark = 0
return fp, true
}
// This is the tricky one: We have a head chunk from heads.db,
// but the very same head chunk might already be in the series
// file. Strategy: Check the first time of both. If it is the
// same or newer, assume the latest chunk in the series file
// is the most recent head chunk. If not, keep the head chunk
// we got from heads.db.
// First, assume the head chunk is not yet persisted.
s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:]
s.chunkDescsOffset = -1
// This is the tricky one: We have chunks from heads.db, but
// some of those chunks might already be in the series
// file. Strategy: Take the last time of the most recent chunk
// in the series file. Then find the oldest chunk among those
// from heads.db that has a first time later or equal to the
// last time from the series file. Throw away the older chunks
// from heads.db and stitch the parts together.
// First, throw away the chunkDescs without chunks.
s.chunkDescs = s.chunkDescs[s.persistWatermark:]
numMemChunkDescs.Sub(float64(s.persistWatermark))
// Load all the chunk descs (which assumes we have none from the future).
cds, err := p.loadChunkDescs(fp, clientmodel.Now())
if err != nil {
@ -250,21 +263,35 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint
purge()
return fp, false
}
if cds[len(cds)-1].firstTime().Before(s.head().firstTime()) {
s.chunkDescs = append(cds, s.chunkDescs...)
glog.Warningf(
"Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered head chunk from checkpoint.",
s.metric, fp, chunksInFile,
)
} else {
glog.Warningf(
"Recovered metric %v, fingerprint %v: head chunk found among the %d recovered chunks in series file.",
s.metric, fp, chunksInFile,
)
s.chunkDescs = cds
s.headChunkPersisted = true
}
s.persistWatermark = len(cds)
s.chunkDescsOffset = 0
lastTime := cds[len(cds)-1].lastTime()
keepIdx := -1
for i, cd := range s.chunkDescs {
if cd.firstTime() >= lastTime {
keepIdx = i
break
}
}
if keepIdx == -1 {
glog.Warningf(
"Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.",
s.metric, fp, chunksInFile,
)
numMemChunkDescs.Sub(float64(len(s.chunkDescs)))
atomic.AddInt64(&numMemChunks, int64(-len(s.chunkDescs)))
s.chunkDescs = cds
s.headChunkClosed = true
return fp, true
}
glog.Warningf(
"Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.",
s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx,
)
numMemChunkDescs.Sub(float64(keepIdx))
atomic.AddInt64(&numMemChunks, int64(-keepIdx))
s.chunkDescs = append(cds, s.chunkDescs[keepIdx:]...)
return fp, true
}
// This series is supposed to be archived.
@ -348,6 +375,7 @@ func (p *persistence) cleanUpArchiveIndexes(
}
series.chunkDescs = cds
series.chunkDescsOffset = 0
series.persistWatermark = len(cds)
fpToSeries[clientmodel.Fingerprint(fp)] = series
return nil
}); err != nil {

View File

@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
@ -48,10 +49,11 @@ const (
seriesTempFileSuffix = ".db.tmp"
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
headsFileName = "heads.db"
headsTempFileName = "heads.db.tmp"
headsFormatVersion = 1
headsMagicString = "PrometheusHeads"
headsFileName = "heads.db"
headsTempFileName = "heads.db.tmp"
headsFormatVersion = 2
headsFormatLegacyVersion = 1 // Can read, but will never write.
headsMagicString = "PrometheusHeads"
dirtyFileName = "DIRTY"
@ -326,7 +328,13 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie
// the (zero-based) index of the first persisted chunk within the series
// file. In case of an error, the returned index is -1 (to avoid the
// misconception that the chunk was written at position 0).
func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk) (int, error) {
func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk) (index int, err error) {
defer func() {
if err != nil {
glog.Error("Error persisting chunks: ", err)
p.setDirty(true)
}
}()
f, err := p.openChunkFileForWriting(fp)
if err != nil {
@ -334,27 +342,16 @@ func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk)
}
defer f.Close()
b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+chunkLen))
for _, c := range chunks {
err = writeChunkHeader(b, c)
if err != nil {
return -1, err
}
err = c.marshal(b)
if err != nil {
return -1, err
}
if err := writeChunks(f, chunks); err != nil {
return -1, err
}
// Determine index within the file.
b.Flush()
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return -1, err
}
index, err := p.chunkIndexForOffset(offset)
index, err = chunkIndexForOffset(offset)
if err != nil {
return -1, err
}
@ -377,7 +374,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde
chunks := make([]chunk, 0, len(indexes))
typeBuf := make([]byte, 1)
for _, idx := range indexes {
_, err := f.Seek(p.offsetForChunkIndex(idx+indexOffset), os.SEEK_SET)
_, err := f.Seek(offsetForChunkIndex(idx+indexOffset), os.SEEK_SET)
if err != nil {
return nil, err
}
@ -398,6 +395,8 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde
chunk.unmarshal(f)
chunks = append(chunks, chunk)
}
chunkOps.WithLabelValues(load).Add(float64(len(chunks)))
atomic.AddInt64(&numMemChunks, int64(len(chunks)))
return chunks, nil
}
@ -430,7 +429,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
numChunks := int(fi.Size()) / totalChunkLen
cds := make([]*chunkDesc, 0, numChunks)
for i := 0; i < numChunks; i++ {
_, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
if err != nil {
return nil, err
}
@ -456,10 +455,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
}
// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
// and all open (non-full) head chunks. Do not call concurrently with
// loadSeriesMapAndHeads.
// and all non persisted chunks. Do not call concurrently with
// loadSeriesMapAndHeads. This method will only write heads format v2, but
// loadSeriesMapAndHeads can also understand v1.
//
// Description of the file format:
// Description of the file format (for both, v1 and v2):
//
// (1) Magic string (const headsMagicString).
//
@ -469,33 +469,33 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
//
// (4) Repeated once per series:
//
// (4.1) A flag byte, see flag constants above.
// (4.1) A flag byte, see flag constants above. (Present but unused in v2.)
//
// (4.2) The fingerprint as big-endian uint64.
//
// (4.3) The metric as defined by codable.Metric.
//
// (4.4) The varint-encoded chunkDescsOffset.
// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
//
// (4.5) The varint-encoded savedFirstTime.
// (4.5) The varint-encoded chunkDescsOffset.
//
// (4.6) The varint-encoded number of chunk descriptors.
// (4.6) The varint-encoded savedFirstTime.
//
// (4.7) Repeated once per chunk descriptor, oldest to most recent:
// (4.7) The varint-encoded number of chunk descriptors.
//
// (4.7.1) The varint-encoded first time.
// (4.8) Repeated once per chunk descriptor, oldest to most recent, either
// variant 4.8.1 (if index < persistWatermark) or variant 4.8.2 (if index >=
// persistWatermark). In v1, everything is variant 4.8.1 except for a
// non-persisted head-chunk (determined by the flags).
//
// (4.7.2) The varint-encoded last time.
// (4.8.1.1) The varint-encoded first time.
// (4.8.1.2) The varint-encoded last time.
//
// (4.8) Exception to 4.7: If the most recent chunk is a non-persisted head chunk,
// the following is persisted instead of the most recent chunk descriptor:
//
// (4.8.1) A byte defining the chunk type.
//
// (4.8.2) The head chunk itself, marshaled with the marshal() method.
// (4.8.2.1) A byte defining the chunk type.
// (4.8.2.2) The chunk itself, marshaled with the marshal() method.
//
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
glog.Info("Checkpointing in-memory metrics and head chunks...")
glog.Info("Checkpointing in-memory metrics and chunks...")
begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
@ -515,7 +515,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
err = os.Rename(p.headsTempFileName(), p.headsFileName())
duration := time.Since(begin)
p.checkpointDuration.Set(float64(duration) / float64(time.Millisecond))
glog.Infof("Done checkpointing in-memory metrics and head chunks in %v.", duration)
glog.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
}()
w := bufio.NewWriterSize(f, fileBufSize)
@ -553,11 +553,8 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
realNumberOfSeries++
var seriesFlags byte
if m.series.headChunkPersisted {
seriesFlags |= flagHeadChunkPersisted
}
if err = w.WriteByte(seriesFlags); err != nil {
// seriesFlags left empty in v2.
if err = w.WriteByte(0); err != nil {
return
}
if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil {
@ -569,6 +566,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
w.Write(buf)
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
return
}
@ -579,7 +579,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
for i, chunkDesc := range m.series.chunkDescs {
if m.series.headChunkPersisted || i < len(m.series.chunkDescs)-1 {
if i < m.series.persistWatermark {
if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return
}
@ -596,6 +596,8 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
}
}
// Series is checkpointed now, so declare it clean.
m.series.dirty = false
}()
if err != nil {
return
@ -618,12 +620,14 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
// open (non-full) head chunks. If recoverable corruption is detected, or if the
// dirty flag was set from the beginning, crash recovery is run, which might
// take a while. If an unrecoverable error is encountered, it is returned. Call
// this method during start-up while nothing else is running in storage
// land. This method is utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
// the chunks contained in the checkpoint (and thus not yet persisted to series
// files). The method is capable of loading the checkpoint format v1 and v2. If
// recoverable corruption is detected, or if the dirty flag was set from the
// beginning, crash recovery is run, which might take a while. If an
// unrecoverable error is encountered, it is returned. Call this method during
// start-up while nothing else is running in storage land. This method is
// utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen int64, err error) {
var chunkDescsTotal int64
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
sm = &seriesMap{m: fingerprintToSeries}
@ -643,7 +647,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
f, err := os.Open(p.headsFileName())
if os.IsNotExist(err) {
return sm, nil
return sm, 0, nil
}
if err != nil {
glog.Warning("Could not open heads file:", err)
@ -657,7 +661,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
if _, err := io.ReadFull(r, buf); err != nil {
glog.Warning("Could not read from heads file:", err)
p.dirty = true
return sm, nil
return sm, 0, nil
}
magic := string(buf)
if magic != headsMagicString {
@ -668,16 +672,17 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
p.dirty = true
return
}
if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil {
version, err := binary.ReadVarint(r)
if (version != headsFormatVersion && version != headsFormatLegacyVersion) || err != nil {
glog.Warningf("unknown heads format version, want %d", headsFormatVersion)
p.dirty = true
return sm, nil
return sm, 0, nil
}
numSeries, err := codable.DecodeUint64(r)
if err != nil {
glog.Warning("Could not decode number of series:", err)
p.dirty = true
return sm, nil
return sm, 0, nil
}
for ; numSeries > 0; numSeries-- {
@ -685,173 +690,276 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
if err != nil {
glog.Warning("Could not read series flags:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codable.DecodeUint64(r)
if err != nil {
glog.Warning("Could not decode fingerprint:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
var metric codable.Metric
if err := metric.UnmarshalFromReader(r); err != nil {
glog.Warning("Could not decode metric:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
var persistWatermark int64
if version != headsFormatLegacyVersion {
// persistWatermark only present in v2.
persistWatermark, err = binary.ReadVarint(r)
if err != nil {
glog.Warning("Could not decode persist watermark:", err)
p.dirty = true
return sm, persistQueueLen, nil
}
}
chunkDescsOffset, err := binary.ReadVarint(r)
if err != nil {
glog.Warning("Could not decode chunk descriptor offset:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
savedFirstTime, err := binary.ReadVarint(r)
if err != nil {
glog.Warning("Could not decode saved first time:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
numChunkDescs, err := binary.ReadVarint(r)
if err != nil {
glog.Warning("Could not decode number of chunk descriptors:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
chunkDescs := make([]*chunkDesc, numChunkDescs)
if version == headsFormatLegacyVersion {
if headChunkPersisted {
persistWatermark = numChunkDescs
} else {
persistWatermark = numChunkDescs - 1
}
}
for i := int64(0); i < numChunkDescs; i++ {
if headChunkPersisted || i < numChunkDescs-1 {
if i < persistWatermark {
firstTime, err := binary.ReadVarint(r)
if err != nil {
glog.Warning("Could not decode first time:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
lastTime, err := binary.ReadVarint(r)
if err != nil {
glog.Warning("Could not decode last time:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
chunkDescs[i] = &chunkDesc{
chunkFirstTime: clientmodel.Timestamp(firstTime),
chunkLastTime: clientmodel.Timestamp(lastTime),
}
chunkDescsTotal++
} else {
// Non-persisted head chunk.
// Non-persisted chunk.
encoding, err := r.ReadByte()
if err != nil {
glog.Warning("Could not decode chunk type:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
chunk := newChunkForEncoding(chunkEncoding(encoding))
if err := chunk.unmarshal(r); err != nil {
glog.Warning("Could not decode chunk type:", err)
p.dirty = true
return sm, nil
return sm, persistQueueLen, nil
}
chunkDescs[i] = newChunkDesc(chunk)
persistQueueLen++
}
}
chunkDescsTotal += numChunkDescs
if !headChunkPersisted {
// In this case, we have created a chunkDesc with
// newChunkDesc, which will count itself automatically.
// Correct for that by decrementing the count.
chunkDescsTotal--
}
fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{
metric: clientmodel.Metric(metric),
chunkDescs: chunkDescs,
chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
headChunkPersisted: headChunkPersisted,
metric: clientmodel.Metric(metric),
chunkDescs: chunkDescs,
persistWatermark: int(persistWatermark),
chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
headChunkClosed: persistWatermark >= numChunkDescs,
}
}
return sm, nil
return sm, persistQueueLen, nil
}
// dropChunks deletes all chunks from a series whose last sample time is before
// beforeTime. It returns the timestamp of the first sample in the oldest chunk
// _not_ dropped, the number of deleted chunks, and true if all chunks of the
// series have been deleted (in which case the returned timestamp will be 0 and
// must be ignored). It is the caller's responsibility to make sure nothing is
// persisted or loaded for the same fingerprint concurrently.
func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (
// dropAndPersistChunks deletes all chunks from a series file whose last sample
// time is before beforeTime, and then appends the provided chunks, leaving out
// those whose last sample time is before beforeTime. It returns the timestamp
// of the first sample in the oldest chunk _not_ dropped, the offset within the
// series file of the first chunk persisted (out of the provided chunks), the
// number of deleted chunks, and true if all chunks of the series have been
// deleted (in which case the returned timestamp will be 0 and must be ignored).
// It is the caller's responsibility to make sure nothing is persisted or loaded
// for the same fingerprint concurrently.
func (p *persistence) dropAndPersistChunks(
fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp, chunks []chunk,
) (
firstTimeNotDropped clientmodel.Timestamp,
offset int,
numDropped int,
allDropped bool,
err error,
) {
// Style note: With the many return values, it was decided to use naked
// returns in this method. They make the method more readable, but
// please handle with care!
defer func() {
if err != nil {
glog.Error("Error dropping and/or persisting chunks: ", err)
p.setDirty(true)
}
}()
if len(chunks) > 0 {
// We have chunks to persist. First check if those are already
// too old. If that's the case, the chunks in the series file
// are all too old, too.
i := 0
for ; i < len(chunks) && chunks[i].lastTime().Before(beforeTime); i++ {
}
if i < len(chunks) {
firstTimeNotDropped = chunks[i].firstTime()
}
if i > 0 || firstTimeNotDropped.Before(beforeTime) {
// Series file has to go.
if numDropped, err = p.deleteSeriesFile(fp); err != nil {
return
}
numDropped += i
if i == len(chunks) {
allDropped = true
return
}
// Now simply persist what has to be persisted to a new file.
_, err = p.persistChunks(fp, chunks[i:])
return
}
}
// If we are here, we have to check the series file itself.
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
return 0, 0, true, nil
// No series file. Only need to create new file with chunks to
// persist, if there are any.
if len(chunks) == 0 {
allDropped = true
err = nil // Do not report not-exist err.
return
}
offset, err = p.persistChunks(fp, chunks)
return
}
if err != nil {
return 0, 0, false, err
return
}
defer f.Close()
// Find the first chunk that should be kept.
var i int
var firstTime clientmodel.Timestamp
for ; ; i++ {
_, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
// Find the first chunk in the file that should be kept.
for ; ; numDropped++ {
_, err = f.Seek(offsetForChunkIndex(numDropped), os.SEEK_SET)
if err != nil {
return 0, 0, false, err
return
}
timeBuf := make([]byte, 16)
_, err = io.ReadAtLeast(f, timeBuf, 16)
headerBuf := make([]byte, chunkHeaderLen)
_, err = io.ReadAtLeast(f, headerBuf, chunkHeaderLen)
if err == io.EOF {
// We ran into the end of the file without finding any chunks that should
// be kept. Remove the whole file.
chunkOps.WithLabelValues(drop).Add(float64(i))
if err := os.Remove(f.Name()); err != nil {
return 0, 0, true, err
if numDropped, err = p.deleteSeriesFile(fp); err != nil {
return
}
return 0, i, true, nil
if len(chunks) == 0 {
allDropped = true
return
}
offset, err = p.persistChunks(fp, chunks)
return
}
if err != nil {
return 0, 0, false, err
return
}
lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf[8:]))
lastTime := clientmodel.Timestamp(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
)
if !lastTime.Before(beforeTime) {
firstTime = clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf))
chunkOps.WithLabelValues(drop).Add(float64(i))
firstTimeNotDropped = clientmodel.Timestamp(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
chunkOps.WithLabelValues(drop).Add(float64(numDropped))
break
}
}
// We've found the first chunk that should be kept. Seek backwards to the
// beginning of its header and start copying everything from there into a new
// file.
_, err = f.Seek(-(chunkHeaderFirstTimeOffset + 16), os.SEEK_CUR)
// We've found the first chunk that should be kept. If it is the first
// one, just append the chunks.
if numDropped == 0 {
if len(chunks) > 0 {
offset, err = p.persistChunks(fp, chunks)
}
return
}
// Otherwise, seek backwards to the beginning of its header and start
// copying everything from there into a new file. Then append the chunks
// to the new file.
_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
if err != nil {
return 0, 0, false, err
return
}
temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
return 0, 0, false, err
return
}
defer temp.Close()
defer func() {
temp.Close()
if err == nil {
err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
}
}()
if _, err := io.Copy(temp, f); err != nil {
return 0, 0, false, err
written, err := io.Copy(temp, f)
if err != nil {
return
}
offset = int(written / (chunkHeaderLen + chunkLen))
if err := os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)); err != nil {
return 0, 0, false, err
if len(chunks) > 0 {
if err = writeChunks(temp, chunks); err != nil {
return
}
}
return firstTime, i, false, nil
return
}
// deleteSeriesFile deletes a series file belonging to the provided
// fingerprint. It returns the number of chunks that were contained in the
// deleted file.
func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error) {
fname := p.fileNameForFingerprint(fp)
fi, err := os.Stat(fname)
if os.IsNotExist(err) {
// Great. The file is already gone.
return 0, nil
}
if err != nil {
return -1, err
}
numChunks := int(fi.Size() / (chunkHeaderLen + chunkLen))
if err := os.Remove(fname); err != nil {
return -1, err
}
chunkOps.WithLabelValues(drop).Add(float64(numChunks))
return numChunks, nil
}
// indexMetric queues the given metric for addition to the indexes needed by
@ -1083,7 +1191,7 @@ func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.F
// NOTE: Although the file was opened for append,
// f.Seek(0, os.SEEK_CUR)
// would now return '0, nil', so we cannot check for a consistent file length right now.
// However, the chunkIndexForOffset method is doing that check, so a wrong file length
// However, the chunkIndexForOffset function is doing that check, so a wrong file length
// would still be detected.
}
@ -1091,29 +1199,6 @@ func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.F
return os.Open(p.fileNameForFingerprint(fp))
}
func writeChunkHeader(w io.Writer, c chunk) error {
header := make([]byte, chunkHeaderLen)
header[chunkHeaderTypeOffset] = byte(c.encoding())
binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()))
binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime()))
_, err := w.Write(header)
return err
}
func (p *persistence) offsetForChunkIndex(i int) int64 {
return int64(i * (chunkHeaderLen + chunkLen))
}
func (p *persistence) chunkIndexForOffset(offset int64) (int, error) {
if int(offset)%(chunkHeaderLen+chunkLen) != 0 {
return -1, fmt.Errorf(
"offset %d is not a multiple of on-disk chunk length %d",
offset, chunkHeaderLen+chunkLen,
)
}
return int(offset) / (chunkHeaderLen + chunkLen), nil
}
func (p *persistence) headsFileName() string {
return path.Join(p.basePath, headsFileName)
}
@ -1224,3 +1309,40 @@ loop:
}
close(p.indexingStopped)
}
func offsetForChunkIndex(i int) int64 {
return int64(i * (chunkHeaderLen + chunkLen))
}
func chunkIndexForOffset(offset int64) (int, error) {
if int(offset)%(chunkHeaderLen+chunkLen) != 0 {
return -1, fmt.Errorf(
"offset %d is not a multiple of on-disk chunk length %d",
offset, chunkHeaderLen+chunkLen,
)
}
return int(offset) / (chunkHeaderLen + chunkLen), nil
}
func writeChunkHeader(w io.Writer, c chunk) error {
header := make([]byte, chunkHeaderLen)
header[chunkHeaderTypeOffset] = byte(c.encoding())
binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()))
binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime()))
_, err := w.Write(header)
return err
}
func writeChunks(w io.Writer, chunks []chunk) error {
b := bufio.NewWriterSize(w, len(chunks)*(chunkHeaderLen+chunkLen))
for _, chunk := range chunks {
if err := writeChunkHeader(b, chunk); err != nil {
return err
}
if err := chunk.marshal(b); err != nil {
return err
}
}
return b.Flush()
}

View File

@ -29,6 +29,8 @@ var (
m1 = clientmodel.Metric{"label": "value1"}
m2 = clientmodel.Metric{"label": "value2"}
m3 = clientmodel.Metric{"label": "value3"}
m4 = clientmodel.Metric{"label": "value4"}
m5 = clientmodel.Metric{"label": "value5"}
)
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) {
@ -83,14 +85,22 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
fpToChunks := buildTestChunks(encoding)
for fp, chunks := range fpToChunks {
for i, c := range chunks {
index, err := p.persistChunks(fp, []chunk{c})
if err != nil {
t.Fatal(err)
}
if i != index {
t.Errorf("Want chunk index %d, got %d.", i, index)
}
firstTimeNotDropped, offset, numDropped, allDropped, err :=
p.dropAndPersistChunks(fp, clientmodel.Earliest, chunks)
if err != nil {
t.Fatal(err)
}
if got, want := firstTimeNotDropped, clientmodel.Timestamp(0); got != want {
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
}
if got, want := offset, 0; got != want {
t.Errorf("Want offset %v, got %v.", got, want)
}
if got, want := numDropped, 0; got != want {
t.Errorf("Want numDropped %v, got %v.", got, want)
}
if allDropped {
t.Error("All dropped.")
}
}
@ -139,10 +149,13 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
}
// Drop half of the chunks.
for fp, expectedChunks := range fpToChunks {
firstTime, numDropped, allDropped, err := p.dropChunks(fp, 5)
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 5, nil)
if err != nil {
t.Fatal(err)
}
if offset != 5 {
t.Errorf("want offset 5, got %d", offset)
}
if firstTime != 5 {
t.Errorf("want first time 5, got %d", firstTime)
}
@ -168,13 +181,16 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
}
// Drop all the chunks.
for fp := range fpToChunks {
firstTime, numDropped, allDropped, err := p.dropChunks(fp, 100)
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 100, nil)
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if numDropped != 5 {
t.Errorf("want 5 dropped chunks, got %v", numDropped)
}
@ -182,6 +198,144 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Error("not all chunks dropped")
}
}
// Re-add first two of the chunks.
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
p.dropAndPersistChunks(fp, clientmodel.Earliest, chunks[:2])
if err != nil {
t.Fatal(err)
}
if got, want := firstTimeNotDropped, clientmodel.Timestamp(0); got != want {
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
}
if got, want := offset, 0; got != want {
t.Errorf("Want offset %v, got %v.", got, want)
}
if got, want := numDropped, 0; got != want {
t.Errorf("Want numDropped %v, got %v.", got, want)
}
if allDropped {
t.Error("All dropped.")
}
}
// Drop the first of the chunks while adding two more.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, chunks[2:4])
if err != nil {
t.Fatal(err)
}
if offset != 1 {
t.Errorf("want offset 1, got %d", offset)
}
if firstTime != 1 {
t.Errorf("want first time 1, got %d", firstTime)
}
if numDropped != 1 {
t.Errorf("want 1 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
wantChunks := chunks[1:4]
indexes := make([]int, len(wantChunks))
for i := range indexes {
indexes[i] = i
}
gotChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for i, wantChunk := range wantChunks {
if !chunksEqual(wantChunk, gotChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
// Drop all the chunks while adding two more.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 4, chunks[4:6])
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 4 {
t.Errorf("want first time 4, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunks, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
wantChunks := chunks[4:6]
indexes := make([]int, len(wantChunks))
for i := range indexes {
indexes[i] = i
}
gotChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for i, wantChunk := range wantChunks {
if !chunksEqual(wantChunk, gotChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
// While adding two more, drop all but one of the added ones.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 7, chunks[6:8])
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 7 {
t.Errorf("want first time 7, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunks, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
wantChunks := chunks[7:8]
indexes := make([]int, len(wantChunks))
for i := range indexes {
indexes[i] = i
}
gotChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for i, wantChunk := range wantChunks {
if !chunksEqual(wantChunk, gotChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
// While adding two more, drop all chunks including the added ones.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 10, chunks[8:])
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunks, got %v", numDropped)
}
if !allDropped {
t.Error("not all chunks dropped")
}
}
}
func TestPersistLoadDropChunksType0(t *testing.T) {
@ -201,23 +355,41 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
s1 := newMemorySeries(m1, true, 0)
s2 := newMemorySeries(m2, false, 0)
s3 := newMemorySeries(m3, false, 0)
s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkPersisted = true
s4 := newMemorySeries(m4, true, 0)
s5 := newMemorySeries(m5, true, 0)
s1.add(&metric.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(&metric.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true
s3.persistWatermark = 1
for i := 0; i < 10000; i++ {
s4.add(&metric.SamplePair{
Timestamp: clientmodel.Timestamp(i),
Value: clientmodel.SampleValue(i) / 2,
})
s5.add(&metric.SamplePair{
Timestamp: clientmodel.Timestamp(i),
Value: clientmodel.SampleValue(i * i),
})
}
s5.persistWatermark = 3
chunkCountS4 := len(s4.chunkDescs)
chunkCountS5 := len(s5.chunkDescs)
sm.put(m1.Fingerprint(), s1)
sm.put(m2.Fingerprint(), s2)
sm.put(m3.Fingerprint(), s3)
sm.put(m4.Fingerprint(), s4)
sm.put(m5.Fingerprint(), s5)
if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil {
t.Fatal(err)
}
loadedSM, err := p.loadSeriesMapAndHeads()
loadedSM, _, err := p.loadSeriesMapAndHeads()
if err != nil {
t.Fatal(err)
}
if loadedSM.length() != 2 {
t.Errorf("want 2 series in map, got %d", loadedSM.length())
if loadedSM.length() != 4 {
t.Errorf("want 4 series in map, got %d", loadedSM.length())
}
if loadedS1, ok := loadedSM.get(m1.Fingerprint()); ok {
if !reflect.DeepEqual(loadedS1.metric, m1) {
@ -229,8 +401,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS1.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS1.chunkDescsOffset)
}
if loadedS1.headChunkPersisted {
t.Error("headChunkPersisted is true")
if loadedS1.headChunkClosed {
t.Error("headChunkClosed is true")
}
} else {
t.Errorf("couldn't find %v in loaded map", m1)
@ -245,11 +417,61 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS3.chunkDescsOffset != -1 {
t.Errorf("want chunkDescsOffset -1, got %d", loadedS3.chunkDescsOffset)
}
if !loadedS3.headChunkPersisted {
t.Error("headChunkPersisted is false")
if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false")
}
} else {
t.Errorf("couldn't find %v in loaded map", m1)
t.Errorf("couldn't find %v in loaded map", m3)
}
if loadedS4, ok := loadedSM.get(m4.Fingerprint()); ok {
if !reflect.DeepEqual(loadedS4.metric, m4) {
t.Errorf("want metric %v, got %v", m4, loadedS4.metric)
}
if got, want := len(loadedS4.chunkDescs), chunkCountS4; got != want {
t.Errorf("got %d chunkDescs, want %d", got, want)
}
if got, want := loadedS4.persistWatermark, 0; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want)
}
if loadedS4.chunkDescs[2].isEvicted() {
t.Error("3rd chunk evicted")
}
if loadedS4.chunkDescs[3].isEvicted() {
t.Error("4th chunk evicted")
}
if loadedS4.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS4.chunkDescsOffset)
}
if loadedS4.headChunkClosed {
t.Error("headChunkClosed is true")
}
} else {
t.Errorf("couldn't find %v in loaded map", m4)
}
if loadedS5, ok := loadedSM.get(m5.Fingerprint()); ok {
if !reflect.DeepEqual(loadedS5.metric, m5) {
t.Errorf("want metric %v, got %v", m5, loadedS5.metric)
}
if got, want := len(loadedS5.chunkDescs), chunkCountS5; got != want {
t.Errorf("got %d chunkDescs, want %d", got, want)
}
if got, want := loadedS5.persistWatermark, 3; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want)
}
if !loadedS5.chunkDescs[2].isEvicted() {
t.Error("3rd chunk not evicted")
}
if loadedS5.chunkDescs[3].isEvicted() {
t.Error("4th chunk evicted")
}
if loadedS5.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS5.chunkDescsOffset)
}
if loadedS5.headChunkClosed {
t.Error("headChunkClosed is true")
}
} else {
t.Errorf("couldn't find %v in loaded map", m5)
}
}

View File

@ -16,19 +16,23 @@ package local
import (
"sort"
"sync"
"sync/atomic"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
// chunkDescEvictionFactor is a factor used for chunkDesc eviction (as opposed
// to evictions of chunks, see method evictOlderThan. A chunk takes about 20x
// more memory than a chunkDesc. With a chunkDescEvictionFactor of 10, not more
// than a third of the total memory taken by a series will be used for
// chunkDescs.
const chunkDescEvictionFactor = 10
const (
// chunkDescEvictionFactor is a factor used for chunkDesc eviction (as opposed
// to evictions of chunks, see method evictOlderThan. A chunk takes about 20x
// more memory than a chunkDesc. With a chunkDescEvictionFactor of 10, not more
// than a third of the total memory taken by a series will be used for
// chunkDescs.
chunkDescEvictionFactor = 10
headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
)
// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer.
type fingerprintSeriesPair struct {
@ -135,29 +139,36 @@ type memorySeries struct {
metric clientmodel.Metric
// Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs []*chunkDesc
// The index (within chunkDescs above) of the first chunkDesc that
// points to a non-persisted chunk. If all chunks are persisted, then
// persistWatermark == len(chunkDescs).
persistWatermark int
// The chunkDescs in memory might not have all the chunkDescs for the
// chunks that are persisted to disk. The missing chunkDescs are all
// contiguous and at the tail end. chunkDescsOffset is the index of the
// chunk on disk that corresponds to the first chunkDesc in memory. If
// it is 0, the chunkDescs are all loaded. A value of -1 denotes a
// special case: There are chunks on disk, but the offset to the
// chunkDescs in memory is unknown. Also, there is no overlap between
// chunks on disk and chunks in memory (implying that upon first
// persisting of a chunk in memory, the offset has to be set).
// chunkDescs in memory is unknown. Also, in this special case, there is
// no overlap between chunks on disk and chunks in memory (implying that
// upon first persisting of a chunk in memory, the offset has to be
// set).
chunkDescsOffset int
// The savedFirstTime field is used as a fallback when the
// chunkDescsOffset is not 0. It can be used to save the firstTime of the
// first chunk before its chunk desc is evicted. In doubt, this field is
// just set to the oldest possible timestamp.
savedFirstTime clientmodel.Timestamp
// Whether the current head chunk has already been scheduled to be
// persisted. If true, the current head chunk must not be modified
// anymore.
headChunkPersisted bool
// Whether the current head chunk has already been finished. If true,
// the current head chunk must not be modified anymore.
headChunkClosed bool
// Whether the current head chunk is used by an iterator. In that case,
// a non-persisted head chunk has to be cloned before more samples are
// a non-closed head chunk has to be cloned before more samples are
// appended.
headChunkUsedByIterator bool
// Whether the series is inconsistent with the last checkpoint in a way
// that would require a disk seek during crash recovery.
dirty bool
}
// newMemorySeries returns a pointer to a newly allocated memorySeries for the
@ -171,13 +182,13 @@ func newMemorySeries(
reallyNew bool,
firstTime clientmodel.Timestamp,
) *memorySeries {
if reallyNew {
if !reallyNew {
firstTime = clientmodel.Earliest
}
s := memorySeries{
metric: m,
headChunkPersisted: !reallyNew,
savedFirstTime: firstTime,
metric: m,
headChunkClosed: !reallyNew,
savedFirstTime: firstTime,
}
if !reallyNew {
s.chunkDescsOffset = -1
@ -185,14 +196,15 @@ func newMemorySeries(
return &s
}
// add adds a sample pair to the series.
// It returns chunkDescs that must be queued to be persisted.
// add adds a sample pair to the series. It returns the number of newly
// completed chunks (which are now eligible for persistence).
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc {
if len(s.chunkDescs) == 0 || s.headChunkPersisted {
func (s *memorySeries) add(v *metric.SamplePair) int {
if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := newChunkDesc(newChunk())
s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkPersisted = false
s.headChunkClosed = false
} else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 {
// We only need to clone the head chunk if the current head
// chunk was used in an iterator at all and if the refCount is
@ -213,19 +225,29 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*
chunks := s.head().add(v)
s.head().chunk = chunks[0]
var chunkDescsToPersist []*chunkDesc
if len(chunks) > 1 {
chunkDescsToPersist = append(chunkDescsToPersist, s.head())
for i, c := range chunks[1:] {
cd := newChunkDesc(c)
s.chunkDescs = append(s.chunkDescs, cd)
// The last chunk is still growing.
if i < len(chunks[1:])-1 {
chunkDescsToPersist = append(chunkDescsToPersist, cd)
}
}
for _, c := range chunks[1:] {
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c))
}
return chunkDescsToPersist
return len(chunks) - 1
}
// maybeCloseHeadChunk closes the head chunk if it has not been touched for the
// duration of headChunkTimeout. It returns whether the head chunk was closed.
// If the head chunk is already closed, the method is a no-op and returns false.
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) maybeCloseHeadChunk() bool {
if s.headChunkClosed {
return false
}
if time.Now().Sub(s.head().lastTime().Time()) > headChunkTimeout {
s.headChunkClosed = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
s.headChunkUsedByIterator = false
return true
}
return false
}
// evictChunkDescs evicts chunkDescs if there are chunkDescEvictionFactor times
@ -237,20 +259,20 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
s.savedFirstTime = s.firstTime()
lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted
s.persistWatermark -= lenEvicted
chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted))
numMemChunkDescs.Sub(float64(lenEvicted))
s.chunkDescs = append(
make([]*chunkDesc, 0, lenToKeep),
s.chunkDescs[lenEvicted:]...,
)
s.dirty = true
}
}
// dropChunks removes chunkDescs older than t. It returns the number of dropped
// chunkDescs and true if all chunkDescs have been dropped.
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) dropChunks(t clientmodel.Timestamp) (int, bool) {
// dropChunks removes chunkDescs older than t. The caller must have locked the
// fingerprint of the series.
func (s *memorySeries) dropChunks(t clientmodel.Timestamp) {
keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) {
@ -259,10 +281,20 @@ func (s *memorySeries) dropChunks(t clientmodel.Timestamp) (int, bool) {
}
}
if keepIdx > 0 {
s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...)
s.chunkDescs = append(
make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx),
s.chunkDescs[keepIdx:]...,
)
s.persistWatermark -= keepIdx
if s.persistWatermark < 0 {
panic("dropped unpersisted chunks from memory")
}
if s.chunkDescsOffset != -1 {
s.chunkDescsOffset += keepIdx
}
numMemChunkDescs.Sub(float64(keepIdx))
s.dirty = true
}
return keepIdx, len(s.chunkDescs) == 0
}
// preloadChunks is an internal helper method.
@ -296,10 +328,7 @@ func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([
for i, c := range chunks {
s.chunkDescs[loadIndexes[i]].setChunk(c)
}
chunkOps.WithLabelValues(load).Add(float64(len(chunks)))
atomic.AddInt64(&numMemChunks, int64(len(chunks)))
}
return pinnedChunkDescs, nil
}
@ -351,6 +380,7 @@ func (s *memorySeries) preloadChunksForRange(
}
s.chunkDescs = append(cds, s.chunkDescs...)
s.chunkDescsOffset = 0
s.persistWatermark += len(cds)
}
if len(s.chunkDescs) == 0 {
@ -385,7 +415,7 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
chunks := make([]chunk, 0, len(s.chunkDescs))
for i, cd := range s.chunkDescs {
if chunk := cd.getChunk(); chunk != nil {
if i == len(s.chunkDescs)-1 && !s.headChunkPersisted {
if i == len(s.chunkDescs)-1 && !s.headChunkClosed {
s.headChunkUsedByIterator = true
}
chunks = append(chunks, chunk)
@ -415,6 +445,26 @@ func (s *memorySeries) firstTime() clientmodel.Timestamp {
return s.savedFirstTime
}
// getChunksToPersist returns a slice of chunkDescs eligible for
// persistence. It's the caller's responsibility to actually persist the
// returned chunks afterwards. The method sets the persistWatermark and the
// dirty flag accordingly.
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) getChunksToPersist() []*chunkDesc {
newWatermark := len(s.chunkDescs)
if !s.headChunkClosed {
newWatermark--
}
if newWatermark == s.persistWatermark {
return nil
}
cds := s.chunkDescs[s.persistWatermark:newWatermark]
s.dirty = true
s.persistWatermark = newWatermark
return cds
}
// memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct {
lock, unlock func()

View File

@ -34,29 +34,22 @@ const (
// See waitForNextFP.
fpMaxWaitDuration = 10 * time.Second
fpMinWaitDuration = 20 * time.Millisecond // A small multiple of disk seek time.
fpMaxSweepTime = 6 * time.Hour
maxEvictInterval = time.Minute
headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
appendWorkers = 8 // Should be enough to not make appending a bottleneck.
appendWorkers = 16 // Should be enough to not make appending samples a bottleneck.
appendQueueCap = 2 * appendWorkers
)
type storageState uint
const (
storageStarting storageState = iota
storageServing
storageStopping
var (
persistQueueLengthDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_length"),
"The current number of chunks waiting in the persist queue.",
nil, nil,
)
)
type persistRequest struct {
fingerprint clientmodel.Fingerprint
chunkDesc *chunkDesc
}
type evictRequest struct {
cd *chunkDesc
evict bool
@ -76,9 +69,10 @@ type memorySeriesStorage struct {
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
persistQueue chan persistRequest
persistQueueCap int // Not actually the cap of above channel. See handlePersistQueue.
persistStopped chan struct{}
persistQueueLen int64 // The number of chunks that need persistence.
persistQueueCap int // If persistQueueLen reaches this threshold, ingestion will stall.
// Note that internally, the chunks to persist are not organized in a queue-like data structure,
// but handled in a more sophisticated way (see maintainMemorySeries).
persistence *persistence
@ -88,10 +82,8 @@ type memorySeriesStorage struct {
evictRequests chan evictRequest
evictStopping, evictStopped chan struct{}
persistLatency prometheus.Summary
persistErrors prometheus.Counter
persistQueueCapacity prometheus.Metric
persistQueueLength prometheus.Gauge
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
@ -119,7 +111,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
return nil, err
}
glog.Info("Loading series map and head chunks...")
fpToSeries, err := p.loadSeriesMapAndHeads()
fpToSeries, persistQueueLen, err := p.loadSeriesMapAndHeads()
if err != nil {
return nil, err
}
@ -146,12 +138,8 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
// The actual buffering happens within handlePersistQueue, so
// cap of persistQueue just has to be enough to not block while
// handlePersistQueue is writing to disk (20ms or so).
persistQueue: make(chan persistRequest, 1024),
persistQueueLen: persistQueueLen,
persistQueueCap: o.PersistenceQueueCapacity,
persistStopped: make(chan struct{}),
persistence: p,
countPersistedHeadChunks: make(chan struct{}, 100),
@ -161,12 +149,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}),
persistLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_latency_microseconds",
Help: "A summary of latencies for persisting each chunk.",
}),
persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -181,12 +163,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
),
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
),
persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_queue_length",
Help: "The current number of chunks waiting in the persist queue.",
}),
numSeries: numSeries,
seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -226,7 +202,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
// Start implements Storage.
func (s *memorySeriesStorage) Start() {
go s.handleEvictList()
go s.handlePersistQueue()
go s.loop()
}
@ -243,10 +218,6 @@ func (s *memorySeriesStorage) Stop() error {
close(s.loopStopping)
<-s.loopStopped
glog.Info("Stopping persist queue...")
close(s.persistQueue)
<-s.persistStopped
glog.Info("Stopping chunk eviction...")
close(s.evictStopping)
<-s.evictStopped
@ -395,6 +366,13 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
// AppendSamples implements Storage.
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
for _, sample := range samples {
if s.getPersistQueueLen() >= s.persistQueueCap {
glog.Warningf("%d chunks waiting for persistence, sample ingestion suspended.", s.getPersistQueueLen())
for s.getPersistQueueLen() >= s.persistQueueCap {
time.Sleep(time.Second)
}
glog.Warning("Sample ingestion resumed.")
}
if sample.Timestamp != s.appendLastTimestamp {
// Timestamp has changed. We have to wait for processing
// of all appended samples before proceeding. Otherwise,
@ -414,27 +392,13 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
fp := sample.Metric.Fingerprint()
s.fpLocker.Lock(fp)
series := s.getOrCreateSeries(fp, sample.Metric)
chunkDescsToPersist := series.add(fp, &metric.SamplePair{
completedChunksCount := series.add(&metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
s.fpLocker.Unlock(fp)
s.ingestedSamplesCount.Inc()
if len(chunkDescsToPersist) == 0 {
return
}
// Queue only outside of the locked area, processing the persistQueue
// requires the same lock!
for _, cd := range chunkDescsToPersist {
s.persistQueue <- persistRequest{fp, cd}
}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
s.incPersistQueueLen(completedChunksCount)
}
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
@ -572,111 +536,13 @@ func (s *memorySeriesStorage) maybeEvict() {
}()
}
func (s *memorySeriesStorage) handlePersistQueue() {
chunkMaps := chunkMaps{}
chunkCount := 0
persistMostConsecutiveChunks := func() {
fp, cds := chunkMaps.pop()
if err := s.persistChunks(fp, cds); err != nil {
// Need to put chunks back for retry.
for _, cd := range cds {
chunkMaps.add(fp, cd)
}
return
}
chunkCount -= len(cds)
s.persistQueueLength.Set(float64(chunkCount))
}
loop:
for {
if chunkCount >= s.persistQueueCap && chunkCount > 0 {
glog.Warningf("%d chunks queued for persistence. Ingestion pipeline will backlog.", chunkCount)
persistMostConsecutiveChunks()
}
select {
case req, ok := <-s.persistQueue:
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
default:
if chunkCount > 0 {
persistMostConsecutiveChunks()
continue loop
}
// If we are here, there is nothing to do right now. So
// just wait for a persist request to come in.
req, ok := <-s.persistQueue
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
}
s.persistQueueLength.Set(float64(chunkCount))
}
// Drain all requests.
for _, m := range chunkMaps {
for fp, cds := range m {
if s.persistChunks(fp, cds) == nil {
chunkCount -= len(cds)
if (chunkCount+len(cds))/1000 > chunkCount/1000 {
glog.Infof(
"Still draining persist queue, %d chunks left to persist...",
chunkCount,
)
}
s.persistQueueLength.Set(float64(chunkCount))
}
}
}
glog.Info("Persist queue drained and stopped.")
close(s.persistStopped)
}
func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*chunkDesc) error {
start := time.Now()
chunks := make([]chunk, len(cds))
for i, cd := range cds {
chunks[i] = cd.chunk
}
s.fpLocker.Lock(fp)
offset, err := s.persistence.persistChunks(fp, chunks)
if series, seriesInMemory := s.fpToSeries.get(fp); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
s.fpLocker.Unlock(fp)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.Inc()
glog.Error("Error persisting chunks: ", err)
s.persistence.setDirty(true)
return err
}
for _, cd := range cds {
cd.unpin(s.evictRequests)
}
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
return nil
}
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
// to drop chunks after 40h, we want to cycle through all fingerprints within
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore,
// this method will always wait for at least fpMinWaitDuration and never longer
// than fpMaxWaitDuration. If s.loopStopped is closed, it will return false
// immediately. The estimation is based on the total number of fingerprints as
// passed in.
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. If
// s.loopStopped is closed, it will return false immediately. The estimation is
// based on the total number of fingerprints as passed in.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
d := fpMaxWaitDuration
if numberOfFPs != 0 {
@ -685,9 +551,6 @@ func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
sweepTime = fpMaxSweepTime
}
d = sweepTime / time.Duration(numberOfFPs)
if d < fpMinWaitDuration {
d = fpMinWaitDuration
}
if d > fpMaxWaitDuration {
d = fpMaxWaitDuration
}
@ -791,14 +654,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
func (s *memorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval)
// We take the number of head chunks persisted since the last checkpoint
// as an approximation for the number of series that are "dirty",
// i.e. whose head chunk is different from the one in the most recent
// checkpoint or for which the fact that the head chunk has been
// persisted is not reflected in the most recent checkpoint. This count
// could overestimate the number of dirty series, but it's good enough
// as a heuristic.
headChunksPersistedSinceLastCheckpoint := 0
dirtySeriesCount := 0
defer func() {
checkpointTimer.Stop()
@ -816,26 +672,30 @@ loop:
break loop
case <-checkpointTimer.C:
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
headChunksPersistedSinceLastCheckpoint = 0
dirtySeriesCount = 0
checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints:
s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) {
dirtySeriesCount++
// Check if we have enough "dirty" series so
// that we need an early checkpoint. However,
// if we are already at 90% capacity of the
// persist queue, creating a checkpoint would be
// counterproductive, as it would slow down
// chunk persisting even more, while in a
// situation like that, where we are clearly
// lacking speed of disk maintenance, the best
// we can do for crash recovery is to work
// through the persist queue as quickly as
// possible. So only checkpoint if the persist
// queue is at most 90% full.
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
s.getPersistQueueLen() < s.persistQueueCap*9/10 {
checkpointTimer.Reset(0)
}
}
case fp := <-archivedFingerprints:
s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
case <-s.countPersistedHeadChunks:
headChunksPersistedSinceLastCheckpoint++
// Check if we have enough "dirty" series so that we need an early checkpoint.
// As described above, we take the headChunksPersistedSinceLastCheckpoint as a
// heuristic for "dirty" series. However, if we are already backlogging
// chunks to be persisted, creating a checkpoint would be counterproductive,
// as it would slow down chunk persisting even more, while in a situation like
// that, the best we can do for crash recovery is to work through the persist
// queue as quickly as possible. So only checkpoint if s.persistQueue is
// at most 20% full.
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit &&
len(s.persistQueue) < cap(s.persistQueue)/5 {
checkpointTimer.Reset(0)
}
}
}
// Wait until both channels are closed.
@ -845,38 +705,60 @@ loop:
}
}
// maintainMemorySeries first purges the series from old chunks. If the series
// still exists after that, it proceeds with the following steps: It closes the
// head chunk if it was not touched in a while. It archives a series if all
// chunks are evicted. It evicts chunkDescs if there are too many.
func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
var headChunkToPersist *chunkDesc
// maintainMemorySeries maintains a series that is in memory (i.e. not
// archived). It returns true if the method has changed from clean to dirty
// (i.e. it is inconsistent with the latest checkpoint now so that in case of a
// crash a recovery operation that requires a disk seek needed to be applied).
//
// The method first closes the head chunk if it was not touched for the duration
// of headChunkTimeout.
//
// Then it determines the chunks that need to be purged and the chunks that need
// to be persisted. Depending on the result, it does the following:
//
// - If all chunks of a series need to be purged, the whole series is deleted
// for good and the method returns false. (Detecting non-existence of a series
// file does not require a disk seek.)
//
// - If any chunks need to be purged (but not all of them), it purges those
// chunks from memory and rewrites the series file on disk, leaving out the
// purged chunks and appending all chunks not yet persisted (with the exception
// of a still open head chunk).
//
// - If no chunks on disk need to be purged, but chunks need to be persisted,
// those chunks are simply appended to the existing series file (or the file is
// created if it does not exist yet).
//
// - If no chunks need to be purged and no chunks need to be persisted, nothing
// happens in this step.
//
// Next, the method checks if all chunks in the series are evicted. In that
// case, it archives the series and returns true.
//
// Finally, it evicts chunkDescs if there are too many.
func (s *memorySeriesStorage) maintainMemorySeries(
fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp,
) (becameDirty bool) {
s.fpLocker.Lock(fp)
defer func() {
s.fpLocker.Unlock(fp)
// Queue outside of lock!
if headChunkToPersist != nil {
s.persistQueue <- persistRequest{fp, headChunkToPersist}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
}
}()
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
// Series is actually not in memory, perhaps archived or dropped in the meantime.
return
return false
}
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
if s.purgeMemorySeries(fp, series, beforeTime) {
if series.maybeCloseHeadChunk() {
s.incPersistQueueLen(1)
}
seriesWasDirty := series.dirty
if s.writeMemorySeries(fp, series, beforeTime) {
// Series is gone now, we are done.
return
return false
}
iOldestNotEvicted := -1
@ -914,41 +796,80 @@ func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, b
return
}
// If we are here, the series is not archived, so check for chunkDesc
// eviction next and then if the head chunk needs to be persisted.
// eviction next
series.evictChunkDescs(iOldestNotEvicted)
if !series.headChunkPersisted && time.Now().Sub(series.head().lastTime().Time()) > headChunkTimeout {
series.headChunkPersisted = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
series.headChunkUsedByIterator = false
headChunkToPersist = series.head()
}
return series.dirty && !seriesWasDirty
}
// purgeMemorySeries drops chunks older than beforeTime from the provided memory
// series. The caller must have locked fp. If the series contains no chunks
// after dropping old chunks, it is purged entirely. In that case, the method
// returns true.
func (s *memorySeriesStorage) purgeMemorySeries(fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp) bool {
// writeMemorySeries (re-)writes a memory series file. While doing so, it drops
// chunks older than beforeTime from both the series file (if it exists) as well
// as from memory. The provided chunksToPersist are appended to the newly
// written series file. If no chunks need to be purged, but chunksToPersist is
// not empty, those chunks are simply appended to the series file. If the series
// contains no chunks after dropping old chunks, it is purged entirely. In that
// case, the method returns true.
//
// The caller must have locked the fp.
func (s *memorySeriesStorage) writeMemorySeries(
fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp,
) bool {
cds := series.getChunksToPersist()
defer func() {
for _, cd := range cds {
cd.unpin(s.evictRequests)
}
s.incPersistQueueLen(-len(cds))
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
}()
// Get the actual chunks from underneath the chunkDescs.
chunks := make([]chunk, len(cds))
for i, cd := range cds {
chunks[i] = cd.chunk
}
if !series.firstTime().Before(beforeTime) {
// Oldest sample not old enough.
// Oldest sample not old enough, just append chunks, if any.
if len(cds) == 0 {
return false
}
offset, err := s.persistence.persistChunks(fp, chunks)
if err != nil {
s.persistErrors.Inc()
return false
}
if series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
return false
}
newFirstTime, numDroppedFromPersistence, allDroppedFromPersistence, err := s.persistence.dropChunks(fp, beforeTime)
newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, err :=
s.persistence.dropAndPersistChunks(fp, beforeTime, chunks)
if err != nil {
glog.Error("Error dropping persisted chunks: ", err)
s.persistErrors.Inc()
return false
}
numDroppedFromMemory, allDroppedFromMemory := series.dropChunks(beforeTime)
if allDroppedFromPersistence && allDroppedFromMemory {
series.dropChunks(beforeTime)
if len(series.chunkDescs) == 0 { // All chunks dropped from memory series.
if !allDroppedFromPersistence {
panic("all chunks dropped from memory but chunks left in persistence")
}
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.seriesOps.WithLabelValues(memoryPurge).Inc()
s.persistence.unindexMetric(fp, series.metric)
return true
}
if series.chunkDescsOffset != -1 {
series.savedFirstTime = newFirstTime
series.chunkDescsOffset += numDroppedFromMemory - numDroppedFromPersistence
series.savedFirstTime = newFirstTime
if series.chunkDescsOffset == -1 {
series.chunkDescsOffset = offset
} else {
series.chunkDescsOffset -= numDroppedFromPersistence
if series.chunkDescsOffset < 0 {
panic("dropped more chunks from persistence than from memory")
}
@ -974,7 +895,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint,
defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
newFirstTime, _, _, allDropped, err := s.persistence.dropAndPersistChunks(fp, beforeTime, nil)
if err != nil {
glog.Error("Error dropping persisted chunks: ", err)
}
@ -999,14 +920,24 @@ func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeT
return s.persistence.loadChunkDescs(fp, beforeTime)
}
// getPersistQueueLen returns persistQueueLen in a goroutine-safe way.
func (s *memorySeriesStorage) getPersistQueueLen() int {
return int(atomic.LoadInt64(&s.persistQueueLen))
}
// incPersistQueueLen increments persistQueueLen in a goroutine-safe way. Use a
// negative 'by' to decrement.
func (s *memorySeriesStorage) incPersistQueueLen(by int) {
atomic.AddInt64(&s.persistQueueLen, int64(by))
}
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)
ch <- s.persistLatency.Desc()
ch <- s.persistErrors.Desc()
ch <- s.persistQueueCapacity.Desc()
ch <- s.persistQueueLength.Desc()
ch <- persistQueueLengthDesc
ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
@ -1019,10 +950,13 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.persistence.Collect(ch)
ch <- s.persistLatency
ch <- s.persistErrors
ch <- s.persistQueueCapacity
ch <- s.persistQueueLength
ch <- prometheus.MustNewConstMetric(
persistQueueLengthDesc,
prometheus.GaugeValue,
float64(s.getPersistQueueLen()),
)
ch <- s.numSeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
@ -1031,54 +965,6 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
numMemChunksDesc,
prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)))
}
// chunkMaps is a slice of maps with chunkDescs to be persisted.
// Each chunk map contains n consecutive chunks to persist, where
// n is the index+1.
type chunkMaps []map[clientmodel.Fingerprint][]*chunkDesc
// add adds a chunk to chunkMaps.
func (cm *chunkMaps) add(fp clientmodel.Fingerprint, cd *chunkDesc) {
// Runtime of this method is linear with the number of
// chunkMaps. However, we expect only ever very few maps.
numMaps := len(*cm)
for i, m := range *cm {
if cds, ok := m[fp]; ok {
// Found our fp! Add cd and level up.
cds = append(cds, cd)
delete(m, fp)
if i == numMaps-1 {
*cm = append(*cm, map[clientmodel.Fingerprint][]*chunkDesc{})
}
(*cm)[i+1][fp] = cds
return
}
}
// Our fp isn't contained in cm yet. Add it to the first map (and add a
// first map if there is none).
if numMaps == 0 {
*cm = chunkMaps{map[clientmodel.Fingerprint][]*chunkDesc{}}
}
(*cm)[0][fp] = []*chunkDesc{cd}
}
// pop retrieves and removes a fingerprint with all its chunks. It chooses one
// of the fingerprints with the most chunks. It panics if cm has no entries.
func (cm *chunkMaps) pop() (clientmodel.Fingerprint, []*chunkDesc) {
m := (*cm)[len(*cm)-1]
for fp, cds := range m {
delete(m, fp)
// Prune empty maps from top level.
for len(m) == 0 {
*cm = (*cm)[:len(*cm)-1]
if len(*cm) == 0 {
break
}
m = (*cm)[len(*cm)-1]
}
return fp, cds
}
panic("popped from empty chunkMaps")
float64(atomic.LoadInt64(&numMemChunks)),
)
}

View File

@ -16,7 +16,6 @@ package local
import (
"fmt"
"math/rand"
"reflect"
"testing"
"testing/quick"
"time"
@ -159,6 +158,7 @@ func TestLoop(t *testing.T) {
MemoryChunks: 50,
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(),
PersistenceQueueCapacity: 1000000,
CheckpointInterval: 250 * time.Millisecond,
}
storage, err := NewMemorySeriesStorage(o)
@ -527,9 +527,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
}
// Persist head chunk so we can safely archive.
series.headChunkPersisted = true
ms.persistQueue <- persistRequest{fp, series.head()}
time.Sleep(time.Second) // Give time for persisting to happen.
series.headChunkClosed = true
ms.maintainMemorySeries(fp, clientmodel.Earliest)
// Archive metrics.
ms.fpToSeries.del(fp)
@ -639,12 +638,12 @@ func TestFuzzChunkType1(t *testing.T) {
// too. This benchmark will have a very long runtime (up to minutes). You can
// use it as an actual benchmark. Run it like this:
//
// go test -cpu 1,2,4,8 -test=NONE -bench BenchmarkFuzzChunkType -benchmem
// go test -cpu 1,2,4,8 -run=NONE -bench BenchmarkFuzzChunkType -benchmem
//
// You can also use it as a test for races. In that case, run it like this (will
// make things even slower):
//
// go test -race -cpu 8 -test=short -bench BenchmarkFuzzChunkType
// go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType
func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
*defaultChunkEncoding = int(encoding)
const samplesPerRun = 100000
@ -655,6 +654,7 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
MemoryChunks: 100,
PersistenceRetentionPeriod: time.Hour,
PersistenceStoragePath: directory.Path(),
PersistenceQueueCapacity: 1000000,
CheckpointInterval: time.Second,
}
s, err := NewMemorySeriesStorage(o)
@ -839,93 +839,3 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge
}
return result
}
func TestChunkMaps(t *testing.T) {
cm := chunkMaps{}
cd1 := &chunkDesc{refCount: 1} // Abuse refCount as identifier.
cd21 := &chunkDesc{refCount: 21}
cd22 := &chunkDesc{refCount: 22}
cd31 := &chunkDesc{refCount: 31}
cd32 := &chunkDesc{refCount: 32}
cd33 := &chunkDesc{refCount: 33}
cd41 := &chunkDesc{refCount: 41}
cd42 := &chunkDesc{refCount: 42}
cd43 := &chunkDesc{refCount: 43}
cd44 := &chunkDesc{refCount: 44}
cd51 := &chunkDesc{refCount: 51}
cd52 := &chunkDesc{refCount: 52}
cd53 := &chunkDesc{refCount: 53}
cd54 := &chunkDesc{refCount: 54}
cd55 := &chunkDesc{refCount: 55}
cm.add(5, cd51)
cm.add(3, cd31)
cm.add(5, cd52)
cm.add(1, cd1)
cm.add(4, cd41)
cm.add(4, cd42)
cm.add(5, cd53)
cm.add(3, cd32)
cm.add(2, cd21)
cm.add(5, cd54)
cm.add(3, cd33)
cm.add(4, cd43)
cm.add(2, cd22)
cm.add(4, cd44)
cm.add(5, cd55)
var fpWant, fpGot clientmodel.Fingerprint
var cdsWant, cdsGot []*chunkDesc
fpWant = 5
cdsWant = []*chunkDesc{cd51, cd52, cd53, cd54, cd55}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 4
cdsWant = []*chunkDesc{cd41, cd42, cd43, cd44}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 3
cdsWant = []*chunkDesc{cd31, cd32, cd33}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 2
cdsWant = []*chunkDesc{cd21, cd22}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 1
cdsWant = []*chunkDesc{cd1}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
}

View File

@ -44,6 +44,7 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
MemoryChunks: 1000000,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(),
PersistenceQueueCapacity: 1000000,
CheckpointInterval: time.Hour,
}
storage, err := NewMemorySeriesStorage(o)