Browse Source

Use log.Logger interface for all discovery services

pull/2667/head
Chris Goller 8 years ago
parent
commit
42de0ae013
  1. 4
      cmd/prometheus/main.go
  2. 22
      discovery/azure/azure.go
  3. 12
      discovery/consul/consul.go
  4. 5
      discovery/consul/consul_test.go
  5. 34
      discovery/discovery.go
  6. 5
      discovery/discovery_test.go
  7. 14
      discovery/dns/dns.go
  8. 8
      discovery/ec2/ec2.go
  9. 20
      discovery/file/file.go
  10. 3
      discovery/file/file_test.go
  11. 8
      discovery/gce/gce.go
  12. 8
      discovery/marathon/marathon.go
  13. 7
      discovery/marathon/marathon_test.go
  14. 14
      discovery/zookeeper/zookeeper.go
  15. 28
      notifier/notifier.go
  16. 15
      notifier/notifier_test.go
  17. 12
      retrieval/targetmanager.go

4
cmd/prometheus/main.go

@ -117,8 +117,8 @@ func Main() int {
}
var (
notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender)
notifier = notifier.New(&cfg.notifier, log.Base())
targetManager = retrieval.NewTargetManager(sampleAppender, log.Base())
queryEngine = promql.NewEngine(queryable, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
)

22
discovery/azure/azure.go

@ -66,14 +66,16 @@ type Discovery struct {
cfg *config.AzureSDConfig
interval time.Duration
port int
logger log.Logger
}
// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
func NewDiscovery(cfg *config.AzureSDConfig) *Discovery {
func NewDiscovery(cfg *config.AzureSDConfig, logger log.Logger) *Discovery {
return &Discovery{
cfg: cfg,
interval: time.Duration(cfg.RefreshInterval),
port: cfg.Port,
logger: logger,
}
}
@ -91,7 +93,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
tg, err := d.refresh()
if err != nil {
log.Errorf("unable to refresh during Azure discovery: %s", err)
d.logger.Errorf("unable to refresh during Azure discovery: %s", err)
} else {
select {
case <-ctx.Done():
@ -141,13 +143,13 @@ type azureResource struct {
}
// Create a new azureResource object from an ID string.
func newAzureResourceFromID(id string) (azureResource, error) {
func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error) {
// Resource IDs have the following format.
// /subscriptions/SUBSCRIPTION_ID/resourceGroups/RESOURCE_GROUP/providers/PROVIDER/TYPE/NAME
s := strings.Split(id, "/")
if len(s) != 9 {
err := fmt.Errorf("invalid ID '%s'. Refusing to create azureResource", id)
log.Error(err)
logger.Error(err)
return azureResource{}, err
}
return azureResource{
@ -185,7 +187,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
}
machines = append(machines, *result.Value...)
}
log.Debugf("Found %d virtual machines during Azure discovery.", len(machines))
d.logger.Debugf("Found %d virtual machines during Azure discovery.", len(machines))
// We have the slice of machines. Now turn them into targets.
// Doing them in go routines because the network interface calls are slow.
@ -197,7 +199,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
ch := make(chan target, len(machines))
for i, vm := range machines {
go func(i int, vm compute.VirtualMachine) {
r, err := newAzureResourceFromID(*vm.ID)
r, err := newAzureResourceFromID(*vm.ID, d.logger)
if err != nil {
ch <- target{labelSet: nil, err: err}
return
@ -219,14 +221,14 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
// Get the IP address information via separate call to the network provider.
for _, nic := range *vm.Properties.NetworkProfile.NetworkInterfaces {
r, err := newAzureResourceFromID(*nic.ID)
r, err := newAzureResourceFromID(*nic.ID, d.logger)
if err != nil {
ch <- target{labelSet: nil, err: err}
return
}
networkInterface, err := client.nic.Get(r.ResourceGroup, r.Name, "")
if err != nil {
log.Errorf("Unable to get network interface %s: %s", r.Name, err)
d.logger.Errorf("Unable to get network interface %s: %s", r.Name, err)
ch <- target{labelSet: nil, err: err}
// Get out of this routine because we cannot continue without a network interface.
return
@ -237,7 +239,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
// yet support this. On deallocated machines, this value happens to be nil so it
// is a cheap and easy way to determine if a machine is allocated or not.
if networkInterface.Properties.Primary == nil {
log.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name)
d.logger.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name)
ch <- target{}
return
}
@ -272,6 +274,6 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
}
}
log.Debugf("Azure discovery completed.")
d.logger.Debugf("Azure discovery completed.")
return tg, nil
}

12
discovery/consul/consul.go

@ -89,10 +89,11 @@ type Discovery struct {
clientDatacenter string
tagSeparator string
watchedServices []string // Set of services which will be discovered.
logger log.Logger
}
// NewDiscovery returns a new Discovery for the given config.
func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) {
func NewDiscovery(conf *config.ConsulSDConfig, logger log.Logger) (*Discovery, error) {
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
@ -121,6 +122,7 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) {
tagSeparator: conf.TagSeparator,
watchedServices: conf.Services,
clientDatacenter: clientConf.Datacenter,
logger: logger,
}
return cd, nil
}
@ -163,7 +165,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}
if err != nil {
log.Errorf("Error refreshing service list: %s", err)
d.logger.Errorf("Error refreshing service list: %s", err)
rpcFailuresCount.Inc()
time.Sleep(retryInterval)
continue
@ -179,7 +181,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
if d.clientDatacenter == "" {
info, err := d.client.Agent().Self()
if err != nil {
log.Errorf("Error retrieving datacenter name: %s", err)
d.logger.Errorf("Error retrieving datacenter name: %s", err)
time.Sleep(retryInterval)
continue
}
@ -203,6 +205,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
datacenterLabel: model.LabelValue(d.clientDatacenter),
},
tagSeparator: d.tagSeparator,
logger: d.logger,
}
wctx, cancel := context.WithCancel(ctx)
@ -235,6 +238,7 @@ type consulService struct {
labels model.LabelSet
client *consul.Client
tagSeparator string
logger log.Logger
}
func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) {
@ -258,7 +262,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetG
}
if err != nil {
log.Errorf("Error refreshing service %s: %s", srv.name, err)
srv.logger.Errorf("Error refreshing service %s: %s", srv.name, err)
rpcFailuresCount.Inc()
time.Sleep(retryInterval)
continue

5
discovery/consul/consul_test.go

@ -16,13 +16,14 @@ package consul
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config"
)
func TestConfiguredService(t *testing.T) {
conf := &config.ConsulSDConfig{
Services: []string{"configuredServiceName"}}
consulDiscovery, err := NewDiscovery(conf)
consulDiscovery, err := NewDiscovery(conf, log.Base())
if err != nil {
t.Errorf("Unexpected error when initialising discovery %v", err)
@ -37,7 +38,7 @@ func TestConfiguredService(t *testing.T) {
func TestNonConfiguredService(t *testing.T) {
conf := &config.ConsulSDConfig{}
consulDiscovery, err := NewDiscovery(conf)
consulDiscovery, err := NewDiscovery(conf, log.Base())
if err != nil {
t.Errorf("Unexpected error when initialising discovery %v", err)

34
discovery/discovery.go

@ -50,7 +50,7 @@ type TargetProvider interface {
}
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetProvider {
func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider {
providers := map[string]TargetProvider{}
app := func(mech string, i int, tp TargetProvider) {
@ -58,59 +58,59 @@ func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetPro
}
for i, c := range cfg.DNSSDConfigs {
app("dns", i, dns.NewDiscovery(c))
app("dns", i, dns.NewDiscovery(c, logger))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, file.NewDiscovery(c))
app("file", i, file.NewDiscovery(c, logger))
}
for i, c := range cfg.ConsulSDConfigs {
k, err := consul.NewDiscovery(c)
k, err := consul.NewDiscovery(c, logger)
if err != nil {
log.Errorf("Cannot create Consul discovery: %s", err)
logger.Errorf("Cannot create Consul discovery: %s", err)
continue
}
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
m, err := marathon.NewDiscovery(c)
m, err := marathon.NewDiscovery(c, logger)
if err != nil {
log.Errorf("Cannot create Marathon discovery: %s", err)
logger.Errorf("Cannot create Marathon discovery: %s", err)
continue
}
app("marathon", i, m)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := kubernetes.New(log.Base(), c)
k, err := kubernetes.New(logger, c)
if err != nil {
log.Errorf("Cannot create Kubernetes discovery: %s", err)
logger.Errorf("Cannot create Kubernetes discovery: %s", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, zookeeper.NewServersetDiscovery(c))
app("serverset", i, zookeeper.NewServersetDiscovery(c, logger))
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, zookeeper.NewNerveDiscovery(c))
app("nerve", i, zookeeper.NewNerveDiscovery(c, logger))
}
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, ec2.NewDiscovery(c))
app("ec2", i, ec2.NewDiscovery(c, logger))
}
for i, c := range cfg.GCESDConfigs {
gced, err := gce.NewDiscovery(c)
gced, err := gce.NewDiscovery(c, logger)
if err != nil {
log.Errorf("Cannot initialize GCE discovery: %s", err)
logger.Errorf("Cannot initialize GCE discovery: %s", err)
continue
}
app("gce", i, gced)
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, azure.NewDiscovery(c))
app("azure", i, azure.NewDiscovery(c, logger))
}
for i, c := range cfg.TritonSDConfigs {
t, err := triton.New(log.With("sd", "triton"), c)
t, err := triton.New(logger.With("sd", "triton"), c)
if err != nil {
log.Errorf("Cannot create Triton discovery: %s", err)
logger.Errorf("Cannot create Triton discovery: %s", err)
continue
}
app("triton", i, t)

5
discovery/discovery_test.go

@ -16,6 +16,7 @@ package discovery
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config"
"golang.org/x/net/context"
yaml "gopkg.in/yaml.v2"
@ -53,7 +54,7 @@ static_configs:
go ts.Run(ctx)
ts.UpdateProviders(ProvidersFromConfig(*cfg))
ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base()))
<-called
verifyPresence(ts.tgroups, "static/0/0", true)
@ -67,7 +68,7 @@ static_configs:
t.Fatalf("Unable to load YAML config sTwo: %s", err)
}
ts.UpdateProviders(ProvidersFromConfig(*cfg))
ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base()))
<-called
verifyPresence(ts.tgroups, "static/0/0", true)

14
discovery/dns/dns.go

@ -66,10 +66,11 @@ type Discovery struct {
interval time.Duration
port int
qtype uint16
logger log.Logger
}
// NewDiscovery returns a new Discovery which periodically refreshes its targets.
func NewDiscovery(conf *config.DNSSDConfig) *Discovery {
func NewDiscovery(conf *config.DNSSDConfig, logger log.Logger) *Discovery {
qtype := dns.TypeSRV
switch strings.ToUpper(conf.Type) {
case "A":
@ -84,6 +85,7 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery {
interval: time.Duration(conf.RefreshInterval),
qtype: qtype,
port: conf.Port,
logger: logger,
}
}
@ -112,7 +114,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr
for _, name := range d.names {
go func(n string) {
if err := d.refresh(ctx, n, ch); err != nil {
log.Errorf("Error refreshing DNS targets: %s", err)
d.logger.Errorf("Error refreshing DNS targets: %s", err)
}
wg.Done()
}(name)
@ -122,7 +124,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr
}
func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error {
response, err := lookupAll(name, d.qtype)
response, err := lookupAll(name, d.qtype, d.logger)
dnsSDLookupsCount.Inc()
if err != nil {
dnsSDLookupFailuresCount.Inc()
@ -147,7 +149,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi
case *dns.AAAA:
target = hostPort(addr.AAAA.String(), d.port)
default:
log.Warnf("%q is not a valid SRV record", record)
d.logger.Warnf("%q is not a valid SRV record", record)
continue
}
@ -167,7 +169,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi
return nil
}
func lookupAll(name string, qtype uint16) (*dns.Msg, error) {
func lookupAll(name string, qtype uint16, logger log.Logger) (*dns.Msg, error) {
conf, err := dns.ClientConfigFromFile(resolvConf)
if err != nil {
return nil, fmt.Errorf("could not load resolv.conf: %s", err)
@ -181,7 +183,7 @@ func lookupAll(name string, qtype uint16) (*dns.Msg, error) {
for _, lname := range conf.NameList(name) {
response, err = lookup(lname, qtype, client, servAddr, false)
if err != nil {
log.
logger.
With("server", server).
With("name", name).
With("reason", err).

8
discovery/ec2/ec2.go

@ -72,10 +72,11 @@ type Discovery struct {
interval time.Duration
profile string
port int
logger log.Logger
}
// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets.
func NewDiscovery(conf *config.EC2SDConfig) *Discovery {
func NewDiscovery(conf *config.EC2SDConfig, logger log.Logger) *Discovery {
creds := credentials.NewStaticCredentials(conf.AccessKey, string(conf.SecretKey), "")
if conf.AccessKey == "" && conf.SecretKey == "" {
creds = nil
@ -88,6 +89,7 @@ func NewDiscovery(conf *config.EC2SDConfig) *Discovery {
profile: conf.Profile,
interval: time.Duration(conf.RefreshInterval),
port: conf.Port,
logger: logger,
}
}
@ -99,7 +101,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Get an initial set right away.
tg, err := d.refresh()
if err != nil {
log.Error(err)
d.logger.Error(err)
} else {
select {
case ch <- []*config.TargetGroup{tg}:
@ -113,7 +115,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case <-ticker.C:
tg, err := d.refresh()
if err != nil {
log.Error(err)
d.logger.Error(err)
continue
}

20
discovery/file/file.go

@ -63,13 +63,15 @@ type Discovery struct {
// and how many target groups they contained.
// This is used to detect deleted target groups.
lastRefresh map[string]int
logger log.Logger
}
// NewDiscovery returns a new file discovery for the given paths.
func NewDiscovery(conf *config.FileSDConfig) *Discovery {
func NewDiscovery(conf *config.FileSDConfig, logger log.Logger) *Discovery {
return &Discovery{
paths: conf.Files,
interval: time.Duration(conf.RefreshInterval),
logger: logger,
}
}
@ -79,7 +81,7 @@ func (d *Discovery) listFiles() []string {
for _, p := range d.paths {
files, err := filepath.Glob(p)
if err != nil {
log.Errorf("Error expanding glob %q: %s", p, err)
d.logger.Errorf("Error expanding glob %q: %s", p, err)
continue
}
paths = append(paths, files...)
@ -100,7 +102,7 @@ func (d *Discovery) watchFiles() {
p = "./"
}
if err := d.watcher.Add(p); err != nil {
log.Errorf("Error adding file watch for %q: %s", p, err)
d.logger.Errorf("Error adding file watch for %q: %s", p, err)
}
}
}
@ -111,7 +113,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Errorf("Error creating file watcher: %s", err)
d.logger.Errorf("Error creating file watcher: %s", err)
return
}
d.watcher = watcher
@ -149,7 +151,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case err := <-d.watcher.Errors:
if err != nil {
log.Errorf("Error on file watch: %s", err)
d.logger.Errorf("Error on file watch: %s", err)
}
}
}
@ -157,7 +159,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// stop shuts down the file watcher.
func (d *Discovery) stop() {
log.Debugf("Stopping file discovery for %s...", d.paths)
d.logger.Debugf("Stopping file discovery for %s...", d.paths)
done := make(chan struct{})
defer close(done)
@ -175,10 +177,10 @@ func (d *Discovery) stop() {
}
}()
if err := d.watcher.Close(); err != nil {
log.Errorf("Error closing file watcher for %s: %s", d.paths, err)
d.logger.Errorf("Error closing file watcher for %s: %s", d.paths, err)
}
log.Debugf("File discovery for %s stopped.", d.paths)
d.logger.Debugf("File discovery for %s stopped.", d.paths)
}
// refresh reads all files matching the discovery's patterns and sends the respective
@ -194,7 +196,7 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup
tgroups, err := readFile(p)
if err != nil {
fileSDReadErrorsCount.Inc()
log.Errorf("Error reading file %q: %s", p, err)
d.logger.Errorf("Error reading file %q: %s", p, err)
// Prevent deletion down below.
ref[p] = d.lastRefresh[p]
continue

3
discovery/file/file_test.go

@ -20,6 +20,7 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -41,7 +42,7 @@ func testFileSD(t *testing.T, ext string) {
conf.RefreshInterval = model.Duration(1 * time.Hour)
var (
fsd = NewDiscovery(&conf)
fsd = NewDiscovery(&conf, log.Base())
ch = make(chan []*config.TargetGroup)
ctx, cancel = context.WithCancel(context.Background())
)

8
discovery/gce/gce.go

@ -76,10 +76,11 @@ type Discovery struct {
interval time.Duration
port int
tagSeparator string
logger log.Logger
}
// NewDiscovery returns a new Discovery which periodically refreshes its targets.
func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) {
func NewDiscovery(conf *config.GCESDConfig, logger log.Logger) (*Discovery, error) {
gd := &Discovery{
project: conf.Project,
zone: conf.Zone,
@ -87,6 +88,7 @@ func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) {
interval: time.Duration(conf.RefreshInterval),
port: conf.Port,
tagSeparator: conf.TagSeparator,
logger: logger,
}
var err error
gd.client, err = google.DefaultClient(oauth2.NoContext, compute.ComputeReadonlyScope)
@ -106,7 +108,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Get an initial set right away.
tg, err := d.refresh()
if err != nil {
log.Error(err)
d.logger.Error(err)
} else {
select {
case ch <- []*config.TargetGroup{tg}:
@ -122,7 +124,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case <-ticker.C:
tg, err := d.refresh()
if err != nil {
log.Error(err)
d.logger.Error(err)
continue
}
select {

8
discovery/marathon/marathon.go

@ -85,10 +85,11 @@ type Discovery struct {
lastRefresh map[string]*config.TargetGroup
appsClient AppListClient
token string
logger log.Logger
}
// NewDiscovery returns a new Marathon Discovery.
func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
func NewDiscovery(conf *config.MarathonSDConfig, logger log.Logger) (*Discovery, error) {
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
@ -116,6 +117,7 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
refreshInterval: time.Duration(conf.RefreshInterval),
appsClient: fetchApps,
token: token,
logger: logger,
}, nil
}
@ -128,7 +130,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case <-time.After(d.refreshInterval):
err := d.updateServices(ctx, ch)
if err != nil {
log.Errorf("Error while updating services: %s", err)
d.logger.Errorf("Error while updating services: %s", err)
}
}
}
@ -167,7 +169,7 @@ func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Targ
case <-ctx.Done():
return ctx.Err()
case ch <- []*config.TargetGroup{{Source: source}}:
log.Debugf("Removing group for %s", source)
d.logger.Debugf("Removing group for %s", source)
}
}
}

7
discovery/marathon/marathon_test.go

@ -19,6 +19,7 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -32,7 +33,7 @@ var (
)
func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error {
md, err := NewDiscovery(&conf)
md, err := NewDiscovery(&conf, log.Base())
if err != nil {
return err
}
@ -140,7 +141,7 @@ func TestMarathonSDSendGroup(t *testing.T) {
func TestMarathonSDRemoveApp(t *testing.T) {
var ch = make(chan []*config.TargetGroup, 1)
md, err := NewDiscovery(&conf)
md, err := NewDiscovery(&conf, log.Base())
if err != nil {
t.Fatalf("%s", err)
}
@ -176,7 +177,7 @@ func TestMarathonSDRunAndStop(t *testing.T) {
ch = make(chan []*config.TargetGroup)
doneCh = make(chan error)
)
md, err := NewDiscovery(&conf)
md, err := NewDiscovery(&conf, log.Base())
if err != nil {
t.Fatalf("%s", err)
}

14
discovery/zookeeper/zookeeper.go

@ -24,6 +24,7 @@ import (
"github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/prometheus/util/treecache"
@ -39,17 +40,18 @@ type Discovery struct {
updates chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache
parse func(data []byte, path string) (model.LabelSet, error)
parse func(data []byte, path string) (model.LabelSet, error)
logger log.Logger
}
// NewNerveDiscovery returns a new Discovery for the given Nerve config.
func NewNerveDiscovery(conf *config.NerveSDConfig) *Discovery {
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember)
func NewNerveDiscovery(conf *config.NerveSDConfig, logger log.Logger) *Discovery {
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember)
}
// NewServersetDiscovery returns a new Discovery for the given serverset config.
func NewServersetDiscovery(conf *config.ServersetSDConfig) *Discovery {
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember)
func NewServersetDiscovery(conf *config.ServersetSDConfig, logger log.Logger) *Discovery {
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember)
}
// NewDiscovery returns a new discovery along Zookeeper parses with
@ -58,6 +60,7 @@ func NewDiscovery(
srvs []string,
timeout time.Duration,
paths []string,
logger log.Logger,
pf func(data []byte, path string) (model.LabelSet, error),
) *Discovery {
conn, _, err := zk.Connect(srvs, timeout)
@ -71,6 +74,7 @@ func NewDiscovery(
updates: updates,
sources: map[string]*config.TargetGroup{},
parse: pf,
logger: logger,
}
for _, path := range paths {
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates))

28
notifier/notifier.go

@ -65,6 +65,7 @@ type Notifier struct {
alertmanagers []*alertmanagerSet
cancelDiscovery func()
logger log.Logger
}
// Options are the configurable parameters of a Handler.
@ -156,7 +157,7 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag
}
// New constructs a new Notifier.
func New(o *Options) *Notifier {
func New(o *Options, logger log.Logger) *Notifier {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
@ -169,6 +170,7 @@ func New(o *Options) *Notifier {
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
logger: logger,
}
queueLenFunc := func() float64 { return float64(n.queueLen()) }
@ -189,7 +191,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
ctx, cancel := context.WithCancel(n.ctx)
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
ams, err := newAlertmanagerSet(cfg)
ams, err := newAlertmanagerSet(cfg, n.logger)
if err != nil {
return err
}
@ -203,7 +205,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
// old ones.
for _, ams := range amSets {
go ams.ts.Run(ctx)
ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig))
ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger))
}
if n.cancelDiscovery != nil {
n.cancelDiscovery()
@ -283,7 +285,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
alerts = alerts[d:]
log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
n.logger.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
n.metrics.dropped.Add(float64(d))
}
@ -292,7 +294,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]
log.Warnf("Alert notification queue full, dropping %d alerts", d)
n.logger.Warnf("Alert notification queue full, dropping %d alerts", d)
n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
@ -349,7 +351,7 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
b, err := json.Marshal(alerts)
if err != nil {
log.Errorf("Encoding alerts failed: %s", err)
n.logger.Errorf("Encoding alerts failed: %s", err)
return false
}
@ -374,7 +376,7 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
u := am.url().String()
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
n.logger.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
n.metrics.errors.WithLabelValues(u).Inc()
} else {
atomic.AddUint64(&numSuccess, 1)
@ -413,7 +415,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []
// Stop shuts down the notification handler.
func (n *Notifier) Stop() {
log.Info("Stopping notification handler...")
n.logger.Info("Stopping notification handler...")
n.cancel()
}
@ -443,11 +445,12 @@ type alertmanagerSet struct {
metrics *alertMetrics
mtx sync.RWMutex
ams []alertmanager
mtx sync.RWMutex
ams []alertmanager
logger log.Logger
}
func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error) {
func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*alertmanagerSet, error) {
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
return nil, err
@ -455,6 +458,7 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error
s := &alertmanagerSet{
client: client,
cfg: cfg,
logger: logger,
}
s.ts = discovery.NewTargetSet(s)
@ -469,7 +473,7 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) {
for _, tg := range tgs {
ams, err := alertmanagerFromGroup(tg, s.cfg)
if err != nil {
log.With("err", err).Error("generating discovered Alertmanagers failed")
s.logger.With("err", err).Error("generating discovered Alertmanagers failed")
continue
}
all = append(all, ams...)

15
notifier/notifier_test.go

@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
@ -63,7 +64,7 @@ func TestPostPath(t *testing.T) {
}
func TestHandlerNextBatch(t *testing.T) {
h := New(&Options{})
h := New(&Options{}, log.Base())
for i := range make([]struct{}, 2*maxBatchSize+1) {
h.queue = append(h.queue, &model.Alert{
@ -150,7 +151,7 @@ func TestHandlerSendAll(t *testing.T) {
defer server1.Close()
defer server2.Close()
h := New(&Options{})
h := New(&Options{}, log.Base())
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
@ -217,7 +218,7 @@ func TestCustomDo(t *testing.T) {
Body: ioutil.NopCloser(nil),
}, nil
},
})
}, log.Base())
h.sendOne(context.Background(), nil, testURL, []byte(testBody))
@ -239,7 +240,7 @@ func TestExternalLabels(t *testing.T) {
Replacement: "c",
},
},
})
}, log.Base())
// This alert should get the external label attached.
h.Send(&model.Alert{
@ -293,7 +294,7 @@ func TestHandlerRelabel(t *testing.T) {
Replacement: "renamed",
},
},
})
}, log.Base())
// This alert should be dropped due to the configuration
h.Send(&model.Alert{
@ -347,7 +348,9 @@ func TestHandlerQueueing(t *testing.T) {
h := New(&Options{
QueueCapacity: 3 * maxBatchSize,
})
},
log.Base(),
)
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{

12
retrieval/targetmanager.go

@ -38,6 +38,7 @@ type TargetManager struct {
// Set of unqiue targets by scrape configuration.
targetSets map[string]*targetSet
logger log.Logger
}
type targetSet struct {
@ -49,16 +50,17 @@ type targetSet struct {
}
// NewTargetManager creates a new TargetManager.
func NewTargetManager(app storage.SampleAppender) *TargetManager {
func NewTargetManager(app storage.SampleAppender, logger log.Logger) *TargetManager {
return &TargetManager{
appender: app,
targetSets: map[string]*targetSet{},
logger: logger,
}
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
log.Info("Starting target manager...")
tm.logger.Info("Starting target manager...")
tm.mtx.Lock()
@ -72,7 +74,7 @@ func (tm *TargetManager) Run() {
// Stop all background processing.
func (tm *TargetManager) Stop() {
log.Infoln("Stopping target manager...")
tm.logger.Infoln("Stopping target manager...")
tm.mtx.Lock()
// Cancel the base context, this will cause all target providers to shut down
@ -84,7 +86,7 @@ func (tm *TargetManager) Stop() {
// Wait for all scrape inserts to complete.
tm.wg.Wait()
log.Debugln("Target manager stopped")
tm.logger.Debugln("Target manager stopped")
}
func (tm *TargetManager) reload() {
@ -118,7 +120,7 @@ func (tm *TargetManager) reload() {
} else {
ts.sp.reload(scfg)
}
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig))
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger))
}
// Remove old target sets. Waiting for scrape pools to complete pending

Loading…
Cancel
Save