mirror of https://github.com/prometheus/prometheus
Make persistence atomic
parent
62b8ded0a5
commit
dbd2b21d2e
133
compact.go
133
compact.go
|
@ -3,8 +3,10 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
|
@ -44,39 +46,30 @@ func (c *compactor) run() {
|
||||||
}
|
}
|
||||||
dir := fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now()))
|
dir := fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now()))
|
||||||
|
|
||||||
if err := c.compact(dir, c.shard.persisted[0], c.shard.persisted[1]); err != nil {
|
p, err := newPersister(dir)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Log("msg", "creating persister failed", "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.compact(p, c.shard.persisted[0], c.shard.persisted[1]); err != nil {
|
||||||
|
c.logger.Log("msg", "compaction failed", "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := p.Close(); err != nil {
|
||||||
c.logger.Log("msg", "compaction failed", "err", err)
|
c.logger.Log("msg", "compaction failed", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(c.donec)
|
close(c.donec)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) close() error {
|
func (c *compactor) Close() error {
|
||||||
close(c.triggerc)
|
close(c.triggerc)
|
||||||
<-c.donec
|
<-c.donec
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) compact(dir string, a, b block) error {
|
func (c *compactor) compact(p *persister, a, b block) error {
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cf, err := os.Create(chunksFileName(dir))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
xf, err := os.Create(indexFileName(dir))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
index := newIndexWriter(xf)
|
|
||||||
series := newSeriesWriter(cf, index)
|
|
||||||
|
|
||||||
defer index.Close()
|
|
||||||
defer series.Close()
|
|
||||||
|
|
||||||
aall, err := a.index().Postings("", "")
|
aall, err := a.index().Postings("", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -117,7 +110,7 @@ func (c *compactor) compact(dir string, a, b block) error {
|
||||||
|
|
||||||
for set.Next() {
|
for set.Next() {
|
||||||
lset, chunks := set.At()
|
lset, chunks := set.At()
|
||||||
if err := series.WriteSeries(i, lset, chunks); err != nil {
|
if err := p.chunkw.WriteSeries(i, lset, chunks); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,7 +133,7 @@ func (c *compactor) compact(dir string, a, b block) error {
|
||||||
return set.Err()
|
return set.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := index.WriteStats(stats); err != nil {
|
if err := p.indexw.WriteStats(stats); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,13 +144,13 @@ func (c *compactor) compact(dir string, a, b block) error {
|
||||||
for x := range v {
|
for x := range v {
|
||||||
s = append(s, x)
|
s = append(s, x)
|
||||||
}
|
}
|
||||||
if err := index.WriteLabelIndex([]string{n}, s); err != nil {
|
if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for t := range postings.m {
|
for t := range postings.m {
|
||||||
if err := index.WritePostings(t.name, t.value, postings.get(t)); err != nil {
|
if err := p.indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -166,7 +159,7 @@ func (c *compactor) compact(dir string, a, b block) error {
|
||||||
for i := range all {
|
for i := range all {
|
||||||
all[i] = uint32(i)
|
all[i] = uint32(i)
|
||||||
}
|
}
|
||||||
if err := index.WritePostings("", "", newListPostings(all)); err != nil {
|
if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,3 +290,89 @@ func (c *compactionMerger) Err() error {
|
||||||
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) {
|
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) {
|
||||||
return c.l, c.c
|
return c.l, c.c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type persister struct {
|
||||||
|
dir, tmpdir string
|
||||||
|
|
||||||
|
chunkf, indexf *fileutil.LockedFile
|
||||||
|
|
||||||
|
chunkw SeriesWriter
|
||||||
|
indexw IndexWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPersister(dir string) (*persister, error) {
|
||||||
|
p := &persister{
|
||||||
|
dir: dir,
|
||||||
|
tmpdir: dir + ".tmp",
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Write to temporary directory to make persistence appear atomic.
|
||||||
|
if fileutil.Exist(p.tmpdir) {
|
||||||
|
if err := os.RemoveAll(p.tmpdir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := fileutil.CreateDirAll(p.tmpdir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.chunkf, err = fileutil.LockFile(chunksFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
p.indexf, err = fileutil.LockFile(indexFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.indexw = newIndexWriter(p.indexf)
|
||||||
|
p.chunkw = newSeriesWriter(p.chunkf, p.indexw)
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *persister) Close() error {
|
||||||
|
if err := p.chunkw.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := p.indexw.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(p.chunkf.File); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(p.indexf.File); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := p.chunkf.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := p.indexf.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return renameDir(p.tmpdir, p.dir)
|
||||||
|
}
|
||||||
|
|
||||||
|
func renameDir(from, to string) error {
|
||||||
|
if err := os.RemoveAll(to); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.Rename(from, to); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Directory was renamed; sync parent dir to persist rename.
|
||||||
|
pdir, err := fileutil.OpenDir(filepath.Dir(to))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = fileutil.Fsync(pdir); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = pdir.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
22
db.go
22
db.go
|
@ -277,7 +277,7 @@ func (s *Shard) Close() error {
|
||||||
|
|
||||||
var e MultiError
|
var e MultiError
|
||||||
|
|
||||||
e.Add(s.compactor.close())
|
e.Add(s.compactor.Close())
|
||||||
|
|
||||||
for _, pb := range s.persisted {
|
for _, pb := range s.persisted {
|
||||||
e.Add(pb.Close())
|
e.Add(pb.Close())
|
||||||
|
@ -311,7 +311,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error {
|
||||||
defer func() { s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) }()
|
defer func() { s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) }()
|
||||||
|
|
||||||
if err := s.persist(); err != nil {
|
if err := s.persist(); err != nil {
|
||||||
s.logger.Log("msg", "persistance error", "err", err)
|
s.logger.Log("msg", "persistence error", "err", err)
|
||||||
}
|
}
|
||||||
s.metrics.persistences.Inc()
|
s.metrics.persistences.Inc()
|
||||||
}()
|
}()
|
||||||
|
@ -383,21 +383,23 @@ func (s *Shard) persist() error {
|
||||||
|
|
||||||
// TODO(fabxc): add grace period where we can still append to old head shard
|
// TODO(fabxc): add grace period where we can still append to old head shard
|
||||||
// before actually persisting it.
|
// before actually persisting it.
|
||||||
p := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
|
dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
|
||||||
|
|
||||||
if err := os.MkdirAll(p, 0777); err != nil {
|
p, err := newPersister(dir)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := head.persist(p)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sz := fmt.Sprintf("%.2fMiB", float64(n)/1024/1024)
|
if err := head.persist(p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := p.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sz := fmt.Sprintf("%.2fMB", float64(p.chunkw.Size()+p.indexw.Size())/1e6)
|
||||||
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
|
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
|
||||||
|
|
||||||
// Reopen block as persisted block for querying.
|
// Reopen block as persisted block for querying.
|
||||||
pb, err := newPersistedBlock(p)
|
pb, err := newPersistedBlock(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
49
head.go
49
head.go
|
@ -2,7 +2,6 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -261,36 +260,25 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HeadBlock) persist(p string) (int64, error) {
|
func (h *HeadBlock) persist(p *persister) error {
|
||||||
sf, err := os.Create(chunksFileName(p))
|
if err := h.wal.Close(); err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
xf, err := os.Create(indexFileName(p))
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
iw := newIndexWriter(xf)
|
|
||||||
sw := newSeriesWriter(sf, iw)
|
|
||||||
|
|
||||||
defer sw.Close()
|
|
||||||
defer iw.Close()
|
|
||||||
|
|
||||||
for ref, cd := range h.descs {
|
for ref, cd := range h.descs {
|
||||||
if err := sw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{
|
if err := p.chunkw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{
|
||||||
{
|
{
|
||||||
MinTime: cd.firsTimestamp,
|
MinTime: cd.firsTimestamp,
|
||||||
MaxTime: cd.lastTimestamp,
|
MaxTime: cd.lastTimestamp,
|
||||||
Chunk: cd.chunk,
|
Chunk: cd.chunk,
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := iw.WriteStats(h.stats); err != nil {
|
if err := p.indexw.WriteStats(h.stats); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
for n, v := range h.values {
|
for n, v := range h.values {
|
||||||
s := make([]string, 0, len(v))
|
s := make([]string, 0, len(v))
|
||||||
|
@ -298,14 +286,14 @@ func (h *HeadBlock) persist(p string) (int64, error) {
|
||||||
s = append(s, x)
|
s = append(s, x)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := iw.WriteLabelIndex([]string{n}, s); err != nil {
|
if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for t := range h.postings.m {
|
for t := range h.postings.m {
|
||||||
if err := iw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil {
|
if err := p.indexw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Write a postings list containing all series.
|
// Write a postings list containing all series.
|
||||||
|
@ -313,17 +301,8 @@ func (h *HeadBlock) persist(p string) (int64, error) {
|
||||||
for i := range all {
|
for i := range all {
|
||||||
all[i] = uint32(i)
|
all[i] = uint32(i)
|
||||||
}
|
}
|
||||||
if err := iw.WritePostings("", "", newListPostings(all)); err != nil {
|
if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
// Everything written successfully, we can remove the WAL.
|
|
||||||
if err := h.wal.Close(); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
if err := os.Remove(h.wal.f.Name()); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return iw.Size() + sw.Size(), err
|
|
||||||
}
|
}
|
||||||
|
|
1
wal.go
1
wal.go
|
@ -74,6 +74,7 @@ type walHandler struct {
|
||||||
series func(labels.Labels)
|
series func(labels.Labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadAll consumes all entries in the WAL and triggers the registered handlers.
|
||||||
func (w *WAL) ReadAll(h *walHandler) error {
|
func (w *WAL) ReadAll(h *walHandler) error {
|
||||||
dec := &walDecoder{
|
dec := &walDecoder{
|
||||||
r: w.f,
|
r: w.f,
|
||||||
|
|
25
writer.go
25
writer.go
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -129,15 +128,6 @@ func (w *seriesWriter) Size() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *seriesWriter) Close() error {
|
func (w *seriesWriter) Close() error {
|
||||||
if f, ok := w.w.(*os.File); ok {
|
|
||||||
if err := f.Sync(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if c, ok := w.w.(io.Closer); ok {
|
|
||||||
return c.Close()
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,18 +489,5 @@ func (w *indexWriter) finalize() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) Close() error {
|
func (w *indexWriter) Close() error {
|
||||||
if err := w.finalize(); err != nil {
|
return w.finalize()
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if f, ok := w.w.(*os.File); ok {
|
|
||||||
if err := f.Sync(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if c, ok := w.w.(io.Closer); ok {
|
|
||||||
return c.Close()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue