From 6774a73878a453ad51eb61cbe2f74fef5bce9491 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 7 Sep 2015 18:08:23 +0200 Subject: [PATCH] Fix error checking and logging around checkpointing. --- storage/local/persistence.go | 36 ++++++++++++++++++++++++------------ storage/local/storage.go | 8 ++++++-- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 8b3358e62..2dda63a77 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -540,15 +540,19 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap begin := time.Now() f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { - return + return err } defer func() { - f.Sync() + syncErr := f.Sync() closeErr := f.Close() if err != nil { return } + err = syncErr + if err != nil { + return + } err = closeErr if err != nil { return @@ -562,18 +566,18 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap w := bufio.NewWriterSize(f, fileBufSize) if _, err = w.WriteString(headsMagicString); err != nil { - return + return err } var numberOfSeriesOffset int if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil { - return + return err } numberOfSeriesOffset += len(headsMagicString) numberOfSeriesInHeader := uint64(fingerprintToSeries.length()) // We have to write the number of series as uint64 because we might need // to overwrite it later, and a varint might change byte width then. if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil { - return + return err } iter := fingerprintToSeries.iter() @@ -606,7 +610,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if err != nil { return } - w.Write(buf) + if _, err = w.Write(buf); err != nil { + return + } if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil { return } @@ -646,27 +652,33 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } } } - // Series is checkpointed now, so declare it clean. + // Series is checkpointed now, so declare it clean. In case the entire + // checkpoint fails later on, this is fine, as the storage's series + // maintenance will mark these series newly dirty again, continuously + // increasing the total number of dirty series as seen by the storage. + // This has the effect of triggering a new checkpoint attempt even + // earlier than if we hadn't incorrectly set "dirty" to "false" here + // already. m.series.dirty = false }() if err != nil { - return + return err } } if err = w.Flush(); err != nil { - return + return err } if realNumberOfSeries != numberOfSeriesInHeader { // The number of series has changed in the meantime. // Rewrite it in the header. if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil { - return + return err } if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil { - return + return err } } - return + return err } // loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all diff --git a/storage/local/storage.go b/storage/local/storage.go index 7c1ae9726..afe5dc001 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -875,8 +875,12 @@ loop: case <-s.loopStopping: break loop case <-checkpointTimer.C: - s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) - dirtySeriesCount = 0 + err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) + if err != nil { + log.Errorln("Error while checkpointing:", err) + } else { + dirtySeriesCount = 0 + } checkpointTimer.Reset(s.checkpointInterval) case fp := <-memoryFingerprints: if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {