From 1fe3d3b06bffb60068449ca350e93484f1a16ca4 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Tue, 11 Jun 2013 17:54:58 +0200 Subject: [PATCH 1/2] Remove obsolete argument from target handling code. --- retrieval/targetmanager.go | 16 ++++++++-------- retrieval/targetmanager_test.go | 6 +++--- web/api/targets.go | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 8ccb40d78..f2979c6a7 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -24,8 +24,8 @@ import ( type TargetManager interface { acquire() release() - AddTarget(job config.JobConfig, t Target, defaultScrapeInterval time.Duration) - ReplaceTargets(job config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration) + AddTarget(job config.JobConfig, t Target) + ReplaceTargets(job config.JobConfig, newTargets []Target) Remove(t Target) AddTargetsFromConfig(config config.Config) Pools() map[string]*TargetPool @@ -53,7 +53,7 @@ func (m *targetManager) release() { <-m.requestAllowance } -func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInterval time.Duration) *TargetPool { +func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { targetPool, ok := m.poolsByJob[job.GetName()] if !ok { @@ -69,14 +69,14 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInte return targetPool } -func (m *targetManager) AddTarget(job config.JobConfig, t Target, defaultScrapeInterval time.Duration) { - targetPool := m.TargetPoolForJob(job, defaultScrapeInterval) +func (m *targetManager) AddTarget(job config.JobConfig, t Target) { + targetPool := m.TargetPoolForJob(job) targetPool.AddTarget(t) m.poolsByJob[job.GetName()] = targetPool } -func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration) { - targetPool := m.TargetPoolForJob(job, defaultScrapeInterval) +func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) { + targetPool := m.TargetPoolForJob(job) targetPool.replaceTargets(newTargets) } @@ -98,7 +98,7 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) { for _, endpoint := range targetGroup.Target { target := NewTarget(endpoint, time.Second*5, baseLabels) - m.AddTarget(job, target, config.ScrapeInterval()) + m.AddTarget(job, target) } } } diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 0437ef30d..59845e830 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -95,15 +95,15 @@ func testTargetManager(t test.Tester) { interval: time.Minute, } - targetManager.AddTarget(testJob1, target1GroupA, 0) - targetManager.AddTarget(testJob1, target2GroupA, 0) + targetManager.AddTarget(testJob1, target1GroupA) + targetManager.AddTarget(testJob1, target2GroupA) target1GroupB := &fakeTarget{ schedules: []time.Time{time.Now()}, interval: time.Minute * 2, } - targetManager.AddTarget(testJob2, target1GroupB, 0) + targetManager.AddTarget(testJob2, target1GroupB) } func TestTargetManager(t *testing.T) { diff --git a/web/api/targets.go b/web/api/targets.go index f83a76caf..7e4df43be 100644 --- a/web/api/targets.go +++ b/web/api/targets.go @@ -52,5 +52,5 @@ func (serv MetricsService) SetTargets(targetGroups []TargetGroup, jobName string // BUG(julius): Validate that this ScrapeInterval is in fact the proper one // for the job. - serv.TargetManager.ReplaceTargets(*job, newTargets, serv.Config.ScrapeInterval()) + serv.TargetManager.ReplaceTargets(*job, newTargets) } From d9b4f98b44826c30817f793a9111f7aca710ced5 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Tue, 11 Jun 2013 22:59:27 +0200 Subject: [PATCH 2/2] Integrate DNS-SD support for discovering job targets. --- config/Makefile | 2 +- config/config.go | 6 ++ config/config.proto | 14 ++- config/config_test.go | 7 ++ .../mixing_sd_and_manual_targets.conf.input | 7 ++ config/fixtures/sd_targets.conf.input | 4 + config/generated/config.pb.go | 35 +++++++- retrieval/target_provider.go | 85 +++++++++++++++++++ retrieval/targetmanager.go | 12 ++- retrieval/targetpool.go | 14 ++- 10 files changed, 177 insertions(+), 9 deletions(-) create mode 100644 config/fixtures/mixing_sd_and_manual_targets.conf.input create mode 100644 config/fixtures/sd_targets.conf.input create mode 100644 retrieval/target_provider.go diff --git a/config/Makefile b/config/Makefile index 7eca91f58..bcc2c168a 100644 --- a/config/Makefile +++ b/config/Makefile @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -all: generated +all: generated/config.pb.go SUFFIXES: diff --git a/config/config.go b/config/config.go index 747c1f2db..75b946620 100644 --- a/config/config.go +++ b/config/config.go @@ -72,11 +72,17 @@ func (c Config) Validate() error { if _, err := utility.StringToDuration(job.GetScrapeInterval()); err != nil { return fmt.Errorf("Invalid scrape interval for job '%s': %s", job.GetName(), err) } + if _, err := utility.StringToDuration(job.GetSdRefreshInterval()); err != nil { + return fmt.Errorf("Invalid SD refresh interval for job '%s': %s", job.GetName(), err) + } for _, targetGroup := range job.TargetGroup { if err := c.validateLabels(targetGroup.Labels); err != nil { return fmt.Errorf("Invalid labels for job '%s': %s", job.GetName(), err) } } + if job.SdName != nil && len(job.TargetGroup) > 0 { + return fmt.Errorf("Specified both DNS-SD name and target group for job: %s", job.GetName()) + } } return nil diff --git a/config/config.proto b/config/config.proto index b49695ab1..e282801a8 100644 --- a/config/config.proto +++ b/config/config.proto @@ -55,8 +55,18 @@ message JobConfig { // How frequently to scrape targets from this job. Overrides the global // default. optional string scrape_interval = 2; - // List of labeled target groups for this job. - repeated TargetGroup target_group = 3; + // The DNS-SD service name pointing to SRV records containing endpoint + // information for a job. When this field is provided, no target_group + // elements may be set. + optional string sd_name = 3; + // Discovery refresh period when using DNS-SD to discover targets. Must be a + // valid Prometheus duration string in the form "[0-9]+[smhdwy]". + optional string sd_refresh_interval = 4 [default = "30s"]; + // List of labeled target groups for this job. Only legal when DNS-SD isn't + // used for a job. + repeated TargetGroup target_group = 5; + // The HTTP resource path to fetch metrics from on targets. + optional string metrics_path = 6 [default = "/metrics.json"]; } // The top-level Prometheus configuration. diff --git a/config/config_test.go b/config/config_test.go index 3c8422a36..79407f550 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -32,6 +32,8 @@ var configTests = []struct { inputFile: "sample.conf.input", }, { inputFile: "empty.conf.input", + }, { + inputFile: "sd_targets.conf.input", }, { inputFile: "invalid_proto_format.conf.input", @@ -53,6 +55,11 @@ var configTests = []struct { shouldFail: true, errContains: "Invalid label name", }, + { + inputFile: "mixing_sd_and_manual_targets.conf.input", + shouldFail: true, + errContains: "Specified both DNS-SD name and target group", + }, } func TestConfigs(t *testing.T) { diff --git a/config/fixtures/mixing_sd_and_manual_targets.conf.input b/config/fixtures/mixing_sd_and_manual_targets.conf.input new file mode 100644 index 000000000..919beb0c5 --- /dev/null +++ b/config/fixtures/mixing_sd_and_manual_targets.conf.input @@ -0,0 +1,7 @@ +job: < + name: "testjob" + sd_name: "sd_name" + target_group: < + target: "http://sampletarget:8080/metrics.json" + > +> diff --git a/config/fixtures/sd_targets.conf.input b/config/fixtures/sd_targets.conf.input new file mode 100644 index 000000000..ffded895f --- /dev/null +++ b/config/fixtures/sd_targets.conf.input @@ -0,0 +1,4 @@ +job: < + name: "testjob" + sd_name: "sd_name" +> diff --git a/config/generated/config.pb.go b/config/generated/config.pb.go index 9fb91bcaa..4e957d54e 100644 --- a/config/generated/config.pb.go +++ b/config/generated/config.pb.go @@ -121,16 +121,22 @@ func (m *TargetGroup) GetLabels() *LabelPairs { } type JobConfig struct { - Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"` - ScrapeInterval *string `protobuf:"bytes,2,opt,name=scrape_interval" json:"scrape_interval,omitempty"` - TargetGroup []*TargetGroup `protobuf:"bytes,3,rep,name=target_group" json:"target_group,omitempty"` - XXX_unrecognized []byte `json:"-"` + Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"` + ScrapeInterval *string `protobuf:"bytes,2,opt,name=scrape_interval" json:"scrape_interval,omitempty"` + SdName *string `protobuf:"bytes,3,opt,name=sd_name" json:"sd_name,omitempty"` + SdRefreshInterval *string `protobuf:"bytes,4,opt,name=sd_refresh_interval,def=30s" json:"sd_refresh_interval,omitempty"` + TargetGroup []*TargetGroup `protobuf:"bytes,5,rep,name=target_group" json:"target_group,omitempty"` + MetricsPath *string `protobuf:"bytes,6,opt,name=metrics_path,def=/metrics.json" json:"metrics_path,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *JobConfig) Reset() { *m = JobConfig{} } func (m *JobConfig) String() string { return proto.CompactTextString(m) } func (*JobConfig) ProtoMessage() {} +const Default_JobConfig_SdRefreshInterval string = "30s" +const Default_JobConfig_MetricsPath string = "/metrics.json" + func (m *JobConfig) GetName() string { if m != nil && m.Name != nil { return *m.Name @@ -145,6 +151,20 @@ func (m *JobConfig) GetScrapeInterval() string { return "" } +func (m *JobConfig) GetSdName() string { + if m != nil && m.SdName != nil { + return *m.SdName + } + return "" +} + +func (m *JobConfig) GetSdRefreshInterval() string { + if m != nil && m.SdRefreshInterval != nil { + return *m.SdRefreshInterval + } + return Default_JobConfig_SdRefreshInterval +} + func (m *JobConfig) GetTargetGroup() []*TargetGroup { if m != nil { return m.TargetGroup @@ -152,6 +172,13 @@ func (m *JobConfig) GetTargetGroup() []*TargetGroup { return nil } +func (m *JobConfig) GetMetricsPath() string { + if m != nil && m.MetricsPath != nil { + return *m.MetricsPath + } + return Default_JobConfig_MetricsPath +} + type PrometheusConfig struct { Global *GlobalConfig `protobuf:"bytes,1,opt,name=global" json:"global,omitempty"` Job []*JobConfig `protobuf:"bytes,2,rep,name=job" json:"job,omitempty"` diff --git a/retrieval/target_provider.go b/retrieval/target_provider.go new file mode 100644 index 000000000..7e23c090a --- /dev/null +++ b/retrieval/target_provider.go @@ -0,0 +1,85 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "fmt" + "net" + "net/url" + "time" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/utility" +) + +// TargetProvider encapsulates retrieving all targets for a job. +type TargetProvider interface { + // Retrieves the current list of targets for this provider. + Targets() ([]Target, error) +} + +type sdTargetProvider struct { + job config.JobConfig + + targets []Target + + lastRefresh time.Time + refreshInterval time.Duration +} + +// Constructs a new sdTargetProvider for a job. +func NewSdTargetProvider(job config.JobConfig) *sdTargetProvider { + i, err := utility.StringToDuration(job.GetSdRefreshInterval()) + if err != nil { + panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err)) + } + return &sdTargetProvider{ + job: job, + refreshInterval: i, + } +} + +func (p *sdTargetProvider) Targets() ([]Target, error) { + if time.Since(p.lastRefresh) > p.refreshInterval { + return p.targets, nil + } + + _, addrs, err := net.LookupSRV("", "", p.job.GetSdName()) + if err != nil { + return nil, err + } + + baseLabels := model.LabelSet{ + model.JobLabel: model.LabelValue(p.job.GetName()), + } + + targets := make([]Target, 0, len(addrs)) + endpoint := &url.URL{ + Scheme: "http", + Path: p.job.GetMetricsPath(), + } + for _, addr := range addrs { + // Remove the final dot from rooted DNS names to make them look more usual. + if addr.Target[len(addr.Target)-1] == '.' { + addr.Target = addr.Target[:len(addr.Target)-1] + } + endpoint.Host = fmt.Sprintf("%s:%d", addr.Target, addr.Port) + t := NewTarget(endpoint.String(), time.Second*5, baseLabels) + targets = append(targets, t) + } + + p.targets = targets + return targets, nil +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index f2979c6a7..e5d4c708d 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -57,7 +57,12 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { targetPool, ok := m.poolsByJob[job.GetName()] if !ok { - targetPool = NewTargetPool(m) + var provider TargetProvider = nil + if job.SdName != nil { + provider = NewSdTargetProvider(job) + } + + targetPool = NewTargetPool(m, provider) log.Printf("Pool for job %s does not exist; creating and starting...", job.GetName()) interval := job.ScrapeInterval() @@ -86,6 +91,11 @@ func (m targetManager) Remove(t Target) { func (m *targetManager) AddTargetsFromConfig(config config.Config) { for _, job := range config.Jobs() { + if job.SdName != nil { + m.TargetPoolForJob(job) + continue + } + for _, targetGroup := range job.TargetGroup { baseLabels := model.LabelSet{ model.JobLabel: model.LabelValue(job.GetName()), diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 6a3b39d4d..0c909790a 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -37,13 +37,16 @@ type TargetPool struct { targets targets addTargetQueue chan Target replaceTargetsQueue chan targets + + targetProvider TargetProvider } -func NewTargetPool(m TargetManager) *TargetPool { +func NewTargetPool(m TargetManager, p TargetProvider) *TargetPool { return &TargetPool{ manager: m, addTargetQueue: make(chan Target, targetAddQueueSize), replaceTargetsQueue: make(chan targets, targetReplaceQueueSize), + targetProvider: p, } } @@ -121,6 +124,15 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t } func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) { + if p.targetProvider != nil { + targets, err := p.targetProvider.Targets() + if err != nil { + log.Printf("Error looking up targets: %s", err) + return + } + p.ReplaceTargets(targets) + } + p.RLock() defer p.RUnlock()