Browse Source

Fixed wrongly handled not ready TSDB on web and API. (#7182)

* fix federate endpoint panic

Signed-off-by: yeya24 <yb532204897@gmail.com>

* Fixed all cases of not ready TSDB being wrongly handled.

* Fixed issue for federation.
* Ensured this will never happen again thanks to interfaces
* Fixes same issue for stats.
* Added tests for readiness.
* Fixed bug in stats. It was:
   status.MaxTime = db.Head().MaxTime()
   status.MinTime = db.Head().MaxTime()


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed Brian's comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed Brian's comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/7186/head
Ben Ye 5 years ago committed by GitHub
parent
commit
1e4e37144d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 45
      cmd/prometheus/main.go
  2. 9
      promql/test.go
  3. 1
      tsdb/db.go
  4. 17
      tsdb/head.go
  5. 15
      util/teststorage/storage.go
  6. 94
      web/api/v1/api.go
  7. 61
      web/api/v1/api_test.go
  8. 53
      web/api/v2/api.go
  9. 6
      web/federate.go
  10. 71
      web/federate_test.go
  11. 41
      web/web.go
  12. 29
      web/web_test.go

45
cmd/prometheus/main.go

@ -387,9 +387,9 @@ func main() {
)
cfg.web.Context = ctxWeb
cfg.web.TSDB = localStorage.Get
cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration
cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes
cfg.web.LocalStorage = localStorage
cfg.web.Storage = fanoutStorage
cfg.web.QueryEngine = queryEngine
cfg.web.ScrapeManager = scrapeManager
@ -921,14 +921,7 @@ func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
s.startTimeMargin = startTimeMargin
}
// Get the storage.
func (s *readyStorage) Get() *tsdb.DB {
if x := s.get(); x != nil {
return x
}
return nil
}
// get is internal, you should use readyStorage as the front implementation layer.
func (s *readyStorage) get() *tsdb.DB {
s.mtx.RLock()
x := s.db
@ -983,12 +976,44 @@ func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
// Close implements the Storage interface.
func (s *readyStorage) Close() error {
if x := s.Get(); x != nil {
if x := s.get(); x != nil {
return x.Close()
}
return nil
}
// CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) CleanTombstones() error {
if x := s.get(); x != nil {
return x.CleanTombstones()
}
return tsdb.ErrNotReady
}
// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
if x := s.get(); x != nil {
return x.Delete(mint, maxt, ms...)
}
return tsdb.ErrNotReady
}
// Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Snapshot(dir string, withHead bool) error {
if x := s.get(); x != nil {
return x.Snapshot(dir, withHead)
}
return tsdb.ErrNotReady
}
// Stats implements the api_v1.TSDBAdminStats interface.
func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) {
if x := s.get(); x != nil {
return x.Head().Stats(statsByLabelName), nil
}
return nil, tsdb.ErrNotReady
}
// tsdbOptions is tsdb.Option version with defined units.
// This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct {

9
promql/test.go

@ -25,10 +25,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
)
@ -54,7 +54,7 @@ type Test struct {
cmds []testCommand
storage storage.Storage
storage *teststorage.TestStorage
queryEngine *Engine
context context.Context
@ -101,6 +101,11 @@ func (t *Test) Storage() storage.Storage {
return t.storage
}
// TSDB returns test's TSDB.
func (t *Test) TSDB() *tsdb.DB {
return t.storage.DB
}
func raise(line int, format string, v ...interface{}) error {
return &parser.ParseErr{
LineOffset: line,

1
tsdb/db.go

@ -39,6 +39,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
// Load the package into main to make sure minium Go version is met.
_ "github.com/prometheus/prometheus/tsdb/goversion"
"github.com/prometheus/prometheus/tsdb/wal"

17
tsdb/head.go

@ -751,6 +751,23 @@ func (h *Head) initTime(t int64) (initialized bool) {
return true
}
type Stats struct {
NumSeries uint64
MinTime, MaxTime int64
IndexPostingStats *index.PostingsStats
}
// Stats returns important current HEAD statistics. Note that it is expensive to
// calculate these.
func (h *Head) Stats(statsByLabelName string) *Stats {
return &Stats{
NumSeries: h.NumSeries(),
MaxTime: h.MaxTime(),
MinTime: h.MinTime(),
IndexPostingStats: h.PostingsCardinalityStats(statsByLabelName),
}
}
type RangeHead struct {
head *Head
mint, maxt int64

15
util/teststorage/storage.go

@ -18,14 +18,13 @@ import (
"os"
"time"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/testutil"
)
// New returns a new storage for testing purposes
// New returns a new TestStorage for testing purposes
// that removes all associated files on closing.
func New(t testutil.T) storage.Storage {
func New(t testutil.T) *TestStorage {
dir, err := ioutil.TempDir("", "test_storage")
if err != nil {
t.Fatalf("Opening test dir failed: %s", err)
@ -40,16 +39,16 @@ func New(t testutil.T) storage.Storage {
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}
return testStorage{Storage: db, dir: dir}
return &TestStorage{DB: db, dir: dir}
}
type testStorage struct {
storage.Storage
type TestStorage struct {
*tsdb.DB
dir string
}
func (s testStorage) Close() error {
if err := s.Storage.Close(); err != nil {
func (s TestStorage) Close() error {
if err := s.DB.Close(); err != nil {
return err
}
return os.RemoveAll(s.dir)

94
web/api/v1/api.go

@ -37,7 +37,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
@ -159,13 +158,13 @@ type apiFuncResult struct {
type apiFunc func(r *http.Request) apiFuncResult
// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations.
type TSDBAdmin interface {
// TSDBAdminStats defines the tsdb interfaces used by the v1 API for admin operations as well as statistics.
type TSDBAdminStats interface {
CleanTombstones() error
Delete(mint, maxt int64, ms ...*labels.Matcher) error
Dir() string
Snapshot(dir string, withHead bool) error
Head() *tsdb.Head
Stats(statsByLabelName string) (*tsdb.Stats, error)
}
// API can register a set of endpoints in a router and handle
@ -183,7 +182,8 @@ type API struct {
ready func(http.HandlerFunc) http.HandlerFunc
globalURLOptions GlobalURLOptions
db func() TSDBAdmin
db TSDBAdminStats
dbDir string
enableAdmin bool
logger log.Logger
remoteReadSampleLimit int
@ -209,7 +209,8 @@ func NewAPI(
flagsMap map[string]string,
globalURLOptions GlobalURLOptions,
readyFunc func(http.HandlerFunc) http.HandlerFunc,
db func() TSDBAdmin,
db TSDBAdminStats,
dbDir string,
enableAdmin bool,
logger log.Logger,
rr rulesRetriever,
@ -232,6 +233,7 @@ func NewAPI(
ready: readyFunc,
globalURLOptions: globalURLOptions,
db: db,
dbDir: dbDir,
enableAdmin: enableAdmin,
rulesRetriever: rr,
remoteReadSampleLimit: remoteReadSampleLimit,
@ -244,22 +246,32 @@ func NewAPI(
}
}
func setUnavailStatusOnTSDBNotReady(r apiFuncResult) apiFuncResult {
if r.err != nil && errors.Cause(r.err.err) == tsdb.ErrNotReady {
r.err.typ = errorUnavailable
}
return r
}
// Register the API's endpoints in the given router.
func (api *API) Register(r *route.Router) {
wrap := func(f apiFunc) http.HandlerFunc {
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httputil.SetCORS(w, api.CORSOrigin, r)
result := f(r)
result := setUnavailStatusOnTSDBNotReady(f(r))
if result.finalizer != nil {
defer result.finalizer()
}
if result.err != nil {
api.respondError(w, result.err, result.data)
} else if result.data != nil {
return
}
if result.data != nil {
api.respond(w, result.data, result.warnings)
} else {
w.WriteHeader(http.StatusNoContent)
return
}
w.WriteHeader(http.StatusNoContent)
})
return api.ready(httputil.CompressionHandler{
Handler: hf,
@ -1124,29 +1136,27 @@ type tsdbStatus struct {
SeriesCountByLabelValuePair []stat `json:"seriesCountByLabelValuePair"`
}
func (api *API) serveTSDBStatus(r *http.Request) apiFuncResult {
db := api.db()
if db == nil {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
convert := func(stats []index.Stat) []stat {
result := make([]stat, 0, len(stats))
for _, item := range stats {
item := stat{Name: item.Name, Value: item.Count}
result = append(result, item)
}
return result
func convertStats(stats []index.Stat) []stat {
result := make([]stat, 0, len(stats))
for _, item := range stats {
item := stat{Name: item.Name, Value: item.Count}
result = append(result, item)
}
return result
}
posting := db.Head().PostingsCardinalityStats(model.MetricNameLabel)
response := tsdbStatus{
SeriesCountByMetricName: convert(posting.CardinalityMetricsStats),
LabelValueCountByLabelName: convert(posting.CardinalityLabelStats),
MemoryInBytesByLabelName: convert(posting.LabelValueStats),
SeriesCountByLabelValuePair: convert(posting.LabelValuePairsStats),
func (api *API) serveTSDBStatus(*http.Request) apiFuncResult {
s, err := api.db.Stats("__name__")
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
}
return apiFuncResult{response, nil, nil, nil}
return apiFuncResult{tsdbStatus{
SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats),
}, nil, nil, nil}
}
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
@ -1322,11 +1332,6 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if !api.enableAdmin {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}
}
db := api.db()
if db == nil {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
if err := r.ParseForm(); err != nil {
return apiFuncResult{nil, &apiError{errorBadData, errors.Wrap(err, "error parsing form values")}, nil, nil}
}
@ -1348,8 +1353,7 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil {
if err := api.db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
}
}
@ -1372,13 +1376,8 @@ func (api *API) snapshot(r *http.Request) apiFuncResult {
}
}
db := api.db()
if db == nil {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
var (
snapdir = filepath.Join(db.Dir(), "snapshots")
snapdir = filepath.Join(api.dbDir, "snapshots")
name = fmt.Sprintf("%s-%x",
time.Now().UTC().Format("20060102T150405Z0700"),
rand.Int())
@ -1387,7 +1386,7 @@ func (api *API) snapshot(r *http.Request) apiFuncResult {
if err := os.MkdirAll(dir, 0777); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot directory")}, nil, nil}
}
if err := db.Snapshot(dir, !skipHead); err != nil {
if err := api.db.Snapshot(dir, !skipHead); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot")}, nil, nil}
}
@ -1400,12 +1399,7 @@ func (api *API) cleanTombstones(r *http.Request) apiFuncResult {
if !api.enableAdmin {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}
}
db := api.db()
if db == nil {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
if err := db.CleanTombstones(); err != nil {
if err := api.db.CleanTombstones(); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
}

61
web/api/v1/api_test.go

@ -1892,32 +1892,24 @@ func TestStreamReadEndpoint(t *testing.T) {
}
type fakeDB struct {
err error
closer func()
err error
}
func (f *fakeDB) CleanTombstones() error { return f.err }
func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err }
func (f *fakeDB) Dir() string {
dir, _ := ioutil.TempDir("", "fakeDB")
f.closer = func() {
os.RemoveAll(dir)
}
return dir
}
func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err }
func (f *fakeDB) Head() *tsdb.Head {
func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err }
func (f *fakeDB) Stats(statsByLabelName string) (*tsdb.Stats, error) {
h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize)
return h
return h.Stats(statsByLabelName), nil
}
func TestAdminEndpoints(t *testing.T) {
tsdb, tsdbWithError := &fakeDB{}, &fakeDB{err: errors.New("some error")}
tsdb, tsdbWithError, tsdbNotReady := &fakeDB{}, &fakeDB{err: errors.New("some error")}, &fakeDB{err: errors.Wrap(tsdb.ErrNotReady, "wrap")}
snapshotAPI := func(api *API) apiFunc { return api.snapshot }
cleanAPI := func(api *API) apiFunc { return api.cleanTombstones }
deleteAPI := func(api *API) apiFunc { return api.deleteSeries }
for i, tc := range []struct {
for _, tc := range []struct {
db *fakeDB
enableAdmin bool
endpoint func(api *API) apiFunc
@ -1965,7 +1957,7 @@ func TestAdminEndpoints(t *testing.T) {
errType: errorInternal,
},
{
db: nil,
db: tsdbNotReady,
enableAdmin: true,
endpoint: snapshotAPI,
@ -1994,7 +1986,7 @@ func TestAdminEndpoints(t *testing.T) {
errType: errorInternal,
},
{
db: nil,
db: tsdbNotReady,
enableAdmin: true,
endpoint: cleanAPI,
@ -2064,37 +2056,31 @@ func TestAdminEndpoints(t *testing.T) {
errType: errorInternal,
},
{
db: nil,
db: tsdbNotReady,
enableAdmin: true,
endpoint: deleteAPI,
values: map[string][]string{"match[]": {"up"}},
errType: errorUnavailable,
},
} {
tc := tc
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Run("", func(t *testing.T) {
dir, _ := ioutil.TempDir("", "fakeDB")
defer testutil.Ok(t, os.RemoveAll(dir))
api := &API{
db: func() TSDBAdmin {
if tc.db != nil {
return tc.db
}
return nil
},
db: tc.db,
dbDir: dir,
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
enableAdmin: tc.enableAdmin,
}
defer func() {
if tc.db != nil && tc.db.closer != nil {
tc.db.closer()
}
}()
endpoint := tc.endpoint(api)
req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil)
if err != nil {
t.Fatalf("Error when creating test request: %s", err)
}
res := endpoint(req)
testutil.Ok(t, err)
res := setUnavailStatusOnTSDBNotReady(endpoint(req))
assertAPIError(t, res.err, tc.errType)
})
}
@ -2504,14 +2490,7 @@ func TestTSDBStatus(t *testing.T) {
} {
tc := tc
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := &API{
db: func() TSDBAdmin {
if tc.db != nil {
return tc.db
}
return nil
},
}
api := &API{db: tc.db}
endpoint := tc.endpoint(api)
req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil)
if err != nil {

53
web/api/v2/api.go

@ -26,7 +26,6 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -40,16 +39,19 @@ import (
// API encapsulates all API services.
type API struct {
enableAdmin bool
db func() *tsdb.DB
db TSDBAdmin
dbDir string
}
// New returns a new API object.
func New(
db func() *tsdb.DB,
db TSDBAdmin,
dbDir string,
enableAdmin bool,
) *API {
return &API{
db: db,
dbDir: dbDir,
enableAdmin: enableAdmin,
}
}
@ -57,7 +59,7 @@ func New(
// RegisterGRPC registers all API services with the given server.
func (api *API) RegisterGRPC(srv *grpc.Server) {
if api.enableAdmin {
pb.RegisterAdminServer(srv, NewAdmin(api.db))
pb.RegisterAdminServer(srv, NewAdmin(api.db, api.dbDir))
} else {
pb.RegisterAdminServer(srv, &AdminDisabled{})
}
@ -133,13 +135,21 @@ func (s *AdminDisabled) DeleteSeries(_ context.Context, r *pb.SeriesDeleteReques
return nil, errAdminDisabled
}
// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations as well as statistics.
type TSDBAdmin interface {
CleanTombstones() error
Delete(mint, maxt int64, ms ...*labels.Matcher) error
Snapshot(dir string, withHead bool) error
}
// Admin provides an administration interface to Prometheus.
type Admin struct {
db func() *tsdb.DB
db TSDBAdmin
dbDir string
}
// NewAdmin returns a Admin server.
func NewAdmin(db func() *tsdb.DB) *Admin {
func NewAdmin(db TSDBAdmin, dbDir string) *Admin {
return &Admin{
db: db,
}
@ -147,12 +157,8 @@ func NewAdmin(db func() *tsdb.DB) *Admin {
// TSDBSnapshot implements pb.AdminServer.
func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) {
db := s.db()
if db == nil {
return nil, errTSDBNotReady
}
var (
snapdir = filepath.Join(db.Dir(), "snapshots")
snapdir = filepath.Join(s.dbDir, "snapshots")
name = fmt.Sprintf("%s-%x",
time.Now().UTC().Format("20060102T150405Z0700"),
rand.Int())
@ -161,7 +167,11 @@ func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*p
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err)
}
if err := db.Snapshot(dir, !req.SkipHead); err != nil {
if err := s.db.Snapshot(dir, !req.SkipHead); err != nil {
if errors.Cause(err) == tsdb.ErrNotReady {
return nil, errTSDBNotReady
}
return nil, status.Errorf(codes.Internal, "create snapshot: %s", err)
}
return &pb.TSDBSnapshotResponse{Name: name}, nil
@ -169,12 +179,10 @@ func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*p
// TSDBCleanTombstones implements pb.AdminServer.
func (s *Admin) TSDBCleanTombstones(_ context.Context, _ *pb.TSDBCleanTombstonesRequest) (*pb.TSDBCleanTombstonesResponse, error) {
db := s.db()
if db == nil {
return nil, errTSDBNotReady
}
if err := db.CleanTombstones(); err != nil {
if err := s.db.CleanTombstones(); err != nil {
if errors.Cause(err) == tsdb.ErrNotReady {
return nil, errTSDBNotReady
}
return nil, status.Errorf(codes.Internal, "clean tombstones: %s", err)
}
@ -212,11 +220,10 @@ func (s *Admin) DeleteSeries(_ context.Context, r *pb.SeriesDeleteRequest) (*pb.
matchers = append(matchers, lm)
}
db := s.db()
if db == nil {
return nil, errTSDBNotReady
}
if err := db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
if err := s.db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
if errors.Cause(err) == tsdb.ErrNotReady {
return nil, errTSDBNotReady
}
return nil, status.Error(codes.Internal, err.Error())
}
return &pb.SeriesDeleteResponse{}, nil

6
web/federate.go

@ -20,10 +20,12 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
@ -78,6 +80,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
q, err := h.localStorage.Querier(req.Context(), mint, maxt)
if err != nil {
federationErrors.Inc()
if errors.Cause(err) == tsdb.ErrNotReady {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

71
web/federate_test.go

@ -15,16 +15,22 @@ package web
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"sort"
"strings"
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/testutil"
)
var scenarios = map[string]struct {
@ -199,7 +205,7 @@ func TestFederation(t *testing.T) {
}
h := &Handler{
localStorage: suite.Storage(),
localStorage: &dbAdapter{suite.TSDB()},
lookbackDelta: 5 * time.Minute,
now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch.
config: &config.Config{
@ -208,16 +214,59 @@ func TestFederation(t *testing.T) {
}
for name, scenario := range scenarios {
h.config.GlobalConfig.ExternalLabels = scenario.externalLabels
req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil)
res := httptest.NewRecorder()
h.federation(res, req)
if got, want := res.Code, scenario.code; got != want {
t.Errorf("Scenario %q: got code %d, want %d", name, got, want)
}
if got, want := normalizeBody(res.Body), scenario.body; got != want {
t.Errorf("Scenario %q: got body\n%s\n, want\n%s\n", name, got, want)
}
t.Run(name, func(t *testing.T) {
h.config.GlobalConfig.ExternalLabels = scenario.externalLabels
req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil)
res := httptest.NewRecorder()
h.federation(res, req)
testutil.Equals(t, scenario.code, res.Code)
testutil.Equals(t, scenario.body, normalizeBody(res.Body))
})
}
}
type notReadyReadStorage struct {
LocalStorage
}
func (notReadyReadStorage) Querier(context.Context, int64, int64) (storage.Querier, error) {
return nil, errors.Wrap(tsdb.ErrNotReady, "wrap")
}
func (notReadyReadStorage) StartTime() (int64, error) {
return 0, errors.Wrap(tsdb.ErrNotReady, "wrap")
}
func (notReadyReadStorage) Stats(string) (*tsdb.Stats, error) {
return nil, errors.Wrap(tsdb.ErrNotReady, "wrap")
}
// Regression test for https://github.com/prometheus/prometheus/issues/7181.
func TestFederation_NotReady(t *testing.T) {
h := &Handler{
localStorage: notReadyReadStorage{},
lookbackDelta: 5 * time.Minute,
now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch.
config: &config.Config{
GlobalConfig: config.GlobalConfig{},
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
h.config.GlobalConfig.ExternalLabels = scenario.externalLabels
req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil)
res := httptest.NewRecorder()
h.federation(res, req)
if scenario.code == http.StatusBadRequest {
// Request are expected to be checked before DB readiness.
testutil.Equals(t, http.StatusBadRequest, res.Code)
return
}
testutil.Equals(t, http.StatusServiceUnavailable, res.Code)
})
}
}

41
web/web.go

@ -166,6 +166,11 @@ func (m *metrics) instrumentHandler(handlerName string, handler http.HandlerFunc
// PrometheusVersion contains build information about Prometheus.
type PrometheusVersion = api_v1.PrometheusVersion
type LocalStorage interface {
storage.Storage
api_v1.TSDBAdminStats
}
// Handler serves various HTTP endpoints of the Prometheus server
type Handler struct {
logger log.Logger
@ -178,9 +183,8 @@ type Handler struct {
queryEngine *promql.Engine
lookbackDelta time.Duration
context context.Context
tsdb func() *tsdb.DB
storage storage.Storage
localStorage storage.Storage
localStorage LocalStorage
notifier *notifier.Manager
apiV1 *api_v1.API
@ -214,9 +218,10 @@ func (h *Handler) ApplyConfig(conf *config.Config) error {
// Options for the web Handler.
type Options struct {
Context context.Context
TSDB func() *tsdb.DB
TSDBRetentionDuration model.Duration
TSDBDir string
TSDBMaxBytes units.Base2Bytes
LocalStorage LocalStorage
Storage storage.Storage
QueryEngine *promql.Engine
LookbackDelta time.Duration
@ -283,9 +288,8 @@ func New(logger log.Logger, o *Options) *Handler {
ruleManager: o.RuleManager,
queryEngine: o.QueryEngine,
lookbackDelta: o.LookbackDelta,
tsdb: o.TSDB,
storage: o.Storage,
localStorage: o.TSDB(),
localStorage: o.LocalStorage,
notifier: o.Notifier,
now: model.Now,
@ -309,9 +313,8 @@ func New(logger log.Logger, o *Options) *Handler {
Scheme: o.ExternalURL.Scheme,
},
h.testReady,
func() api_v1.TSDBAdmin {
return h.options.TSDB()
},
h.options.LocalStorage,
h.options.TSDBDir,
h.options.EnableAdminAPI,
logger,
h.ruleManager,
@ -538,7 +541,8 @@ func (h *Handler) Run(ctx context.Context) error {
grpcSrv = grpc.NewServer()
)
av2 := api_v2.New(
h.options.TSDB,
h.options.LocalStorage,
h.options.TSDBDir,
h.options.EnableAdminAPI,
)
av2.RegisterGRPC(grpcSrv)
@ -789,13 +793,22 @@ func (h *Handler) status(w http.ResponseWriter, r *http.Request) {
status.LastConfigTime = time.Unix(int64(toFloat64(mF)), 0).UTC()
}
}
db := h.tsdb()
startTime := time.Now().UnixNano()
status.Stats = db.Head().PostingsCardinalityStats("__name__")
s, err := h.localStorage.Stats("__name__")
if err != nil {
if errors.Cause(err) == tsdb.ErrNotReady {
http.Error(w, tsdb.ErrNotReady.Error(), http.StatusServiceUnavailable)
return
}
http.Error(w, fmt.Sprintf("error gathering local storage statistics: %s", err), http.StatusInternalServerError)
return
}
status.Duration = fmt.Sprintf("%.3f", float64(time.Now().UnixNano()-startTime)/float64(1e9))
status.NumSeries = db.Head().NumSeries()
status.MaxTime = db.Head().MaxTime()
status.MinTime = db.Head().MaxTime()
status.Stats = s.IndexPostingStats
status.NumSeries = s.NumSeries
status.MaxTime = s.MaxTime
status.MinTime = s.MinTime
h.executeTemplate(w, "status.html", status)
}

29
web/web_test.go

@ -41,6 +41,7 @@ func TestMain(m *testing.M) {
os.Setenv("no_proxy", "localhost,127.0.0.1,0.0.0.0,:")
os.Exit(m.Run())
}
func TestGlobalURL(t *testing.T) {
opts := &Options{
ListenAddress: ":9090",
@ -89,15 +90,21 @@ func TestGlobalURL(t *testing.T) {
}
}
type dbAdapter struct {
*tsdb.DB
}
func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) {
return a.Head().Stats(statsByLabelName), nil
}
func TestReadyAndHealthy(t *testing.T) {
t.Parallel()
dbDir, err := ioutil.TempDir("", "tsdb-ready")
testutil.Ok(t, err)
defer testutil.Ok(t, os.RemoveAll(dbDir))
defer os.RemoveAll(dbDir)
db, err := tsdb.Open(dbDir, nil, nil, nil)
testutil.Ok(t, err)
opts := &Options{
@ -106,13 +113,14 @@ func TestReadyAndHealthy(t *testing.T) {
MaxConnections: 512,
Context: nil,
Storage: nil,
LocalStorage: &dbAdapter{db},
TSDBDir: dbDir,
QueryEngine: nil,
ScrapeManager: &scrape.Manager{},
RuleManager: &rules.Manager{},
Notifier: nil,
RoutePrefix: "/",
EnableAdminAPI: true,
TSDB: func() *tsdb.DB { return db },
ExternalURL: &url.URL{
Scheme: "http",
Host: "localhost:9090",
@ -136,6 +144,9 @@ func TestReadyAndHealthy(t *testing.T) {
}
}()
// TODO(bwplotka): Those tests create tons of new connection and memory that is never cleaned.
// Close and exhaust all response bodies.
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
@ -282,13 +293,10 @@ func TestReadyAndHealthy(t *testing.T) {
func TestRoutePrefix(t *testing.T) {
t.Parallel()
dbDir, err := ioutil.TempDir("", "tsdb-ready")
testutil.Ok(t, err)
defer os.RemoveAll(dbDir)
defer testutil.Ok(t, os.RemoveAll(dbDir))
db, err := tsdb.Open(dbDir, nil, nil, nil)
testutil.Ok(t, err)
opts := &Options{
@ -296,6 +304,8 @@ func TestRoutePrefix(t *testing.T) {
ReadTimeout: 30 * time.Second,
MaxConnections: 512,
Context: nil,
TSDBDir: dbDir,
LocalStorage: &dbAdapter{db},
Storage: nil,
QueryEngine: nil,
ScrapeManager: nil,
@ -307,7 +317,6 @@ func TestRoutePrefix(t *testing.T) {
Host: "localhost.localdomain:9090",
Scheme: "http",
},
TSDB: func() *tsdb.DB { return db },
}
opts.Flags = map[string]string{}
@ -399,7 +408,6 @@ func TestDebugHandler(t *testing.T) {
Host: "localhost.localdomain:9090",
Scheme: "http",
},
TSDB: func() *tsdb.DB { return nil },
}
handler := New(nil, opts)
handler.Ready()
@ -426,7 +434,6 @@ func TestHTTPMetrics(t *testing.T) {
Host: "localhost.localdomain:9090",
Scheme: "http",
},
TSDB: func() *tsdb.DB { return nil },
})
getReady := func() int {
t.Helper()

Loading…
Cancel
Save