Merge pull request #450 from prometheus/beorn7/double-start-protection

Add double-start protection.
pull/452/head
Björn Rabenstein 10 years ago
commit e6a1ac8f12

@ -0,0 +1,33 @@
// Package flock provides portable file locking. It is essentially ripped out
// from the code of github.com/syndtr/goleveldb. Strange enough that the
// standard library does not provide this functionality. Once this package has
// proven to work as expected, we should probably turn it into a separate
// general purpose package for humanity.
package flock
import (
"os"
"path/filepath"
)
// Releaser provides the Release method to release a file lock.
type Releaser interface {
Release() error
}
// New locks the file with the provided name. If the file does not exist, it is
// created. The returned Releaser is used to release the lock. existed is true
// if the file to lock already existed. A non-nil error is returned if the
// locking has failed. Neither this function nor the returned Releaser is
// goroutine-safe.
func New(fileName string) (r Releaser, existed bool, err error) {
if err = os.MkdirAll(filepath.Dir(fileName), 0755); err != nil {
return
}
_, err = os.Stat(fileName)
existed = err == nil
r, err = newLock(fileName)
return
}

@ -0,0 +1,19 @@
package flock
import "os"
type plan9Lock struct {
f *os.File
}
func (l *plan9Lock) Release() error {
return l.f.Close()
}
func newLock(fileName string) (Releaser, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, os.ModeExclusive|0644)
if err != nil {
return nil, err
}
return &plan9Lock{f}, nil
}

@ -0,0 +1,46 @@
// +build solaris
package flock
import (
"os"
"syscall"
)
type unixLock struct {
f *os.File
}
func (l *unixLock) Release() error {
if err := l.set(false); err != nil {
return err
}
return l.f.Close()
}
func (l *unixLock) set(lock bool) error {
flock := syscall.Flock_t{
Type: syscall.F_UNLCK,
Start: 0,
Len: 0,
Whence: 1,
}
if lock {
flock.Type = syscall.F_WRLCK
}
return syscall.FcntlFlock(l.f.Fd(), syscall.F_SETLK, &flock)
}
func newLock(fileName string) (Releaser, error) {
f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
l := &unixLock{f}
err = l.set(true)
if err != nil {
f.Close()
return nil, err
}
return l, nil
}

@ -0,0 +1,67 @@
package flock
import (
"os"
"path/filepath"
"testing"
"github.com/prometheus/prometheus/utility/test"
)
func TestLocking(t *testing.T) {
dir := test.NewTemporaryDirectory("test_flock", t)
defer dir.Close()
fileName := filepath.Join(dir.Path(), "LOCK")
if _, err := os.Stat(fileName); err == nil {
t.Fatalf("File %q unexpectedly exists.", fileName)
}
lock, existed, err := New(fileName)
if err != nil {
t.Fatalf("Error locking file %q: %s", fileName, err)
}
if existed {
t.Errorf("File %q reported as existing during locking.", fileName)
}
// File must now exist.
if _, err := os.Stat(fileName); err != nil {
t.Errorf("Could not stat file %q expected to exist: %s", fileName, err)
}
// Try to lock again.
lockedAgain, existed, err := New(fileName)
if err == nil {
t.Fatalf("File %q locked twice.", fileName)
}
if lockedAgain != nil {
t.Error("Unsuccessful locking did not return nil.")
}
if !existed {
t.Errorf("Existing file %q not recognized.", fileName)
}
if err := lock.Release(); err != nil {
t.Errorf("Error releasing lock for file %q: %s", fileName, err)
}
// File must still exist.
if _, err := os.Stat(fileName); err != nil {
t.Errorf("Could not stat file %q expected to exist: %s", fileName, err)
}
// Lock existing file.
lock, existed, err = New(fileName)
if err != nil {
t.Fatalf("Error locking file %q: %s", fileName, err)
}
if !existed {
t.Errorf("Existing file %q not recognized.", fileName)
}
if err := lock.Release(); err != nil {
t.Errorf("Error releasing lock for file %q: %s", fileName, err)
}
}

@ -0,0 +1,41 @@
// +build darwin dragonfly freebsd linux netbsd openbsd
package flock
import (
"os"
"syscall"
)
type unixLock struct {
f *os.File
}
func (l *unixLock) Release() error {
if err := l.set(false); err != nil {
return err
}
return l.f.Close()
}
func (l *unixLock) set(lock bool) error {
how := syscall.LOCK_UN
if lock {
how = syscall.LOCK_EX
}
return syscall.Flock(int(l.f.Fd()), how|syscall.LOCK_NB)
}
func newLock(fileName string) (Releaser, error) {
f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
l := &unixLock{f}
err = l.set(true)
if err != nil {
f.Close()
return nil, err
}
return l, nil
}

