mirror of https://github.com/prometheus/prometheus
Remote storage reads based on oldest timestamp in primary storage (#3129)
Currently all read queries are simply pushed to remote read clients. This is fine, except for remote storage for wich it unefficient and make query slower even if remote read is unnecessary. So we need instead to compare the oldest timestamp in primary/local storage with the query range lower boundary. If the oldest timestamp is older than the mint parameter, then there is no need for remote read. This is an optionnal behavior per remote read client. Signed-off-by: Thibault Chataigner <t.chataigner@criteo.com>pull/3311/merge
parent
5ab8834bef
commit
bf4a279a91
|
@ -225,7 +225,7 @@ func main() {
|
|||
|
||||
var (
|
||||
localStorage = &tsdb.ReadyStorage{}
|
||||
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"))
|
||||
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime)
|
||||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||
)
|
||||
|
||||
|
@ -326,7 +326,8 @@ func main() {
|
|||
}
|
||||
level.Info(logger).Log("msg", "TSDB started")
|
||||
|
||||
localStorage.Set(db)
|
||||
startTimeMargin := int64(time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
|
||||
localStorage.Set(db, startTimeMargin)
|
||||
}()
|
||||
|
||||
prometheus.MustRegister(configSuccess)
|
||||
|
|
|
@ -197,6 +197,7 @@ var (
|
|||
// DefaultRemoteReadConfig is the default remote read configuration.
|
||||
DefaultRemoteReadConfig = RemoteReadConfig{
|
||||
RemoteTimeout: model.Duration(1 * time.Minute),
|
||||
ReadRecent: true,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -1490,7 +1491,7 @@ type QueueConfig struct {
|
|||
type RemoteReadConfig struct {
|
||||
URL *URL `yaml:"url"`
|
||||
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
||||
|
||||
ReadRecent bool `yaml:"read_recent,omitempty"`
|
||||
// We cannot do proper Go type embedding below as the parser will then parse
|
||||
// values arbitrarily into the overflow maps of further-down types.
|
||||
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
|
||||
|
|
|
@ -75,6 +75,19 @@ var expectedConf = &Config{
|
|||
},
|
||||
},
|
||||
|
||||
RemoteReadConfigs: []*RemoteReadConfig{
|
||||
{
|
||||
URL: mustParseURL("http://remote1/read"),
|
||||
RemoteTimeout: model.Duration(1 * time.Minute),
|
||||
ReadRecent: true,
|
||||
},
|
||||
{
|
||||
URL: mustParseURL("http://remote3/read"),
|
||||
RemoteTimeout: model.Duration(1 * time.Minute),
|
||||
ReadRecent: false,
|
||||
},
|
||||
},
|
||||
|
||||
ScrapeConfigs: []*ScrapeConfig{
|
||||
{
|
||||
JobName: "prometheus",
|
||||
|
|
|
@ -20,6 +20,12 @@ remote_write:
|
|||
action: drop
|
||||
- url: http://remote2/push
|
||||
|
||||
remote_read:
|
||||
- url: http://remote1/read
|
||||
read_recent: true
|
||||
- url: http://remote3/read
|
||||
read_recent: false
|
||||
|
||||
scrape_configs:
|
||||
- job_name: prometheus
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
)
|
||||
|
||||
|
@ -40,6 +41,27 @@ func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Stora
|
|||
}
|
||||
}
|
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (f *fanout) StartTime() (int64, error) {
|
||||
// StartTime of a fanout should be the earliest StartTime of all its storages,
|
||||
// both primary and secondaries.
|
||||
firstTime, err := f.primary.StartTime()
|
||||
if err != nil {
|
||||
return int64(model.Latest), err
|
||||
}
|
||||
|
||||
for _, storage := range f.secondaries {
|
||||
t, err := storage.StartTime()
|
||||
if err != nil {
|
||||
return int64(model.Latest), err
|
||||
}
|
||||
if t < firstTime {
|
||||
firstTime = t
|
||||
}
|
||||
}
|
||||
return firstTime, nil
|
||||
}
|
||||
|
||||
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
||||
queriers := mergeQuerier{
|
||||
queriers: make([]Querier, 0, 1+len(f.secondaries)),
|
||||
|
|
|
@ -31,6 +31,9 @@ var (
|
|||
// Storage ingests and manages samples, along with various indexes. All methods
|
||||
// are goroutine-safe. Storage implements storage.SampleAppender.
|
||||
type Storage interface {
|
||||
// StartTime returns the oldest timestamp stored in the storage.
|
||||
StartTime() (int64, error)
|
||||
|
||||
// Querier returns a new Querier on the storage.
|
||||
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
|
||||
|
|
|
@ -37,15 +37,17 @@ const maxErrMsgLen = 256
|
|||
|
||||
// Client allows reading and writing from/to a remote HTTP endpoint.
|
||||
type Client struct {
|
||||
index int // Used to differentiate metrics.
|
||||
url *config.URL
|
||||
client *http.Client
|
||||
timeout time.Duration
|
||||
index int // Used to differentiate metrics.
|
||||
url *config.URL
|
||||
client *http.Client
|
||||
timeout time.Duration
|
||||
readRecent bool
|
||||
}
|
||||
|
||||
type clientConfig struct {
|
||||
url *config.URL
|
||||
timeout model.Duration
|
||||
readRecent bool
|
||||
httpClientConfig config.HTTPClientConfig
|
||||
}
|
||||
|
||||
|
@ -57,10 +59,11 @@ func NewClient(index int, conf *clientConfig) (*Client, error) {
|
|||
}
|
||||
|
||||
return &Client{
|
||||
index: index,
|
||||
url: conf.url,
|
||||
client: httpClient,
|
||||
timeout: time.Duration(conf.timeout),
|
||||
index: index,
|
||||
url: conf.url,
|
||||
client: httpClient,
|
||||
timeout: time.Duration(conf.timeout),
|
||||
readRecent: conf.readRecent,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -30,17 +30,35 @@ func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier,
|
|||
defer r.mtx.Unlock()
|
||||
|
||||
queriers := make([]storage.Querier, 0, len(r.clients))
|
||||
localStartTime, err := r.localStartTimeCallback()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, c := range r.clients {
|
||||
cmaxt := maxt
|
||||
if !c.readRecent {
|
||||
// Avoid queries whose timerange is later than the first timestamp in local DB.
|
||||
if mint > localStartTime {
|
||||
continue
|
||||
}
|
||||
// Query only samples older than the first timestamp in local DB.
|
||||
if maxt > localStartTime {
|
||||
cmaxt = localStartTime
|
||||
}
|
||||
}
|
||||
queriers = append(queriers, &querier{
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
maxt: cmaxt,
|
||||
client: c,
|
||||
externalLabels: r.externalLabels,
|
||||
})
|
||||
}
|
||||
return storage.NewMergeQuerier(queriers), nil
|
||||
return newMergeQueriers(queriers), nil
|
||||
}
|
||||
|
||||
// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed.
|
||||
var newMergeQueriers = storage.NewMergeQuerier
|
||||
|
||||
// Querier is an adapter to make a Client usable as a storage.Querier.
|
||||
type querier struct {
|
||||
mint, maxt int64
|
||||
|
|
|
@ -17,8 +17,10 @@ import (
|
|||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -123,3 +125,59 @@ func TestConcreteSeriesSet(t *testing.T) {
|
|||
t.Fatalf("Expected Next() to be false.")
|
||||
}
|
||||
}
|
||||
|
||||
type mockMergeQuerier struct{ queriersCount int }
|
||||
|
||||
func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil }
|
||||
func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
|
||||
func (*mockMergeQuerier) Close() error { return nil }
|
||||
|
||||
func TestRemoteStorageQuerier(t *testing.T) {
|
||||
tests := []struct {
|
||||
localStartTime int64
|
||||
readRecentClients []bool
|
||||
mint int64
|
||||
maxt int64
|
||||
expectedQueriersCount int
|
||||
}{
|
||||
{
|
||||
localStartTime: int64(20),
|
||||
readRecentClients: []bool{true, true, false},
|
||||
mint: int64(0),
|
||||
maxt: int64(50),
|
||||
expectedQueriersCount: 3,
|
||||
},
|
||||
{
|
||||
localStartTime: int64(20),
|
||||
readRecentClients: []bool{true, true, false},
|
||||
mint: int64(30),
|
||||
maxt: int64(50),
|
||||
expectedQueriersCount: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil })
|
||||
s.clients = []*Client{}
|
||||
for _, readRecent := range test.readRecentClients {
|
||||
c, _ := NewClient(0, &clientConfig{
|
||||
url: nil,
|
||||
timeout: model.Duration(30 * time.Second),
|
||||
httpClientConfig: config.HTTPClientConfig{},
|
||||
readRecent: readRecent,
|
||||
})
|
||||
s.clients = append(s.clients, c)
|
||||
}
|
||||
// overrides mergeQuerier to mockMergeQuerier so we can reflect its type
|
||||
newMergeQueriers = func(queriers []storage.Querier) storage.Querier {
|
||||
return &mockMergeQuerier{queriersCount: len(queriers)}
|
||||
}
|
||||
|
||||
querier, _ := s.Querier(nil, test.mint, test.maxt)
|
||||
actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount
|
||||
|
||||
if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) {
|
||||
t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,9 @@ import (
|
|||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
// Callback func that return the oldest timestamp stored in a storage.
|
||||
type startTimeCallback func() (int64, error)
|
||||
|
||||
// Storage represents all the remote read and write endpoints. It implements
|
||||
// storage.Storage.
|
||||
type Storage struct {
|
||||
|
@ -31,15 +34,17 @@ type Storage struct {
|
|||
queues []*QueueManager
|
||||
|
||||
// For reads
|
||||
clients []*Client
|
||||
externalLabels model.LabelSet
|
||||
clients []*Client
|
||||
localStartTimeCallback startTimeCallback
|
||||
externalLabels model.LabelSet
|
||||
}
|
||||
|
||||
func NewStorage(l log.Logger) *Storage {
|
||||
// NewStorage returns a remote.Storage.
|
||||
func NewStorage(l log.Logger, stCallback startTimeCallback) *Storage {
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
}
|
||||
return &Storage{logger: l}
|
||||
return &Storage{logger: l, localStartTimeCallback: stCallback}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
|
@ -87,6 +92,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
|||
url: rrConf.URL,
|
||||
timeout: rrConf.RemoteTimeout,
|
||||
httpClientConfig: rrConf.HTTPClientConfig,
|
||||
readRecent: rrConf.ReadRecent,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -100,6 +106,11 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (s *Storage) StartTime() (int64, error) {
|
||||
return int64(model.Latest), nil
|
||||
}
|
||||
|
||||
// Close the background processing of the storage queues.
|
||||
func (s *Storage) Close() error {
|
||||
s.mtx.Lock()
|
||||
|
|
|
@ -40,11 +40,11 @@ type ReadyStorage struct {
|
|||
}
|
||||
|
||||
// Set the storage.
|
||||
func (s *ReadyStorage) Set(db *tsdb.DB) {
|
||||
func (s *ReadyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
s.a = &adapter{db: db}
|
||||
s.a = &adapter{db: db, startTimeMargin: startTimeMargin}
|
||||
}
|
||||
|
||||
// Get the storage.
|
||||
|
@ -62,6 +62,14 @@ func (s *ReadyStorage) get() *adapter {
|
|||
return x
|
||||
}
|
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (s *ReadyStorage) StartTime() (int64, error) {
|
||||
if x := s.get(); x != nil {
|
||||
return x.StartTime()
|
||||
}
|
||||
return int64(model.Latest), ErrNotReady
|
||||
}
|
||||
|
||||
// Querier implements the Storage interface.
|
||||
func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
if x := s.get(); x != nil {
|
||||
|
@ -86,13 +94,15 @@ func (s *ReadyStorage) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func Adapter(db *tsdb.DB) storage.Storage {
|
||||
return &adapter{db: db}
|
||||
// Adapter return an adapter as storage.Storage.
|
||||
func Adapter(db *tsdb.DB, startTimeMargin int64) storage.Storage {
|
||||
return &adapter{db: db, startTimeMargin: startTimeMargin}
|
||||
}
|
||||
|
||||
// adapter implements a storage.Storage around TSDB.
|
||||
type adapter struct {
|
||||
db *tsdb.DB
|
||||
db *tsdb.DB
|
||||
startTimeMargin int64
|
||||
}
|
||||
|
||||
// Options of the DB storage.
|
||||
|
@ -139,6 +149,57 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
|
|||
return db, nil
|
||||
}
|
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (a adapter) StartTime() (int64, error) {
|
||||
startTime := int64(model.Latest)
|
||||
|
||||
var indexr tsdb.IndexReader
|
||||
if len(a.db.Blocks()) > 0 {
|
||||
var err error
|
||||
indexr, err = a.db.Blocks()[0].Index()
|
||||
if err != nil {
|
||||
return startTime, err
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
indexr, err = a.db.Head().Index()
|
||||
if err != nil {
|
||||
return startTime, err
|
||||
}
|
||||
}
|
||||
|
||||
joblabel := "job"
|
||||
tpls, err := indexr.LabelValues(joblabel)
|
||||
if err != nil {
|
||||
return startTime, err
|
||||
}
|
||||
|
||||
for i := 0; i < tpls.Len(); i++ {
|
||||
vals, err := tpls.At(i)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, v := range vals {
|
||||
p, err := indexr.Postings(joblabel, v)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if p.Next() {
|
||||
var lset tsdbLabels.Labels
|
||||
var chks []tsdb.ChunkMeta
|
||||
indexr.Series(p.At(), &lset, &chks)
|
||||
if startTime > chks[0].MinTime {
|
||||
startTime = chks[0].MinTime
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add a safety margin as it may take a few minutes for everything to spin up.
|
||||
return startTime + a.startTimeMargin, nil
|
||||
}
|
||||
|
||||
func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
q, err := a.db.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
|
|
|
@ -40,7 +40,7 @@ func NewStorage(t T) storage.Storage {
|
|||
if err != nil {
|
||||
t.Fatalf("Opening test storage failed: %s", err)
|
||||
}
|
||||
return testStorage{Storage: tsdb.Adapter(db), dir: dir}
|
||||
return testStorage{Storage: tsdb.Adapter(db, int64(0)), dir: dir}
|
||||
}
|
||||
|
||||
type testStorage struct {
|
||||
|
|
Loading…
Reference in New Issue