From 6892fc6dcb7c5e6cf44764e3f893f55236774aa1 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 8 Sep 2017 10:12:28 +0200 Subject: [PATCH] Finish old WAL segment async, default to no fsync We were still fsyncing while holding the write lock when we cut a new segment. Given we cannot do anything but logging errors, we might just as well complete segments asynchronously. There's not realistic use case where one would fsync after every WAL entry, thus make the default of a flush interval of 0 to never fsync which is a much more likely use case. --- cmd/tsdb/main.go | 3 ++- head.go | 7 ++++--- wal.go | 49 +++++++++++++++++++++++------------------------- wal_test.go | 3 +-- 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 491c2ac35..960c9c05e 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -139,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 2000) + total, err = b.ingestScrapes(metrics, 15000) if err != nil { exitWithError(err) } @@ -147,6 +147,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { fmt.Println(" > total samples:", total) fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) + select {} measureTime("stopStorage", func() { if err := b.storage.Close(); err != nil { diff --git a/head.go b/head.go index 4e1e85877..b2d6e9842 100644 --- a/head.go +++ b/head.go @@ -403,11 +403,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { return errors.Wrap(ErrNotFound, "unknown series") } s.Lock() - if err := s.appendable(t, v); err != nil { - return err - } + err := s.appendable(t, v) s.Unlock() + if err != nil { + return err + } if t < a.mint { return ErrOutOfBounds } diff --git a/wal.go b/wal.go index 747510fd6..9af9a1853 100644 --- a/wal.go +++ b/wal.go @@ -417,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { tf.minSeries = s.Ref } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -447,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { tf.maxTime = s.T } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -479,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { } } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -537,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { func (w *SegmentWAL) cut() error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.sync(); err != nil { - return err - } - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := hf.Truncate(off); err != nil { - return err - } - if err := hf.Close(); err != nil { + if err := w.flush(); err != nil { return err } + // Finish last segment asynchronously to not block the WAL moving along + // in the new segment. + go func() { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Truncate(off); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Sync(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Close(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + }() } p, _, err := nextSequenceFile(w.dirFile.Name()) @@ -561,9 +556,11 @@ func (w *SegmentWAL) cut() error { return err } - if err = w.dirFile.Sync(); err != nil { - return err - } + go func() { + if err = w.dirFile.Sync(); err != nil { + w.logger.Log("msg", "sync WAL directory", "err", err) + } + }() w.files = append(w.files, newSegmentFile(f)) diff --git a/wal_test.go b/wal_test.go index 45279b1b5..180623dd0 100644 --- a/wal_test.go +++ b/wal_test.go @@ -91,9 +91,8 @@ func TestSegmentWAL_cut(t *testing.T) { require.NoError(t, w.cut(), "cut failed") - // Cutting creates a new file and close the previous tail file. + // Cutting creates a new file. require.Equal(t, 2, len(w.files)) - require.Error(t, w.files[0].Close()) require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))