Browse Source

Instrument WAL fsync

pull/5805/head
Bas Harenslak 7 years ago
parent
commit
5e1c258a98
  1. 2
      db.go
  2. 36
      wal.go
  3. 18
      wal_test.go

2
db.go

@ -204,7 +204,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
return nil, errors.Wrap(err, "create leveled compactor")
}
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval)
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
if err != nil {
return nil, err
}

36
wal.go

@ -32,6 +32,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/client_golang/prometheus"
)
// WALEntryType indicates what data a WAL entry contains.
@ -65,6 +66,26 @@ type SeriesCB func([]RefSeries) error
// is only valid until the call returns.
type DeletesCB func([]Stone) error
type walMetrics struct {
fsyncDuration prometheus.Summary
}
func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
m := &walMetrics{}
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
})
if r != nil {
r.MustRegister(
m.fsyncDuration,
)
}
return m
}
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
@ -150,6 +171,7 @@ func newCRC32() hash.Hash32 {
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
dirFile *os.File
files []*segmentFile
@ -169,7 +191,7 @@ type SegmentWAL struct {
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written.
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
@ -190,6 +212,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration)
segmentSize: walSegmentSizeBytes,
crc32: newCRC32(),
}
w.metrics = newWalMetrics(w, r)
fns, err := sequenceFiles(w.dirFile.Name())
if err != nil {
@ -592,7 +615,10 @@ func (w *SegmentWAL) Sync() error {
}
if head != nil {
// But only fsync the head segment after releasing the mutex as it will block on disk I/O.
return fileutil.Fdatasync(head.File)
start := time.Now()
err := fileutil.Fdatasync(head.File)
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
return err
}
return nil
}
@ -604,7 +630,11 @@ func (w *SegmentWAL) sync() error {
if w.head() == nil {
return nil
}
return fileutil.Fdatasync(w.head().File)
start := time.Now()
err := fileutil.Fdatasync(w.head().File)
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
return err
}
func (w *SegmentWAL) flush() error {

18
wal_test.go

@ -44,7 +44,7 @@ func TestSegmentWAL_Open(t *testing.T) {
}
// Initialize 5 correct segment files.
w, err := OpenSegmentWAL(tmpdir, nil, 0)
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
require.NoError(t, err)
require.Equal(t, 5, len(w.files), "unexpected number of segments loaded")
@ -74,7 +74,7 @@ func TestSegmentWAL_Open(t *testing.T) {
_, err = f.WriteAt([]byte{0}, 4)
require.NoError(t, err)
w, err = OpenSegmentWAL(tmpdir, nil, 0)
w, err = OpenSegmentWAL(tmpdir, nil, 0, nil)
require.Error(t, err, "open with corrupted segments")
}
@ -84,7 +84,7 @@ func TestSegmentWAL_cut(t *testing.T) {
defer os.RemoveAll(tmpdir)
// This calls cut() implicitly the first time without a previous tail.
w, err := OpenSegmentWAL(tmpdir, nil, 0)
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
require.NoError(t, err)
require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))
@ -131,7 +131,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
require.NoError(t, err)
// defer os.RemoveAll(dir)
w, err := OpenSegmentWAL(dir, nil, 0)
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
w.segmentSize = 10000
@ -181,7 +181,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
require.NoError(t, w.Close())
// The same again with a new WAL.
w, err = OpenSegmentWAL(dir, nil, 0)
w, err = OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
var readSeries []RefSeries
@ -221,7 +221,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
// Open WAL a bunch of times, validate all previous data can be read,
// write more data to it, close it.
for k := 0; k < numMetrics; k += numMetrics / iterations {
w, err := OpenSegmentWAL(dir, nil, 0)
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
// Set smaller segment size so we can actually write several files.
@ -390,7 +390,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)
w, err := OpenSegmentWAL(dir, nil, 0)
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}}))
@ -415,7 +415,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
w2, err := OpenSegmentWAL(dir, logger, 0)
w2, err := OpenSegmentWAL(dir, logger, 0, nil)
require.NoError(t, err)
r := w2.Reader()
@ -446,7 +446,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
// We should see the first valid entry and the new one, everything after
// is truncated.
w3, err := OpenSegmentWAL(dir, logger, 0)
w3, err := OpenSegmentWAL(dir, logger, 0, nil)
require.NoError(t, err)
r = w3.Reader()

Loading…
Cancel
Save