Browse Source

fix bugs on platform windows to pass all test case. (#192)

* fix bugs on platform windows to pass all test case.

* fix bugs on platform windows to pass all test case

* clean up codes
pull/5805/head
ranbochen 7 years ago committed by Fabian Reinartz
parent
commit
a27cf34a36
  1. 8
      chunks.go
  2. 12
      compact.go
  3. 3
      db_windows.go
  4. 2
      index.go
  5. 10
      tombstones.go
  6. 20
      wal.go
  7. 3
      wal_test.go

8
chunks.go

@ -170,6 +170,7 @@ func (w *chunkWriter) finalizeTail() error {
if err := tf.Truncate(off); err != nil { if err := tf.Truncate(off); err != nil {
return err return err
} }
return tf.Close() return tf.Close()
} }
@ -276,7 +277,12 @@ func (w *chunkWriter) seq() int {
} }
func (w *chunkWriter) Close() error { func (w *chunkWriter) Close() error {
return w.finalizeTail() if err := w.finalizeTail(); err != nil {
return err
}
// close dir file (if not windows platform will fail on rename)
return w.dirFile.Close()
} }
// ChunkReader provides reading access of serialized time series data. // ChunkReader provides reading access of serialized time series data.

12
compact.go

@ -426,12 +426,22 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil { if err != nil {
return errors.Wrap(err, "open temporary block dir") return errors.Wrap(err, "open temporary block dir")
} }
defer df.Close() defer func() {
if df != nil {
df.Close()
}
}()
if err := fileutil.Fsync(df); err != nil { if err := fileutil.Fsync(df); err != nil {
return errors.Wrap(err, "sync temporary dir file") return errors.Wrap(err, "sync temporary dir file")
} }
// close temp dir before rename block dir(for windows platform)
if err = df.Close(); err != nil {
return errors.Wrap(err, "close temporary dir")
}
df = nil
// Block successfully written, make visible and remove old ones. // Block successfully written, make visible and remove old ones.
if err := renameFile(tmp, dir); err != nil { if err := renameFile(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir") return errors.Wrap(err, "rename block dir")

3
db_windows.go

@ -21,8 +21,7 @@ import (
func mmap(f *os.File, sz int) ([]byte, error) { func mmap(f *os.File, sz int) ([]byte, error) {
low, high := uint32(sz), uint32(sz>>32) low, high := uint32(sz), uint32(sz>>32)
h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, high, low, nil)
h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, low, high, nil)
if h == 0 { if h == 0 {
return nil, os.NewSyscallError("CreateFileMapping", errno) return nil, os.NewSyscallError("CreateFileMapping", errno)
} }

2
index.go

@ -153,6 +153,8 @@ func newIndexWriter(dir string) (*indexWriter, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer df.Close() // close for flatform windows
f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666)
if err != nil { if err != nil {
return nil, err return nil, err

10
tombstones.go

@ -49,7 +49,11 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
if err != nil { if err != nil {
return err return err
} }
defer f.Close() defer func() {
if f != nil {
f.Close()
}
}()
buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)} buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)}
buf.reset() buf.reset()
@ -82,6 +86,10 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
return err return err
} }
if err = f.Close(); err != nil {
return err
}
f = nil
return renameFile(tmp, path) return renameFile(tmp, path)
} }

20
wal.go

@ -384,7 +384,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
w.putBuffer(buf) w.putBuffer(buf)
if err != nil { if err != nil {
return err return errors.Wrap(err, "write to compaction segment")
} }
} }
if r.Err() != nil { if r.Err() != nil {
@ -401,14 +401,15 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
csf.Sync() csf.Sync()
csf.Close() csf.Close()
candidates[0].Close() // need close before remove on platform windows
if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { if err := renameFile(csf.Name(), candidates[0].Name()); err != nil {
return err return errors.Wrap(err, "rename compaction segment")
} }
for _, f := range candidates[1:] { for _, f := range candidates[1:] {
f.Close() // need close before remove on platform windows
if err := os.RemoveAll(f.Name()); err != nil { if err := os.RemoveAll(f.Name()); err != nil {
return errors.Wrap(err, "delete WAL segment file") return errors.Wrap(err, "delete WAL segment file")
} }
f.Close()
} }
if err := w.dirFile.Sync(); err != nil { if err := w.dirFile.Sync(); err != nil {
return err return err
@ -522,6 +523,15 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) {
} }
metab := make([]byte, 8) metab := make([]byte, 8)
// If there is an error, we need close f for platform windows before gc.
// Otherwise, file op may fail.
hasError := true
defer func() {
if hasError {
f.Close()
}
}()
if n, err := f.Read(metab); err != nil { if n, err := f.Read(metab); err != nil {
return nil, errors.Wrapf(err, "validate meta %q", f.Name()) return nil, errors.Wrapf(err, "validate meta %q", f.Name())
} else if n != 8 { } else if n != 8 {
@ -534,6 +544,7 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) {
if metab[4] != WALFormatDefault { if metab[4] != WALFormatDefault {
return nil, errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) return nil, errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name())
} }
hasError = false
return f, nil return f, nil
} }
@ -702,7 +713,8 @@ func (w *SegmentWAL) Close() error {
if hf := w.head(); hf != nil { if hf := w.head(); hf != nil {
return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name())
} }
return nil
return w.dirFile.Close()
} }
const ( const (

3
wal_test.go

@ -271,8 +271,9 @@ func TestWALRestoreCorrupted_invalidSegment(t *testing.T) {
Ok(t, err) Ok(t, err)
f, err := wal.createSegmentFile(dir + "/000001") f, err := wal.createSegmentFile(dir + "/000001")
Ok(t, err) Ok(t, err)
_, err = wal.createSegmentFile(dir + "/000002") f2, err := wal.createSegmentFile(dir + "/000002")
Ok(t, err) Ok(t, err)
Ok(t, f2.Close())
// Make header of second segment invalid. // Make header of second segment invalid.
_, err = f.WriteAt([]byte{1, 2, 3, 4}, 0) _, err = f.WriteAt([]byte{1, 2, 3, 4}, 0)

Loading…
Cancel
Save