Browse Source

Finish old WAL segment async, default to no fsync

We were still fsyncing while holding the write lock when we cut a new
segment. Given we cannot do anything but logging errors, we might just
as well complete segments asynchronously.

There's not realistic use case where one would fsync after every WAL
entry, thus make the default of a flush interval of 0 to never fsync
which is a much more likely use case.
pull/5805/head
Fabian Reinartz 7 years ago
parent
commit
6892fc6dcb
  1. 3
      cmd/tsdb/main.go
  2. 7
      head.go
  3. 49
      wal.go
  4. 3
      wal_test.go

3
cmd/tsdb/main.go

@ -139,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dur := measureTime("ingestScrapes", func() {
b.startProfiling()
total, err = b.ingestScrapes(metrics, 2000)
total, err = b.ingestScrapes(metrics, 15000)
if err != nil {
exitWithError(err)
}
@ -147,6 +147,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
fmt.Println(" > total samples:", total)
fmt.Println(" > samples/sec:", float64(total)/dur.Seconds())
select {}
measureTime("stopStorage", func() {
if err := b.storage.Close(); err != nil {

7
head.go

@ -403,11 +403,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
return errors.Wrap(ErrNotFound, "unknown series")
}
s.Lock()
if err := s.appendable(t, v); err != nil {
return err
}
err := s.appendable(t, v)
s.Unlock()
if err != nil {
return err
}
if t < a.mint {
return ErrOutOfBounds
}

49
wal.go

@ -417,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error {
tf.minSeries = s.Ref
}
}
if w.flushInterval <= 0 {
return errors.Wrap(w.Sync(), "sync")
}
return nil
}
@ -447,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error {
tf.maxTime = s.T
}
}
if w.flushInterval <= 0 {
return errors.Wrap(w.Sync(), "sync")
}
return nil
}
@ -479,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error {
}
}
}
if w.flushInterval <= 0 {
return errors.Wrap(w.Sync(), "sync")
}
return nil
}
@ -537,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) {
func (w *SegmentWAL) cut() error {
// Sync current head to disk and close.
if hf := w.head(); hf != nil {
if err := w.sync(); err != nil {
return err
}
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if err := hf.Truncate(off); err != nil {
return err
}
if err := hf.Close(); err != nil {
if err := w.flush(); err != nil {
return err
}
// Finish last segment asynchronously to not block the WAL moving along
// in the new segment.
go func() {
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Truncate(off); err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Sync(); err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if err := hf.Close(); err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
}()
}
p, _, err := nextSequenceFile(w.dirFile.Name())
@ -561,9 +556,11 @@ func (w *SegmentWAL) cut() error {
return err
}
if err = w.dirFile.Sync(); err != nil {
return err
}
go func() {
if err = w.dirFile.Sync(); err != nil {
w.logger.Log("msg", "sync WAL directory", "err", err)
}
}()
w.files = append(w.files, newSegmentFile(f))

3
wal_test.go

@ -91,9 +91,8 @@ func TestSegmentWAL_cut(t *testing.T) {
require.NoError(t, w.cut(), "cut failed")
// Cutting creates a new file and close the previous tail file.
// Cutting creates a new file.
require.Equal(t, 2, len(w.files))
require.Error(t, w.files[0].Close())
require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))

Loading…
Cancel
Save