wal: synchronize background operations

This adds an actor channel and thereby serializes all background
operations through the run() method.
Fixes an existing race.
pull/5805/head
Fabian Reinartz 2017-10-27 13:47:07 +02:00
parent 5d28c849c7
commit 8aedb7671e
1 changed files with 33 additions and 14 deletions

47
wal.go
View File

@ -190,6 +190,7 @@ type SegmentWAL struct {
stopc chan struct{} stopc chan struct{}
donec chan struct{} donec chan struct{}
actorc chan func() error // sequantilized background operations
buffers sync.Pool buffers sync.Pool
} }
@ -213,6 +214,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
flushInterval: flushInterval, flushInterval: flushInterval,
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
actorc: make(chan func() error, 1),
segmentSize: walSegmentSizeBytes, segmentSize: walSegmentSizeBytes,
crc32: newCRC32(), crc32: newCRC32(),
} }
@ -569,18 +571,21 @@ func (w *SegmentWAL) cut() error {
// Finish last segment asynchronously to not block the WAL moving along // Finish last segment asynchronously to not block the WAL moving along
// in the new segment. // in the new segment.
go func() { go func() {
off, err := hf.Seek(0, os.SEEK_CUR) w.actorc <- func() error {
if err != nil { off, err := hf.Seek(0, os.SEEK_CUR)
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) if err != nil {
} return errors.Wrapf(err, "finish old segment %s", hf.Name())
if err := hf.Truncate(off); err != nil { }
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) if err := hf.Truncate(off); err != nil {
} return errors.Wrapf(err, "finish old segment %s", hf.Name())
if err := hf.Sync(); err != nil { }
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) if err := hf.Sync(); err != nil {
} return errors.Wrapf(err, "finish old segment %s", hf.Name())
if err := hf.Close(); err != nil { }
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) if err := hf.Close(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
return nil
} }
}() }()
} }
@ -595,8 +600,8 @@ func (w *SegmentWAL) cut() error {
} }
go func() { go func() {
if err = w.dirFile.Sync(); err != nil { w.actorc <- func() error {
level.Error(w.logger).Log("msg", "sync WAL directory", "err", err) return errors.Wrap(w.dirFile.Sync(), "sync WAL directory")
} }
}() }()
@ -675,9 +680,23 @@ func (w *SegmentWAL) run(interval time.Duration) {
defer close(w.donec) defer close(w.donec)
for { for {
// Processing all enqueued operations has precedence over shutdown and
// background syncs.
select {
case f := <-w.actorc:
if err := f(); err != nil {
level.Error(w.logger).Log("msg", "operation failed", "err", err)
}
continue
default:
}
select { select {
case <-w.stopc: case <-w.stopc:
return return
case f := <-w.actorc:
if err := f(); err != nil {
level.Error(w.logger).Log("msg", "operation failed", "err", err)
}
case <-tick: case <-tick:
if err := w.Sync(); err != nil { if err := w.Sync(); err != nil {
level.Error(w.logger).Log("msg", "sync failed", "err", err) level.Error(w.logger).Log("msg", "sync failed", "err", err)