diff --git a/chunks.go b/chunks.go index 8152677f7..f6e329b79 100644 --- a/chunks.go +++ b/chunks.go @@ -170,6 +170,7 @@ func (w *chunkWriter) finalizeTail() error { if err := tf.Truncate(off); err != nil { return err } + return tf.Close() } @@ -276,7 +277,12 @@ func (w *chunkWriter) seq() int { } func (w *chunkWriter) Close() error { - return w.finalizeTail() + if err := w.finalizeTail(); err != nil { + return err + } + + // close dir file (if not windows platform will fail on rename) + return w.dirFile.Close() } // ChunkReader provides reading access of serialized time series data. diff --git a/compact.go b/compact.go index a70918a28..955ba3caf 100644 --- a/compact.go +++ b/compact.go @@ -426,12 +426,22 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open temporary block dir") } - defer df.Close() + defer func() { + if df != nil { + df.Close() + } + }() if err := fileutil.Fsync(df); err != nil { return errors.Wrap(err, "sync temporary dir file") } + // close temp dir before rename block dir(for windows platform) + if err = df.Close(); err != nil { + return errors.Wrap(err, "close temporary dir") + } + df = nil + // Block successfully written, make visible and remove old ones. if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") diff --git a/db_windows.go b/db_windows.go index 700518e7a..444bf4103 100644 --- a/db_windows.go +++ b/db_windows.go @@ -21,8 +21,7 @@ import ( func mmap(f *os.File, sz int) ([]byte, error) { low, high := uint32(sz), uint32(sz>>32) - - h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, low, high, nil) + h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, high, low, nil) if h == 0 { return nil, os.NewSyscallError("CreateFileMapping", errno) } diff --git a/index.go b/index.go index c0680aa33..f976de695 100644 --- a/index.go +++ b/index.go @@ -153,6 +153,8 @@ func newIndexWriter(dir string) (*indexWriter, error) { if err != nil { return nil, err } + defer df.Close() // close for flatform windows + f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err diff --git a/tombstones.go b/tombstones.go index 19b224634..d43cd0bd0 100644 --- a/tombstones.go +++ b/tombstones.go @@ -49,7 +49,11 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { if err != nil { return err } - defer f.Close() + defer func() { + if f != nil { + f.Close() + } + }() buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)} buf.reset() @@ -82,6 +86,10 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { return err } + if err = f.Close(); err != nil { + return err + } + f = nil return renameFile(tmp, path) } diff --git a/wal.go b/wal.go index 225851de1..ec3799857 100644 --- a/wal.go +++ b/wal.go @@ -384,7 +384,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { w.putBuffer(buf) if err != nil { - return err + return errors.Wrap(err, "write to compaction segment") } } if r.Err() != nil { @@ -401,14 +401,15 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { csf.Sync() csf.Close() + candidates[0].Close() // need close before remove on platform windows if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { - return err + return errors.Wrap(err, "rename compaction segment") } for _, f := range candidates[1:] { + f.Close() // need close before remove on platform windows if err := os.RemoveAll(f.Name()); err != nil { return errors.Wrap(err, "delete WAL segment file") } - f.Close() } if err := w.dirFile.Sync(); err != nil { return err @@ -522,6 +523,15 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { } metab := make([]byte, 8) + // If there is an error, we need close f for platform windows before gc. + // Otherwise, file op may fail. + hasError := true + defer func() { + if hasError { + f.Close() + } + }() + if n, err := f.Read(metab); err != nil { return nil, errors.Wrapf(err, "validate meta %q", f.Name()) } else if n != 8 { @@ -534,6 +544,7 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { if metab[4] != WALFormatDefault { return nil, errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) } + hasError = false return f, nil } @@ -702,7 +713,8 @@ func (w *SegmentWAL) Close() error { if hf := w.head(); hf != nil { return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) } - return nil + + return w.dirFile.Close() } const ( diff --git a/wal_test.go b/wal_test.go index b4fccb981..2e402d19f 100644 --- a/wal_test.go +++ b/wal_test.go @@ -271,8 +271,9 @@ func TestWALRestoreCorrupted_invalidSegment(t *testing.T) { Ok(t, err) f, err := wal.createSegmentFile(dir + "/000001") Ok(t, err) - _, err = wal.createSegmentFile(dir + "/000002") + f2, err := wal.createSegmentFile(dir + "/000002") Ok(t, err) + Ok(t, f2.Close()) // Make header of second segment invalid. _, err = f.WriteAt([]byte{1, 2, 3, 4}, 0)