diff --git a/retrieval/scrape.go b/retrieval/scrape.go index a810edc72..3697a2216 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -82,9 +82,9 @@ type scrapePool struct { config *config.ScrapeConfig client *http.Client // Targets and loops must always be synchronized to have the same - // set of fingerprints. - targets map[model.Fingerprint]*Target - loops map[model.Fingerprint]loop + // set of hashes. + targets map[uint64]*Target + loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop @@ -100,8 +100,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape appender: app, config: cfg, client: client, - targets: map[model.Fingerprint]*Target{}, - loops: map[model.Fingerprint]loop{}, + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, newLoop: newScrapeLoop, } } @@ -178,21 +178,21 @@ func (sp *scrapePool) sync(targets []*Target) { defer sp.mtx.Unlock() var ( - fingerprints = map[model.Fingerprint]struct{}{} - interval = time.Duration(sp.config.ScrapeInterval) - timeout = time.Duration(sp.config.ScrapeTimeout) + uniqueTargets = map[uint64]struct{}{} + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) ) for _, t := range targets { - fp := t.fingerprint() - fingerprints[fp] = struct{}{} + hash := t.hash() + uniqueTargets[hash] = struct{}{} - if _, ok := sp.targets[fp]; !ok { + if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client} l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) - sp.targets[fp] = t - sp.loops[fp] = l + sp.targets[hash] = t + sp.loops[hash] = l go l.run(interval, timeout, nil) } @@ -201,16 +201,16 @@ func (sp *scrapePool) sync(targets []*Target) { var wg sync.WaitGroup // Stop and remove old targets and scraper loops. - for fp := range sp.targets { - if _, ok := fingerprints[fp]; !ok { + for hash := range sp.targets { + if _, ok := uniqueTargets[hash]; !ok { wg.Add(1) go func(l loop) { l.stop() wg.Done() - }(sp.loops[fp]) + }(sp.loops[hash]) - delete(sp.loops, fp) - delete(sp.targets, fp) + delete(sp.loops, hash) + delete(sp.targets, hash) } } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index d36919214..55e9c35ad 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -60,11 +60,11 @@ func (l *testLoop) stop() { func TestScrapePoolStop(t *testing.T) { sp := &scrapePool{ - targets: map[model.Fingerprint]*Target{}, - loops: map[model.Fingerprint]loop{}, + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, } var mtx sync.Mutex - stopped := map[model.Fingerprint]bool{} + stopped := map[uint64]bool{} numTargets := 20 // Stopping the scrape pool must call stop() on all scrape loops, @@ -82,12 +82,12 @@ func TestScrapePoolStop(t *testing.T) { time.Sleep(time.Duration(i*20) * time.Millisecond) mtx.Lock() - stopped[t.fingerprint()] = true + stopped[t.hash()] = true mtx.Unlock() } - sp.targets[t.fingerprint()] = t - sp.loops[t.fingerprint()] = l + sp.targets[t.hash()] = t + sp.loops[t.hash()] = l } done := make(chan struct{}) @@ -126,7 +126,7 @@ func TestScrapePoolReload(t *testing.T) { var mtx sync.Mutex numTargets := 20 - stopped := map[model.Fingerprint]bool{} + stopped := map[uint64]bool{} reloadCfg := &config.ScrapeConfig{ ScrapeInterval: model.Duration(3 * time.Second), @@ -144,7 +144,7 @@ func TestScrapePoolReload(t *testing.T) { t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout) } mtx.Lock() - if !stopped[s.(*targetScraper).fingerprint()] { + if !stopped[s.(*targetScraper).hash()] { t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper)) } mtx.Unlock() @@ -152,8 +152,8 @@ func TestScrapePoolReload(t *testing.T) { return l } sp := &scrapePool{ - targets: map[model.Fingerprint]*Target{}, - loops: map[model.Fingerprint]loop{}, + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, newLoop: newLoop, } @@ -172,18 +172,18 @@ func TestScrapePoolReload(t *testing.T) { time.Sleep(time.Duration(i*20) * time.Millisecond) mtx.Lock() - stopped[t.fingerprint()] = true + stopped[t.hash()] = true mtx.Unlock() } - sp.targets[t.fingerprint()] = t - sp.loops[t.fingerprint()] = l + sp.targets[t.hash()] = t + sp.loops[t.hash()] = l } done := make(chan struct{}) - beforeTargets := map[model.Fingerprint]*Target{} - for fp, t := range sp.targets { - beforeTargets[fp] = t + beforeTargets := map[uint64]*Target{} + for h, t := range sp.targets { + beforeTargets[h] = t } reloadTime := time.Now() diff --git a/retrieval/target.go b/retrieval/target.go index e48e28449..8aff3f8d8 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -15,6 +15,7 @@ package retrieval import ( "fmt" + "hash/fnv" "io/ioutil" "net/http" "net/url" @@ -117,24 +118,21 @@ type Target struct { // The status object for the target. It is only set once on initialization. status *TargetStatus - scrapeConfig *config.ScrapeConfig - - // Mutex protects the members below. - sync.RWMutex - // Labels before any processing. metaLabels model.LabelSet // Any labels that are added to this target and its metrics. labels model.LabelSet + // Additional URL parmeters that are part of the target URL. + params url.Values } // NewTarget creates a reasonably configured target for querying. -func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) *Target { +func NewTarget(labels, metaLabels model.LabelSet, params url.Values) *Target { return &Target{ - status: &TargetStatus{}, - scrapeConfig: cfg, - labels: labels, - metaLabels: metaLabels, + status: &TargetStatus{}, + labels: labels, + metaLabels: metaLabels, + params: params, } } @@ -188,15 +186,16 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) { } func (t *Target) String() string { - return t.host() + return t.URL().String() } -// fingerprint returns an identifying hash for the target. -func (t *Target) fingerprint() model.Fingerprint { - t.RLock() - defer t.RUnlock() +// hash returns an identifying hash for the target. +func (t *Target) hash() uint64 { + h := fnv.New64a() + h.Write([]byte(t.labels.Fingerprint().String())) + h.Write([]byte(t.URL().String())) - return t.labels.Fingerprint() + return h.Sum64() } // offset returns the time until the next scrape cycle for the target. @@ -205,7 +204,7 @@ func (t *Target) offset(interval time.Duration) time.Duration { var ( base = now % int64(interval) - offset = uint64(t.fingerprint()) % uint64(interval) + offset = t.hash() % uint64(interval) next = base + int64(offset) ) @@ -215,35 +214,27 @@ func (t *Target) offset(interval time.Duration) time.Duration { return time.Duration(next) } -func (t *Target) scheme() string { - t.RLock() - defer t.RUnlock() - - return string(t.labels[model.SchemeLabel]) -} - -func (t *Target) host() string { - t.RLock() - defer t.RUnlock() - - return string(t.labels[model.AddressLabel]) +// Labels returns a copy of the set of all public labels of the target. +func (t *Target) Labels() model.LabelSet { + lset := make(model.LabelSet, len(t.labels)) + for ln, lv := range t.labels { + if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) { + lset[ln] = lv + } + } + return lset } -func (t *Target) path() string { - t.RLock() - defer t.RUnlock() - - return string(t.labels[model.MetricsPathLabel]) +// MetaLabels returns a copy of the target's labels before any processing. +func (t *Target) MetaLabels() model.LabelSet { + return t.metaLabels.Clone() } // URL returns a copy of the target's URL. func (t *Target) URL() *url.URL { - t.RLock() - defer t.RUnlock() - params := url.Values{} - for k, v := range t.scrapeConfig.Params { + for k, v := range t.params { params[k] = make([]string, len(v)) copy(params[k], v) } @@ -329,36 +320,3 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } - -// Labels returns a copy of the set of all public labels of the target. -func (t *Target) Labels() model.LabelSet { - t.RLock() - defer t.RUnlock() - - return t.unlockedLabels() -} - -// unlockedLabels does the same as Labels but does not lock the mutex (useful -// for internal usage when the mutex is already locked). -func (t *Target) unlockedLabels() model.LabelSet { - lset := make(model.LabelSet, len(t.labels)) - for ln, lv := range t.labels { - if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) { - lset[ln] = lv - } - } - - if _, ok := lset[model.InstanceLabel]; !ok { - lset[model.InstanceLabel] = t.labels[model.AddressLabel] - } - - return lset -} - -// MetaLabels returns a copy of the target's labels before any processing. -func (t *Target) MetaLabels() model.LabelSet { - t.RLock() - defer t.RUnlock() - - return t.metaLabels.Clone() -} diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 8d6233f05..e679a568d 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -34,9 +34,8 @@ import ( func TestTargetLabels(t *testing.T) { target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"}) want := model.LabelSet{ - model.JobLabel: "some_job", - model.InstanceLabel: "example.com:80", - "foo": "bar", + model.JobLabel: "some_job", + "foo": "bar", } got := target.Labels() if !reflect.DeepEqual(want, got) { @@ -142,10 +141,6 @@ func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelS labels[model.MetricsPathLabel] = "/metrics" return &Target{ - scrapeConfig: &config.ScrapeConfig{ - ScrapeInterval: model.Duration(time.Millisecond), - ScrapeTimeout: model.Duration(deadline), - }, labels: labels, status: &TargetStatus{}, } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index e30a85ff6..5e590e44a 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -176,7 +176,7 @@ type targetSet struct { mtx sync.RWMutex // Sets of targets by a source string that is unique across target providers. - tgroups map[string]map[model.Fingerprint]*Target + tgroups map[string]map[uint64]*Target providers map[string]TargetProvider scrapePool *scrapePool @@ -189,7 +189,7 @@ type targetSet struct { func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { ts := &targetSet{ - tgroups: map[string]map[model.Fingerprint]*Target{}, + tgroups: map[string]map[uint64]*Target{}, scrapePool: newScrapePool(cfg, app), syncCh: make(chan struct{}, 1), config: cfg, @@ -394,8 +394,8 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { } // targetsFromGroup builds targets based on the given TargetGroup and config. -func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[model.Fingerprint]*Target, error) { - targets := make(map[model.Fingerprint]*Target, len(tg.Targets)) +func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[uint64]*Target, error) { + targets := make(map[uint64]*Target, len(tg.Targets)) for i, labels := range tg.Targets { for k, v := range cfg.Params { @@ -460,8 +460,12 @@ func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[mod } } - tr := NewTarget(cfg, labels, preRelabelLabels) - targets[tr.fingerprint()] = tr + if _, ok := labels[model.InstanceLabel]; !ok { + labels[model.InstanceLabel] = labels[model.AddressLabel] + } + + tr := NewTarget(labels, preRelabelLabels, cfg.Params) + targets[tr.hash()] = tr } return targets, nil