@ -0,0 +1,23 @@
package flock
import "syscall"
type windowsLock struct {
fd syscall.Handle
}
func (fl *windowsLock) Release() error {
return syscall.Close(fl.fd)
}
func newLock(fileName string) (Releaser, error) {
pathp, err := syscall.UTF16PtrFromString(fileName)
if err != nil {
return nil, err
}
fd, err := syscall.CreateFile(pathp, syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, nil, syscall.CREATE_ALWAYS, syscall.FILE_ATTRIBUTE_NORMAL, 0)
if err != nil {
return nil, err
}
return &windowsFileLock{fd}, nil
}

@ -21,6 +21,7 @@ import (
"math" "math"
"os" "os"
"path" "path"
"path/filepath"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -32,6 +33,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/flock"
"github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
@ -106,9 +108,11 @@ type persistence struct {
indexingBatchLatency prometheus.Summary indexingBatchLatency prometheus.Summary
checkpointDuration prometheus.Gauge checkpointDuration prometheus.Gauge
dirtyMtx sync.Mutex // Protects dirty and becameDirty. dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state. dirty bool // true if persistence was started in dirty state.
becameDirty bool // true if an inconsistency came up during runtime. becameDirty bool // true if an inconsistency came up during runtime.
dirtyFileName string // The file used for locking and to mark dirty state.
fLock flock.Releaser // The file lock to protect against concurrent usage.
} }
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
@ -116,6 +120,17 @@ func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, er
if err := os.MkdirAll(basePath, 0700); err != nil { if err := os.MkdirAll(basePath, 0700); err != nil {
return nil, err return nil, err
} }
dirtyPath := filepath.Join(basePath, dirtyFileName)
fLock, dirtyfileExisted, err := flock.New(dirtyPath)
if err != nil {
glog.Errorf("Could not lock %s, Prometheus already running?", dirtyPath)
return nil, err
}
if dirtyfileExisted {
dirty = true
}
archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath) archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
if err != nil { if err != nil {
return nil, err return nil, err
@ -173,14 +188,9 @@ func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, er
Name: "checkpoint_duration_milliseconds", Name: "checkpoint_duration_milliseconds",
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.", Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
}), }),
dirty: dirty, dirty: dirty,
} dirtyFileName: dirtyPath,
if dirtyFile, err := os.OpenFile(p.dirtyFileName(), os.O_CREATE|os.O_EXCL, 0666); err == nil { fLock: fLock,
dirtyFile.Close()
} else if os.IsExist(err) {
p.dirty = true
} else {
return nil, err
} }
if p.dirty { if p.dirty {
@ -227,12 +237,6 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
ch <- p.checkpointDuration ch <- p.checkpointDuration
} }
// dirtyFileName returns the name of the (empty) file used to mark the
// persistency layer as dirty.
func (p *persistence) dirtyFileName() string {
return path.Join(p.basePath, dirtyFileName)
}
// isDirty returns the dirty flag in a goroutine-safe way. // isDirty returns the dirty flag in a goroutine-safe way.
func (p *persistence) isDirty() bool { func (p *persistence) isDirty() bool {
p.dirtyMtx.Lock() p.dirtyMtx.Lock()
@ -1344,7 +1348,7 @@ func (p *persistence) close() error {
close(p.indexingQueue) close(p.indexingQueue)
<-p.indexingStopped <-p.indexingStopped
var lastError error var lastError, dirtyFileRemoveError error
if err := p.archivedFingerprintToMetrics.Close(); err != nil { if err := p.archivedFingerprintToMetrics.Close(); err != nil {
lastError = err lastError = err
glog.Error("Error closing archivedFingerprintToMetric index DB: ", err) glog.Error("Error closing archivedFingerprintToMetric index DB: ", err)
@ -1362,7 +1366,16 @@ func (p *persistence) close() error {
glog.Error("Error closing labelNameToLabelValues index DB: ", err) glog.Error("Error closing labelNameToLabelValues index DB: ", err)
} }
if lastError == nil && !p.isDirty() { if lastError == nil && !p.isDirty() {
lastError = os.Remove(p.dirtyFileName()) dirtyFileRemoveError = os.Remove(p.dirtyFileName)
}
if err := p.fLock.Release(); err != nil {
lastError = err
glog.Error("Error releasing file lock: ", err)
}
if dirtyFileRemoveError != nil {
// On Windows, removing the dirty file before unlocking is not
// possible. So remove it here if it failed above.
lastError = os.Remove(p.dirtyFileName)
} }
return lastError return lastError
} }

Loading…
Cancel
Save