mirror of https://github.com/prometheus/prometheus
Add support for remote write relabelling.
Switch back to a single remote writer, as we were only ever meant to have one and the relabel semantics are clearer that way.pull/2047/head
parent
32d29f8dbc
commit
77605649a9
|
@ -192,7 +192,7 @@ type Config struct {
|
||||||
RuleFiles []string `yaml:"rule_files,omitempty"`
|
RuleFiles []string `yaml:"rule_files,omitempty"`
|
||||||
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
|
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
|
||||||
|
|
||||||
RemoteWriteConfig []RemoteWriteConfig `yaml:"remote_write,omitempty"`
|
RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write,omitempty"`
|
||||||
|
|
||||||
// Catches all undefined fields and must be empty after parsing.
|
// Catches all undefined fields and must be empty after parsing.
|
||||||
XXX map[string]interface{} `yaml:",inline"`
|
XXX map[string]interface{} `yaml:",inline"`
|
||||||
|
@ -1075,11 +1075,12 @@ func (re Regexp) MarshalYAML() (interface{}, error) {
|
||||||
|
|
||||||
// RemoteWriteConfig is the configuration for remote storage.
|
// RemoteWriteConfig is the configuration for remote storage.
|
||||||
type RemoteWriteConfig struct {
|
type RemoteWriteConfig struct {
|
||||||
URL URL `yaml:"url,omitempty"`
|
URL *URL `yaml:"url,omitempty"`
|
||||||
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
||||||
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
|
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
|
||||||
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
|
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
|
||||||
ProxyURL URL `yaml:"proxy_url,omitempty"`
|
ProxyURL URL `yaml:"proxy_url,omitempty"`
|
||||||
|
WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"`
|
||||||
|
|
||||||
// Catches all undefined fields and must be empty after parsing.
|
// Catches all undefined fields and must be empty after parsing.
|
||||||
XXX map[string]interface{} `yaml:",inline"`
|
XXX map[string]interface{} `yaml:",inline"`
|
||||||
|
|
|
@ -44,6 +44,19 @@ var expectedConf = &Config{
|
||||||
"testdata/my/*.rules",
|
"testdata/my/*.rules",
|
||||||
},
|
},
|
||||||
|
|
||||||
|
RemoteWriteConfig: RemoteWriteConfig{
|
||||||
|
RemoteTimeout: model.Duration(30 * time.Second),
|
||||||
|
WriteRelabelConfigs: []*RelabelConfig{
|
||||||
|
{
|
||||||
|
SourceLabels: model.LabelNames{"__name__"},
|
||||||
|
Separator: ";",
|
||||||
|
Regex: MustNewRegexp("expensive.*"),
|
||||||
|
Replacement: "$1",
|
||||||
|
Action: RelabelDrop,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
ScrapeConfigs: []*ScrapeConfig{
|
ScrapeConfigs: []*ScrapeConfig{
|
||||||
{
|
{
|
||||||
JobName: "prometheus",
|
JobName: "prometheus",
|
||||||
|
|
|
@ -13,6 +13,12 @@ rule_files:
|
||||||
- "/absolute/second.rules"
|
- "/absolute/second.rules"
|
||||||
- "my/*.rules"
|
- "my/*.rules"
|
||||||
|
|
||||||
|
remote_write:
|
||||||
|
write_relabel_configs:
|
||||||
|
- source_labels: [__name__]
|
||||||
|
regex: expensive.*
|
||||||
|
action: drop
|
||||||
|
|
||||||
scrape_configs:
|
scrape_configs:
|
||||||
- job_name: prometheus
|
- job_name: prometheus
|
||||||
|
|
||||||
|
|
|
@ -31,14 +31,13 @@ import (
|
||||||
|
|
||||||
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
|
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
index int // Used to differentiate metrics
|
|
||||||
url config.URL
|
url config.URL
|
||||||
client *http.Client
|
client *http.Client
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient creates a new Client.
|
// NewClient creates a new Client.
|
||||||
func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) {
|
func NewClient(conf config.RemoteWriteConfig) (*Client, error) {
|
||||||
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
|
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -56,8 +55,7 @@ func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
index: index,
|
url: *conf.URL,
|
||||||
url: conf.URL,
|
|
||||||
client: httputil.NewClient(rt),
|
client: httputil.NewClient(rt),
|
||||||
timeout: time.Duration(conf.RemoteTimeout),
|
timeout: time.Duration(conf.RemoteTimeout),
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -117,10 +115,6 @@ func (c *Client) Store(samples model.Samples) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name identifies the client as a generic client.
|
// Name identifies the client as a generic client.
|
||||||
//
|
|
||||||
// TODO: This client is going to be the only one soon - then this method
|
|
||||||
// will simply be removed in the restructuring and the whole "generic" naming
|
|
||||||
// will be gone for good.
|
|
||||||
func (c Client) Name() string {
|
func (c Client) Name() string {
|
||||||
return fmt.Sprintf("generic:%d:%s", c.index, c.url)
|
return "generic"
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
influx "github.com/influxdb/influxdb/client"
|
influx "github.com/influxdb/influxdb/client"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/relabel"
|
||||||
"github.com/prometheus/prometheus/storage/remote/graphite"
|
"github.com/prometheus/prometheus/storage/remote/graphite"
|
||||||
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
||||||
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
||||||
|
@ -33,6 +34,7 @@ import (
|
||||||
type Storage struct {
|
type Storage struct {
|
||||||
queues []*StorageQueueManager
|
queues []*StorageQueueManager
|
||||||
externalLabels model.LabelSet
|
externalLabels model.LabelSet
|
||||||
|
relabelConfigs []*config.RelabelConfig
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,6 +44,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
||||||
defer s.mtx.Unlock()
|
defer s.mtx.Unlock()
|
||||||
|
|
||||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||||
|
s.relabelConfigs = conf.RemoteWriteConfig.WriteRelabelConfigs
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,8 +119,14 @@ func (s *Storage) Append(smpl *model.Sample) error {
|
||||||
snew.Metric[ln] = lv
|
snew.Metric[ln] = lv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
snew.Metric = model.Metric(
|
||||||
|
relabel.Process(model.LabelSet(snew.Metric), s.relabelConfigs...))
|
||||||
s.mtx.RUnlock()
|
s.mtx.RUnlock()
|
||||||
|
|
||||||
|
if snew.Metric == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
for _, q := range s.queues {
|
for _, q := range s.queues {
|
||||||
q.Append(&snew)
|
q.Append(&snew)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,15 +19,16 @@ import (
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/relabel"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Storage collects multiple remote storage queues.
|
// Storage collects multiple remote storage queues.
|
||||||
type ReloadableStorage struct {
|
type ReloadableStorage struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
externalLabels model.LabelSet
|
externalLabels model.LabelSet
|
||||||
conf []config.RemoteWriteConfig
|
conf config.RemoteWriteConfig
|
||||||
|
|
||||||
queues []*StorageQueueManager
|
queue *StorageQueueManager
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new remote Storage.
|
// New returns a new remote Storage.
|
||||||
|
@ -42,31 +43,32 @@ func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error {
|
||||||
|
|
||||||
// TODO: we should only stop & recreate queues which have changes,
|
// TODO: we should only stop & recreate queues which have changes,
|
||||||
// as this can be quite disruptive.
|
// as this can be quite disruptive.
|
||||||
newQueues := []*StorageQueueManager{}
|
var newQueue *StorageQueueManager
|
||||||
for i, c := range conf.RemoteWriteConfig {
|
|
||||||
c, err := NewClient(i, c)
|
if conf.RemoteWriteConfig.URL != nil {
|
||||||
|
c, err := NewClient(conf.RemoteWriteConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
newQueues = append(newQueues, NewStorageQueueManager(c, nil))
|
newQueue = NewStorageQueueManager(c, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, q := range s.queues {
|
if s.queue != nil {
|
||||||
q.Stop()
|
s.queue.Stop()
|
||||||
}
|
}
|
||||||
s.queues = newQueues
|
s.queue = newQueue
|
||||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
|
||||||
s.conf = conf.RemoteWriteConfig
|
s.conf = conf.RemoteWriteConfig
|
||||||
for _, q := range s.queues {
|
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||||
q.Start()
|
if s.queue != nil {
|
||||||
|
s.queue.Start()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the background processing of the storage queues.
|
// Stop the background processing of the storage queues.
|
||||||
func (s *ReloadableStorage) Stop() {
|
func (s *ReloadableStorage) Stop() {
|
||||||
for _, q := range s.queues {
|
if s.queue != nil {
|
||||||
q.Stop()
|
s.queue.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,9 +86,14 @@ func (s *ReloadableStorage) Append(smpl *model.Sample) error {
|
||||||
snew.Metric[ln] = lv
|
snew.Metric[ln] = lv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
snew.Metric = model.Metric(
|
||||||
|
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))
|
||||||
|
|
||||||
for _, q := range s.queues {
|
if snew.Metric == nil {
|
||||||
q.Append(&snew)
|
return nil
|
||||||
|
}
|
||||||
|
if s.queue != nil {
|
||||||
|
s.queue.Append(&snew)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue