diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index c5f1c68e9..2f1cb6001 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -27,10 +27,10 @@ func (a nopAppendable) Appender() (storage.Appender, error) { type nopAppender struct{} -func (a nopAppender) SetSeries(labels.Labels) (uint64, error) { return 0, nil } -func (a nopAppender) Add(uint64, int64, float64) error { return nil } -func (a nopAppender) Commit() error { return nil } -func (a nopAppender) Rollback() error { return nil } +func (a nopAppender) Add(labels.Labels, int64, float64) (uint64, error) { return 0, nil } +func (a nopAppender) AddFast(uint64, int64, float64) error { return nil } +func (a nopAppender) Commit() error { return nil } +func (a nopAppender) Rollback() error { return nil } type collectResultAppender struct { refs map[uint64]labels.Labels diff --git a/template/template_test.go b/template/template_test.go index 647cb5389..178e63c5b 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -210,14 +210,11 @@ func TestTemplateExpansion(t *testing.T) { t.Fatalf("get appender: %s", err) } - aref, err := app.SetSeries(labels.FromStrings(labels.MetricName, "metric", "instance", "a")) + _, err = app.Add(labels.FromStrings(labels.MetricName, "metric", "instance", "a"), 0, 11) require.NoError(t, err) - bref, err := app.SetSeries(labels.FromStrings(labels.MetricName, "metric", "instance", "b")) + _, err = app.Add(labels.FromStrings(labels.MetricName, "metric", "instance", "b"), 0, 21) require.NoError(t, err) - app.Add(aref, 0, 11) - app.Add(bref, 0, 21) - if err := app.Commit(); err != nil { t.Fatalf("commit samples: %s", err) } diff --git a/util/testutil/storage.go b/util/testutil/storage.go index d37ac361e..6aa341f5e 100644 --- a/util/testutil/storage.go +++ b/util/testutil/storage.go @@ -3,6 +3,7 @@ package testutil import ( "io/ioutil" "os" + "time" "github.com/prometheus/common/log" "github.com/prometheus/prometheus/storage" @@ -20,8 +21,8 @@ func NewStorage(t T) storage.Storage { log.With("dir", dir).Debugln("opening test storage") db, err := tsdb.Open(dir, &tsdb.Options{ - MinBlockDuration: 2 * 60 * 60 * 1000, - MaxBlockDuration: 24 * 60 * 60 * 1000, + MinBlockDuration: 2 * time.Hour, + MaxBlockDuration: 24 * time.Hour, AppendableBlocks: 10, }) if err != nil { diff --git a/vendor/github.com/fabxc/tsdb/block.go b/vendor/github.com/fabxc/tsdb/block.go index ca7c48f46..db63e14f2 100644 --- a/vendor/github.com/fabxc/tsdb/block.go +++ b/vendor/github.com/fabxc/tsdb/block.go @@ -7,7 +7,6 @@ import ( "path/filepath" "sort" - "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" ) @@ -130,11 +129,11 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, errors.Wrap(err, "open index file") } - sr, err := newSeriesReader(chunksf.b) + sr, err := newSeriesReader([][]byte{chunksf.b}) if err != nil { return nil, errors.Wrap(err, "create series reader") } - ir, err := newIndexReader(sr, indexf.b) + ir, err := newIndexReader(indexf.b) if err != nil { return nil, errors.Wrap(err, "create index reader") } @@ -175,14 +174,12 @@ func indexFileName(path string) string { } type mmapFile struct { - f *fileutil.LockedFile + f *os.File b []byte } func openMmapFile(path string) (*mmapFile, error) { - // We have to open the file in RDWR for the lock to work with fileutil. - // TODO(fabxc): use own flock call that supports multi-reader. - f, err := fileutil.TryLockFile(path, os.O_RDWR, 0666) + f, err := os.Open(path) if err != nil { return nil, errors.Wrap(err, "try lock file") } @@ -191,7 +188,7 @@ func openMmapFile(path string) (*mmapFile, error) { return nil, errors.Wrap(err, "stat") } - b, err := mmap(f.File, int(info.Size())) + b, err := mmap(f, int(info.Size())) if err != nil { return nil, errors.Wrap(err, "mmap") } diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index 412a24788..6a0d67499 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -158,26 +158,25 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { c.metrics.duration.Observe(time.Since(start).Seconds()) }() - if fileutil.Exist(dir) { - if err = os.RemoveAll(dir); err != nil { - return err - } + if err = os.RemoveAll(dir); err != nil { + return err } - if err = os.MkdirAll(dir, 0755); err != nil { + + if err = os.MkdirAll(dir, 0777); err != nil { return err } - chunkf, err := fileutil.LockFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) + chunkf, err := os.OpenFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return errors.Wrap(err, "create chunk file") } - indexf, err := fileutil.LockFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) + indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return errors.Wrap(err, "create index file") } indexw := newIndexWriter(indexf) - chunkw := newSeriesWriter(chunkf, indexw) + chunkw := newChunkWriter(chunkf) if err = c.write(dir, blocks, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") @@ -189,10 +188,10 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } - if err = fileutil.Fsync(chunkf.File); err != nil { + if err = fileutil.Fsync(chunkf); err != nil { return errors.Wrap(err, "fsync chunk file") } - if err = fileutil.Fsync(indexf.File); err != nil { + if err = fileutil.Fsync(indexf); err != nil { return errors.Wrap(err, "fsync index file") } if err = chunkf.Close(); err != nil { @@ -204,7 +203,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return nil } -func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error { +func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw ChunkWriter) error { var set compactionSet for i, b := range blocks { @@ -238,10 +237,12 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw for set.Next() { lset, chunks := set.At() - if err := chunkw.WriteSeries(i, lset, chunks); err != nil { + if err := chunkw.WriteChunks(chunks...); err != nil { return err } + indexw.AddSeries(i, lset, chunks...) + meta.Stats.NumChunks += uint64(len(chunks)) meta.Stats.NumSeries++ diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index 72b82c778..10253ca1d 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/nightlyone/lockfile" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -28,10 +29,11 @@ import ( // DefaultOptions used for the DB. They are sane for setups using // millisecond precision timestampdb. var DefaultOptions = &Options{ - WALFlushInterval: 5 * time.Second, - MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds - MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds - AppendableBlocks: 2, + WALFlushInterval: 5 * time.Second, + RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds + MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds + MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds + AppendableBlocks: 2, } // Options of the DB storage. @@ -39,6 +41,9 @@ type Options struct { // The interval at which the write ahead log is flushed to disc. WALFlushInterval time.Duration + // Duration of persisted data to keep. + RetentionDuration uint64 + // The timestamp range of head blocks after which they get persisted. // It's the minimum duration of any persisted block. MinBlockDuration uint64 @@ -82,6 +87,7 @@ const sep = '\xff' // a hashed partition of a seriedb. type DB struct { dir string + lockf lockfile.Lockfile logger log.Logger metrics *dbMetrics opts *Options @@ -126,13 +132,24 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { // Open returns a new DB in the given directory. func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { - if !fileutil.Exist(dir) { - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err + } + + absdir, err := filepath.Abs(dir) + if err != nil { + return nil, err + } + lockf, err := lockfile.New(filepath.Join(absdir, "lock")) + if err != nil { + return nil, err + } + if err := lockf.TryLock(); err != nil { + return nil, errors.Wrapf(err, "open DB in %s", dir) } - // var r prometheus.Registerer - r := prometheus.DefaultRegisterer + + var r prometheus.Registerer + // r := prometheus.DefaultRegisterer if opts == nil { opts = DefaultOptions @@ -143,6 +160,7 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { db = &DB{ dir: dir, + lockf: lockf, logger: logger, metrics: newDBMetrics(r), opts: opts, @@ -284,6 +302,37 @@ func (db *DB) compact(i, j int) error { return errors.Wrap(err, "removing old block") } } + return db.retentionCutoff() +} + +func (db *DB) retentionCutoff() error { + if db.opts.RetentionDuration == 0 { + return nil + } + h := db.heads[len(db.heads)-1] + t := h.meta.MinTime - int64(db.opts.RetentionDuration) + + var ( + blocks = db.blocks() + i int + b Block + ) + for i, b = range blocks { + if b.Meta().MinTime >= t { + break + } + } + if i <= 1 { + return nil + } + db.logger.Log("msg", "retention cutoff", "idx", i-1) + db.removeBlocks(0, i) + + for _, b := range blocks[:i] { + if err := os.RemoveAll(b.Dir()); err != nil { + return errors.Wrap(err, "removing old block") + } + } return nil } @@ -299,7 +348,7 @@ func (db *DB) initBlocks() error { } for _, dir := range dirs { - if fileutil.Exist(filepath.Join(dir, walFileName)) { + if fileutil.Exist(filepath.Join(dir, walDirName)) { h, err := openHeadBlock(dir, db.logger) if err != nil { return err @@ -339,6 +388,8 @@ func (db *DB) Close() error { merr.Add(hb.Close()) } + merr.Add(db.lockf.Unlock()) + return merr.Err() } @@ -566,7 +617,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { func (db *DB) cut(mint int64) (*headBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) - dir, seq, err := nextBlockDir(db.dir) + dir, seq, err := nextSequenceFile(db.dir, "b-") if err != nil { return nil, err } @@ -616,7 +667,32 @@ func blockDirs(dir string) ([]string, error) { return dirs, nil } -func nextBlockDir(dir string) (string, int, error) { +func sequenceFiles(dir, prefix string) ([]string, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + var res []string + + for _, fi := range files { + if isSequenceFile(fi, prefix) { + res = append(res, filepath.Join(dir, fi.Name())) + } + } + return res, nil +} + +func isSequenceFile(fi os.FileInfo, prefix string) bool { + if !strings.HasPrefix(fi.Name(), prefix) { + return false + } + if _, err := strconv.ParseUint(fi.Name()[len(prefix):], 10, 32); err != nil { + return false + } + return true +} + +func nextSequenceFile(dir, prefix string) (string, int, error) { names, err := fileutil.ReadDir(dir) if err != nil { return "", 0, err @@ -624,16 +700,16 @@ func nextBlockDir(dir string) (string, int, error) { i := uint64(0) for _, n := range names { - if !strings.HasPrefix(n, "b-") { + if !strings.HasPrefix(n, prefix) { continue } - j, err := strconv.ParseUint(n[2:], 10, 32) + j, err := strconv.ParseUint(n[len(prefix):], 10, 32) if err != nil { continue } i = j } - return filepath.Join(dir, fmt.Sprintf("b-%0.6d", i+1)), int(i + 1), nil + return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil } // PartitionedDB is a time series storage. diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index 1c69ac3bb..d9ab2944c 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -43,6 +43,7 @@ type headBlock struct { activeWriters uint64 + symbols map[string]struct{} // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. series []*memSeries @@ -97,25 +98,26 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { meta: *meta, } - // Replay contents of the write ahead log. - if err = wal.ReadAll(&walHandler{ - series: func(lset labels.Labels) error { + r := wal.Reader() + + for r.Next() { + series, samples := r.At() + + for _, lset := range series { h.create(lset.Hash(), lset) h.meta.Stats.NumSeries++ - return nil - }, - sample: func(s refdSample) error { + } + for _, s := range samples { h.series[s.ref].append(s.t, s.v) if !h.inBounds(s.t) { - return ErrOutOfBounds + return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") } - h.meta.Stats.NumSamples++ - return nil - }, - }); err != nil { - return nil, err + } + } + if err := r.Err(); err != nil { + return nil, errors.Wrap(err, "consume WAL") } h.updateMapping() @@ -362,14 +364,17 @@ type headSeriesReader struct { } // Chunk returns the chunk for the reference number. -func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) { +func (h *headSeriesReader) Chunk(ref uint64) (chunks.Chunk, error) { h.mtx.RLock() defer h.mtx.RUnlock() + si := ref >> 32 + ci := (ref << 32) >> 32 + c := &safeChunk{ - Chunk: h.series[ref>>8].chunks[int((ref<<24)>>24)].chunk, - s: h.series[ref>>8], - i: int((ref << 24) >> 24), + Chunk: h.series[si].chunks[ci].chunk, + s: h.series[si], + i: int(ci), } return c, nil } @@ -438,7 +443,7 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) metas = append(metas, ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, - Ref: (ref << 8) | uint32(i), + Ref: (uint64(ref) << 32) | uint64(i), }) } diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index df17ad270..c09bab885 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -491,7 +491,7 @@ type chunkSeries struct { // chunk is a function that retrieves chunks based on a reference // number contained in the chunk meta information. - chunk func(ref uint32) (chunks.Chunk, error) + chunk func(ref uint64) (chunks.Chunk, error) } func (s *chunkSeries) Labels() labels.Labels { diff --git a/vendor/github.com/fabxc/tsdb/reader.go b/vendor/github.com/fabxc/tsdb/reader.go index a49fb794b..630c11a99 100644 --- a/vendor/github.com/fabxc/tsdb/reader.go +++ b/vendor/github.com/fabxc/tsdb/reader.go @@ -13,32 +13,45 @@ import ( // SeriesReader provides reading access of serialized time series data. type SeriesReader interface { // Chunk returns the series data chunk with the given reference. - Chunk(ref uint32) (chunks.Chunk, error) + Chunk(ref uint64) (chunks.Chunk, error) } // seriesReader implements a SeriesReader for a serialized byte stream // of series data. type seriesReader struct { - // The underlying byte slice holding the encoded series data. - b []byte + // The underlying bytes holding the encoded series data. + bs [][]byte } -func newSeriesReader(b []byte) (*seriesReader, error) { - if len(b) < 4 { - return nil, errors.Wrap(errInvalidSize, "index header") - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { - return nil, fmt.Errorf("invalid magic number %x", m) +func newSeriesReader(bs [][]byte) (*seriesReader, error) { + s := &seriesReader{bs: bs} + + for i, b := range bs { + if len(b) < 4 { + return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { + return nil, fmt.Errorf("invalid magic number %x", m) + } } - return &seriesReader{b: b}, nil + return s, nil } -func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) { - if int(offset) > len(s.b) { - return nil, errors.Errorf("offset %d beyond data size %d", offset, len(s.b)) +func (s *seriesReader) Chunk(ref uint64) (chunks.Chunk, error) { + var ( + seq = int(ref >> 32) + off = int((ref << 32) >> 32) + ) + if seq >= len(s.bs) { + return nil, errors.Errorf("reference sequence %d out of range", seq) } - b := s.b[offset:] + b := s.bs[seq] + + if int(off) >= len(b) { + return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) + } + b = b[off:] l, n := binary.Uvarint(b) if n < 0 { @@ -78,8 +91,6 @@ type StringTuples interface { } type indexReader struct { - series SeriesReader - // The underlying byte slice holding the encoded series data. b []byte @@ -93,14 +104,11 @@ var ( errInvalidFlag = fmt.Errorf("invalid flag") ) -func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { +func newIndexReader(b []byte) (*indexReader, error) { if len(b) < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } - r := &indexReader{ - series: s, - b: b, - } + r := &indexReader{b: b} // Verify magic number. if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex { @@ -299,7 +307,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { b = b[n:] chunks = append(chunks, ChunkMeta{ - Ref: uint32(o), + Ref: o, MinTime: firstTime, MaxTime: lastTime, }) diff --git a/vendor/github.com/fabxc/tsdb/wal.go b/vendor/github.com/fabxc/tsdb/wal.go index 5731bba14..a923f5b3d 100644 --- a/vendor/github.com/fabxc/tsdb/wal.go +++ b/vendor/github.com/fabxc/tsdb/wal.go @@ -3,6 +3,7 @@ package tsdb import ( "bufio" "encoding/binary" + "hash" "hash/crc32" "io" "math" @@ -20,11 +21,19 @@ import ( // WALEntryType indicates what data a WAL entry contains. type WALEntryType byte -// The valid WAL entry types. const ( - WALEntrySymbols = 1 - WALEntrySeries = 2 - WALEntrySamples = 3 + // WALMagic is a 4 byte number every WAL segment file starts with. + WALMagic = uint32(0x43AF00EF) + + // WALFormatDefault is the version flag for the default outer segment file format. + WALFormatDefault = byte(1) +) + +// Entry types in a segment file. +const ( + WALEntrySymbols WALEntryType = 1 + WALEntrySeries WALEntryType = 2 + WALEntrySamples WALEntryType = 3 ) // WAL is a write ahead log for series data. It can only be written to. @@ -32,102 +41,201 @@ const ( type WAL struct { mtx sync.Mutex - f *fileutil.LockedFile - enc *walEncoder + dirFile *os.File + files []*os.File + logger log.Logger flushInterval time.Duration + segmentSize int64 + + crc32 hash.Hash32 + cur *bufio.Writer + curN int64 stopc chan struct{} donec chan struct{} - - symbols map[string]uint32 } -const walFileName = "wal-000" +const ( + walDirName = "wal" + walSegmentSizeBytes = 64 * 1000 * 1000 // 64 MB +) // OpenWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) { + dir = filepath.Join(dir, walDirName) + if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } - - p := filepath.Join(dir, walFileName) - - f, err := fileutil.TryLockFile(p, os.O_RDWR, 0666) - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - - f, err = fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - return nil, err - } - if _, err = f.Seek(0, os.SEEK_END); err != nil { - return nil, err - } - } - enc, err := newWALEncoder(f.File) + df, err := fileutil.OpenDir(dir) if err != nil { return nil, err } w := &WAL{ - f: f, + dirFile: df, logger: l, - enc: enc, flushInterval: flushInterval, - symbols: map[string]uint32{}, donec: make(chan struct{}), stopc: make(chan struct{}), + segmentSize: walSegmentSizeBytes, + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } + if err := w.initSegments(); err != nil { + return nil, err + } + go w.run(flushInterval) return w, nil } -type walHandler struct { - sample func(refdSample) error - series func(labels.Labels) error +// Reader returns a new reader over the the write ahead log data. +// It must be completely consumed before writing to the WAL. +func (w *WAL) Reader() *WALReader { + var rs []io.ReadCloser + for _, f := range w.files { + rs = append(rs, f) + } + return NewWALReader(rs...) } -// ReadAll consumes all entries in the WAL and triggers the registered handlers. -func (w *WAL) ReadAll(h *walHandler) error { - dec := &walDecoder{ - r: w.f, - handler: h, +// Log writes a batch of new series labels and samples to the log. +func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { + if err := w.encodeSeries(series); err != nil { + return err + } + if err := w.encodeSamples(samples); err != nil { + return err } + if w.flushInterval <= 0 { + return w.Sync() + } + return nil +} - for { - if err := dec.entry(); err != nil { - if err == io.EOF { - return nil +// initSegments finds all existing segment files and opens them in the +// appropriate file modes. +func (w *WAL) initSegments() error { + fns, err := sequenceFiles(w.dirFile.Name(), "") + if err != nil { + return err + } + if len(fns) == 0 { + return nil + } + if len(fns) > 1 { + for _, fn := range fns[:len(fns)-1] { + f, err := os.Open(fn) + if err != nil { + return err } - return err + w.files = append(w.files, f) + } + } + // The most recent WAL file is the one we have to keep appending to. + f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666) + if err != nil { + return err + } + w.files = append(w.files, f) + + // Consume and validate meta headers. + for _, f := range w.files { + metab := make([]byte, 8) + + if n, err := f.Read(metab); err != nil { + return errors.Wrapf(err, "validate meta %q", f.Name()) + } else if n != 8 { + return errors.Errorf("invalid header size %d in %q", n, f.Name()) + } + + if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic { + return errors.Errorf("invalid magic header %x in %q", m, f.Name()) + } + if metab[4] != WALFormatDefault { + return errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) } } + + return nil } -// Log writes a batch of new series labels and samples to the log. -func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { - if err := w.enc.encodeSeries(series); err != nil { +// cut finishes the currently active segments and open the next one. +// The encoder is reset to point to the new segment. +func (w *WAL) cut() error { + // Sync current tail to disc and close. + if tf := w.tail(); tf != nil { + if err := w.sync(); err != nil { + return err + } + off, err := tf.Seek(0, os.SEEK_CUR) + if err != nil { + return err + } + if err := tf.Truncate(off); err != nil { + return err + } + if err := tf.Close(); err != nil { + return err + } + } + + p, _, err := nextSequenceFile(w.dirFile.Name(), "") + if err != nil { return err } - if err := w.enc.encodeSamples(samples); err != nil { + f, err := os.Create(p) + if err != nil { return err } - if w.flushInterval <= 0 { - return w.sync() + if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { + return err } + if err = w.dirFile.Sync(); err != nil { + return err + } + + // Write header metadata for new file. + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], WALMagic) + metab[4] = WALFormatDefault + + if _, err := f.Write(metab); err != nil { + return err + } + + w.files = append(w.files, f) + w.cur = bufio.NewWriterSize(f, 4*1024*1024) + w.curN = 8 + return nil } +func (w *WAL) tail() *os.File { + if len(w.files) == 0 { + return nil + } + return w.files[len(w.files)-1] +} + +func (w *WAL) Sync() error { + w.mtx.Lock() + defer w.mtx.Unlock() + + return w.sync() +} + func (w *WAL) sync() error { - if err := w.enc.flush(); err != nil { + if w.cur == nil { + return nil + } + if err := w.cur.Flush(); err != nil { return err } - return fileutil.Fdatasync(w.f.File) + return fileutil.Fdatasync(w.tail()) } func (w *WAL) run(interval time.Duration) { @@ -145,7 +253,7 @@ func (w *WAL) run(interval time.Duration) { case <-w.stopc: return case <-tick: - if err := w.sync(); err != nil { + if err := w.Sync(); err != nil { w.logger.Log("msg", "sync failed", "err", err) } } @@ -157,16 +265,18 @@ func (w *WAL) Close() error { close(w.stopc) <-w.donec + w.mtx.Lock() + defer w.mtx.Unlock() + if err := w.sync(); err != nil { return err } - return w.f.Close() -} - -type walEncoder struct { - mtx sync.Mutex - // w *ioutil.PageWriter - w *bufio.Writer + // On opening, a WAL must be fully consumed once. Afterwards + // only the current segment will still be open. + if tf := w.tail(); tf != nil { + return tf.Close() + } + return nil } const ( @@ -178,31 +288,24 @@ const ( walPageBytes = 16 * minSectorSize ) -func newWALEncoder(f *os.File) (*walEncoder, error) { - // offset, err := f.Seek(0, os.SEEK_CUR) - // if err != nil { - // return nil, err - // } - enc := &walEncoder{ - // w: ioutil.NewPageWriter(f, walPageBytes, int(offset)), - w: bufio.NewWriterSize(f, 4*1024*1024), - } - return enc, nil -} - -func (e *walEncoder) flush() error { - e.mtx.Lock() - defer e.mtx.Unlock() +func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { + w.mtx.Lock() + defer w.mtx.Unlock() - return e.w.Flush() -} - -func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error { - e.mtx.Lock() - defer e.mtx.Unlock() + // Cut to the next segment if exceeds the file size unless it would also + // exceed the size of a new segment. + var ( + sz = int64(6 + 4 + len(buf)) + newsz = w.curN + sz + ) + if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { + if err := w.cut(); err != nil { + return err + } + } - h := crc32.NewIEEE() - w := io.MultiWriter(h, e.w) + w.crc32.Reset() + wr := io.MultiWriter(w.crc32, w.cur) b := make([]byte, 6) b[0] = byte(et) @@ -210,16 +313,18 @@ func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error { binary.BigEndian.PutUint32(b[2:], uint32(len(buf))) - if _, err := w.Write(b); err != nil { + if _, err := wr.Write(b); err != nil { return err } - if _, err := w.Write(buf); err != nil { + if _, err := wr.Write(buf); err != nil { return err } - if _, err := e.w.Write(h.Sum(nil)); err != nil { + if _, err := w.cur.Write(w.crc32.Sum(nil)); err != nil { return err } + w.curN += sz + putWALBuffer(buf) return nil } @@ -244,7 +349,7 @@ func putWALBuffer(b []byte) { walBuffers.Put(b) } -func (e *walEncoder) encodeSeries(series []labels.Labels) error { +func (w *WAL) encodeSeries(series []labels.Labels) error { if len(series) == 0 { return nil } @@ -267,10 +372,10 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error { } } - return e.entry(WALEntrySeries, walSeriesSimple, buf) + return w.entry(WALEntrySeries, walSeriesSimple, buf) } -func (e *walEncoder) encodeSamples(samples []refdSample) error { +func (w *WAL) encodeSamples(samples []refdSample) error { if len(samples) == 0 { return nil } @@ -300,25 +405,127 @@ func (e *walEncoder) encodeSamples(samples []refdSample) error { buf = append(buf, b[:8]...) } - return e.entry(WALEntrySamples, walSamplesSimple, buf) + return w.entry(WALEntrySamples, walSamplesSimple, buf) } -type walDecoder struct { - r io.Reader - handler *walHandler +// WALReader decodes and emits write ahead log entries. +type WALReader struct { + rs []io.ReadCloser + cur int + buf []byte + crc32 hash.Hash32 - buf []byte + err error + labels []labels.Labels + samples []refdSample } -func newWALDecoer(r io.Reader, h *walHandler) *walDecoder { - return &walDecoder{ - r: r, - handler: h, - buf: make([]byte, 0, 1024*1024), +// NewWALReader returns a new WALReader over the sequence of the given ReadClosers. +func NewWALReader(rs ...io.ReadCloser) *WALReader { + return &WALReader{ + rs: rs, + buf: make([]byte, 0, 128*4096), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } } -func (d *walDecoder) decodeSeries(flag byte, b []byte) error { +// At returns the last decoded entry of labels or samples. +func (r *WALReader) At() ([]labels.Labels, []refdSample) { + return r.labels, r.samples +} + +// Err returns the last error the reader encountered. +func (r *WALReader) Err() error { + return r.err +} + +// nextEntry retrieves the next entry. It is also used as a testing hook. +func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { + if r.cur >= len(r.rs) { + return 0, 0, nil, io.EOF + } + cr := r.rs[r.cur] + + et, flag, b, err := r.entry(cr) + if err == io.EOF { + // Current reader completed, close and move to the next one. + if err := cr.Close(); err != nil { + return 0, 0, nil, err + } + r.cur++ + return r.nextEntry() + } + return et, flag, b, err +} + +// Next returns decodes the next entry pair and returns true +// if it was succesful. +func (r *WALReader) Next() bool { + r.labels = r.labels[:0] + r.samples = r.samples[:0] + + et, flag, b, err := r.nextEntry() + if err != nil { + if err != io.EOF { + r.err = err + } + return false + } + + switch et { + case WALEntrySamples: + if err := r.decodeSamples(flag, b); err != nil { + r.err = err + } + case WALEntrySeries: + if err := r.decodeSeries(flag, b); err != nil { + r.err = err + } + default: + r.err = errors.Errorf("unknown WAL entry type %d", et) + } + return r.err == nil +} + +func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { + r.crc32.Reset() + tr := io.TeeReader(cr, r.crc32) + + b := make([]byte, 6) + if _, err := tr.Read(b); err != nil { + return 0, 0, nil, err + } + + var ( + etype = WALEntryType(b[0]) + flag = b[1] + length = int(binary.BigEndian.Uint32(b[2:])) + ) + // Exit if we reached pre-allocated space. + if etype == 0 { + return 0, 0, nil, io.EOF + } + + if length > len(r.buf) { + r.buf = make([]byte, length) + } + buf := r.buf[:length] + + if _, err := tr.Read(buf); err != nil { + return 0, 0, nil, err + } + _, err := cr.Read(b[:4]) + if err != nil { + return 0, 0, nil, err + } + if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { + return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) + } + + return etype, flag, buf, nil +} + +func (r *WALReader) decodeSeries(flag byte, b []byte) error { for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { @@ -343,14 +550,12 @@ func (d *walDecoder) decodeSeries(flag byte, b []byte) error { b = b[n+int(vl):] } - if err := d.handler.series(lset); err != nil { - return err - } + r.labels = append(r.labels, lset) } return nil } -func (d *walDecoder) decodeSamples(flag byte, b []byte) error { +func (r *WALReader) decodeSamples(flag byte, b []byte) error { if len(b) < 16 { return errors.Wrap(errInvalidSize, "header length") } @@ -384,45 +589,7 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error { smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] - if err := d.handler.sample(smpl); err != nil { - return err - } + r.samples = append(r.samples, smpl) } return nil } - -func (d *walDecoder) entry() error { - b := make([]byte, 6) - if _, err := d.r.Read(b); err != nil { - return err - } - - var ( - etype = WALEntryType(b[0]) - flag = b[1] - length = int(binary.BigEndian.Uint32(b[2:])) - ) - - if length > len(d.buf) { - d.buf = make([]byte, length) - } - buf := d.buf[:length] - - if _, err := d.r.Read(buf); err != nil { - return err - } - // Read away checksum. - // TODO(fabxc): verify it - if _, err := d.r.Read(b[:4]); err != nil { - return err - } - - switch etype { - case WALEntrySeries: - return d.decodeSeries(flag, buf) - case WALEntrySamples: - return d.decodeSamples(flag, buf) - } - - return errors.Errorf("unknown WAL entry type %q", etype) -} diff --git a/vendor/github.com/fabxc/tsdb/writer.go b/vendor/github.com/fabxc/tsdb/writer.go index a73f2bd11..6b4e31deb 100644 --- a/vendor/github.com/fabxc/tsdb/writer.go +++ b/vendor/github.com/fabxc/tsdb/writer.go @@ -3,6 +3,7 @@ package tsdb import ( "bufio" "encoding/binary" + "hash" "hash/crc32" "io" "sort" @@ -24,12 +25,13 @@ const ( const compactionPageBytes = minSectorSize * 64 -// SeriesWriter serializes a time block of chunked series data. -type SeriesWriter interface { - // WriteSeries writes the time series data chunks for a single series. - // The reference is used to resolve the correct series in the written index. - // It only has to be valid for the duration of the write. - WriteSeries(ref uint32, l labels.Labels, chunks []ChunkMeta) error +// ChunkWriter serializes a time block of chunked series data. +type ChunkWriter interface { + // WriteChunks writes several chunks. The data field of the ChunkMetas + // must be populated. + // After returning successfully, the Ref fields in the ChunkMetas + // is set and can be used to retrieve the chunks from the written data. + WriteChunks(chunks ...ChunkMeta) error // Size returns the size of the data written so far. Size() int64 @@ -39,33 +41,32 @@ type SeriesWriter interface { Close() error } -// seriesWriter implements the SeriesWriter interface for the standard +// chunkWriter implements the ChunkWriter interface for the standard // serialization format. -type seriesWriter struct { - ow io.Writer - w *bufio.Writer - n int64 - c int - - index IndexWriter +type chunkWriter struct { + ow io.Writer + w *bufio.Writer + n int64 + c int + crc32 hash.Hash } -func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter { - return &seriesWriter{ +func newChunkWriter(w io.Writer) *chunkWriter { + return &chunkWriter{ ow: w, w: bufio.NewWriterSize(w, 1*1024*1024), n: 0, - index: index, + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } } -func (w *seriesWriter) write(wr io.Writer, b []byte) error { +func (w *chunkWriter) write(wr io.Writer, b []byte) error { n, err := wr.Write(b) w.n += int64(n) return err } -func (w *seriesWriter) writeMeta() error { +func (w *chunkWriter) writeMeta() error { b := [8]byte{} binary.BigEndian.PutUint32(b[:4], MagicSeries) @@ -74,7 +75,7 @@ func (w *seriesWriter) writeMeta() error { return w.write(w.w, b[:]) } -func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkMeta) error { +func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { // Initialize with meta data. if w.n == 0 { if err := w.writeMeta(); err != nil { @@ -82,9 +83,8 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM } } - // TODO(fabxc): is crc32 enough for chunks of one series? - h := crc32.NewIEEE() - wr := io.MultiWriter(h, w.w) + w.crc32.Reset() + wr := io.MultiWriter(w.crc32, w.w) // For normal reads we don't need the number of the chunk section but // it allows us to verify checksums without reading the index file. @@ -101,7 +101,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM for i := range chks { chk := &chks[i] - chk.Ref = uint32(w.n) + chk.Ref = uint64(w.n) n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) @@ -117,21 +117,17 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM chk.Chunk = nil } - if err := w.write(w.w, h.Sum(nil)); err != nil { + if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { return err } - - if w.index != nil { - w.index.AddSeries(ref, lset, chks...) - } return nil } -func (w *seriesWriter) Size() int64 { +func (w *chunkWriter) Size() int64 { return w.n } -func (w *seriesWriter) Close() error { +func (w *chunkWriter) Close() error { // Initialize block in case no data was written to it. if w.n == 0 { if err := w.writeMeta(); err != nil { @@ -146,7 +142,7 @@ type ChunkMeta struct { // Ref and Chunk hold either a reference that can be used to retrieve // chunk data or the data itself. // Generally, only one of them is set. - Ref uint32 + Ref uint64 Chunk chunks.Chunk MinTime, MaxTime int64 // time range the data covers @@ -195,6 +191,8 @@ type indexWriter struct { symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets postings []hashEntry // postings lists offsets + + crc32 hash.Hash } func newIndexWriter(w io.Writer) *indexWriter { @@ -204,6 +202,7 @@ func newIndexWriter(w io.Writer) *indexWriter { n: 0, symbols: make(map[string]uint32, 4096), series: make(map[uint32]*indexWriterSeries, 4096), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } } @@ -215,8 +214,8 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error { // section writes a CRC32 checksummed section of length l and guarded by flag. func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { - h := crc32.NewIEEE() - wr := io.MultiWriter(h, w.w) + w.crc32.Reset() + wr := io.MultiWriter(w.crc32, w.w) b := [5]byte{flag, 0, 0, 0, 0} binary.BigEndian.PutUint32(b[1:], l) @@ -228,7 +227,7 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er if err := f(wr); err != nil { return errors.Wrap(err, "contents write func") } - if err := w.write(w.w, h.Sum(nil)); err != nil { + if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { return errors.Wrap(err, "writing checksum") } return nil diff --git a/vendor/github.com/nightlyone/lockfile/LICENSE b/vendor/github.com/nightlyone/lockfile/LICENSE new file mode 100644 index 000000000..eb5b80468 --- /dev/null +++ b/vendor/github.com/nightlyone/lockfile/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2012 Ingo Oeser + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/nightlyone/lockfile/README.md b/vendor/github.com/nightlyone/lockfile/README.md new file mode 100644 index 000000000..54ee19cc4 --- /dev/null +++ b/vendor/github.com/nightlyone/lockfile/README.md @@ -0,0 +1,52 @@ +lockfile +========= +Handle locking via pid files. + +[![Build Status Unix][1]][2] +[![Build status Windows][3]][4] + +[1]: https://secure.travis-ci.org/nightlyone/lockfile.png +[2]: https://travis-ci.org/nightlyone/lockfile +[3]: https://ci.appveyor.com/api/projects/status/7mojkmauj81uvp8u/branch/master?svg=true +[4]: https://ci.appveyor.com/project/nightlyone/lockfile/branch/master + + + +install +------- +Install [Go 1][5], either [from source][6] or [with a prepackaged binary][7]. +For Windows suport, Go 1.4 or newer is required. + +Then run + + go get github.com/nightlyone/lockfile + +[5]: http://golang.org +[6]: http://golang.org/doc/install/source +[7]: http://golang.org/doc/install + +LICENSE +------- +BSD + +documentation +------------- +[package documentation at godoc.org](http://godoc.org/github.com/nightlyone/lockfile) + +install +------------------- + go get github.com/nightlyone/lockfile + + +contributing +============ + +Contributions are welcome. Please open an issue or send me a pull request for a dedicated branch. +Make sure the git commit hooks show it works. + +git commit hooks +----------------------- +enable commit hooks via + + cd .git ; rm -rf hooks; ln -s ../git-hooks hooks ; cd .. + diff --git a/vendor/github.com/nightlyone/lockfile/appveyor.yml b/vendor/github.com/nightlyone/lockfile/appveyor.yml new file mode 100644 index 000000000..cf72a58b1 --- /dev/null +++ b/vendor/github.com/nightlyone/lockfile/appveyor.yml @@ -0,0 +1,12 @@ +clone_folder: c:\gopath\src\github.com\nightlyone\lockfile + +environment: + GOPATH: c:\gopath + +install: + - go version + - go env + - go get -v -t ./... + +build_script: + - go test -v ./... diff --git a/vendor/github.com/nightlyone/lockfile/lockfile.go b/vendor/github.com/nightlyone/lockfile/lockfile.go new file mode 100644 index 000000000..da00bec78 --- /dev/null +++ b/vendor/github.com/nightlyone/lockfile/lockfile.go @@ -0,0 +1,201 @@ +// Package lockfile handles pid file based locking. +// While a sync.Mutex helps against concurrency issues within a single process, +// this package is designed to help against concurrency issues between cooperating processes +// or serializing multiple invocations of the same process. You can also combine sync.Mutex +// with Lockfile in order to serialize an action between different goroutines in a single program +// and also multiple invocations of this program. +package lockfile + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" +) + +// Lockfile is a pid file which can be locked +type Lockfile string + +// TemporaryError is a type of error where a retry after a random amount of sleep should help to mitigate it. +type TemporaryError string + +func (t TemporaryError) Error() string { return string(t) } + +// Temporary returns always true. +// It exists, so you can detect it via +// if te, ok := err.(interface{ Temporary() bool }); ok { +// fmt.Println("I am a temporay error situation, so wait and retry") +// } +func (t TemporaryError) Temporary() bool { return true } + +// Various errors returned by this package +var ( + ErrBusy = TemporaryError("Locked by other process") // If you get this, retry after a short sleep might help + ErrNotExist = TemporaryError("Lockfile created, but doesn't exist") // If you get this, retry after a short sleep might help + ErrNeedAbsPath = errors.New("Lockfiles must be given as absolute path names") + ErrInvalidPid = errors.New("Lockfile contains invalid pid for system") + ErrDeadOwner = errors.New("Lockfile contains pid of process not existent on this system anymore") + ErrRogueDeletion = errors.New("Lockfile owned by me has been removed unexpectedly") +) + +// New describes a new filename located at the given absolute path. +func New(path string) (Lockfile, error) { + if !filepath.IsAbs(path) { + return Lockfile(""), ErrNeedAbsPath + } + return Lockfile(path), nil +} + +// GetOwner returns who owns the lockfile. +func (l Lockfile) GetOwner() (*os.Process, error) { + name := string(l) + + // Ok, see, if we have a stale lockfile here + content, err := ioutil.ReadFile(name) + if err != nil { + return nil, err + } + + // try hard for pids. If no pid, the lockfile is junk anyway and we delete it. + pid, err := scanPidLine(content) + if err != nil { + return nil, err + } + running, err := isRunning(pid) + if err != nil { + return nil, err + } + + if running { + proc, err := os.FindProcess(pid) + if err != nil { + return nil, err + } + return proc, nil + } + return nil, ErrDeadOwner + +} + +// TryLock tries to own the lock. +// It Returns nil, if successful and and error describing the reason, it didn't work out. +// Please note, that existing lockfiles containing pids of dead processes +// and lockfiles containing no pid at all are simply deleted. +func (l Lockfile) TryLock() error { + name := string(l) + + // This has been checked by New already. If we trigger here, + // the caller didn't use New and re-implemented it's functionality badly. + // So panic, that he might find this easily during testing. + if !filepath.IsAbs(name) { + panic(ErrNeedAbsPath) + } + + tmplock, err := ioutil.TempFile(filepath.Dir(name), "") + if err != nil { + return err + } + + cleanup := func() { + _ = tmplock.Close() + _ = os.Remove(tmplock.Name()) + } + defer cleanup() + + if err := writePidLine(tmplock, os.Getpid()); err != nil { + return err + } + + // return value intentionally ignored, as ignoring it is part of the algorithm + _ = os.Link(tmplock.Name(), name) + + fiTmp, err := os.Lstat(tmplock.Name()) + if err != nil { + return err + } + fiLock, err := os.Lstat(name) + if err != nil { + // tell user that a retry would be a good idea + if os.IsNotExist(err) { + return ErrNotExist + } + return err + } + + // Success + if os.SameFile(fiTmp, fiLock) { + return nil + } + + proc, err := l.GetOwner() + switch err { + default: + // Other errors -> defensively fail and let caller handle this + return err + case nil: + if proc.Pid != os.Getpid() { + return ErrBusy + } + case ErrDeadOwner, ErrInvalidPid: + // cases we can fix below + } + + // clean stale/invalid lockfile + err = os.Remove(name) + if err != nil { + // If it doesn't exist, then it doesn't matter who removed it. + if !os.IsNotExist(err) { + return err + } + } + + // now that the stale lockfile is gone, let's recurse + return l.TryLock() +} + +// Unlock a lock again, if we owned it. Returns any error that happend during release of lock. +func (l Lockfile) Unlock() error { + proc, err := l.GetOwner() + switch err { + case ErrInvalidPid, ErrDeadOwner: + return ErrRogueDeletion + case nil: + if proc.Pid == os.Getpid() { + // we really own it, so let's remove it. + return os.Remove(string(l)) + } + // Not owned by me, so don't delete it. + return ErrRogueDeletion + default: + // This is an application error or system error. + // So give a better error for logging here. + if os.IsNotExist(err) { + return ErrRogueDeletion + } + // Other errors -> defensively fail and let caller handle this + return err + } +} + +func writePidLine(w io.Writer, pid int) error { + _, err := io.WriteString(w, fmt.Sprintf("%d\n", pid)) + return err +} + +func scanPidLine(content []byte) (int, error) { + if len(content) == 0 { + return 0, ErrInvalidPid + } + + var pid int + if _, err := fmt.Sscanln(string(content), &pid); err != nil { + return 0, ErrInvalidPid + } + + if pid <= 0 { + return 0, ErrInvalidPid + } + return pid, nil +} diff --git a/vendor/github.com/nightlyone/lockfile/lockfile_unix.go b/vendor/github.com/nightlyone/lockfile/lockfile_unix.go new file mode 100644 index 000000000..742b041fb --- /dev/null +++ b/vendor/github.com/nightlyone/lockfile/lockfile_unix.go @@ -0,0 +1,20 @@ +// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris + +package lockfile + +import ( + "os" + "syscall" +) + +func isRunning(pid int) (bool, error) { + proc, err := os.FindProcess(pid) + if err != nil { + return false, err + } + + if err := proc.Signal(syscall.Signal(0)); err != nil { + return false, nil + } + return true, nil +} diff --git a/vendor/github.com/nightlyone/lockfile/lockfile_windows.go b/vendor/github.com/nightlyone/lockfile/lockfile_windows.go new file mode 100644 index 000000000..482bd91d7 --- /dev/null +++ b/vendor/github.com/nightlyone/lockfile/lockfile_windows.go @@ -0,0 +1,30 @@ +package lockfile + +import ( + "syscall" +) + +//For some reason these consts don't exist in syscall. +const ( + error_invalid_parameter = 87 + code_still_active = 259 +) + +func isRunning(pid int) (bool, error) { + procHnd, err := syscall.OpenProcess(syscall.PROCESS_QUERY_INFORMATION, true, uint32(pid)) + if err != nil { + if scerr, ok := err.(syscall.Errno); ok { + if uintptr(scerr) == error_invalid_parameter { + return false, nil + } + } + } + + var code uint32 + err = syscall.GetExitCodeProcess(procHnd, &code) + if err != nil { + return false, err + } + + return code == code_still_active, nil +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 4b43ef544..6df803d5e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "8moNbKzCSmH0RNnoroJikyd95aA=", + "checksumSHA1": "edB8coiX4s6hf6BZuYE5+MPJYX8=", "path": "github.com/fabxc/tsdb", - "revision": "012cf4ef254e34a10befd0b592bcfa5b1794e92b", - "revisionTime": "2017-02-04T10:53:52Z" + "revision": "f734773214e1bcb7962d92155863110d01214db5", + "revisionTime": "2017-02-19T12:01:19Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=", @@ -567,6 +567,12 @@ "revision": "58f52c57ce9df13460ac68200cef30a008b9c468", "revisionTime": "2016-10-18T06:08:08Z" }, + { + "checksumSHA1": "aCtmlyAgau9n0UHs8Pk+3xfIaVk=", + "path": "github.com/nightlyone/lockfile", + "revision": "1d49c987357a327b5b03aa84cbddd582c328615d", + "revisionTime": "2016-09-28T00:14:32Z" + }, { "checksumSHA1": "3YJklSuzSE1Rt8A+2dhiWSmf/fw=", "origin": "k8s.io/client-go/1.5/vendor/github.com/pborman/uuid",