diff --git a/README.md b/README.md index 8531a6b56..a7c3be644 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ You can also clone the repository yourself and build using `make`: $ cd $GOPATH/src/github.com/prometheus $ git clone https://github.com/prometheus/prometheus.git $ cd prometheus - $ make + $ make build $ ./prometheus -config.file=your_config.yml The Makefile provides several targets: diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 19d51c989..3ca28985e 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -107,7 +107,7 @@ func init() { ) cfg.fs.IntVar( &cfg.storage.MemoryChunks, "storage.local.memory-chunks", 1024*1024, - "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.", + "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily. Sample ingestion will be throttled if the configured value is exceeded by more than 10%.", ) cfg.fs.DurationVar( &cfg.storage.PersistenceRetentionPeriod, "storage.local.retention", 15*24*time.Hour, @@ -115,7 +115,7 @@ func init() { ) cfg.fs.IntVar( &cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024, - "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.", + "How many chunks can be waiting for persistence before sample ingestion will be throttled. Many chunks waiting to be persisted will increase the checkpoint size.", ) cfg.fs.DurationVar( &cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute, diff --git a/config/config.go b/config/config.go index 81882b063..19bec5423 100644 --- a/config/config.go +++ b/config/config.go @@ -25,8 +25,6 @@ import ( "github.com/prometheus/common/model" "gopkg.in/yaml.v2" - - "github.com/prometheus/prometheus/util/strutil" ) var ( @@ -75,9 +73,9 @@ var ( // DefaultGlobalConfig is the default global configuration. DefaultGlobalConfig = GlobalConfig{ - ScrapeInterval: Duration(1 * time.Minute), - ScrapeTimeout: Duration(10 * time.Second), - EvaluationInterval: Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), + ScrapeTimeout: model.Duration(10 * time.Second), + EvaluationInterval: model.Duration(1 * time.Minute), } // DefaultScrapeConfig is the default scrape configuration. @@ -99,13 +97,13 @@ var ( // DefaultDNSSDConfig is the default DNS SD configuration. DefaultDNSSDConfig = DNSSDConfig{ - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), Type: "SRV", } // DefaultFileSDConfig is the default file SD configuration. DefaultFileSDConfig = FileSDConfig{ - RefreshInterval: Duration(5 * time.Minute), + RefreshInterval: model.Duration(5 * time.Minute), } // DefaultConsulSDConfig is the default Consul SD configuration. @@ -116,30 +114,30 @@ var ( // DefaultServersetSDConfig is the default Serverset SD configuration. DefaultServersetSDConfig = ServersetSDConfig{ - Timeout: Duration(10 * time.Second), + Timeout: model.Duration(10 * time.Second), } // DefaultNerveSDConfig is the default Nerve SD configuration. DefaultNerveSDConfig = NerveSDConfig{ - Timeout: Duration(10 * time.Second), + Timeout: model.Duration(10 * time.Second), } // DefaultMarathonSDConfig is the default Marathon SD configuration. DefaultMarathonSDConfig = MarathonSDConfig{ - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), } // DefaultKubernetesSDConfig is the default Kubernetes SD configuration DefaultKubernetesSDConfig = KubernetesSDConfig{ KubeletPort: 10255, - RequestTimeout: Duration(10 * time.Second), - RetryInterval: Duration(1 * time.Second), + RequestTimeout: model.Duration(10 * time.Second), + RetryInterval: model.Duration(1 * time.Second), } // DefaultEC2SDConfig is the default EC2 SD configuration. DefaultEC2SDConfig = EC2SDConfig{ Port: 80, - RefreshInterval: Duration(60 * time.Second), + RefreshInterval: model.Duration(60 * time.Second), } ) @@ -281,11 +279,11 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { // objects. type GlobalConfig struct { // How frequently to scrape targets by default. - ScrapeInterval Duration `yaml:"scrape_interval,omitempty"` + ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // The default timeout when scraping targets. - ScrapeTimeout Duration `yaml:"scrape_timeout,omitempty"` + ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // How frequently to evaluate rules by default. - EvaluationInterval Duration `yaml:"evaluation_interval,omitempty"` + EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"` // The labels to add to any timeseries that this Prometheus instance scrapes. ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"` @@ -344,9 +342,9 @@ type ScrapeConfig struct { // A set of query parameters with which the target is scraped. Params url.Values `yaml:"params,omitempty"` // How frequently to scrape the targets of this scrape config. - ScrapeInterval Duration `yaml:"scrape_interval,omitempty"` + ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // The timeout for scraping targets of this config. - ScrapeTimeout Duration `yaml:"scrape_timeout,omitempty"` + ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // The HTTP resource path on which to fetch metrics from targets. MetricsPath string `yaml:"metrics_path,omitempty"` // The URL scheme with which to fetch metrics from targets. @@ -532,10 +530,10 @@ func (tg *TargetGroup) UnmarshalJSON(b []byte) error { // DNSSDConfig is the configuration for DNS based service discovery. type DNSSDConfig struct { - Names []string `yaml:"names"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` - Type string `yaml:"type"` - Port int `yaml:"port"` // Ignored for SRV records + Names []string `yaml:"names"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + Type string `yaml:"type"` + Port int `yaml:"port"` // Ignored for SRV records // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } @@ -565,8 +563,8 @@ func (c *DNSSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { // FileSDConfig is the configuration for file based discovery. type FileSDConfig struct { - Names []string `yaml:"names"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` + Names []string `yaml:"names"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -624,9 +622,9 @@ func (c *ConsulSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error // ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery. type ServersetSDConfig struct { - Servers []string `yaml:"servers"` - Paths []string `yaml:"paths"` - Timeout Duration `yaml:"timeout,omitempty"` + Servers []string `yaml:"servers"` + Paths []string `yaml:"paths"` + Timeout model.Duration `yaml:"timeout,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -656,9 +654,9 @@ func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) err // NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery. type NerveSDConfig struct { - Servers []string `yaml:"servers"` - Paths []string `yaml:"paths"` - Timeout Duration `yaml:"timeout,omitempty"` + Servers []string `yaml:"servers"` + Paths []string `yaml:"paths"` + Timeout model.Duration `yaml:"timeout,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -688,8 +686,8 @@ func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { // MarathonSDConfig is the configuration for services running on Marathon. type MarathonSDConfig struct { - Servers []string `yaml:"servers,omitempty"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` + Servers []string `yaml:"servers,omitempty"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -712,15 +710,15 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro // KubernetesSDConfig is the configuration for Kubernetes service discovery. type KubernetesSDConfig struct { - APIServers []URL `yaml:"api_servers"` - KubeletPort int `yaml:"kubelet_port,omitempty"` - InCluster bool `yaml:"in_cluster,omitempty"` - BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` - BearerToken string `yaml:"bearer_token,omitempty"` - BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - RetryInterval Duration `yaml:"retry_interval,omitempty"` - RequestTimeout Duration `yaml:"request_timeout,omitempty"` - TLSConfig TLSConfig `yaml:"tls_config,omitempty"` + APIServers []URL `yaml:"api_servers"` + KubeletPort int `yaml:"kubelet_port,omitempty"` + InCluster bool `yaml:"in_cluster,omitempty"` + BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` + BearerToken string `yaml:"bearer_token,omitempty"` + BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + RetryInterval model.Duration `yaml:"retry_interval,omitempty"` + RequestTimeout model.Duration `yaml:"request_timeout,omitempty"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -749,11 +747,11 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er // EC2SDConfig is the configuration for EC2 based service discovery. type EC2SDConfig struct { - Region string `yaml:"region"` - AccessKey string `yaml:"access_key,omitempty"` - SecretKey string `yaml:"secret_key,omitempty"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` - Port int `yaml:"port"` + Region string `yaml:"region"` + AccessKey string `yaml:"access_key,omitempty"` + SecretKey string `yaml:"secret_key,omitempty"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + Port int `yaml:"port"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } @@ -883,28 +881,3 @@ func (re Regexp) MarshalYAML() (interface{}, error) { } return nil, nil } - -// Duration encapsulates a time.Duration and makes it YAML marshallable. -// -// TODO(fabxc): Since we have custom types for most things, including timestamps, -// we might want to move this into our model as well, eventually. -type Duration time.Duration - -// UnmarshalYAML implements the yaml.Unmarshaler interface. -func (d *Duration) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - dur, err := strutil.StringToDuration(s) - if err != nil { - return err - } - *d = Duration(dur) - return nil -} - -// MarshalYAML implements the yaml.Marshaler interface. -func (d Duration) MarshalYAML() (interface{}, error) { - return strutil.DurationToString(time.Duration(d)), nil -} diff --git a/config/config_test.go b/config/config_test.go index 8c27f1b54..477bee706 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -28,9 +28,9 @@ import ( var expectedConf = &Config{ GlobalConfig: GlobalConfig{ - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, - EvaluationInterval: Duration(30 * time.Second), + EvaluationInterval: model.Duration(30 * time.Second), ExternalLabels: model.LabelSet{ "monitor": "codelab", @@ -49,7 +49,7 @@ var expectedConf = &Config{ JobName: "prometheus", HonorLabels: true, - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -73,11 +73,11 @@ var expectedConf = &Config{ FileSDConfigs: []*FileSDConfig{ { Names: []string{"foo/*.slow.json", "foo/*.slow.yml", "single/file.yml"}, - RefreshInterval: Duration(10 * time.Minute), + RefreshInterval: model.Duration(10 * time.Minute), }, { Names: []string{"bar/*.yaml"}, - RefreshInterval: Duration(5 * time.Minute), + RefreshInterval: model.Duration(5 * time.Minute), }, }, @@ -108,8 +108,8 @@ var expectedConf = &Config{ { JobName: "service-x", - ScrapeInterval: Duration(50 * time.Second), - ScrapeTimeout: Duration(5 * time.Second), + ScrapeInterval: model.Duration(50 * time.Second), + ScrapeTimeout: model.Duration(5 * time.Second), BasicAuth: &BasicAuth{ Username: "admin_name", @@ -124,14 +124,14 @@ var expectedConf = &Config{ "first.dns.address.domain.com", "second.dns.address.domain.com", }, - RefreshInterval: Duration(15 * time.Second), + RefreshInterval: model.Duration(15 * time.Second), Type: "SRV", }, { Names: []string{ "first.dns.address.domain.com", }, - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), Type: "SRV", }, }, @@ -180,7 +180,7 @@ var expectedConf = &Config{ { JobName: "service-y", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -198,8 +198,8 @@ var expectedConf = &Config{ { JobName: "service-z", - ScrapeInterval: Duration(15 * time.Second), - ScrapeTimeout: Duration(10 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), + ScrapeTimeout: model.Duration(10 * time.Second), MetricsPath: "/metrics", Scheme: "http", @@ -214,7 +214,7 @@ var expectedConf = &Config{ { JobName: "service-kubernetes", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -228,15 +228,15 @@ var expectedConf = &Config{ Password: "mypassword", }, KubeletPort: 10255, - RequestTimeout: Duration(10 * time.Second), - RetryInterval: Duration(1 * time.Second), + RequestTimeout: model.Duration(10 * time.Second), + RetryInterval: model.Duration(1 * time.Second), }, }, }, { JobName: "service-marathon", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -247,14 +247,14 @@ var expectedConf = &Config{ Servers: []string{ "http://marathon.example.com:8080", }, - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), }, }, }, { JobName: "service-ec2", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -265,7 +265,7 @@ var expectedConf = &Config{ Region: "us-east-1", AccessKey: "access", SecretKey: "secret", - RefreshInterval: Duration(60 * time.Second), + RefreshInterval: model.Duration(60 * time.Second), Port: 80, }, }, @@ -273,7 +273,7 @@ var expectedConf = &Config{ { JobName: "service-nerve", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -283,7 +283,7 @@ var expectedConf = &Config{ { Servers: []string{"localhost"}, Paths: []string{"/monitoring"}, - Timeout: Duration(10 * time.Second), + Timeout: model.Duration(10 * time.Second), }, }, }, diff --git a/notification/notification.go b/notification/notification.go index 3279bd2c5..7bf63019a 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "net/http" + "strings" "sync" "time" @@ -239,7 +240,7 @@ func (n *Handler) setMore() { } func (n *Handler) postURL() string { - return n.opts.AlertmanagerURL + alertPushEndpoint + return strings.TrimRight(n.opts.AlertmanagerURL, "/") + alertPushEndpoint } func (n *Handler) send(alerts ...*model.Alert) error { diff --git a/notification/notification_test.go b/notification/notification_test.go index 29d057b65..7c6de0a7a 100644 --- a/notification/notification_test.go +++ b/notification/notification_test.go @@ -25,6 +25,43 @@ import ( "github.com/prometheus/common/model" ) +func TestHandlerPostURL(t *testing.T) { + var cases = []struct { + in, out string + }{ + { + in: "http://localhost:9093", + out: "http://localhost:9093/api/v1/alerts", + }, + { + in: "http://localhost:9093/", + out: "http://localhost:9093/api/v1/alerts", + }, + { + in: "http://localhost:9093/prefix", + out: "http://localhost:9093/prefix/api/v1/alerts", + }, + { + in: "http://localhost:9093/prefix//", + out: "http://localhost:9093/prefix/api/v1/alerts", + }, + { + in: "http://localhost:9093/prefix//", + out: "http://localhost:9093/prefix/api/v1/alerts", + }, + } + h := &Handler{ + opts: &HandlerOptions{}, + } + + for _, c := range cases { + h.opts.AlertmanagerURL = c.in + if res := h.postURL(); res != c.out { + t.Errorf("Expected post URL %q for %q but got %q", c.out, c.in, res) + } + } +} + func TestHandlerNextBatch(t *testing.T) { h := New(&HandlerOptions{}) diff --git a/promql/parse.go b/promql/parse.go index 198aa4380..a23986973 100644 --- a/promql/parse.go +++ b/promql/parse.go @@ -1140,12 +1140,12 @@ func (p *parser) unquoteString(s string) string { } func parseDuration(ds string) (time.Duration, error) { - dur, err := strutil.StringToDuration(ds) + dur, err := model.ParseDuration(ds) if err != nil { return 0, err } if dur == 0 { return 0, fmt.Errorf("duration must be greater than 0") } - return dur, nil + return time.Duration(dur), nil } diff --git a/promql/printer.go b/promql/printer.go index 35b40b00e..7be0de04e 100644 --- a/promql/printer.go +++ b/promql/printer.go @@ -22,7 +22,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/metric" - "github.com/prometheus/prometheus/util/strutil" ) // Tree returns a string of the tree structure of the given node. @@ -104,7 +103,7 @@ func (node *AlertStmt) String() string { s := fmt.Sprintf("ALERT %s", node.Name) s += fmt.Sprintf("\n\tIF %s", node.Expr) if node.Duration > 0 { - s += fmt.Sprintf("\n\tFOR %s", strutil.DurationToString(node.Duration)) + s += fmt.Sprintf("\n\tFOR %s", model.Duration(node.Duration)) } if len(node.Labels) > 0 { s += fmt.Sprintf("\n\tLABELS %s", node.Labels) @@ -178,9 +177,9 @@ func (node *MatrixSelector) String() string { } offset := "" if node.Offset != time.Duration(0) { - offset = fmt.Sprintf(" OFFSET %s", strutil.DurationToString(node.Offset)) + offset = fmt.Sprintf(" OFFSET %s", model.Duration(node.Offset)) } - return fmt.Sprintf("%s[%s]%s", vecSelector.String(), strutil.DurationToString(node.Range), offset) + return fmt.Sprintf("%s[%s]%s", vecSelector.String(), model.Duration(node.Range), offset) } func (node *NumberLiteral) String() string { @@ -210,7 +209,7 @@ func (node *VectorSelector) String() string { } offset := "" if node.Offset != time.Duration(0) { - offset = fmt.Sprintf(" OFFSET %s", strutil.DurationToString(node.Offset)) + offset = fmt.Sprintf(" OFFSET %s", model.Duration(node.Offset)) } if len(labelStrings) == 0 { diff --git a/promql/test.go b/promql/test.go index 04ad8b8be..024725df2 100644 --- a/promql/test.go +++ b/promql/test.go @@ -26,7 +26,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" - "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -98,11 +97,11 @@ func (t *Test) parseLoad(lines []string, i int) (int, *loadCmd, error) { } parts := patLoad.FindStringSubmatch(lines[i]) - gap, err := strutil.StringToDuration(parts[1]) + gap, err := model.ParseDuration(parts[1]) if err != nil { return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err) } - cmd := newLoadCmd(gap) + cmd := newLoadCmd(time.Duration(gap)) for i+1 < len(lines) { i++ defLine := lines[i] @@ -141,11 +140,11 @@ func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) { return i, nil, err } - offset, err := strutil.StringToDuration(at) + offset, err := model.ParseDuration(at) if err != nil { return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err) } - ts := testStartTime.Add(offset) + ts := testStartTime.Add(time.Duration(offset)) cmd := newEvalCmd(expr, ts, ts, 0) switch mod { diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 74270ad02..4c1407666 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" ) @@ -22,7 +24,7 @@ func testFileSD(t *testing.T, ext string) { // whether file watches work as expected. var conf config.FileSDConfig conf.Names = []string{"fixtures/_*" + ext} - conf.RefreshInterval = config.Duration(1 * time.Hour) + conf.RefreshInterval = model.Duration(1 * time.Hour) var ( fsd = NewFileDiscovery(&conf) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 880e6230c..d19f6b41f 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -14,8 +14,6 @@ package retrieval import ( - "time" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -23,26 +21,31 @@ import ( type nopAppender struct{} -func (a nopAppender) Append(*model.Sample) { +func (a nopAppender) Append(*model.Sample) error { + return nil } -type slowAppender struct{} - -func (a slowAppender) Append(*model.Sample) { - time.Sleep(time.Millisecond) +func (a nopAppender) NeedsThrottling() bool { + return false } type collectResultAppender struct { - result model.Samples + result model.Samples + throttled bool } -func (a *collectResultAppender) Append(s *model.Sample) { +func (a *collectResultAppender) Append(s *model.Sample) error { for ln, lv := range s.Metric { if len(lv) == 0 { delete(s.Metric, ln) } } a.result = append(a.result, s) + return nil +} + +func (a *collectResultAppender) NeedsThrottling() bool { + return a.throttled } // fakeTargetProvider implements a TargetProvider and allows manual injection diff --git a/retrieval/target.go b/retrieval/target.go index 6139c3932..bf9adb0c6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/httputil" ) @@ -48,7 +49,7 @@ const ( ) var ( - errIngestChannelFull = errors.New("ingestion channel full") + errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -59,10 +60,19 @@ var ( }, []string{interval}, ) + targetSkippedScrapes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "target_skipped_scrapes_total", + Help: "Total number of scrapes that were skipped because the metric storage was throttled.", + }, + []string{interval}, + ) ) func init() { prometheus.MustRegister(targetIntervalLength) + prometheus.MustRegister(targetSkippedScrapes) } // TargetHealth describes the health state of a target. @@ -151,8 +161,6 @@ type Target struct { scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. scraperStopped chan struct{} - // Channel to buffer ingested samples. - ingestedSamples chan model.Vector // Mutex protects the members below. sync.RWMutex @@ -166,8 +174,6 @@ type Target struct { baseLabels model.LabelSet // Internal labels, such as scheme. internalLabels model.LabelSet - // What is the deadline for the HTTP or HTTPS against this endpoint. - deadline time.Duration // The time between two scrapes. scrapeInterval time.Duration // Whether the target's labels have precedence over the base labels @@ -237,7 +243,6 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.L t.url.RawQuery = params.Encode() t.scrapeInterval = time.Duration(cfg.ScrapeInterval) - t.deadline = time.Duration(cfg.ScrapeTimeout) t.honorLabels = cfg.HonorLabels t.metaLabels = metaLabels @@ -361,6 +366,11 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { targetIntervalLength.WithLabelValues(intervalStr).Observe( float64(took) / float64(time.Second), // Sub-second precision. ) + if sampleAppender.NeedsThrottling() { + targetSkippedScrapes.WithLabelValues(intervalStr).Inc() + t.status.setLastError(errSkippedScrape) + continue + } t.scrape(sampleAppender) } } @@ -377,26 +387,6 @@ func (t *Target) StopScraper() { log.Debugf("Scraper for target %v stopped.", t) } -func (t *Target) ingest(s model.Vector) error { - t.RLock() - deadline := t.deadline - t.RUnlock() - // Since the regular case is that ingestedSamples is ready to receive, - // first try without setting a timeout so that we don't need to allocate - // a timer most of the time. - select { - case t.ingestedSamples <- s: - return nil - default: - select { - case t.ingestedSamples <- s: - return nil - case <-time.After(deadline / 10): - return errIngestChannelFull - } - } -} - const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` func (t *Target) scrape(appender storage.SampleAppender) (err error) { @@ -414,20 +404,20 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { // so the relabeling rules are applied to the correct label set. if len(t.metricRelabelConfigs) > 0 { appender = relabelAppender{ - app: appender, - relabelings: t.metricRelabelConfigs, + SampleAppender: appender, + relabelings: t.metricRelabelConfigs, } } if t.honorLabels { appender = honorLabelsAppender{ - app: appender, - labels: baseLabels, + SampleAppender: appender, + labels: baseLabels, } } else { appender = ruleLabelsAppender{ - app: appender, - labels: baseLabels, + SampleAppender: appender, + labels: baseLabels, } } @@ -460,31 +450,30 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { }, } - t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap) - - go func() { - for { - // TODO(fabxc): Change the SampleAppender interface to return an error - // so we can proceed based on the status and don't leak goroutines trying - // to append a single sample after dropping all the other ones. - // - // This will also allow use to reuse this vector and save allocations. - var samples model.Vector - if err = sdec.Decode(&samples); err != nil { - break - } - if err = t.ingest(samples); err != nil { - break - } + var ( + samples model.Vector + numOutOfOrder int + logger = log.With("target", t.InstanceIdentifier()) + ) + for { + if err = sdec.Decode(&samples); err != nil { + break } - close(t.ingestedSamples) - }() - - for samples := range t.ingestedSamples { for _, s := range samples { - appender.Append(s) + err := appender.Append(s) + if err != nil { + if err == local.ErrOutOfOrderSample { + numOutOfOrder++ + } else { + logger.With("sample", s).Warnf("Error inserting sample: %s", err) + } + } + } } + if numOutOfOrder > 0 { + logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") + } if err == io.EOF { return nil @@ -495,11 +484,11 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { // Merges the ingested sample's metric with the label set. On a collision the // value of the ingested label is stored in a label prefixed with 'exported_'. type ruleLabelsAppender struct { - app storage.SampleAppender + storage.SampleAppender labels model.LabelSet } -func (app ruleLabelsAppender) Append(s *model.Sample) { +func (app ruleLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if v, ok := s.Metric[ln]; ok && v != "" { s.Metric[model.ExportedLabelPrefix+ln] = v @@ -507,47 +496,46 @@ func (app ruleLabelsAppender) Append(s *model.Sample) { s.Metric[ln] = lv } - app.app.Append(s) + return app.SampleAppender.Append(s) } type honorLabelsAppender struct { - app storage.SampleAppender + storage.SampleAppender labels model.LabelSet } // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Append(s *model.Sample) { +func (app honorLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if _, ok := s.Metric[ln]; !ok { s.Metric[ln] = lv } } - app.app.Append(s) + return app.SampleAppender.Append(s) } // Applies a set of relabel configurations to the sample's metric // before actually appending it. type relabelAppender struct { - app storage.SampleAppender + storage.SampleAppender relabelings []*config.RelabelConfig } -func (app relabelAppender) Append(s *model.Sample) { +func (app relabelAppender) Append(s *model.Sample) error { labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...) if err != nil { - log.Errorf("Error while relabeling metric %s: %s", s.Metric, err) - return + return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err) } // Check if the timeseries was dropped. if labels == nil { - return + return nil } s.Metric = model.Metric(labels) - app.app.Append(s) + return app.SampleAppender.Append(s) } // URL returns a copy of the target's URL. diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 51cb331f2..7fdc56efc 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -139,12 +139,12 @@ func TestTargetScrapeUpdatesState(t *testing.T) { } } -func TestTargetScrapeWithFullChannel(t *testing.T) { +func TestTargetScrapeWithThrottledStorage(t *testing.T) { server := httptest.NewServer( http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - for i := 0; i < 2*ingestedSamplesCap; i++ { + for i := 0; i < 10; i++ { w.Write([]byte( fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i), )) @@ -155,15 +155,21 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { defer server.Close() testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"}) - // Affects full channel but not HTTP fetch - testTarget.deadline = 0 - testTarget.scrape(slowAppender{}) + go testTarget.RunScraper(&collectResultAppender{throttled: true}) + + // Enough time for a scrape to happen. + time.Sleep(20 * time.Millisecond) + + testTarget.StopScraper() + // Wait for it to take effect. + time.Sleep(20 * time.Millisecond) + if testTarget.status.Health() != HealthBad { t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) } - if testTarget.status.LastError() != errIngestChannelFull { - t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError()) + if testTarget.status.LastError() != errSkippedScrape { + t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError()) } } @@ -420,8 +426,8 @@ func TestURLParams(t *testing.T) { target := NewTarget( &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeInterval: model.Duration(1 * time.Minute), + ScrapeTimeout: model.Duration(1 * time.Second), Scheme: serverURL.Scheme, Params: url.Values{ "foo": []string{"bar", "baz"}, @@ -441,7 +447,7 @@ func TestURLParams(t *testing.T) { func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.LabelSet) *Target { cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(deadline), + ScrapeTimeout: model.Duration(deadline), } c, _ := newHTTPClient(cfg) t := &Target{ @@ -450,7 +456,6 @@ func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.La Host: strings.TrimLeft(targetURL, "http://"), Path: "/metrics", }, - deadline: deadline, status: &TargetStatus{}, scrapeInterval: 1 * time.Millisecond, httpClient: c, @@ -481,7 +486,7 @@ func TestNewHTTPBearerToken(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), BearerToken: "1234", } c, err := newHTTPClient(cfg) @@ -509,7 +514,7 @@ func TestNewHTTPBearerTokenFile(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), BearerTokenFile: "testdata/bearertoken.txt", } c, err := newHTTPClient(cfg) @@ -536,7 +541,7 @@ func TestNewHTTPBasicAuth(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), BasicAuth: &config.BasicAuth{ Username: "user", Password: "password123", @@ -566,7 +571,7 @@ func TestNewHTTPCACert(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), TLSConfig: config.TLSConfig{ CAFile: "testdata/ca.cer", }, @@ -599,7 +604,7 @@ func TestNewHTTPClientCert(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), TLSConfig: config.TLSConfig{ CAFile: "testdata/ca.cer", CertFile: "testdata/client.cer", diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 8a453e70a..0da126f8e 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -165,6 +165,7 @@ func (tm *TargetManager) Run() { }) tm.running = true + log.Info("Target manager started.") } // handleUpdates receives target group updates and handles them in the diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 85abdd31e..b192a10c0 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -75,7 +75,7 @@ func TestPrefixedTargetProvider(t *testing.T) { func TestTargetManagerChan(t *testing.T) { testJob1 := &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), TargetGroups: []*config.TargetGroup{{ Targets: []model.LabelSet{ {model.AddressLabel: "example.org:80"}, @@ -204,7 +204,7 @@ func TestTargetManagerChan(t *testing.T) { func TestTargetManagerConfigUpdate(t *testing.T) { testJob1 := &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), Params: url.Values{ "testParam": []string{"paramValue", "secondValue"}, }, @@ -234,7 +234,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) { } testJob2 := &config.ScrapeConfig{ JobName: "test_job2", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), TargetGroups: []*config.TargetGroup{ { Targets: []model.LabelSet{ @@ -288,7 +288,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) { // Test that targets without host:port addresses are dropped. testJob3 := &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), TargetGroups: []*config.TargetGroup{{ Targets: []model.LabelSet{ {model.AddressLabel: "example.net:80"}, diff --git a/rules/alerting.go b/rules/alerting.go index 127c922a1..fdd3d00a8 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -39,7 +39,7 @@ const ( type AlertState int const ( - // StateInactive is the state of an alert that is either firing nor pending. + // StateInactive is the state of an alert that is neither firing nor pending. StateInactive AlertState = iota // StatePending is the state of an alert that has been active for less than // the configured threshold duration. @@ -58,7 +58,7 @@ func (s AlertState) String() string { case StateFiring: return "firing" } - panic(fmt.Errorf("unknown alert state: %v", s)) + panic(fmt.Errorf("unknown alert state: %v", s.String())) } // Alert is the user-level representation of a single instance of an alerting rule. @@ -159,7 +159,7 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine) (model.Vector, fp := smpl.Metric.Fingerprint() resultFPs[fp] = struct{}{} - if alert, ok := r.active[fp]; ok { + if alert, ok := r.active[fp]; ok && alert.State != StateInactive { alert.Value = smpl.Value continue } @@ -255,7 +255,7 @@ func (rule *AlertingRule) String() string { s := fmt.Sprintf("ALERT %s", rule.name) s += fmt.Sprintf("\n\tIF %s", rule.vector) if rule.holdDuration > 0 { - s += fmt.Sprintf("\n\tFOR %s", strutil.DurationToString(rule.holdDuration)) + s += fmt.Sprintf("\n\tFOR %s", model.Duration(rule.holdDuration)) } if len(rule.labels) > 0 { s += fmt.Sprintf("\n\tLABELS %s", rule.labels) @@ -277,7 +277,7 @@ func (rule *AlertingRule) HTMLSnippet(pathPrefix string) template.HTML { s := fmt.Sprintf("ALERT %s", pathPrefix+strutil.GraphLinkForExpression(alertMetric.String()), rule.name) s += fmt.Sprintf("\n IF %s", pathPrefix+strutil.GraphLinkForExpression(rule.vector.String()), rule.vector) if rule.holdDuration > 0 { - s += fmt.Sprintf("\n FOR %s", strutil.DurationToString(rule.holdDuration)) + s += fmt.Sprintf("\n FOR %s", model.Duration(rule.holdDuration)) } if len(rule.labels) > 0 { s += fmt.Sprintf("\n LABELS %s", rule.labels) diff --git a/rules/manager.go b/rules/manager.go index 2fea605a9..e9e7096e7 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -66,9 +66,19 @@ var ( iterationDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Name: "evaluator_duration_seconds", - Help: "The duration for all evaluations to execute.", + Help: "The duration of rule group evaluations.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }) + iterationsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "evaluator_iterations_skipped_total", + Help: "The total number of rule group evaluations skipped due to throttled metric storage.", + }) + iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "evaluator_iterations_total", + Help: "The total number of scheduled rule group evaluations, whether skipped or executed.", + }) ) func init() { @@ -78,6 +88,7 @@ func init() { evalFailures.WithLabelValues(string(ruleTypeRecording)) prometheus.MustRegister(iterationDuration) + prometheus.MustRegister(iterationsSkipped) prometheus.MustRegister(evalFailures) prometheus.MustRegister(evalDuration) } @@ -133,6 +144,11 @@ func (g *Group) run() { } iter := func() { + iterationsScheduled.Inc() + if g.opts.SampleAppender.NeedsThrottling() { + iterationsSkipped.Inc() + return + } start := time.Now() g.eval() diff --git a/rules/manager_test.go b/rules/manager_test.go index 463388e15..40e57203a 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -27,14 +27,8 @@ import ( func TestAlertingRule(t *testing.T) { suite, err := promql.NewTest(t, ` load 5m - http_requests{job="api-server", instance="0", group="production"} 0+10x10 - http_requests{job="api-server", instance="1", group="production"} 0+20x10 - http_requests{job="api-server", instance="0", group="canary"} 0+30x10 - http_requests{job="api-server", instance="1", group="canary"} 0+40x10 - http_requests{job="app-server", instance="0", group="production"} 0+50x10 - http_requests{job="app-server", instance="1", group="production"} 0+60x10 - http_requests{job="app-server", instance="0", group="canary"} 0+70x10 - http_requests{job="app-server", instance="1", group="canary"} 0+80x10 + http_requests{job="app-server", instance="0", group="canary"} 75 85 95 105 105 95 85 + http_requests{job="app-server", instance="1", group="canary"} 80 90 100 110 120 130 140 `) if err != nil { t.Fatal(err) @@ -79,17 +73,32 @@ func TestAlertingRule(t *testing.T) { }, { time: 10 * time.Minute, result: []string{ + `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="1", job="app-server", severity="critical"} => 0 @[%v]`, + }, + }, + { + time: 15 * time.Minute, + result: []string{ `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 0 @[%v]`, }, }, { - time: 15 * time.Minute, - result: nil, + time: 20 * time.Minute, + result: []string{}, }, { - time: 20 * time.Minute, - result: nil, + time: 25 * time.Minute, + result: []string{ + `ALERTS{alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, + }, + }, + { + time: 30 * time.Minute, + result: []string{ + `ALERTS{alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 0 @[%v]`, + `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, + }, }, } diff --git a/storage/local/interface.go b/storage/local/interface.go index 6c7df5ca2..454c2d9d5 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -33,7 +33,10 @@ type Storage interface { // processing.) The implementation might remove labels with empty value // from the provided Sample as those labels are considered equivalent to // a label not present at all. - Append(*model.Sample) + Append(*model.Sample) error + // NeedsThrottling returns true if the Storage has too many chunks in memory + // already or has too many chunks waiting for persistence. + NeedsThrottling() bool // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader diff --git a/storage/local/storage.go b/storage/local/storage.go index 3d5aeed56..62453d89d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -47,9 +47,9 @@ const ( persintenceUrgencyScoreForLeavingRushedMode = 0.7 // This factor times -storage.local.memory-chunks is the number of - // memory chunks we tolerate before suspending ingestion (TODO!). It is - // also a basis for calculating the persistenceUrgencyScore. - toleranceFactorForMemChunks = 1.1 + // memory chunks we tolerate before throttling the storage. It is also a + // basis for calculating the persistenceUrgencyScore. + toleranceFactorMemChunks = 1.1 // This factor times -storage.local.max-chunks-to-persist is the minimum // required number of chunks waiting for persistence before the number // of chunks in memory may influence the persistenceUrgencyScore. (In @@ -121,9 +121,10 @@ type syncStrategy func() bool type memorySeriesStorage struct { // numChunksToPersist has to be aligned for atomic operations. - numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall. - rushed bool // Whether the storage is in rushed mode. + numChunksToPersist int64 // The number of chunks waiting for persistence. + maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + rushed bool // Whether the storage is in rushed mode. + throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -180,6 +181,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), + throttled: make(chan struct{}, 1), maxMemoryChunks: o.MemoryChunks, dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, @@ -306,6 +308,7 @@ func (s *memorySeriesStorage) Start() (err error) { } go s.handleEvictList() + go s.logThrottling() go s.loop() return nil @@ -564,23 +567,15 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } +var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") + // Append implements Storage. -func (s *memorySeriesStorage) Append(sample *model.Sample) { +func (s *memorySeriesStorage) Append(sample *model.Sample) error { for ln, lv := range sample.Metric { if len(lv) == 0 { delete(sample.Metric, ln) } } - if s.getNumChunksToPersist() >= s.maxChunksToPersist { - log.Warnf( - "%d chunks waiting for persistence, sample ingestion suspended.", - s.getNumChunksToPersist(), - ) - for s.getNumChunksToPersist() >= s.maxChunksToPersist { - time.Sleep(time.Second) - } - log.Warn("Sample ingestion resumed.") - } rawFP := sample.Metric.FastFingerprint() s.fpLocker.Lock(rawFP) fp, err := s.mapper.mapFP(rawFP, sample.Metric) @@ -596,16 +591,16 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { series := s.getOrCreateSeries(fp, sample.Metric) if sample.Timestamp <= series.lastTime { + s.fpLocker.Unlock(fp) // Don't log and track equal timestamps, as they are a common occurrence // when using client-side timestamps (e.g. Pushgateway or federation). // It would be even better to also compare the sample values here, but // we don't have efficient access to a series's last value. if sample.Timestamp != series.lastTime { - log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime) s.outOfOrderSamplesCount.Inc() + return ErrOutOfOrderSample } - s.fpLocker.Unlock(fp) - return + return nil } completedChunksCount := series.add(&model.SamplePair{ Value: sample.Value, @@ -614,6 +609,59 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { s.fpLocker.Unlock(fp) s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) + + return nil +} + +// NeedsThrottling implements Storage. +func (s *memorySeriesStorage) NeedsThrottling() bool { + if s.getNumChunksToPersist() > s.maxChunksToPersist || + float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { + select { + case s.throttled <- struct{}{}: + default: // Do nothing, signal aready pending. + } + return true + } + return false +} + +// logThrottling handles logging of throttled events and has to be started as a +// goroutine. It stops once s.loopStopping is closed. +// +// Logging strategy: Whenever Throttle() is called and returns true, an signal +// is sent to s.throttled. If that happens for the first time, an Error is +// logged that the storage is now throttled. As long as signals continues to be +// sent via s.throttled at least once per minute, nothing else is logged. Once +// no signal has arrived for a minute, an Info is logged that the storage is not +// throttled anymore. This resets things to the initial state, i.e. once a +// signal arrives again, the Error will be logged again. +func (s *memorySeriesStorage) logThrottling() { + timer := time.NewTimer(time.Minute) + timer.Stop() + + for { + select { + case <-s.throttled: + if !timer.Reset(time.Minute) { + log. + With("chunksToPersist", s.getNumChunksToPersist()). + With("maxChunksToPersist", s.maxChunksToPersist). + With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). + Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.") + } + case <-timer.C: + log. + With("chunksToPersist", s.getNumChunksToPersist()). + With("maxChunksToPersist", s.maxChunksToPersist). + With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). + Info("Storage does not need throttling anymore.") + case <-s.loopStopping: + return + } + } } func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries { @@ -1210,7 +1258,7 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist { score = math.Max( score, - (memChunks/maxMemChunks-1)/(toleranceFactorForMemChunks-1), + (memChunks/maxMemChunks-1)/(toleranceFactorMemChunks-1), ) } if score > 1 { @@ -1230,11 +1278,11 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { s.rushedMode.Set(0) log. With("urgencyScore", score). - With("chunksToPersist", chunksToPersist). - With("maxChunksToPersist", maxChunksToPersist). - With("memoryChunks", memChunks). - With("maxMemoryChunks", maxMemChunks). - Warn("Storage has left rushed mode.") + With("chunksToPersist", int(chunksToPersist)). + With("maxChunksToPersist", int(maxChunksToPersist)). + With("memoryChunks", int(memChunks)). + With("maxMemoryChunks", int(maxMemChunks)). + Info("Storage has left rushed mode.") return score } if score > persintenceUrgencyScoreForEnteringRushedMode { @@ -1243,10 +1291,10 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { s.rushedMode.Set(1) log. With("urgencyScore", score). - With("chunksToPersist", chunksToPersist). - With("maxChunksToPersist", maxChunksToPersist). - With("memoryChunks", memChunks). - With("maxMemoryChunks", maxMemChunks). + With("chunksToPersist", int(chunksToPersist)). + With("maxChunksToPersist", int(maxChunksToPersist)). + With("memoryChunks", int(memChunks)). + With("maxMemoryChunks", int(maxMemChunks)). Warn("Storage has entered rushed mode.") return 1 } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 768f2f3d5..4ed739707 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -132,15 +132,16 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue } // Append queues a sample to be sent to the remote storage. It drops the -// sample on the floor if the queue is full. It implements -// storage.SampleAppender. -func (t *StorageQueueManager) Append(s *model.Sample) { +// sample on the floor if the queue is full. +// Always returns nil. +func (t *StorageQueueManager) Append(s *model.Sample) error { select { case t.queue <- s: default: t.samplesCount.WithLabelValues(dropped).Inc() log.Warn("Remote storage queue full, discarding sample.") } + return nil } // Stop stops sending samples to the remote storage and waits for pending diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 91d057a63..6c0ddba9d 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -104,8 +104,8 @@ func (s *Storage) Stop() { } } -// Append implements storage.SampleAppender. -func (s *Storage) Append(smpl *model.Sample) { +// Append implements storage.SampleAppender. Always returns nil. +func (s *Storage) Append(smpl *model.Sample) error { s.mtx.RLock() var snew model.Sample @@ -122,6 +122,14 @@ func (s *Storage) Append(smpl *model.Sample) { for _, q := range s.queues { q.Append(&snew) } + return nil +} + +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (s *Storage) NeedsThrottling() bool { + return false } // Describe implements prometheus.Collector. diff --git a/storage/storage.go b/storage/storage.go index 9f509e2dc..5acae673e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -18,9 +18,31 @@ import ( ) // SampleAppender is the interface to append samples to both, local and remote -// storage. +// storage. All methods are goroutine-safe. type SampleAppender interface { - Append(*model.Sample) + // Append appends a sample to the underlying storage. Depending on the + // storage implementation, there are different guarantees for the fate + // of the sample after Append has returned. Remote storage + // implementation will simply drop samples if they cannot keep up with + // sending samples. Local storage implementations will only drop metrics + // upon unrecoverable errors. + Append(*model.Sample) error + // NeedsThrottling returns true if the underlying storage wishes to not + // receive any more samples. Append will still work but might lead to + // undue resource usage. It is recommended to call NeedsThrottling once + // before an upcoming batch of Append calls (e.g. a full scrape of a + // target or the evaluation of a rule group) and only proceed with the + // batch if NeedsThrottling returns false. In that way, the result of a + // scrape or of an evaluation of a rule group will always be appended + // completely or not at all, and the work of scraping or evaluation will + // not be performed in vain. Also, a call of NeedsThrottling is + // potentially expensive, so limiting the number of calls is reasonable. + // + // Only SampleAppenders for which it is considered critical to receive + // each and every sample should ever return true. SampleAppenders that + // tolerate not receiving all samples should always return false and + // instead drop samples as they see fit to avoid overload. + NeedsThrottling() bool } // Fanout is a SampleAppender that appends every sample to each SampleAppender @@ -30,8 +52,25 @@ type Fanout []SampleAppender // Append implements SampleAppender. It appends the provided sample to all // SampleAppenders in the Fanout slice and waits for each append to complete // before proceeding with the next. -func (f Fanout) Append(s *model.Sample) { +// If any of the SampleAppenders returns an error, the first one is returned +// at the end. +func (f Fanout) Append(s *model.Sample) error { + var err error for _, a := range f { - a.Append(s) + if e := a.Append(s); e != nil && err == nil { + err = e + } } + return err +} + +// NeedsThrottling returns true if at least one of the SampleAppenders in the +// Fanout slice is throttled. +func (f Fanout) NeedsThrottling() bool { + for _, a := range f { + if a.NeedsThrottling() { + return true + } + } + return false } diff --git a/util/strutil/strconv.go b/util/strutil/strconv.go index 1b7edf66b..6bb25afdc 100644 --- a/util/strutil/strconv.go +++ b/util/strutil/strconv.go @@ -17,75 +17,13 @@ import ( "fmt" "net/url" "regexp" - "strconv" "strings" - "time" ) var ( - durationRE = regexp.MustCompile("^([0-9]+)([ywdhms]+)$") invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) ) -// DurationToString formats a time.Duration as a string with the assumption that -// a year always has 365 days and a day always has 24h. (The former doesn't work -// in leap years, the latter is broken by DST switches, not to speak about leap -// seconds, but those are not even treated properly by the duration strings in -// the standard library.) -func DurationToString(duration time.Duration) string { - seconds := int64(duration / time.Second) - factors := map[string]int64{ - "y": 60 * 60 * 24 * 365, - "d": 60 * 60 * 24, - "h": 60 * 60, - "m": 60, - "s": 1, - } - unit := "s" - switch int64(0) { - case seconds % factors["y"]: - unit = "y" - case seconds % factors["d"]: - unit = "d" - case seconds % factors["h"]: - unit = "h" - case seconds % factors["m"]: - unit = "m" - } - return fmt.Sprintf("%v%v", seconds/factors[unit], unit) -} - -// StringToDuration parses a string into a time.Duration, assuming that a year -// always has 365d, a week 7d, a day 24h. See DurationToString for problems with -// that. -func StringToDuration(durationStr string) (duration time.Duration, err error) { - matches := durationRE.FindStringSubmatch(durationStr) - if len(matches) != 3 { - err = fmt.Errorf("not a valid duration string: %q", durationStr) - return - } - durationSeconds, _ := strconv.Atoi(matches[1]) - duration = time.Duration(durationSeconds) * time.Second - unit := matches[2] - switch unit { - case "y": - duration *= 60 * 60 * 24 * 365 - case "w": - duration *= 60 * 60 * 24 * 7 - case "d": - duration *= 60 * 60 * 24 - case "h": - duration *= 60 * 60 - case "m": - duration *= 60 - case "s": - duration *= 1 - default: - return 0, fmt.Errorf("invalid time unit in duration string: %q", unit) - } - return -} - // TableLinkForExpression creates an escaped relative link to the table view of // the provided expression. func TableLinkForExpression(expr string) string { diff --git a/vendor/github.com/prometheus/common/model/time.go b/vendor/github.com/prometheus/common/model/time.go index e2e9ff574..548968aeb 100644 --- a/vendor/github.com/prometheus/common/model/time.go +++ b/vendor/github.com/prometheus/common/model/time.go @@ -163,10 +163,10 @@ func (t *Time) UnmarshalJSON(b []byte) error { // This type should not propagate beyond the scope of input/output processing. type Duration time.Duration -var durationRE = regexp.MustCompile("^([0-9]+)(d|h|m|s|ms)$") +var durationRE = regexp.MustCompile("^([0-9]+)(y|w|d|h|m|s|ms)$") // StringToDuration parses a string into a time.Duration, assuming that a year -// a day always has 24h. +// always has 365d, a week always has 7d, and a day always has 24h. func ParseDuration(durationStr string) (Duration, error) { matches := durationRE.FindStringSubmatch(durationStr) if len(matches) != 3 { @@ -177,6 +177,10 @@ func ParseDuration(durationStr string) (Duration, error) { dur = time.Duration(n) * time.Millisecond ) switch unit := matches[2]; unit { + case "y": + dur *= 1000 * 60 * 60 * 24 * 365 + case "w": + dur *= 1000 * 60 * 60 * 24 * 7 case "d": dur *= 1000 * 60 * 60 * 24 case "h": @@ -199,6 +203,8 @@ func (d Duration) String() string { unit = "ms" ) factors := map[string]int64{ + "y": 1000 * 60 * 60 * 24 * 365, + "w": 1000 * 60 * 60 * 24 * 7, "d": 1000 * 60 * 60 * 24, "h": 1000 * 60 * 60, "m": 1000 * 60, @@ -207,6 +213,10 @@ func (d Duration) String() string { } switch int64(0) { + case ms % factors["y"]: + unit = "y" + case ms % factors["w"]: + unit = "w" case ms % factors["d"]: unit = "d" case ms % factors["h"]: diff --git a/vendor/vendor.json b/vendor/vendor.json index a5c1ee115..f2405ebab 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -174,8 +174,8 @@ }, { "path": "github.com/prometheus/common/model", - "revision": "b0d797186bfbaf6d785031c6c2d32f75c720007d", - "revisionTime": "2016-01-22T12:15:42+01:00" + "revision": "0e53cc19aa67dd2e8587a26e28643cb152f5403d", + "revisionTime": "2016-01-29T15:16:16+01:00" }, { "path": "github.com/prometheus/common/route", diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 8acc9c7c9..823665f3b 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -18,7 +18,6 @@ import ( "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/util/httputil" - "github.com/prometheus/prometheus/util/strutil" ) type status string @@ -324,8 +323,8 @@ func parseDuration(s string) (time.Duration, error) { if d, err := strconv.ParseFloat(s, 64); err == nil { return time.Duration(d * float64(time.Second)), nil } - if d, err := strutil.StringToDuration(s); err == nil { - return d, nil + if d, err := model.ParseDuration(s); err == nil { + return time.Duration(d), nil } return 0, fmt.Errorf("cannot parse %q to a valid duration", s) } diff --git a/web/ui/templates/_base.html b/web/ui/templates/_base.html index 61ddc0454..c9d7f4d06 100644 --- a/web/ui/templates/_base.html +++ b/web/ui/templates/_base.html @@ -41,7 +41,7 @@
  • Graph
  • Status
  • - Help + Help