mirror of https://github.com/prometheus/prometheus
462 lines
13 KiB
Go
462 lines
13 KiB
Go
// Copyright 2013 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package retrieval
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/prometheus/client_golang/extraction"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
clientmodel "github.com/prometheus/client_golang/model"
|
|
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/utility"
|
|
)
|
|
|
|
const (
|
|
// InstanceLabel is the label value used for the instance label.
|
|
InstanceLabel clientmodel.LabelName = "instance"
|
|
// ScrapeHealthMetricName is the metric name for the synthetic health
|
|
// variable.
|
|
scrapeHealthMetricName clientmodel.LabelValue = "up"
|
|
// ScrapeTimeMetricName is the metric name for the synthetic scrape duration
|
|
// variable.
|
|
scrapeDurationMetricName clientmodel.LabelValue = "scrape_duration_seconds"
|
|
// Capacity of the channel to buffer samples during ingestion.
|
|
ingestedSamplesCap = 256
|
|
|
|
// Constants for instrumentation.
|
|
namespace = "prometheus"
|
|
interval = "interval"
|
|
)
|
|
|
|
var (
|
|
errIngestChannelFull = errors.New("ingestion channel full")
|
|
|
|
localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"}
|
|
|
|
targetIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Name: "target_interval_length_seconds",
|
|
Help: "Actual intervals between scrapes.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{interval},
|
|
)
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(targetIntervalLength)
|
|
}
|
|
|
|
// TargetState describes the state of a Target.
|
|
type TargetState int
|
|
|
|
func (t TargetState) String() string {
|
|
switch t {
|
|
case Unknown:
|
|
return "UNKNOWN"
|
|
case Healthy:
|
|
return "HEALTHY"
|
|
case Unhealthy:
|
|
return "UNHEALTHY"
|
|
}
|
|
|
|
panic("unknown state")
|
|
}
|
|
|
|
const (
|
|
// Unknown is the state of a Target before it is first scraped.
|
|
Unknown TargetState = iota
|
|
// Healthy is the state of a Target that has been successfully scraped.
|
|
Healthy
|
|
// Unhealthy is the state of a Target that was scraped unsuccessfully.
|
|
Unhealthy
|
|
)
|
|
|
|
// A Target represents an endpoint that should be interrogated for metrics.
|
|
//
|
|
// The protocol described by this type will likely change in future iterations,
|
|
// as it offers no good support for aggregated targets and fan out. Thusly,
|
|
// it is likely that the current Target and target uses will be
|
|
// wrapped with some resolver type.
|
|
//
|
|
// For the future, the Target protocol will abstract away the exact means that
|
|
// metrics are retrieved and deserialized from the given instance to which it
|
|
// refers.
|
|
//
|
|
// Target implements extraction.Ingester.
|
|
type Target interface {
|
|
// Return the last encountered scrape error, if any.
|
|
LastError() error
|
|
// Return the health of the target.
|
|
State() TargetState
|
|
// Return the last time a scrape was attempted.
|
|
LastScrape() time.Time
|
|
// The URL to which the Target corresponds. Out of all of the available
|
|
// points in this interface, this one is the best candidate to change given
|
|
// the ways to express the endpoint.
|
|
URL() string
|
|
// Used to populate the `instance` label in metrics.
|
|
InstanceIdentifier() string
|
|
// The URL as seen from other hosts. References to localhost are resolved
|
|
// to the address of the prometheus server.
|
|
GlobalURL() string
|
|
// Return the target's base labels.
|
|
BaseLabels() clientmodel.LabelSet
|
|
// Return the target's base labels without job and instance label. That's
|
|
// useful for display purposes.
|
|
BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet
|
|
// SetBaseLabelsFrom queues a replacement of the current base labels by
|
|
// the labels of the given target. The method returns immediately after
|
|
// queuing. The actual replacement of the base labels happens
|
|
// asynchronously (but most likely before the next scrape for the target
|
|
// begins).
|
|
SetBaseLabelsFrom(Target)
|
|
// Scrape target at the specified interval.
|
|
RunScraper(storage.SampleAppender, time.Duration)
|
|
// Stop scraping, synchronous.
|
|
StopScraper()
|
|
// Ingest implements extraction.Ingester.
|
|
Ingest(clientmodel.Samples) error
|
|
}
|
|
|
|
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
|
|
type target struct {
|
|
// The current health state of the target.
|
|
state TargetState
|
|
// The last encountered scrape error, if any.
|
|
lastError error
|
|
// The last time a scrape was attempted.
|
|
lastScrape time.Time
|
|
// Closing scraperStopping signals that scraping should stop.
|
|
scraperStopping chan struct{}
|
|
// Closing scraperStopped signals that scraping has been stopped.
|
|
scraperStopped chan struct{}
|
|
// Channel to queue base labels to be replaced.
|
|
newBaseLabels chan clientmodel.LabelSet
|
|
// Channel to buffer ingested samples.
|
|
ingestedSamples chan clientmodel.Samples
|
|
|
|
url string
|
|
// What is the deadline for the HTTP or HTTPS against this endpoint.
|
|
deadline time.Duration
|
|
// Any base labels that are added to this target and its metrics.
|
|
baseLabels clientmodel.LabelSet
|
|
// The HTTP client used to scrape the target's endpoint.
|
|
httpClient *http.Client
|
|
|
|
// Mutex protects lastError, lastScrape, state, and baseLabels. Writing
|
|
// the above must only happen in the goroutine running the RunScraper
|
|
// loop, and it must happen under the lock. In that way, no mutex lock
|
|
// is required for reading the above in the goroutine running the
|
|
// RunScraper loop, but only for reading in other goroutines.
|
|
sync.Mutex
|
|
}
|
|
|
|
// NewTarget creates a reasonably configured target for querying.
|
|
func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target {
|
|
t := &target{
|
|
url: url,
|
|
deadline: deadline,
|
|
httpClient: utility.NewDeadlineClient(deadline),
|
|
scraperStopping: make(chan struct{}),
|
|
scraperStopped: make(chan struct{}),
|
|
newBaseLabels: make(chan clientmodel.LabelSet, 1),
|
|
}
|
|
t.baseLabels = clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())}
|
|
for baseLabel, baseValue := range baseLabels {
|
|
t.baseLabels[baseLabel] = baseValue
|
|
}
|
|
return t
|
|
}
|
|
|
|
// Ingest implements Target and extraction.Ingester.
|
|
func (t *target) Ingest(s clientmodel.Samples) error {
|
|
// Since the regular case is that ingestedSamples is ready to receive,
|
|
// first try without setting a timeout so that we don't need to allocate
|
|
// a timer most of the time.
|
|
select {
|
|
case t.ingestedSamples <- s:
|
|
return nil
|
|
default:
|
|
select {
|
|
case t.ingestedSamples <- s:
|
|
return nil
|
|
case <-time.After(t.deadline / 10):
|
|
return errIngestChannelFull
|
|
}
|
|
}
|
|
}
|
|
|
|
// RunScraper implements Target.
|
|
func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) {
|
|
defer func() {
|
|
// Need to drain t.newBaseLabels to not make senders block during shutdown.
|
|
for {
|
|
select {
|
|
case <-t.newBaseLabels:
|
|
// Do nothing.
|
|
default:
|
|
close(t.scraperStopped)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64()))
|
|
select {
|
|
case <-jitterTimer.C:
|
|
case <-t.scraperStopping:
|
|
jitterTimer.Stop()
|
|
return
|
|
}
|
|
jitterTimer.Stop()
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
t.Lock() // Writing t.lastScrape requires the lock.
|
|
t.lastScrape = time.Now()
|
|
t.Unlock()
|
|
t.scrape(sampleAppender)
|
|
|
|
// Explanation of the contraption below:
|
|
//
|
|
// In case t.newBaseLabels or t.scraperStopping have something to receive,
|
|
// we want to read from those channels rather than starting a new scrape
|
|
// (which might take very long). That's why the outer select has no
|
|
// ticker.C. Should neither t.newBaseLabels nor t.scraperStopping have
|
|
// anything to receive, we go into the inner select, where ticker.C is
|
|
// in the mix.
|
|
for {
|
|
select {
|
|
case newBaseLabels := <-t.newBaseLabels:
|
|
t.Lock() // Writing t.baseLabels requires the lock.
|
|
t.baseLabels = newBaseLabels
|
|
t.Unlock()
|
|
case <-t.scraperStopping:
|
|
return
|
|
default:
|
|
select {
|
|
case newBaseLabels := <-t.newBaseLabels:
|
|
t.Lock() // Writing t.baseLabels requires the lock.
|
|
t.baseLabels = newBaseLabels
|
|
t.Unlock()
|
|
case <-t.scraperStopping:
|
|
return
|
|
case <-ticker.C:
|
|
took := time.Since(t.lastScrape)
|
|
t.Lock() // Write t.lastScrape requires locking.
|
|
t.lastScrape = time.Now()
|
|
t.Unlock()
|
|
targetIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
float64(took) / float64(time.Second), // Sub-second precision.
|
|
)
|
|
t.scrape(sampleAppender)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// StopScraper implements Target.
|
|
func (t *target) StopScraper() {
|
|
close(t.scraperStopping)
|
|
<-t.scraperStopped
|
|
}
|
|
|
|
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
|
|
|
|
func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
|
timestamp := clientmodel.Now()
|
|
defer func(start time.Time) {
|
|
t.Lock() // Writing t.state and t.lastError requires the lock.
|
|
if err == nil {
|
|
t.state = Healthy
|
|
} else {
|
|
t.state = Unhealthy
|
|
}
|
|
t.lastError = err
|
|
t.Unlock()
|
|
t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start))
|
|
}(time.Now())
|
|
|
|
req, err := http.NewRequest("GET", t.URL(), nil)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
req.Header.Add("Accept", acceptHeader)
|
|
|
|
resp, err := t.httpClient.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("server returned HTTP status %s", resp.Status)
|
|
}
|
|
|
|
processor, err := extraction.ProcessorForRequestHeader(resp.Header)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.ingestedSamples = make(chan clientmodel.Samples, ingestedSamplesCap)
|
|
|
|
processOptions := &extraction.ProcessOptions{
|
|
Timestamp: timestamp,
|
|
}
|
|
go func() {
|
|
err = processor.ProcessSingle(resp.Body, t, processOptions)
|
|
close(t.ingestedSamples)
|
|
}()
|
|
|
|
for samples := range t.ingestedSamples {
|
|
for _, s := range samples {
|
|
s.Metric.MergeFromLabelSet(t.baseLabels, clientmodel.ExporterLabelPrefix)
|
|
sampleAppender.Append(s)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// LastError implements Target.
|
|
func (t *target) LastError() error {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
return t.lastError
|
|
}
|
|
|
|
// State implements Target.
|
|
func (t *target) State() TargetState {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
return t.state
|
|
}
|
|
|
|
// LastScrape implements Target.
|
|
func (t *target) LastScrape() time.Time {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
return t.lastScrape
|
|
}
|
|
|
|
// URL implements Target.
|
|
func (t *target) URL() string {
|
|
return t.url
|
|
}
|
|
|
|
// InstanceIdentifier implements Target.
|
|
func (t *target) InstanceIdentifier() string {
|
|
u, err := url.Parse(t.url)
|
|
if err != nil {
|
|
glog.Warningf("Could not parse instance URL when generating identifier, using raw URL: %s", err)
|
|
return t.url
|
|
}
|
|
// If we are given a port in the host port, use that.
|
|
if strings.Contains(u.Host, ":") {
|
|
return u.Host
|
|
}
|
|
// Otherwise, deduce port based on protocol.
|
|
if u.Scheme == "http" {
|
|
return fmt.Sprintf("%s:80", u.Host)
|
|
} else if u.Scheme == "https" {
|
|
return fmt.Sprintf("%s:443", u.Host)
|
|
}
|
|
|
|
glog.Warningf("Unknown scheme %s when generating identifier, using raw URL.", u.Scheme)
|
|
return t.url
|
|
}
|
|
|
|
// GlobalURL implements Target.
|
|
func (t *target) GlobalURL() string {
|
|
url := t.url
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
glog.Warningf("Couldn't get hostname: %s, returning target.URL()", err)
|
|
return url
|
|
}
|
|
for _, localhostRepresentation := range localhostRepresentations {
|
|
url = strings.Replace(url, localhostRepresentation, fmt.Sprintf("http://%s", hostname), -1)
|
|
}
|
|
return url
|
|
}
|
|
|
|
// BaseLabels implements Target.
|
|
func (t *target) BaseLabels() clientmodel.LabelSet {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
return t.baseLabels
|
|
}
|
|
|
|
// BaseLabelsWithoutJobAndInstance implements Target.
|
|
func (t *target) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet {
|
|
ls := clientmodel.LabelSet{}
|
|
for ln, lv := range t.BaseLabels() {
|
|
if ln != clientmodel.JobLabel && ln != InstanceLabel {
|
|
ls[ln] = lv
|
|
}
|
|
}
|
|
return ls
|
|
}
|
|
|
|
// SetBaseLabelsFrom implements Target.
|
|
func (t *target) SetBaseLabelsFrom(newTarget Target) {
|
|
if t.URL() != newTarget.URL() {
|
|
panic("targets don't refer to the same endpoint")
|
|
}
|
|
t.newBaseLabels <- newTarget.BaseLabels()
|
|
}
|
|
|
|
func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) {
|
|
healthMetric := clientmodel.Metric{}
|
|
durationMetric := clientmodel.Metric{}
|
|
for label, value := range t.baseLabels {
|
|
healthMetric[label] = value
|
|
durationMetric[label] = value
|
|
}
|
|
healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName)
|
|
durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName)
|
|
|
|
healthValue := clientmodel.SampleValue(0)
|
|
if healthy {
|
|
healthValue = clientmodel.SampleValue(1)
|
|
}
|
|
|
|
healthSample := &clientmodel.Sample{
|
|
Metric: healthMetric,
|
|
Timestamp: timestamp,
|
|
Value: healthValue,
|
|
}
|
|
durationSample := &clientmodel.Sample{
|
|
Metric: durationMetric,
|
|
Timestamp: timestamp,
|
|
Value: clientmodel.SampleValue(float64(scrapeDuration) / float64(time.Second)),
|
|
}
|
|
|
|
sampleAppender.Append(healthSample)
|
|
sampleAppender.Append(durationSample)
|
|
}
|