From a20bebf7eb45cafd0b196cc3ec33487f3aa591ff Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 17 Feb 2020 11:41:04 +0000 Subject: [PATCH] Moved readyStorage to main. Signed-off-by: Bartlomiej Plotka --- cmd/prometheus/main.go | 89 +++++++++++++++++++++++++++++++++++++++++- tsdb/db.go | 85 ---------------------------------------- web/web_test.go | 5 +-- 3 files changed, 90 insertions(+), 89 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 888e2a977..d3b553f39 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -17,6 +17,7 @@ package main import ( "context" "fmt" + "math" "net" "net/http" _ "net/http/pprof" // Comment this line to disable pprof endpoint. @@ -49,6 +50,7 @@ import ( "github.com/prometheus/prometheus/discovery" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/notifier" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/logging" "github.com/prometheus/prometheus/pkg/relabel" prom_runtime "github.com/prometheus/prometheus/pkg/runtime" @@ -335,7 +337,7 @@ func main() { level.Info(logger).Log("vm_limits", prom_runtime.VmLimits()) var ( - localStorage = &tsdb.ReadyStorage{} + localStorage = &readyStorage{} remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline)) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) @@ -890,3 +892,88 @@ func sendAlerts(s sender, externalURL string) rules.NotifyFunc { } } } + +// readyStorage implements the Storage interface while allowing to set the actual +// storage at a later point in time. +type readyStorage struct { + mtx sync.RWMutex + db *tsdb.DB + startTimeMargin int64 +} + +// Set the storage. +func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.db = db + s.startTimeMargin = startTimeMargin +} + +// Get the storage. +func (s *readyStorage) Get() *tsdb.DB { + if x := s.get(); x != nil { + return x + } + return nil +} + +func (s *readyStorage) get() *tsdb.DB { + s.mtx.RLock() + x := s.db + s.mtx.RUnlock() + return x +} + +// StartTime implements the Storage interface. +func (s *readyStorage) StartTime() (int64, error) { + if x := s.get(); x != nil { + var startTime int64 + + if len(x.Blocks()) > 0 { + startTime = x.Blocks()[0].Meta().MinTime + } else { + startTime = time.Now().Unix() * 1000 + } + // Add a safety margin as it may take a few minutes for everything to spin up. + return startTime + s.startTimeMargin, nil + } + + return math.MaxInt64, tsdb.ErrNotReady +} + +// Querier implements the Storage interface. +func (s *readyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + if x := s.get(); x != nil { + return x.Querier(ctx, mint, maxt) + } + return nil, tsdb.ErrNotReady +} + +// Appender implements the Storage interface. +func (s *readyStorage) Appender() storage.Appender { + if x := s.get(); x != nil { + return x.Appender() + } + return notReadyAppender{} +} + +type notReadyAppender struct{} + +func (n notReadyAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { + return 0, tsdb.ErrNotReady +} + +func (n notReadyAppender) AddFast(ref uint64, t int64, v float64) error { return tsdb.ErrNotReady } + +func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady } + +func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } + +// Close implements the Storage interface. +func (s *readyStorage) Close() error { + if x := s.Get(); x != nil { + return x.Close() + } + return nil +} diff --git a/tsdb/db.go b/tsdb/db.go index bb24e1446..173f6db77 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1455,88 +1455,3 @@ func exponential(d, min, max time.Duration) time.Duration { } return d } - -// ReadyStorage implements the Storage interface while allowing to set the actual -// storage at a later point in time. -type ReadyStorage struct { - mtx sync.RWMutex - db *DB - startTimeMargin int64 -} - -// Set the storage. -func (s *ReadyStorage) Set(db *DB, startTimeMargin int64) { - s.mtx.Lock() - defer s.mtx.Unlock() - - s.db = db - s.startTimeMargin = startTimeMargin -} - -// Get the storage. -func (s *ReadyStorage) Get() *DB { - if x := s.get(); x != nil { - return x - } - return nil -} - -func (s *ReadyStorage) get() *DB { - s.mtx.RLock() - x := s.db - s.mtx.RUnlock() - return x -} - -// StartTime implements the Storage interface. -func (s *ReadyStorage) StartTime() (int64, error) { - if x := s.get(); x != nil { - var startTime int64 - - if len(x.Blocks()) > 0 { - startTime = x.Blocks()[0].Meta().MinTime - } else { - startTime = time.Now().Unix() * 1000 - } - // Add a safety margin as it may take a few minutes for everything to spin up. - return startTime + s.startTimeMargin, nil - } - - return math.MaxInt64, ErrNotReady -} - -// Querier implements the Storage interface. -func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - if x := s.get(); x != nil { - return x.Querier(ctx, mint, maxt) - } - return nil, ErrNotReady -} - -// Appender implements the Storage interface. -func (s *ReadyStorage) Appender() storage.Appender { - if x := s.get(); x != nil { - return x.Appender() - } - return notReadyAppender{} -} - -type notReadyAppender struct{} - -func (n notReadyAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { - return 0, ErrNotReady -} - -func (n notReadyAppender) AddFast(ref uint64, t int64, v float64) error { return ErrNotReady } - -func (n notReadyAppender) Commit() error { return ErrNotReady } - -func (n notReadyAppender) Rollback() error { return ErrNotReady } - -// Close implements the Storage interface. -func (s *ReadyStorage) Close() error { - if x := s.Get(); x != nil { - return x.Close() - } - return nil -} diff --git a/web/web_test.go b/web/web_test.go index 718923cab..6dbde4e59 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -28,7 +28,6 @@ import ( "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/rules" @@ -106,7 +105,7 @@ func TestReadyAndHealthy(t *testing.T) { ReadTimeout: 30 * time.Second, MaxConnections: 512, Context: nil, - Storage: &tsdb.ReadyStorage{}, + Storage: nil, QueryEngine: nil, ScrapeManager: &scrape.Manager{}, RuleManager: &rules.Manager{}, @@ -297,7 +296,7 @@ func TestRoutePrefix(t *testing.T) { ReadTimeout: 30 * time.Second, MaxConnections: 512, Context: nil, - Storage: &tsdb.ReadyStorage{}, + Storage: nil, QueryEngine: nil, ScrapeManager: nil, RuleManager: nil,