diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 6160741ef..2b7038138 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -387,9 +387,9 @@ func main() { ) cfg.web.Context = ctxWeb - cfg.web.TSDB = localStorage.Get cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes + cfg.web.LocalStorage = localStorage cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine cfg.web.ScrapeManager = scrapeManager @@ -921,14 +921,7 @@ func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { s.startTimeMargin = startTimeMargin } -// Get the storage. -func (s *readyStorage) Get() *tsdb.DB { - if x := s.get(); x != nil { - return x - } - return nil -} - +// get is internal, you should use readyStorage as the front implementation layer. func (s *readyStorage) get() *tsdb.DB { s.mtx.RLock() x := s.db @@ -983,12 +976,44 @@ func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } // Close implements the Storage interface. func (s *readyStorage) Close() error { - if x := s.Get(); x != nil { + if x := s.get(); x != nil { return x.Close() } return nil } +// CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. +func (s *readyStorage) CleanTombstones() error { + if x := s.get(); x != nil { + return x.CleanTombstones() + } + return tsdb.ErrNotReady +} + +// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. +func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error { + if x := s.get(); x != nil { + return x.Delete(mint, maxt, ms...) + } + return tsdb.ErrNotReady +} + +// Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. +func (s *readyStorage) Snapshot(dir string, withHead bool) error { + if x := s.get(); x != nil { + return x.Snapshot(dir, withHead) + } + return tsdb.ErrNotReady +} + +// Stats implements the api_v1.TSDBAdminStats interface. +func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) { + if x := s.get(); x != nil { + return x.Head().Stats(statsByLabelName), nil + } + return nil, tsdb.ErrNotReady +} + // tsdbOptions is tsdb.Option version with defined units. // This is required as tsdb.Option fields are unit agnostic (time). type tsdbOptions struct { diff --git a/promql/test.go b/promql/test.go index 72dd9b9b8..e3d99e83f 100644 --- a/promql/test.go +++ b/promql/test.go @@ -25,10 +25,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" ) @@ -54,7 +54,7 @@ type Test struct { cmds []testCommand - storage storage.Storage + storage *teststorage.TestStorage queryEngine *Engine context context.Context @@ -101,6 +101,11 @@ func (t *Test) Storage() storage.Storage { return t.storage } +// TSDB returns test's TSDB. +func (t *Test) TSDB() *tsdb.DB { + return t.storage.DB +} + func raise(line int, format string, v ...interface{}) error { return &parser.ParseErr{ LineOffset: line, diff --git a/tsdb/db.go b/tsdb/db.go index 9ab1124d6..e97499c04 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" + // Load the package into main to make sure minium Go version is met. _ "github.com/prometheus/prometheus/tsdb/goversion" "github.com/prometheus/prometheus/tsdb/wal" diff --git a/tsdb/head.go b/tsdb/head.go index 8da6fc2be..4b374fd5d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -751,6 +751,23 @@ func (h *Head) initTime(t int64) (initialized bool) { return true } +type Stats struct { + NumSeries uint64 + MinTime, MaxTime int64 + IndexPostingStats *index.PostingsStats +} + +// Stats returns important current HEAD statistics. Note that it is expensive to +// calculate these. +func (h *Head) Stats(statsByLabelName string) *Stats { + return &Stats{ + NumSeries: h.NumSeries(), + MaxTime: h.MaxTime(), + MinTime: h.MinTime(), + IndexPostingStats: h.PostingsCardinalityStats(statsByLabelName), + } +} + type RangeHead struct { head *Head mint, maxt int64 diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index c6ac4b89a..6fa90781e 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -18,14 +18,13 @@ import ( "os" "time" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/testutil" ) -// New returns a new storage for testing purposes +// New returns a new TestStorage for testing purposes // that removes all associated files on closing. -func New(t testutil.T) storage.Storage { +func New(t testutil.T) *TestStorage { dir, err := ioutil.TempDir("", "test_storage") if err != nil { t.Fatalf("Opening test dir failed: %s", err) @@ -40,16 +39,16 @@ func New(t testutil.T) storage.Storage { if err != nil { t.Fatalf("Opening test storage failed: %s", err) } - return testStorage{Storage: db, dir: dir} + return &TestStorage{DB: db, dir: dir} } -type testStorage struct { - storage.Storage +type TestStorage struct { + *tsdb.DB dir string } -func (s testStorage) Close() error { - if err := s.Storage.Close(); err != nil { +func (s TestStorage) Close() error { + if err := s.DB.Close(); err != nil { return err } return os.RemoveAll(s.dir) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index b94c91944..a0ae68527 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -37,7 +37,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" @@ -159,13 +158,13 @@ type apiFuncResult struct { type apiFunc func(r *http.Request) apiFuncResult -// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations. -type TSDBAdmin interface { +// TSDBAdminStats defines the tsdb interfaces used by the v1 API for admin operations as well as statistics. +type TSDBAdminStats interface { CleanTombstones() error Delete(mint, maxt int64, ms ...*labels.Matcher) error - Dir() string Snapshot(dir string, withHead bool) error - Head() *tsdb.Head + + Stats(statsByLabelName string) (*tsdb.Stats, error) } // API can register a set of endpoints in a router and handle @@ -183,7 +182,8 @@ type API struct { ready func(http.HandlerFunc) http.HandlerFunc globalURLOptions GlobalURLOptions - db func() TSDBAdmin + db TSDBAdminStats + dbDir string enableAdmin bool logger log.Logger remoteReadSampleLimit int @@ -209,7 +209,8 @@ func NewAPI( flagsMap map[string]string, globalURLOptions GlobalURLOptions, readyFunc func(http.HandlerFunc) http.HandlerFunc, - db func() TSDBAdmin, + db TSDBAdminStats, + dbDir string, enableAdmin bool, logger log.Logger, rr rulesRetriever, @@ -232,6 +233,7 @@ func NewAPI( ready: readyFunc, globalURLOptions: globalURLOptions, db: db, + dbDir: dbDir, enableAdmin: enableAdmin, rulesRetriever: rr, remoteReadSampleLimit: remoteReadSampleLimit, @@ -244,22 +246,32 @@ func NewAPI( } } +func setUnavailStatusOnTSDBNotReady(r apiFuncResult) apiFuncResult { + if r.err != nil && errors.Cause(r.err.err) == tsdb.ErrNotReady { + r.err.typ = errorUnavailable + } + return r +} + // Register the API's endpoints in the given router. func (api *API) Register(r *route.Router) { wrap := func(f apiFunc) http.HandlerFunc { hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { httputil.SetCORS(w, api.CORSOrigin, r) - result := f(r) + result := setUnavailStatusOnTSDBNotReady(f(r)) if result.finalizer != nil { defer result.finalizer() } if result.err != nil { api.respondError(w, result.err, result.data) - } else if result.data != nil { + return + } + + if result.data != nil { api.respond(w, result.data, result.warnings) - } else { - w.WriteHeader(http.StatusNoContent) + return } + w.WriteHeader(http.StatusNoContent) }) return api.ready(httputil.CompressionHandler{ Handler: hf, @@ -1124,29 +1136,27 @@ type tsdbStatus struct { SeriesCountByLabelValuePair []stat `json:"seriesCountByLabelValuePair"` } -func (api *API) serveTSDBStatus(r *http.Request) apiFuncResult { - db := api.db() - if db == nil { - return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} - } - convert := func(stats []index.Stat) []stat { - result := make([]stat, 0, len(stats)) - for _, item := range stats { - item := stat{Name: item.Name, Value: item.Count} - result = append(result, item) - } - return result +func convertStats(stats []index.Stat) []stat { + result := make([]stat, 0, len(stats)) + for _, item := range stats { + item := stat{Name: item.Name, Value: item.Count} + result = append(result, item) } + return result +} - posting := db.Head().PostingsCardinalityStats(model.MetricNameLabel) - response := tsdbStatus{ - SeriesCountByMetricName: convert(posting.CardinalityMetricsStats), - LabelValueCountByLabelName: convert(posting.CardinalityLabelStats), - MemoryInBytesByLabelName: convert(posting.LabelValueStats), - SeriesCountByLabelValuePair: convert(posting.LabelValuePairsStats), +func (api *API) serveTSDBStatus(*http.Request) apiFuncResult { + s, err := api.db.Stats("__name__") + if err != nil { + return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} } - return apiFuncResult{response, nil, nil, nil} + return apiFuncResult{tsdbStatus{ + SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats), + LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats), + MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats), + SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats), + }, nil, nil, nil} } func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { @@ -1322,11 +1332,6 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult { if !api.enableAdmin { return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} } - db := api.db() - if db == nil { - return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} - } - if err := r.ParseForm(); err != nil { return apiFuncResult{nil, &apiError{errorBadData, errors.Wrap(err, "error parsing form values")}, nil, nil} } @@ -1348,8 +1353,7 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult { if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - - if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil { + if err := api.db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil { return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} } } @@ -1372,13 +1376,8 @@ func (api *API) snapshot(r *http.Request) apiFuncResult { } } - db := api.db() - if db == nil { - return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} - } - var ( - snapdir = filepath.Join(db.Dir(), "snapshots") + snapdir = filepath.Join(api.dbDir, "snapshots") name = fmt.Sprintf("%s-%x", time.Now().UTC().Format("20060102T150405Z0700"), rand.Int()) @@ -1387,7 +1386,7 @@ func (api *API) snapshot(r *http.Request) apiFuncResult { if err := os.MkdirAll(dir, 0777); err != nil { return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot directory")}, nil, nil} } - if err := db.Snapshot(dir, !skipHead); err != nil { + if err := api.db.Snapshot(dir, !skipHead); err != nil { return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot")}, nil, nil} } @@ -1400,12 +1399,7 @@ func (api *API) cleanTombstones(r *http.Request) apiFuncResult { if !api.enableAdmin { return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} } - db := api.db() - if db == nil { - return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} - } - - if err := db.CleanTombstones(); err != nil { + if err := api.db.CleanTombstones(); err != nil { return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 552fda467..e0c7c74f9 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1892,32 +1892,24 @@ func TestStreamReadEndpoint(t *testing.T) { } type fakeDB struct { - err error - closer func() + err error } func (f *fakeDB) CleanTombstones() error { return f.err } func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err } -func (f *fakeDB) Dir() string { - dir, _ := ioutil.TempDir("", "fakeDB") - f.closer = func() { - os.RemoveAll(dir) - } - return dir -} -func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err } -func (f *fakeDB) Head() *tsdb.Head { +func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err } +func (f *fakeDB) Stats(statsByLabelName string) (*tsdb.Stats, error) { h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize) - return h + return h.Stats(statsByLabelName), nil } func TestAdminEndpoints(t *testing.T) { - tsdb, tsdbWithError := &fakeDB{}, &fakeDB{err: errors.New("some error")} + tsdb, tsdbWithError, tsdbNotReady := &fakeDB{}, &fakeDB{err: errors.New("some error")}, &fakeDB{err: errors.Wrap(tsdb.ErrNotReady, "wrap")} snapshotAPI := func(api *API) apiFunc { return api.snapshot } cleanAPI := func(api *API) apiFunc { return api.cleanTombstones } deleteAPI := func(api *API) apiFunc { return api.deleteSeries } - for i, tc := range []struct { + for _, tc := range []struct { db *fakeDB enableAdmin bool endpoint func(api *API) apiFunc @@ -1965,7 +1957,7 @@ func TestAdminEndpoints(t *testing.T) { errType: errorInternal, }, { - db: nil, + db: tsdbNotReady, enableAdmin: true, endpoint: snapshotAPI, @@ -1994,7 +1986,7 @@ func TestAdminEndpoints(t *testing.T) { errType: errorInternal, }, { - db: nil, + db: tsdbNotReady, enableAdmin: true, endpoint: cleanAPI, @@ -2064,37 +2056,31 @@ func TestAdminEndpoints(t *testing.T) { errType: errorInternal, }, { - db: nil, + db: tsdbNotReady, enableAdmin: true, endpoint: deleteAPI, + values: map[string][]string{"match[]": {"up"}}, errType: errorUnavailable, }, } { tc := tc - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Run("", func(t *testing.T) { + dir, _ := ioutil.TempDir("", "fakeDB") + defer testutil.Ok(t, os.RemoveAll(dir)) + api := &API{ - db: func() TSDBAdmin { - if tc.db != nil { - return tc.db - } - return nil - }, + db: tc.db, + dbDir: dir, ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, enableAdmin: tc.enableAdmin, } - defer func() { - if tc.db != nil && tc.db.closer != nil { - tc.db.closer() - } - }() endpoint := tc.endpoint(api) req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil) - if err != nil { - t.Fatalf("Error when creating test request: %s", err) - } - res := endpoint(req) + testutil.Ok(t, err) + + res := setUnavailStatusOnTSDBNotReady(endpoint(req)) assertAPIError(t, res.err, tc.errType) }) } @@ -2504,14 +2490,7 @@ func TestTSDBStatus(t *testing.T) { } { tc := tc t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - api := &API{ - db: func() TSDBAdmin { - if tc.db != nil { - return tc.db - } - return nil - }, - } + api := &API{db: tc.db} endpoint := tc.endpoint(api) req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil) if err != nil { diff --git a/web/api/v2/api.go b/web/api/v2/api.go index b7c752baa..63cdacd79 100644 --- a/web/api/v2/api.go +++ b/web/api/v2/api.go @@ -26,7 +26,6 @@ import ( "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/pkg/errors" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -40,16 +39,19 @@ import ( // API encapsulates all API services. type API struct { enableAdmin bool - db func() *tsdb.DB + db TSDBAdmin + dbDir string } // New returns a new API object. func New( - db func() *tsdb.DB, + db TSDBAdmin, + dbDir string, enableAdmin bool, ) *API { return &API{ db: db, + dbDir: dbDir, enableAdmin: enableAdmin, } } @@ -57,7 +59,7 @@ func New( // RegisterGRPC registers all API services with the given server. func (api *API) RegisterGRPC(srv *grpc.Server) { if api.enableAdmin { - pb.RegisterAdminServer(srv, NewAdmin(api.db)) + pb.RegisterAdminServer(srv, NewAdmin(api.db, api.dbDir)) } else { pb.RegisterAdminServer(srv, &AdminDisabled{}) } @@ -133,13 +135,21 @@ func (s *AdminDisabled) DeleteSeries(_ context.Context, r *pb.SeriesDeleteReques return nil, errAdminDisabled } +// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations as well as statistics. +type TSDBAdmin interface { + CleanTombstones() error + Delete(mint, maxt int64, ms ...*labels.Matcher) error + Snapshot(dir string, withHead bool) error +} + // Admin provides an administration interface to Prometheus. type Admin struct { - db func() *tsdb.DB + db TSDBAdmin + dbDir string } // NewAdmin returns a Admin server. -func NewAdmin(db func() *tsdb.DB) *Admin { +func NewAdmin(db TSDBAdmin, dbDir string) *Admin { return &Admin{ db: db, } @@ -147,12 +157,8 @@ func NewAdmin(db func() *tsdb.DB) *Admin { // TSDBSnapshot implements pb.AdminServer. func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) { - db := s.db() - if db == nil { - return nil, errTSDBNotReady - } var ( - snapdir = filepath.Join(db.Dir(), "snapshots") + snapdir = filepath.Join(s.dbDir, "snapshots") name = fmt.Sprintf("%s-%x", time.Now().UTC().Format("20060102T150405Z0700"), rand.Int()) @@ -161,7 +167,11 @@ func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*p if err := os.MkdirAll(dir, 0777); err != nil { return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err) } - if err := db.Snapshot(dir, !req.SkipHead); err != nil { + if err := s.db.Snapshot(dir, !req.SkipHead); err != nil { + if errors.Cause(err) == tsdb.ErrNotReady { + return nil, errTSDBNotReady + } + return nil, status.Errorf(codes.Internal, "create snapshot: %s", err) } return &pb.TSDBSnapshotResponse{Name: name}, nil @@ -169,12 +179,10 @@ func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*p // TSDBCleanTombstones implements pb.AdminServer. func (s *Admin) TSDBCleanTombstones(_ context.Context, _ *pb.TSDBCleanTombstonesRequest) (*pb.TSDBCleanTombstonesResponse, error) { - db := s.db() - if db == nil { - return nil, errTSDBNotReady - } - - if err := db.CleanTombstones(); err != nil { + if err := s.db.CleanTombstones(); err != nil { + if errors.Cause(err) == tsdb.ErrNotReady { + return nil, errTSDBNotReady + } return nil, status.Errorf(codes.Internal, "clean tombstones: %s", err) } @@ -212,11 +220,10 @@ func (s *Admin) DeleteSeries(_ context.Context, r *pb.SeriesDeleteRequest) (*pb. matchers = append(matchers, lm) } - db := s.db() - if db == nil { - return nil, errTSDBNotReady - } - if err := db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil { + if err := s.db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil { + if errors.Cause(err) == tsdb.ErrNotReady { + return nil, errTSDBNotReady + } return nil, status.Error(codes.Internal, err.Error()) } return &pb.SeriesDeleteResponse{}, nil diff --git a/web/federate.go b/web/federate.go index ebed25dc4..4815a5a16 100644 --- a/web/federate.go +++ b/web/federate.go @@ -20,10 +20,12 @@ import ( "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" @@ -78,6 +80,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { q, err := h.localStorage.Querier(req.Context(), mint, maxt) if err != nil { federationErrors.Inc() + if errors.Cause(err) == tsdb.ErrNotReady { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/web/federate_test.go b/web/federate_test.go index ffb721f93..8f9e86ab4 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -15,16 +15,22 @@ package web import ( "bytes" + "context" + "net/http" "net/http/httptest" "sort" "strings" "testing" "time" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/util/testutil" ) var scenarios = map[string]struct { @@ -199,7 +205,7 @@ func TestFederation(t *testing.T) { } h := &Handler{ - localStorage: suite.Storage(), + localStorage: &dbAdapter{suite.TSDB()}, lookbackDelta: 5 * time.Minute, now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. config: &config.Config{ @@ -208,16 +214,59 @@ func TestFederation(t *testing.T) { } for name, scenario := range scenarios { - h.config.GlobalConfig.ExternalLabels = scenario.externalLabels - req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil) - res := httptest.NewRecorder() - h.federation(res, req) - if got, want := res.Code, scenario.code; got != want { - t.Errorf("Scenario %q: got code %d, want %d", name, got, want) - } - if got, want := normalizeBody(res.Body), scenario.body; got != want { - t.Errorf("Scenario %q: got body\n%s\n, want\n%s\n", name, got, want) - } + t.Run(name, func(t *testing.T) { + h.config.GlobalConfig.ExternalLabels = scenario.externalLabels + req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil) + res := httptest.NewRecorder() + + h.federation(res, req) + testutil.Equals(t, scenario.code, res.Code) + testutil.Equals(t, scenario.body, normalizeBody(res.Body)) + }) + } +} + +type notReadyReadStorage struct { + LocalStorage +} + +func (notReadyReadStorage) Querier(context.Context, int64, int64) (storage.Querier, error) { + return nil, errors.Wrap(tsdb.ErrNotReady, "wrap") +} + +func (notReadyReadStorage) StartTime() (int64, error) { + return 0, errors.Wrap(tsdb.ErrNotReady, "wrap") +} + +func (notReadyReadStorage) Stats(string) (*tsdb.Stats, error) { + return nil, errors.Wrap(tsdb.ErrNotReady, "wrap") +} + +// Regression test for https://github.com/prometheus/prometheus/issues/7181. +func TestFederation_NotReady(t *testing.T) { + h := &Handler{ + localStorage: notReadyReadStorage{}, + lookbackDelta: 5 * time.Minute, + now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. + config: &config.Config{ + GlobalConfig: config.GlobalConfig{}, + }, + } + + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + h.config.GlobalConfig.ExternalLabels = scenario.externalLabels + req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil) + res := httptest.NewRecorder() + + h.federation(res, req) + if scenario.code == http.StatusBadRequest { + // Request are expected to be checked before DB readiness. + testutil.Equals(t, http.StatusBadRequest, res.Code) + return + } + testutil.Equals(t, http.StatusServiceUnavailable, res.Code) + }) } } diff --git a/web/web.go b/web/web.go index 7e377b49a..204bfaf27 100644 --- a/web/web.go +++ b/web/web.go @@ -166,6 +166,11 @@ func (m *metrics) instrumentHandler(handlerName string, handler http.HandlerFunc // PrometheusVersion contains build information about Prometheus. type PrometheusVersion = api_v1.PrometheusVersion +type LocalStorage interface { + storage.Storage + api_v1.TSDBAdminStats +} + // Handler serves various HTTP endpoints of the Prometheus server type Handler struct { logger log.Logger @@ -178,9 +183,8 @@ type Handler struct { queryEngine *promql.Engine lookbackDelta time.Duration context context.Context - tsdb func() *tsdb.DB storage storage.Storage - localStorage storage.Storage + localStorage LocalStorage notifier *notifier.Manager apiV1 *api_v1.API @@ -214,9 +218,10 @@ func (h *Handler) ApplyConfig(conf *config.Config) error { // Options for the web Handler. type Options struct { Context context.Context - TSDB func() *tsdb.DB TSDBRetentionDuration model.Duration + TSDBDir string TSDBMaxBytes units.Base2Bytes + LocalStorage LocalStorage Storage storage.Storage QueryEngine *promql.Engine LookbackDelta time.Duration @@ -283,9 +288,8 @@ func New(logger log.Logger, o *Options) *Handler { ruleManager: o.RuleManager, queryEngine: o.QueryEngine, lookbackDelta: o.LookbackDelta, - tsdb: o.TSDB, storage: o.Storage, - localStorage: o.TSDB(), + localStorage: o.LocalStorage, notifier: o.Notifier, now: model.Now, @@ -309,9 +313,8 @@ func New(logger log.Logger, o *Options) *Handler { Scheme: o.ExternalURL.Scheme, }, h.testReady, - func() api_v1.TSDBAdmin { - return h.options.TSDB() - }, + h.options.LocalStorage, + h.options.TSDBDir, h.options.EnableAdminAPI, logger, h.ruleManager, @@ -538,7 +541,8 @@ func (h *Handler) Run(ctx context.Context) error { grpcSrv = grpc.NewServer() ) av2 := api_v2.New( - h.options.TSDB, + h.options.LocalStorage, + h.options.TSDBDir, h.options.EnableAdminAPI, ) av2.RegisterGRPC(grpcSrv) @@ -789,13 +793,22 @@ func (h *Handler) status(w http.ResponseWriter, r *http.Request) { status.LastConfigTime = time.Unix(int64(toFloat64(mF)), 0).UTC() } } - db := h.tsdb() + startTime := time.Now().UnixNano() - status.Stats = db.Head().PostingsCardinalityStats("__name__") + s, err := h.localStorage.Stats("__name__") + if err != nil { + if errors.Cause(err) == tsdb.ErrNotReady { + http.Error(w, tsdb.ErrNotReady.Error(), http.StatusServiceUnavailable) + return + } + http.Error(w, fmt.Sprintf("error gathering local storage statistics: %s", err), http.StatusInternalServerError) + return + } status.Duration = fmt.Sprintf("%.3f", float64(time.Now().UnixNano()-startTime)/float64(1e9)) - status.NumSeries = db.Head().NumSeries() - status.MaxTime = db.Head().MaxTime() - status.MinTime = db.Head().MaxTime() + status.Stats = s.IndexPostingStats + status.NumSeries = s.NumSeries + status.MaxTime = s.MaxTime + status.MinTime = s.MinTime h.executeTemplate(w, "status.html", status) } diff --git a/web/web_test.go b/web/web_test.go index a34e17ca6..23fbf3e49 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -41,6 +41,7 @@ func TestMain(m *testing.M) { os.Setenv("no_proxy", "localhost,127.0.0.1,0.0.0.0,:") os.Exit(m.Run()) } + func TestGlobalURL(t *testing.T) { opts := &Options{ ListenAddress: ":9090", @@ -89,15 +90,21 @@ func TestGlobalURL(t *testing.T) { } } +type dbAdapter struct { + *tsdb.DB +} + +func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) { + return a.Head().Stats(statsByLabelName), nil +} + func TestReadyAndHealthy(t *testing.T) { t.Parallel() dbDir, err := ioutil.TempDir("", "tsdb-ready") - testutil.Ok(t, err) + defer testutil.Ok(t, os.RemoveAll(dbDir)) - defer os.RemoveAll(dbDir) db, err := tsdb.Open(dbDir, nil, nil, nil) - testutil.Ok(t, err) opts := &Options{ @@ -106,13 +113,14 @@ func TestReadyAndHealthy(t *testing.T) { MaxConnections: 512, Context: nil, Storage: nil, + LocalStorage: &dbAdapter{db}, + TSDBDir: dbDir, QueryEngine: nil, ScrapeManager: &scrape.Manager{}, RuleManager: &rules.Manager{}, Notifier: nil, RoutePrefix: "/", EnableAdminAPI: true, - TSDB: func() *tsdb.DB { return db }, ExternalURL: &url.URL{ Scheme: "http", Host: "localhost:9090", @@ -136,6 +144,9 @@ func TestReadyAndHealthy(t *testing.T) { } }() + // TODO(bwplotka): Those tests create tons of new connection and memory that is never cleaned. + // Close and exhaust all response bodies. + // Give some time for the web goroutine to run since we need the server // to be up before starting tests. time.Sleep(5 * time.Second) @@ -282,13 +293,10 @@ func TestReadyAndHealthy(t *testing.T) { func TestRoutePrefix(t *testing.T) { t.Parallel() dbDir, err := ioutil.TempDir("", "tsdb-ready") - testutil.Ok(t, err) - - defer os.RemoveAll(dbDir) + defer testutil.Ok(t, os.RemoveAll(dbDir)) db, err := tsdb.Open(dbDir, nil, nil, nil) - testutil.Ok(t, err) opts := &Options{ @@ -296,6 +304,8 @@ func TestRoutePrefix(t *testing.T) { ReadTimeout: 30 * time.Second, MaxConnections: 512, Context: nil, + TSDBDir: dbDir, + LocalStorage: &dbAdapter{db}, Storage: nil, QueryEngine: nil, ScrapeManager: nil, @@ -307,7 +317,6 @@ func TestRoutePrefix(t *testing.T) { Host: "localhost.localdomain:9090", Scheme: "http", }, - TSDB: func() *tsdb.DB { return db }, } opts.Flags = map[string]string{} @@ -399,7 +408,6 @@ func TestDebugHandler(t *testing.T) { Host: "localhost.localdomain:9090", Scheme: "http", }, - TSDB: func() *tsdb.DB { return nil }, } handler := New(nil, opts) handler.Ready() @@ -426,7 +434,6 @@ func TestHTTPMetrics(t *testing.T) { Host: "localhost.localdomain:9090", Scheme: "http", }, - TSDB: func() *tsdb.DB { return nil }, }) getReady := func() int { t.Helper()