diff --git a/wal.go b/wal.go index 225851de1..8efa3cd28 100644 --- a/wal.go +++ b/wal.go @@ -190,6 +190,7 @@ type SegmentWAL struct { stopc chan struct{} donec chan struct{} + actorc chan func() error // sequantilized background operations buffers sync.Pool } @@ -213,6 +214,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, flushInterval: flushInterval, donec: make(chan struct{}), stopc: make(chan struct{}), + actorc: make(chan func() error, 1), segmentSize: walSegmentSizeBytes, crc32: newCRC32(), } @@ -569,18 +571,21 @@ func (w *SegmentWAL) cut() error { // Finish last segment asynchronously to not block the WAL moving along // in the new segment. go func() { - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Truncate(off); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Sync(); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Close(); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + w.actorc <- func() error { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Truncate(off); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Sync(); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Close(); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + return nil } }() } @@ -595,8 +600,8 @@ func (w *SegmentWAL) cut() error { } go func() { - if err = w.dirFile.Sync(); err != nil { - level.Error(w.logger).Log("msg", "sync WAL directory", "err", err) + w.actorc <- func() error { + return errors.Wrap(w.dirFile.Sync(), "sync WAL directory") } }() @@ -675,9 +680,23 @@ func (w *SegmentWAL) run(interval time.Duration) { defer close(w.donec) for { + // Processing all enqueued operations has precedence over shutdown and + // background syncs. + select { + case f := <-w.actorc: + if err := f(); err != nil { + level.Error(w.logger).Log("msg", "operation failed", "err", err) + } + continue + default: + } select { case <-w.stopc: return + case f := <-w.actorc: + if err := f(); err != nil { + level.Error(w.logger).Log("msg", "operation failed", "err", err) + } case <-tick: if err := w.Sync(); err != nil { level.Error(w.logger).Log("msg", "sync failed", "err", err)