From dbd2b21d2e2c6badbe5d705b1bde4b1838e5b20b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 2 Jan 2017 14:41:13 +0100 Subject: [PATCH] Make persistence atomic --- compact.go | 133 ++++++++++++++++++++++++++++++++++++++++++----------- db.go | 22 +++++---- head.go | 49 ++++++-------------- wal.go | 1 + writer.go | 25 +--------- 5 files changed, 134 insertions(+), 96 deletions(-) diff --git a/compact.go b/compact.go index 7149edbf4..fce8108ce 100644 --- a/compact.go +++ b/compact.go @@ -3,8 +3,10 @@ package tsdb import ( "fmt" "os" + "path/filepath" "time" + "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/timestamp" @@ -44,39 +46,30 @@ func (c *compactor) run() { } dir := fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) - if err := c.compact(dir, c.shard.persisted[0], c.shard.persisted[1]); err != nil { + p, err := newPersister(dir) + if err != nil { + c.logger.Log("msg", "creating persister failed", "err", err) + continue + } + + if err := c.compact(p, c.shard.persisted[0], c.shard.persisted[1]); err != nil { + c.logger.Log("msg", "compaction failed", "err", err) + continue + } + if err := p.Close(); err != nil { c.logger.Log("msg", "compaction failed", "err", err) } } close(c.donec) } -func (c *compactor) close() error { +func (c *compactor) Close() error { close(c.triggerc) <-c.donec return nil } -func (c *compactor) compact(dir string, a, b block) error { - if err := os.MkdirAll(dir, 0777); err != nil { - return err - } - - cf, err := os.Create(chunksFileName(dir)) - if err != nil { - return err - } - xf, err := os.Create(indexFileName(dir)) - if err != nil { - return err - } - - index := newIndexWriter(xf) - series := newSeriesWriter(cf, index) - - defer index.Close() - defer series.Close() - +func (c *compactor) compact(p *persister, a, b block) error { aall, err := a.index().Postings("", "") if err != nil { return err @@ -117,7 +110,7 @@ func (c *compactor) compact(dir string, a, b block) error { for set.Next() { lset, chunks := set.At() - if err := series.WriteSeries(i, lset, chunks); err != nil { + if err := p.chunkw.WriteSeries(i, lset, chunks); err != nil { return err } @@ -140,7 +133,7 @@ func (c *compactor) compact(dir string, a, b block) error { return set.Err() } - if err := index.WriteStats(stats); err != nil { + if err := p.indexw.WriteStats(stats); err != nil { return err } @@ -151,13 +144,13 @@ func (c *compactor) compact(dir string, a, b block) error { for x := range v { s = append(s, x) } - if err := index.WriteLabelIndex([]string{n}, s); err != nil { + if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil { return err } } for t := range postings.m { - if err := index.WritePostings(t.name, t.value, postings.get(t)); err != nil { + if err := p.indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { return err } } @@ -166,7 +159,7 @@ func (c *compactor) compact(dir string, a, b block) error { for i := range all { all[i] = uint32(i) } - if err := index.WritePostings("", "", newListPostings(all)); err != nil { + if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil { return err } @@ -297,3 +290,89 @@ func (c *compactionMerger) Err() error { func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) { return c.l, c.c } + +type persister struct { + dir, tmpdir string + + chunkf, indexf *fileutil.LockedFile + + chunkw SeriesWriter + indexw IndexWriter +} + +func newPersister(dir string) (*persister, error) { + p := &persister{ + dir: dir, + tmpdir: dir + ".tmp", + } + var err error + + // Write to temporary directory to make persistence appear atomic. + if fileutil.Exist(p.tmpdir) { + if err := os.RemoveAll(p.tmpdir); err != nil { + return nil, err + } + } + if err := fileutil.CreateDirAll(p.tmpdir); err != nil { + return nil, err + } + + p.chunkf, err = fileutil.LockFile(chunksFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + p.indexf, err = fileutil.LockFile(indexFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + + p.indexw = newIndexWriter(p.indexf) + p.chunkw = newSeriesWriter(p.chunkf, p.indexw) + + return p, nil +} + +func (p *persister) Close() error { + if err := p.chunkw.Close(); err != nil { + return err + } + if err := p.indexw.Close(); err != nil { + return err + } + if err := fileutil.Fsync(p.chunkf.File); err != nil { + return err + } + if err := fileutil.Fsync(p.indexf.File); err != nil { + return err + } + if err := p.chunkf.Close(); err != nil { + return err + } + if err := p.indexf.Close(); err != nil { + return err + } + + return renameDir(p.tmpdir, p.dir) +} + +func renameDir(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := fileutil.OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + if err = fileutil.Fsync(pdir); err != nil { + return err + } + if err = pdir.Close(); err != nil { + return err + } + return nil +} diff --git a/db.go b/db.go index 2e9283804..7c6ac60fb 100644 --- a/db.go +++ b/db.go @@ -277,7 +277,7 @@ func (s *Shard) Close() error { var e MultiError - e.Add(s.compactor.close()) + e.Add(s.compactor.Close()) for _, pb := range s.persisted { e.Add(pb.Close()) @@ -311,7 +311,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error { defer func() { s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) }() if err := s.persist(); err != nil { - s.logger.Log("msg", "persistance error", "err", err) + s.logger.Log("msg", "persistence error", "err", err) } s.metrics.persistences.Inc() }() @@ -383,21 +383,23 @@ func (s *Shard) persist() error { // TODO(fabxc): add grace period where we can still append to old head shard // before actually persisting it. - p := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) + dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) - if err := os.MkdirAll(p, 0777); err != nil { - return err - } - - n, err := head.persist(p) + p, err := newPersister(dir) if err != nil { return err } - sz := fmt.Sprintf("%.2fMiB", float64(n)/1024/1024) + if err := head.persist(p); err != nil { + return err + } + if err := p.Close(); err != nil { + return err + } + sz := fmt.Sprintf("%.2fMB", float64(p.chunkw.Size()+p.indexw.Size())/1e6) s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") // Reopen block as persisted block for querying. - pb, err := newPersistedBlock(p) + pb, err := newPersistedBlock(dir) if err != nil { return err } diff --git a/head.go b/head.go index 171c55290..78a968dbb 100644 --- a/head.go +++ b/head.go @@ -2,7 +2,6 @@ package tsdb import ( "errors" - "os" "sort" "sync" @@ -261,36 +260,25 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { return nil } -func (h *HeadBlock) persist(p string) (int64, error) { - sf, err := os.Create(chunksFileName(p)) - if err != nil { - return 0, err +func (h *HeadBlock) persist(p *persister) error { + if err := h.wal.Close(); err != nil { + return err } - xf, err := os.Create(indexFileName(p)) - if err != nil { - return 0, err - } - - iw := newIndexWriter(xf) - sw := newSeriesWriter(sf, iw) - - defer sw.Close() - defer iw.Close() for ref, cd := range h.descs { - if err := sw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{ + if err := p.chunkw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{ { MinTime: cd.firsTimestamp, MaxTime: cd.lastTimestamp, Chunk: cd.chunk, }, }); err != nil { - return 0, err + return err } } - if err := iw.WriteStats(h.stats); err != nil { - return 0, err + if err := p.indexw.WriteStats(h.stats); err != nil { + return err } for n, v := range h.values { s := make([]string, 0, len(v)) @@ -298,14 +286,14 @@ func (h *HeadBlock) persist(p string) (int64, error) { s = append(s, x) } - if err := iw.WriteLabelIndex([]string{n}, s); err != nil { - return 0, err + if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil { + return err } } for t := range h.postings.m { - if err := iw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil { - return 0, err + if err := p.indexw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil { + return err } } // Write a postings list containing all series. @@ -313,17 +301,8 @@ func (h *HeadBlock) persist(p string) (int64, error) { for i := range all { all[i] = uint32(i) } - if err := iw.WritePostings("", "", newListPostings(all)); err != nil { - return 0, err + if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil { + return err } - - // Everything written successfully, we can remove the WAL. - if err := h.wal.Close(); err != nil { - return 0, err - } - if err := os.Remove(h.wal.f.Name()); err != nil { - return 0, err - } - - return iw.Size() + sw.Size(), err + return nil } diff --git a/wal.go b/wal.go index 20fb70b03..be54a467f 100644 --- a/wal.go +++ b/wal.go @@ -74,6 +74,7 @@ type walHandler struct { series func(labels.Labels) } +// ReadAll consumes all entries in the WAL and triggers the registered handlers. func (w *WAL) ReadAll(h *walHandler) error { dec := &walDecoder{ r: w.f, diff --git a/writer.go b/writer.go index 5b74b4c82..8e88770cd 100644 --- a/writer.go +++ b/writer.go @@ -5,7 +5,6 @@ import ( "fmt" "hash/crc32" "io" - "os" "sort" "strings" @@ -129,15 +128,6 @@ func (w *seriesWriter) Size() int64 { } func (w *seriesWriter) Close() error { - if f, ok := w.w.(*os.File); ok { - if err := f.Sync(); err != nil { - return err - } - } - - if c, ok := w.w.(io.Closer); ok { - return c.Close() - } return nil } @@ -499,18 +489,5 @@ func (w *indexWriter) finalize() error { } func (w *indexWriter) Close() error { - if err := w.finalize(); err != nil { - return err - } - - if f, ok := w.w.(*os.File); ok { - if err := f.Sync(); err != nil { - return err - } - } - - if c, ok := w.w.(io.Closer); ok { - return c.Close() - } - return nil + return w.finalize() }