mirror of https://github.com/prometheus/prometheus
Merge pull request #4282 from prometheus/4263-remote
remote read: nil pointer deference when using remove readpull/4283/head
commit
47809cae01
|
@ -96,9 +96,12 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams
|
|||
return nil, err
|
||||
}
|
||||
|
||||
rp := &prompb.ReadHints{
|
||||
StepMs: p.Step,
|
||||
Func: p.Func,
|
||||
var rp *prompb.ReadHints
|
||||
if p != nil {
|
||||
rp = &prompb.ReadHints{
|
||||
StepMs: p.Step,
|
||||
Func: p.Func,
|
||||
}
|
||||
}
|
||||
|
||||
return &prompb.Query{
|
||||
|
|
|
@ -31,7 +31,9 @@ import (
|
|||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promlog"
|
||||
"github.com/prometheus/common/route"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
|
@ -128,30 +130,125 @@ func TestEndpoints(t *testing.T) {
|
|||
|
||||
now := time.Now()
|
||||
|
||||
var tr testTargetRetriever
|
||||
t.Run("local", func(t *testing.T) {
|
||||
api := &API{
|
||||
Queryable: suite.Storage(),
|
||||
QueryEngine: suite.QueryEngine(),
|
||||
targetRetriever: testTargetRetriever{},
|
||||
alertmanagerRetriever: testAlertmanagerRetriever{},
|
||||
now: func() time.Time { return now },
|
||||
config: func() config.Config { return samplePrometheusCfg },
|
||||
flagsMap: sampleFlagMap,
|
||||
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
|
||||
}
|
||||
|
||||
var ar testAlertmanagerRetriever
|
||||
testEndpoints(t, api, true)
|
||||
})
|
||||
|
||||
api := &API{
|
||||
Queryable: suite.Storage(),
|
||||
QueryEngine: suite.QueryEngine(),
|
||||
targetRetriever: tr,
|
||||
alertmanagerRetriever: ar,
|
||||
now: func() time.Time { return now },
|
||||
config: func() config.Config { return samplePrometheusCfg },
|
||||
flagsMap: sampleFlagMap,
|
||||
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
|
||||
}
|
||||
// Run all the API tests against a API that is wired to forward queries via
|
||||
// the remote read client to a test server, which in turn sends them to the
|
||||
// data from the test suite.
|
||||
t.Run("remote", func(t *testing.T) {
|
||||
server := setupRemote(suite.Storage())
|
||||
defer server.Close()
|
||||
|
||||
u, err := url.Parse(server.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
al := promlog.AllowedLevel{}
|
||||
al.Set("debug")
|
||||
remote := remote.NewStorage(promlog.New(al), func() (int64, error) {
|
||||
return 0, nil
|
||||
}, 1*time.Second)
|
||||
|
||||
err = remote.ApplyConfig(&config.Config{
|
||||
RemoteReadConfigs: []*config.RemoteReadConfig{
|
||||
{
|
||||
URL: &config_util.URL{URL: u},
|
||||
RemoteTimeout: model.Duration(1 * time.Second),
|
||||
ReadRecent: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
api := &API{
|
||||
Queryable: remote,
|
||||
QueryEngine: suite.QueryEngine(),
|
||||
targetRetriever: testTargetRetriever{},
|
||||
alertmanagerRetriever: testAlertmanagerRetriever{},
|
||||
now: func() time.Time { return now },
|
||||
config: func() config.Config { return samplePrometheusCfg },
|
||||
flagsMap: sampleFlagMap,
|
||||
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
|
||||
}
|
||||
|
||||
testEndpoints(t, api, false)
|
||||
})
|
||||
}
|
||||
|
||||
func setupRemote(s storage.Storage) *httptest.Server {
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
req, err := remote.DecodeReadRequest(r)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
resp := prompb.ReadResponse{
|
||||
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||
}
|
||||
for i, query := range req.Queries {
|
||||
from, through, matchers, err := remote.FromQuery(query)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
querier, err := s.Querier(r.Context(), from, through)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer querier.Close()
|
||||
|
||||
set, err := querier.Select(nil, matchers...)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
resp.Results[i], err = remote.ToQueryResult(set)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := remote.EncodeReadResponse(&resp, w); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
return httptest.NewServer(handler)
|
||||
}
|
||||
|
||||
func testEndpoints(t *testing.T, api *API, testLabelAPI bool) {
|
||||
|
||||
start := time.Unix(0, 0)
|
||||
|
||||
var tests = []struct {
|
||||
type test struct {
|
||||
endpoint apiFunc
|
||||
params map[string]string
|
||||
query url.Values
|
||||
response interface{}
|
||||
errType errorType
|
||||
}{
|
||||
}
|
||||
|
||||
var tests = []test{
|
||||
{
|
||||
endpoint: api.query,
|
||||
query: url.Values{
|
||||
|
@ -203,7 +300,7 @@ func TestEndpoints(t *testing.T) {
|
|||
ResultType: promql.ValueTypeScalar,
|
||||
Result: promql.Scalar{
|
||||
V: 0.333,
|
||||
T: timestamp.FromTime(now),
|
||||
T: timestamp.FromTime(api.now()),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -309,34 +406,6 @@ func TestEndpoints(t *testing.T) {
|
|||
},
|
||||
errType: errorBadData,
|
||||
},
|
||||
{
|
||||
endpoint: api.labelValues,
|
||||
params: map[string]string{
|
||||
"name": "__name__",
|
||||
},
|
||||
response: []string{
|
||||
"test_metric1",
|
||||
"test_metric2",
|
||||
},
|
||||
},
|
||||
{
|
||||
endpoint: api.labelValues,
|
||||
params: map[string]string{
|
||||
"name": "foo",
|
||||
},
|
||||
response: []string{
|
||||
"bar",
|
||||
"boo",
|
||||
},
|
||||
},
|
||||
// Bad name parameter.
|
||||
{
|
||||
endpoint: api.labelValues,
|
||||
params: map[string]string{
|
||||
"name": "not!!!allowed",
|
||||
},
|
||||
errType: errorBadData,
|
||||
},
|
||||
{
|
||||
endpoint: api.series,
|
||||
query: url.Values{
|
||||
|
@ -500,6 +569,39 @@ func TestEndpoints(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
if testLabelAPI {
|
||||
tests = append(tests, []test{
|
||||
{
|
||||
endpoint: api.labelValues,
|
||||
params: map[string]string{
|
||||
"name": "__name__",
|
||||
},
|
||||
response: []string{
|
||||
"test_metric1",
|
||||
"test_metric2",
|
||||
},
|
||||
},
|
||||
{
|
||||
endpoint: api.labelValues,
|
||||
params: map[string]string{
|
||||
"name": "foo",
|
||||
},
|
||||
response: []string{
|
||||
"bar",
|
||||
"boo",
|
||||
},
|
||||
},
|
||||
// Bad name parameter.
|
||||
{
|
||||
endpoint: api.labelValues,
|
||||
params: map[string]string{
|
||||
"name": "not!!!allowed",
|
||||
},
|
||||
errType: errorBadData,
|
||||
},
|
||||
}...)
|
||||
}
|
||||
|
||||
methods := func(f apiFunc) []string {
|
||||
fp := reflect.ValueOf(f).Pointer()
|
||||
if fp == reflect.ValueOf(api.query).Pointer() || fp == reflect.ValueOf(api.queryRange).Pointer() {
|
||||
|
@ -517,14 +619,14 @@ func TestEndpoints(t *testing.T) {
|
|||
return http.NewRequest(m, fmt.Sprintf("http://example.com?%s", q.Encode()), nil)
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
for i, test := range tests {
|
||||
for _, method := range methods(test.endpoint) {
|
||||
// Build a context with the correct request params.
|
||||
ctx := context.Background()
|
||||
for p, v := range test.params {
|
||||
ctx = route.WithParam(ctx, p, v)
|
||||
}
|
||||
t.Logf("run %s\t%q", method, test.query.Encode())
|
||||
t.Logf("run %d\t%s\t%q", i, method, test.query.Encode())
|
||||
|
||||
req, err := request(method, test.query)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue