mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
522 lines
14 KiB
522 lines
14 KiB
// Copyright 2015 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package consul |
|
|
|
import ( |
|
"context" |
|
"net/http" |
|
"net/http/httptest" |
|
"net/url" |
|
"testing" |
|
"time" |
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
"github.com/prometheus/common/config" |
|
"github.com/prometheus/common/model" |
|
"github.com/prometheus/common/promslog" |
|
"github.com/stretchr/testify/require" |
|
"go.uber.org/goleak" |
|
"gopkg.in/yaml.v2" |
|
|
|
"github.com/prometheus/prometheus/discovery" |
|
"github.com/prometheus/prometheus/discovery/targetgroup" |
|
) |
|
|
|
func TestMain(m *testing.M) { |
|
goleak.VerifyTestMain(m) |
|
} |
|
|
|
// TODO: Add ability to unregister metrics? |
|
func NewTestMetrics(t *testing.T, conf discovery.Config, reg prometheus.Registerer) discovery.DiscovererMetrics { |
|
refreshMetrics := discovery.NewRefreshMetrics(reg) |
|
require.NoError(t, refreshMetrics.Register()) |
|
|
|
metrics := conf.NewDiscovererMetrics(prometheus.NewRegistry(), refreshMetrics) |
|
require.NoError(t, metrics.Register()) |
|
|
|
return metrics |
|
} |
|
|
|
func TestConfiguredService(t *testing.T) { |
|
conf := &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
} |
|
|
|
metrics := NewTestMetrics(t, conf, prometheus.NewRegistry()) |
|
|
|
consulDiscovery, err := NewDiscovery(conf, nil, metrics) |
|
require.NoError(t, err, "when initializing discovery") |
|
require.True(t, consulDiscovery.shouldWatch("configuredServiceName", []string{""}), |
|
"Expected service %s to be watched", "configuredServiceName") |
|
require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}), |
|
"Expected service %s to not be watched", "nonConfiguredServiceName") |
|
} |
|
|
|
func TestConfiguredServiceWithTag(t *testing.T) { |
|
conf := &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http"}, |
|
} |
|
|
|
metrics := NewTestMetrics(t, conf, prometheus.NewRegistry()) |
|
|
|
consulDiscovery, err := NewDiscovery(conf, nil, metrics) |
|
require.NoError(t, err, "when initializing discovery") |
|
require.False(t, consulDiscovery.shouldWatch("configuredServiceName", []string{""}), |
|
"Expected service %s to not be watched without tag", "configuredServiceName") |
|
|
|
require.True(t, consulDiscovery.shouldWatch("configuredServiceName", []string{"http"}), |
|
"Expected service %s to be watched with tag %s", "configuredServiceName", "http") |
|
|
|
require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}), |
|
"Expected service %s to not be watched without tag", "nonConfiguredServiceName") |
|
|
|
require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{"http"}), |
|
"Expected service %s to not be watched with tag %s", "nonConfiguredServiceName", "http") |
|
} |
|
|
|
func TestConfiguredServiceWithTags(t *testing.T) { |
|
type testcase struct { |
|
// What we've configured to watch. |
|
conf *SDConfig |
|
// The service we're checking if we should watch or not. |
|
serviceName string |
|
serviceTags []string |
|
shouldWatch bool |
|
} |
|
|
|
cases := []testcase{ |
|
{ |
|
conf: &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http", "v1"}, |
|
}, |
|
serviceName: "configuredServiceName", |
|
serviceTags: []string{""}, |
|
shouldWatch: false, |
|
}, |
|
{ |
|
conf: &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http", "v1"}, |
|
}, |
|
serviceName: "configuredServiceName", |
|
serviceTags: []string{"http", "v1"}, |
|
shouldWatch: true, |
|
}, |
|
{ |
|
conf: &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http", "v1"}, |
|
}, |
|
serviceName: "nonConfiguredServiceName", |
|
serviceTags: []string{""}, |
|
shouldWatch: false, |
|
}, |
|
{ |
|
conf: &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http", "v1"}, |
|
}, |
|
serviceName: "nonConfiguredServiceName", |
|
serviceTags: []string{"http, v1"}, |
|
shouldWatch: false, |
|
}, |
|
{ |
|
conf: &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http", "v1"}, |
|
}, |
|
serviceName: "configuredServiceName", |
|
serviceTags: []string{"http", "v1", "foo"}, |
|
shouldWatch: true, |
|
}, |
|
{ |
|
conf: &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http", "v1", "foo"}, |
|
}, |
|
serviceName: "configuredServiceName", |
|
serviceTags: []string{"http", "v1", "foo"}, |
|
shouldWatch: true, |
|
}, |
|
{ |
|
conf: &SDConfig{ |
|
Services: []string{"configuredServiceName"}, |
|
ServiceTags: []string{"http", "v1"}, |
|
}, |
|
serviceName: "configuredServiceName", |
|
serviceTags: []string{"http", "v1", "v1"}, |
|
shouldWatch: true, |
|
}, |
|
} |
|
|
|
for _, tc := range cases { |
|
metrics := NewTestMetrics(t, tc.conf, prometheus.NewRegistry()) |
|
|
|
consulDiscovery, err := NewDiscovery(tc.conf, nil, metrics) |
|
require.NoError(t, err, "when initializing discovery") |
|
ret := consulDiscovery.shouldWatch(tc.serviceName, tc.serviceTags) |
|
require.Equal(t, tc.shouldWatch, ret, "Watched service and tags: %s %+v, input was %s %+v", |
|
tc.conf.Services, tc.conf.ServiceTags, tc.serviceName, tc.serviceTags) |
|
} |
|
} |
|
|
|
func TestNonConfiguredService(t *testing.T) { |
|
conf := &SDConfig{} |
|
|
|
metrics := NewTestMetrics(t, conf, prometheus.NewRegistry()) |
|
|
|
consulDiscovery, err := NewDiscovery(conf, nil, metrics) |
|
require.NoError(t, err, "when initializing discovery") |
|
require.True(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}), "Expected service %s to be watched", "nonConfiguredServiceName") |
|
} |
|
|
|
const ( |
|
AgentAnswer = `{"Config": {"Datacenter": "test-dc"}}` |
|
ServiceTestAnswer = ` |
|
[{ |
|
"Node": { |
|
"ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e", |
|
"Node": "node1", |
|
"Address": "1.1.1.1", |
|
"Datacenter": "test-dc", |
|
"TaggedAddresses": { |
|
"lan": "192.168.10.10", |
|
"wan": "10.0.10.10" |
|
}, |
|
"Meta": {"rack_name": "2304"}, |
|
"CreateIndex": 1, |
|
"ModifyIndex": 1 |
|
}, |
|
"Service": { |
|
"ID": "test", |
|
"Service": "test", |
|
"Tags": ["tag1"], |
|
"Address": "", |
|
"Meta": {"version":"1.0.0","environment":"staging"}, |
|
"Port": 3341, |
|
"Weights": { |
|
"Passing": 1, |
|
"Warning": 1 |
|
}, |
|
"EnableTagOverride": false, |
|
"ProxyDestination": "", |
|
"Proxy": {}, |
|
"Connect": {}, |
|
"CreateIndex": 1, |
|
"ModifyIndex": 1 |
|
}, |
|
"Checks": [{ |
|
"Node": "node1", |
|
"CheckID": "serfHealth", |
|
"Name": "Serf Health Status", |
|
"Status": "passing" |
|
}] |
|
}]` |
|
|
|
ServicesTestAnswer = `{"test": ["tag1"], "other": ["tag2"]}` |
|
) |
|
|
|
func newServer(t *testing.T) (*httptest.Server, *SDConfig) { |
|
// github.com/hashicorp/consul/testutil/ would be nice but it needs a local consul binary. |
|
stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
|
response := "" |
|
switch r.URL.String() { |
|
case "/v1/agent/self": |
|
response = AgentAnswer |
|
case "/v1/health/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=120000ms": |
|
response = ServiceTestAnswer |
|
case "/v1/health/service/test?wait=120000ms": |
|
response = ServiceTestAnswer |
|
case "/v1/health/service/other?wait=120000ms": |
|
response = `[]` |
|
case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=120000ms": |
|
response = ServicesTestAnswer |
|
case "/v1/catalog/services?wait=120000ms": |
|
response = ServicesTestAnswer |
|
case "/v1/catalog/services?index=1&node-meta=rack_name%3A2304&stale=&wait=120000ms": |
|
time.Sleep(5 * time.Second) |
|
response = ServicesTestAnswer |
|
case "/v1/catalog/services?index=1&wait=120000ms": |
|
time.Sleep(5 * time.Second) |
|
response = ServicesTestAnswer |
|
case "/v1/catalog/services?filter=NodeMeta.rack_name+%3D%3D+%222304%22&index=1&wait=120000ms": |
|
response = ServicesTestAnswer |
|
default: |
|
t.Errorf("Unhandled consul call: %s", r.URL) |
|
} |
|
w.Header().Add("X-Consul-Index", "1") |
|
w.Write([]byte(response)) |
|
})) |
|
stuburl, err := url.Parse(stub.URL) |
|
require.NoError(t, err) |
|
|
|
config := &SDConfig{ |
|
Server: stuburl.Host, |
|
Token: "fake-token", |
|
RefreshInterval: model.Duration(1 * time.Second), |
|
} |
|
return stub, config |
|
} |
|
|
|
func newDiscovery(t *testing.T, config *SDConfig) *Discovery { |
|
logger := promslog.NewNopLogger() |
|
|
|
metrics := NewTestMetrics(t, config, prometheus.NewRegistry()) |
|
|
|
d, err := NewDiscovery(config, logger, metrics) |
|
require.NoError(t, err) |
|
return d |
|
} |
|
|
|
func checkOneTarget(t *testing.T, tg []*targetgroup.Group) { |
|
require.Len(t, tg, 1) |
|
target := tg[0] |
|
require.Equal(t, "test-dc", string(target.Labels["__meta_consul_dc"])) |
|
require.Equal(t, target.Source, string(target.Labels["__meta_consul_service"])) |
|
if target.Source == "test" { |
|
// test service should have one node. |
|
require.NotEmpty(t, target.Targets, "Test service should have one node") |
|
} |
|
} |
|
|
|
// Watch all the services in the catalog. |
|
func TestAllServices(t *testing.T) { |
|
stub, config := newServer(t) |
|
defer stub.Close() |
|
|
|
d := newDiscovery(t, config) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
ch := make(chan []*targetgroup.Group) |
|
go func() { |
|
d.Run(ctx, ch) |
|
close(ch) |
|
}() |
|
checkOneTarget(t, <-ch) |
|
checkOneTarget(t, <-ch) |
|
cancel() |
|
<-ch |
|
} |
|
|
|
// targetgroup with no targets is emitted if no services were discovered. |
|
func TestNoTargets(t *testing.T) { |
|
stub, config := newServer(t) |
|
defer stub.Close() |
|
config.ServiceTags = []string{"missing"} |
|
|
|
d := newDiscovery(t, config) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
ch := make(chan []*targetgroup.Group) |
|
go func() { |
|
d.Run(ctx, ch) |
|
close(ch) |
|
}() |
|
|
|
targets := (<-ch)[0].Targets |
|
require.Empty(t, targets) |
|
cancel() |
|
<-ch |
|
} |
|
|
|
// Watch only the test service. |
|
func TestOneService(t *testing.T) { |
|
stub, config := newServer(t) |
|
defer stub.Close() |
|
|
|
config.Services = []string{"test"} |
|
d := newDiscovery(t, config) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
ch := make(chan []*targetgroup.Group) |
|
go d.Run(ctx, ch) |
|
checkOneTarget(t, <-ch) |
|
cancel() |
|
} |
|
|
|
// Watch the test service with a specific tag and node-meta. |
|
func TestAllOptions(t *testing.T) { |
|
stub, config := newServer(t) |
|
defer stub.Close() |
|
|
|
config.Services = []string{"test"} |
|
config.NodeMeta = map[string]string{"rack_name": "2304"} |
|
config.ServiceTags = []string{"tag1"} |
|
config.AllowStale = true |
|
config.Token = "fake-token" |
|
|
|
d := newDiscovery(t, config) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
ch := make(chan []*targetgroup.Group) |
|
go func() { |
|
d.Run(ctx, ch) |
|
close(ch) |
|
}() |
|
checkOneTarget(t, <-ch) |
|
cancel() |
|
<-ch |
|
} |
|
|
|
// Watch the test service with a specific tag and node-meta via Filter parameter. |
|
func TestFilterOption(t *testing.T) { |
|
stub, config := newServer(t) |
|
defer stub.Close() |
|
|
|
config.Services = []string{"test"} |
|
config.Filter = `NodeMeta.rack_name == "2304"` |
|
config.Token = "fake-token" |
|
|
|
d := newDiscovery(t, config) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
ch := make(chan []*targetgroup.Group) |
|
go func() { |
|
d.Run(ctx, ch) |
|
close(ch) |
|
}() |
|
checkOneTarget(t, <-ch) |
|
cancel() |
|
} |
|
|
|
func TestGetDatacenterShouldReturnError(t *testing.T) { |
|
for _, tc := range []struct { |
|
handler func(http.ResponseWriter, *http.Request) |
|
errMessage string |
|
}{ |
|
{ |
|
// Define a handler that will return status 500. |
|
handler: func(w http.ResponseWriter, r *http.Request) { |
|
w.WriteHeader(http.StatusInternalServerError) |
|
}, |
|
errMessage: "Unexpected response code: 500 ()", |
|
}, |
|
{ |
|
// Define a handler that will return incorrect response. |
|
handler: func(w http.ResponseWriter, r *http.Request) { |
|
w.Write([]byte(`{"Config": {"Not-Datacenter": "test-dc"}}`)) |
|
}, |
|
errMessage: "invalid value '<nil>' for Config.Datacenter", |
|
}, |
|
} { |
|
stub := httptest.NewServer(http.HandlerFunc(tc.handler)) |
|
stuburl, err := url.Parse(stub.URL) |
|
require.NoError(t, err) |
|
|
|
config := &SDConfig{ |
|
Server: stuburl.Host, |
|
Token: "fake-token", |
|
RefreshInterval: model.Duration(1 * time.Second), |
|
} |
|
defer stub.Close() |
|
d := newDiscovery(t, config) |
|
|
|
// Should be empty if not initialized. |
|
require.Equal(t, "", d.clientDatacenter) |
|
|
|
err = d.getDatacenter() |
|
|
|
// An error should be returned. |
|
require.EqualError(t, err, tc.errMessage) |
|
// Should still be empty. |
|
require.Equal(t, "", d.clientDatacenter) |
|
} |
|
} |
|
|
|
func TestUnmarshalConfig(t *testing.T) { |
|
unmarshal := func(d []byte) func(interface{}) error { |
|
return func(o interface{}) error { |
|
return yaml.Unmarshal(d, o) |
|
} |
|
} |
|
|
|
goodConfig := DefaultSDConfig |
|
goodConfig.Username = "123" |
|
goodConfig.Password = "1234" |
|
goodConfig.HTTPClientConfig = config.HTTPClientConfig{ |
|
BasicAuth: &config.BasicAuth{ |
|
Username: "123", |
|
Password: "1234", |
|
}, |
|
FollowRedirects: true, |
|
EnableHTTP2: true, |
|
} |
|
|
|
cases := []struct { |
|
name string |
|
config string |
|
expected SDConfig |
|
errMessage string |
|
}{ |
|
{ |
|
name: "good", |
|
config: ` |
|
server: localhost:8500 |
|
username: 123 |
|
password: 1234 |
|
`, |
|
expected: goodConfig, |
|
}, |
|
{ |
|
name: "username and password and basic auth configured", |
|
config: ` |
|
server: localhost:8500 |
|
username: 123 |
|
password: 1234 |
|
basic_auth: |
|
username: 12345 |
|
password: 123456 |
|
`, |
|
errMessage: "at most one of consul SD configuration username and password and basic auth can be configured", |
|
}, |
|
{ |
|
name: "token and authorization configured", |
|
config: ` |
|
server: localhost:8500 |
|
token: 1234567 |
|
authorization: |
|
credentials: 12345678 |
|
`, |
|
errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured", |
|
}, |
|
{ |
|
name: "token and oauth2 configured", |
|
config: ` |
|
server: localhost:8500 |
|
token: 1234567 |
|
oauth2: |
|
client_id: 10 |
|
client_secret: 11 |
|
token_url: http://example.com |
|
`, |
|
errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured", |
|
}, |
|
} |
|
|
|
for _, test := range cases { |
|
t.Run(test.name, func(t *testing.T) { |
|
var config SDConfig |
|
err := config.UnmarshalYAML(unmarshal([]byte(test.config))) |
|
if err != nil { |
|
require.EqualError(t, err, test.errMessage) |
|
return |
|
} |
|
require.Empty(t, test.errMessage, "Expected error.") |
|
|
|
require.Equal(t, test.expected, config) |
|
}) |
|
} |
|
}
|
|
|