diff --git a/config/config.go b/config/config.go index 05559f62c..2560653a3 100644 --- a/config/config.go +++ b/config/config.go @@ -246,6 +246,11 @@ func resolveFilepaths(baseDir string, cfg *Config) { mcfg.TLSConfig.CertFile = join(mcfg.TLSConfig.CertFile) mcfg.TLSConfig.KeyFile = join(mcfg.TLSConfig.KeyFile) } + for _, consulcfg := range cfg.ConsulSDConfigs { + consulcfg.TLSConfig.CAFile = join(consulcfg.TLSConfig.CAFile) + consulcfg.TLSConfig.CertFile = join(consulcfg.TLSConfig.CertFile) + consulcfg.TLSConfig.KeyFile = join(consulcfg.TLSConfig.KeyFile) + } } for _, cfg := range cfg.ScrapeConfigs { @@ -584,7 +589,7 @@ func (c *AlertingConfig) UnmarshalYAML(unmarshal func(interface{}) error) error return nil } -// AlertmanagersConfig configures how Alertmanagers can be discovered and communicated with. +// AlertmanagerConfig configures how Alertmanagers can be discovered and communicated with. type AlertmanagerConfig struct { // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. @@ -824,6 +829,7 @@ type ConsulSDConfig struct { // Defaults to all services if empty. Services []string `yaml:"services"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } @@ -949,8 +955,10 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro return nil } +// KubernetesRole is role of the service in Kubernetes. type KubernetesRole string +// The valid options for KubernetesRole. const ( KubernetesRoleNode = "node" KubernetesRolePod = "pod" @@ -958,6 +966,7 @@ const ( KubernetesRoleEndpoint = "endpoints" ) +// UnmarshalYAML implements the yaml.Unmarshaler interface. func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := unmarshal((*string)(c)); err != nil { return err @@ -1226,6 +1235,17 @@ func (c *RelabelConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if c.Action == RelabelHashMod && !model.LabelName(c.TargetLabel).IsValid() { return fmt.Errorf("%q is invalid 'target_label' for %s action", c.TargetLabel, c.Action) } + + if c.Action == RelabelLabelDrop || c.Action == RelabelLabelKeep { + if c.SourceLabels != nil || + c.TargetLabel != DefaultRelabelConfig.TargetLabel || + c.Modulus != DefaultRelabelConfig.Modulus || + c.Separator != DefaultRelabelConfig.Separator || + c.Replacement != DefaultRelabelConfig.Replacement { + return fmt.Errorf("%s action requires only 'regex', and no other fields", c.Action) + } + } + return nil } diff --git a/config/config_test.go b/config/config_test.go index 4be6989b7..66972cbbf 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -247,7 +247,13 @@ var expectedConf = &Config{ Server: "localhost:1234", Services: []string{"nginx", "cache", "mysql"}, TagSeparator: DefaultConsulSDConfig.TagSeparator, - Scheme: DefaultConsulSDConfig.Scheme, + Scheme: "https", + TLSConfig: TLSConfig{ + CertFile: "testdata/valid_cert_file", + KeyFile: "testdata/valid_key_file", + CAFile: "testdata/valid_ca_file", + InsecureSkipVerify: false, + }, }, }, }, @@ -538,6 +544,36 @@ var expectedErrors = []struct { }, { filename: "modulus_missing.bad.yml", errMsg: "relabel configuration for hashmod requires non-zero modulus", + }, { + filename: "labelkeep.bad.yml", + errMsg: "labelkeep action requires only 'regex', and no other fields", + }, { + filename: "labelkeep2.bad.yml", + errMsg: "labelkeep action requires only 'regex', and no other fields", + }, { + filename: "labelkeep3.bad.yml", + errMsg: "labelkeep action requires only 'regex', and no other fields", + }, { + filename: "labelkeep4.bad.yml", + errMsg: "labelkeep action requires only 'regex', and no other fields", + }, { + filename: "labelkeep5.bad.yml", + errMsg: "labelkeep action requires only 'regex', and no other fields", + }, { + filename: "labeldrop.bad.yml", + errMsg: "labeldrop action requires only 'regex', and no other fields", + }, { + filename: "labeldrop2.bad.yml", + errMsg: "labeldrop action requires only 'regex', and no other fields", + }, { + filename: "labeldrop3.bad.yml", + errMsg: "labeldrop action requires only 'regex', and no other fields", + }, { + filename: "labeldrop4.bad.yml", + errMsg: "labeldrop action requires only 'regex', and no other fields", + }, { + filename: "labeldrop5.bad.yml", + errMsg: "labeldrop action requires only 'regex', and no other fields", }, { filename: "rules.bad.yml", errMsg: "invalid rule file path", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 7fc616113..05015da9f 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -114,6 +114,12 @@ scrape_configs: consul_sd_configs: - server: 'localhost:1234' services: ['nginx', 'cache', 'mysql'] + scheme: https + tls_config: + ca_file: valid_ca_file + cert_file: valid_cert_file + key_file: valid_key_file + insecure_skip_verify: false relabel_configs: - source_labels: [__meta_sd_consul_tags] diff --git a/config/testdata/labeldrop.bad.yml b/config/testdata/labeldrop.bad.yml new file mode 100644 index 000000000..b71c1e8a8 --- /dev/null +++ b/config/testdata/labeldrop.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - source_labels: [abcdef] + action: labeldrop diff --git a/config/testdata/labeldrop2.bad.yml b/config/testdata/labeldrop2.bad.yml new file mode 100644 index 000000000..f70316975 --- /dev/null +++ b/config/testdata/labeldrop2.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - modulus: 8 + action: labeldrop diff --git a/config/testdata/labeldrop3.bad.yml b/config/testdata/labeldrop3.bad.yml new file mode 100644 index 000000000..5bed5d0af --- /dev/null +++ b/config/testdata/labeldrop3.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - separator: ',' + action: labeldrop diff --git a/config/testdata/labeldrop4.bad.yml b/config/testdata/labeldrop4.bad.yml new file mode 100644 index 000000000..52877d2b4 --- /dev/null +++ b/config/testdata/labeldrop4.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - replacement: yolo-{1} + action: labeldrop diff --git a/config/testdata/labeldrop5.bad.yml b/config/testdata/labeldrop5.bad.yml new file mode 100644 index 000000000..36f282751 --- /dev/null +++ b/config/testdata/labeldrop5.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - target_label: yolo + action: labeldrop diff --git a/config/testdata/labelkeep.bad.yml b/config/testdata/labelkeep.bad.yml new file mode 100644 index 000000000..709da0595 --- /dev/null +++ b/config/testdata/labelkeep.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - source_labels: [abcdef] + action: labelkeep diff --git a/config/testdata/labelkeep2.bad.yml b/config/testdata/labelkeep2.bad.yml new file mode 100644 index 000000000..734e537cf --- /dev/null +++ b/config/testdata/labelkeep2.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - modulus: 8 + action: labelkeep diff --git a/config/testdata/labelkeep3.bad.yml b/config/testdata/labelkeep3.bad.yml new file mode 100644 index 000000000..407a0f7c1 --- /dev/null +++ b/config/testdata/labelkeep3.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - separator: ',' + action: labelkeep diff --git a/config/testdata/labelkeep4.bad.yml b/config/testdata/labelkeep4.bad.yml new file mode 100644 index 000000000..4e7799415 --- /dev/null +++ b/config/testdata/labelkeep4.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - replacement: yolo-{1} + action: labelkeep diff --git a/config/testdata/labelkeep5.bad.yml b/config/testdata/labelkeep5.bad.yml new file mode 100644 index 000000000..689399fc7 --- /dev/null +++ b/config/testdata/labelkeep5.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: prometheus + relabel_configs: + - target_label: yolo + action: labelkeep diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 2f487dfdd..562a2700d 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -16,6 +16,7 @@ package consul import ( "fmt" "net" + "net/http" "strconv" "strings" "time" @@ -24,9 +25,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" - "golang.org/x/net/context" - "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/httputil" + "golang.org/x/net/context" ) const ( @@ -92,6 +93,13 @@ type Discovery struct { // NewDiscovery returns a new Discovery for the given config. func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { + tls, err := httputil.NewTLSConfig(conf.TLSConfig) + if err != nil { + return nil, err + } + transport := &http.Transport{TLSClientConfig: tls} + wrapper := &http.Client{Transport: transport} + clientConf := &consul.Config{ Address: conf.Server, Scheme: conf.Scheme, @@ -101,6 +109,7 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { Username: conf.Username, Password: conf.Password, }, + HttpClient: wrapper, } client, err := consul.NewClient(clientConf) if err != nil { diff --git a/discovery/marathon/marathon.go b/discovery/marathon/marathon.go index 75db582bf..5d715d2b7 100644 --- a/discovery/marathon/marathon.go +++ b/discovery/marathon/marathon.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/httputil" + "github.com/prometheus/prometheus/util/strutil" ) const ( @@ -45,6 +46,11 @@ const ( // taskLabel contains the mesos task name of the app instance. taskLabel model.LabelName = metaLabelPrefix + "task" + // portMappingLabelPrefix is the prefix for the application portMappings labels. + portMappingLabelPrefix = metaLabelPrefix + "port_mapping_label_" + // portDefinitionLabelPrefix is the prefix for the application portDefinitions labels. + portDefinitionLabelPrefix = metaLabelPrefix + "port_definition_label_" + // Constants for instrumentation. namespace = "prometheus" ) @@ -188,9 +194,15 @@ type Task struct { Ports []uint32 `json:"ports"` } +// PortMappings describes in which port the process are binding inside the docker container. +type PortMappings struct { + Labels map[string]string `json:"labels"` +} + // DockerContainer describes a container which uses the docker runtime. type DockerContainer struct { - Image string `json:"image"` + Image string `json:"image"` + PortMappings []PortMappings `json:"portMappings"` } // Container describes the runtime an app in running in. @@ -198,13 +210,19 @@ type Container struct { Docker DockerContainer `json:"docker"` } +// PortDefinitions describes which load balancer port you should access to access the service. +type PortDefinitions struct { + Labels map[string]string `json:"labels"` +} + // App describes a service running on Marathon. type App struct { - ID string `json:"id"` - Tasks []Task `json:"tasks"` - RunningTasks int `json:"tasksRunning"` - Labels map[string]string `json:"labels"` - Container Container `json:"container"` + ID string `json:"id"` + Tasks []Task `json:"tasks"` + RunningTasks int `json:"tasksRunning"` + Labels map[string]string `json:"labels"` + Container Container `json:"container"` + PortDefinitions []PortDefinitions `json:"portDefinitions"` } // AppList is a list of Marathon apps. @@ -285,7 +303,7 @@ func createTargetGroup(app *App) *config.TargetGroup { } for ln, lv := range app.Labels { - ln = appLabelPrefix + ln + ln = appLabelPrefix + strutil.SanitizeLabelName(ln) tg.Labels[model.LabelName(ln)] = model.LabelValue(lv) } @@ -298,15 +316,30 @@ func targetsForApp(app *App) []model.LabelSet { if len(t.Ports) == 0 { continue } - target := targetForTask(&t) - targets = append(targets, model.LabelSet{ - model.AddressLabel: model.LabelValue(target), - taskLabel: model.LabelValue(t.ID), - }) + for i := 0; i < len(t.Ports); i++ { + targetAddress := targetForTask(&t, i) + target := model.LabelSet{ + model.AddressLabel: model.LabelValue(targetAddress), + taskLabel: model.LabelValue(t.ID), + } + if i < len(app.PortDefinitions) { + for ln, lv := range app.PortDefinitions[i].Labels { + ln = portDefinitionLabelPrefix + strutil.SanitizeLabelName(ln) + target[model.LabelName(ln)] = model.LabelValue(lv) + } + } + if i < len(app.Container.Docker.PortMappings) { + for ln, lv := range app.Container.Docker.PortMappings[i].Labels { + ln = portMappingLabelPrefix + strutil.SanitizeLabelName(ln) + target[model.LabelName(ln)] = model.LabelValue(lv) + } + } + targets = append(targets, target) + } } return targets } -func targetForTask(task *Task) string { - return net.JoinHostPort(task.Host, fmt.Sprintf("%d", task.Ports[0])) +func targetForTask(task *Task, index int) string { + return net.JoinHostPort(task.Host, fmt.Sprintf("%d", task.Ports[index])) } diff --git a/discovery/marathon/marathon_test.go b/discovery/marathon/marathon_test.go index b545da172..913380ad6 100644 --- a/discovery/marathon/marathon_test.go +++ b/discovery/marathon/marathon_test.go @@ -80,7 +80,12 @@ func marathonTestAppList(labels map[string]string, runningTasks int) *AppList { Host: "mesos-slave1", Ports: []uint32{31000}, } - docker = DockerContainer{Image: "repo/image:tag"} + docker = DockerContainer{ + Image: "repo/image:tag", + PortMappings: []PortMappings{ + {Labels: labels}, + }, + } container = Container{Docker: docker} app = App{ ID: "test-service", @@ -88,6 +93,9 @@ func marathonTestAppList(labels map[string]string, runningTasks int) *AppList { RunningTasks: runningTasks, Labels: labels, Container: container, + PortDefinitions: []PortDefinitions{ + {Labels: make(map[string]string)}, + }, } ) return &AppList{ @@ -119,6 +127,12 @@ func TestMarathonSDSendGroup(t *testing.T) { if tgt[model.AddressLabel] != "mesos-slave1:31000" { t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } default: t.Fatal("Did not get a target group.") } @@ -189,6 +203,83 @@ func TestMarathonSDRunAndStop(t *testing.T) { } } +func marathonTestAppListWithMutiplePorts(labels map[string]string, runningTasks int) *AppList { + var ( + task = Task{ + ID: "test-task-1", + Host: "mesos-slave1", + Ports: []uint32{31000, 32000}, + } + docker = DockerContainer{ + Image: "repo/image:tag", + PortMappings: []PortMappings{ + {Labels: labels}, + {Labels: make(map[string]string)}, + }, + } + container = Container{Docker: docker} + app = App{ + ID: "test-service", + Tasks: []Task{task}, + RunningTasks: runningTasks, + Labels: labels, + Container: container, + PortDefinitions: []PortDefinitions{ + {Labels: make(map[string]string)}, + {Labels: labels}, + }, + } + ) + return &AppList{ + Apps: []App{app}, + } +} + +func TestMarathonSDSendGroupWithMutiplePort(t *testing.T) { + var ( + ch = make(chan []*config.TargetGroup, 1) + client = func(client *http.Client, url, token string) (*AppList, error) { + return marathonTestAppListWithMutiplePorts(marathonValidLabel, 1), nil + } + ) + if err := testUpdateServices(client, ch); err != nil { + t.Fatalf("Got error: %s", err) + } + select { + case tgs := <-ch: + tg := tgs[0] + + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:32000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) + } + default: + t.Fatal("Did not get a target group.") + } +} + func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *AppList { var ( task = Task{ @@ -235,3 +326,149 @@ func TestMarathonZeroTaskPorts(t *testing.T) { t.Fatal("Did not get a target group.") } } + +func marathonTestAppListWithoutPortMappings(labels map[string]string, runningTasks int) *AppList { + var ( + task = Task{ + ID: "test-task-1", + Host: "mesos-slave1", + Ports: []uint32{31000, 32000}, + } + docker = DockerContainer{ + Image: "repo/image:tag", + } + container = Container{Docker: docker} + app = App{ + ID: "test-service", + Tasks: []Task{task}, + RunningTasks: runningTasks, + Labels: labels, + Container: container, + PortDefinitions: []PortDefinitions{ + {Labels: make(map[string]string)}, + {Labels: labels}, + }, + } + ) + return &AppList{ + Apps: []App{app}, + } +} + +func TestMarathonSDSendGroupWithoutPortMappings(t *testing.T) { + var ( + ch = make(chan []*config.TargetGroup, 1) + client = func(client *http.Client, url, token string) (*AppList, error) { + return marathonTestAppListWithoutPortMappings(marathonValidLabel, 1), nil + } + ) + if err := testUpdateServices(client, ch); err != nil { + t.Fatalf("Got error: %s", err) + } + select { + case tgs := <-ch: + tg := tgs[0] + + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:32000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) + } + default: + t.Fatal("Did not get a target group.") + } +} + +func marathonTestAppListWithoutPortDefinitions(labels map[string]string, runningTasks int) *AppList { + var ( + task = Task{ + ID: "test-task-1", + Host: "mesos-slave1", + Ports: []uint32{31000, 32000}, + } + docker = DockerContainer{ + Image: "repo/image:tag", + PortMappings: []PortMappings{ + {Labels: labels}, + {Labels: make(map[string]string)}, + }, + } + container = Container{Docker: docker} + app = App{ + ID: "test-service", + Tasks: []Task{task}, + RunningTasks: runningTasks, + Labels: labels, + Container: container, + } + ) + return &AppList{ + Apps: []App{app}, + } +} + +func TestMarathonSDSendGroupWithoutPortDefinitions(t *testing.T) { + var ( + ch = make(chan []*config.TargetGroup, 1) + client = func(client *http.Client, url, token string) (*AppList, error) { + return marathonTestAppListWithoutPortDefinitions(marathonValidLabel, 1), nil + } + ) + if err := testUpdateServices(client, ch); err != nil { + t.Fatalf("Got error: %s", err) + } + select { + case tgs := <-ch: + tg := tgs[0] + + if tg.Source != "test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 2 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[model.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "yes" { + t.Fatalf("Wrong first portMappings label from the first port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong first portDefinitions label from the first port: %s", tgt[model.AddressLabel]) + } + tgt = tg.Targets[1] + if tgt[model.AddressLabel] != "mesos-slave1:32000" { + t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portMappingLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portMappings label from the second port: %s", tgt[model.AddressLabel]) + } + if tgt[model.LabelName(portDefinitionLabelPrefix+"prometheus")] != "" { + t.Fatalf("Wrong portDefinitions label from the second port: %s", tgt[model.AddressLabel]) + } + default: + t.Fatal("Did not get a target group.") + } +} diff --git a/relabel/relabel.go b/relabel/relabel.go index be9fa6d82..49e134c13 100644 --- a/relabel/relabel.go +++ b/relabel/relabel.go @@ -87,13 +87,13 @@ func relabel(labels model.LabelSet, cfg *config.RelabelConfig) model.LabelSet { } labels = out case config.RelabelLabelDrop: - for ln, _ := range labels { + for ln := range labels { if cfg.Regex.MatchString(string(ln)) { delete(labels, ln) } } case config.RelabelLabelKeep: - for ln, _ := range labels { + for ln := range labels { if !cfg.Regex.MatchString(string(ln)) { delete(labels, ln) } diff --git a/storage/remote/ewma.go b/storage/remote/ewma.go new file mode 100644 index 000000000..d974bc3bb --- /dev/null +++ b/storage/remote/ewma.go @@ -0,0 +1,66 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "sync" + "sync/atomic" + "time" +) + +// ewmaRate tracks an exponentially weighted moving average of a per-second rate. +type ewmaRate struct { + newEvents int64 + alpha float64 + interval time.Duration + lastRate float64 + init bool + mutex sync.Mutex +} + +func newEWMARate(alpha float64, interval time.Duration) ewmaRate { + return ewmaRate{ + alpha: alpha, + interval: interval, + } +} + +// rate returns the per-second rate. +func (r *ewmaRate) rate() float64 { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.lastRate +} + +// tick assumes to be called every r.interval. +func (r *ewmaRate) tick() { + newEvents := atomic.LoadInt64(&r.newEvents) + atomic.AddInt64(&r.newEvents, -newEvents) + instantRate := float64(newEvents) / r.interval.Seconds() + + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.init { + r.lastRate += r.alpha * (instantRate - r.lastRate) + } else { + r.init = true + r.lastRate = instantRate + } +} + +// inc counts one event. +func (r *ewmaRate) incr(incr int64) { + atomic.AddInt64(&r.newEvents, incr) +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a9c6eb460..3718561aa 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -14,6 +14,7 @@ package remote import ( + "math" "sync" "time" @@ -32,13 +33,28 @@ const ( subsystem = "remote_storage" queue = "queue" - defaultShards = 10 + // With a maximum of 1000 shards, assuming an average of 100ms remote write + // time and 100 samples per batch, we will be able to push 1M samples/s. + defaultMaxShards = 1000 defaultMaxSamplesPerSend = 100 - // The queue capacity is per shard. - defaultQueueCapacity = 100 * 1024 / defaultShards + + // defaultQueueCapacity is per shard - at 1000 shards, this will buffer + // 100M samples. It is configured to buffer 1000 batches, which at 100ms + // per batch is 1:40mins. + defaultQueueCapacity = defaultMaxSamplesPerSend * 1000 defaultBatchSendDeadline = 5 * time.Second - logRateLimit = 0.1 // Limit to 1 log event every 10s - logBurst = 10 + + // We track samples in/out and how long pushes take using an Exponentially + // Weighted Moving Average. + ewmaWeight = 0.2 + shardUpdateDuration = 10 * time.Second + + // Allow 30% too many shards before scaling down. + shardToleranceFraction = 0.3 + + // Limit to 1 log event every 10s + logRateLimit = 0.1 + logBurst = 10 ) var ( @@ -97,6 +113,15 @@ var ( }, []string{queue}, ) + numShards = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards", + Help: "The number of shards used for parallel sending to the remote storage.", + }, + []string{queue}, + ) ) func init() { @@ -106,6 +131,7 @@ func init() { prometheus.MustRegister(sentBatchDuration) prometheus.MustRegister(queueLength) prometheus.MustRegister(queueCapacity) + prometheus.MustRegister(numShards) } // StorageClient defines an interface for sending a batch of samples to an @@ -120,7 +146,7 @@ type StorageClient interface { // QueueManagerConfig configures a storage queue. type QueueManagerConfig struct { QueueCapacity int // Number of samples to buffer per shard before we start dropping them. - Shards int // Number of shards, i.e. amount of concurrency. + MaxShards int // Max number of shards, i.e. amount of concurrency. MaxSamplesPerSend int // Maximum number of samples per send. BatchSendDeadline time.Duration // Maximum time sample will wait in buffer. ExternalLabels model.LabelSet @@ -132,11 +158,18 @@ type QueueManagerConfig struct { // indicated by the provided StorageClient. type QueueManager struct { cfg QueueManagerConfig - shards []chan *model.Sample - wg sync.WaitGroup - done chan struct{} queueName string logLimiter *rate.Limiter + + shardsMtx sync.Mutex + shards *shards + numShards int + reshardChan chan int + quit chan struct{} + wg sync.WaitGroup + + samplesIn, samplesOut, samplesOutDuration ewmaRate + integralAccumulator float64 } // NewQueueManager builds a new QueueManager. @@ -144,8 +177,8 @@ func NewQueueManager(cfg QueueManagerConfig) *QueueManager { if cfg.QueueCapacity == 0 { cfg.QueueCapacity = defaultQueueCapacity } - if cfg.Shards == 0 { - cfg.Shards = defaultShards + if cfg.MaxShards == 0 { + cfg.MaxShards = defaultMaxShards } if cfg.MaxSamplesPerSend == 0 { cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend @@ -154,21 +187,22 @@ func NewQueueManager(cfg QueueManagerConfig) *QueueManager { cfg.BatchSendDeadline = defaultBatchSendDeadline } - shards := make([]chan *model.Sample, cfg.Shards) - for i := 0; i < cfg.Shards; i++ { - shards[i] = make(chan *model.Sample, cfg.QueueCapacity) - } - t := &QueueManager{ - cfg: cfg, - shards: shards, - done: make(chan struct{}), - queueName: cfg.Client.Name(), - logLimiter: rate.NewLimiter(logRateLimit, logBurst), - } + cfg: cfg, + queueName: cfg.Client.Name(), + logLimiter: rate.NewLimiter(logRateLimit, logBurst), + numShards: 1, + reshardChan: make(chan int), + quit: make(chan struct{}), + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), + } + t.shards = t.newShards(t.numShards) + numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) - t.wg.Add(cfg.Shards) + return t } @@ -193,13 +227,13 @@ func (t *QueueManager) Append(s *model.Sample) error { return nil } - fp := snew.Metric.FastFingerprint() - shard := uint64(fp) % uint64(t.cfg.Shards) + t.shardsMtx.Lock() + enqueued := t.shards.enqueue(&snew) + t.shardsMtx.Unlock() - select { - case t.shards[shard] <- &snew: + if enqueued { queueLength.WithLabelValues(t.queueName).Inc() - default: + } else { droppedSamplesTotal.WithLabelValues(t.queueName).Inc() if t.logLimiter.Allow() { log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.") @@ -218,25 +252,186 @@ func (*QueueManager) NeedsThrottling() bool { // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { - for i := 0; i < t.cfg.Shards; i++ { - go t.runShard(i) - } + t.wg.Add(2) + go t.updateShardsLoop() + go t.reshardLoop() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.start() } // Stop stops sending samples to the remote storage and waits for pending // sends to complete. func (t *QueueManager) Stop() { log.Infof("Stopping remote storage...") - for _, shard := range t.shards { - close(shard) - } + close(t.quit) t.wg.Wait() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.stop() log.Info("Remote storage stopped.") } -func (t *QueueManager) runShard(i int) { +func (t *QueueManager) updateShardsLoop() { defer t.wg.Done() - shard := t.shards[i] + + ticker := time.Tick(shardUpdateDuration) + for { + select { + case <-ticker: + t.calculateDesiredShards() + case <-t.quit: + return + } + } +} + +func (t *QueueManager) calculateDesiredShards() { + t.samplesIn.tick() + t.samplesOut.tick() + t.samplesOutDuration.tick() + + // We use the number of incoming samples as a prediction of how much work we + // will need to do next iteration. We add to this any pending samples + // (received - send) so we can catch up with any backlog. We use the average + // outgoing batch latency to work out how many shards we need. + var ( + samplesIn = t.samplesIn.rate() + samplesOut = t.samplesOut.rate() + samplesPending = samplesIn - samplesOut + samplesOutDuration = t.samplesOutDuration.rate() + ) + + // We use an integral accumulator, like in a PID, to help dampen oscillation. + t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1) + + if samplesOut <= 0 { + return + } + + var ( + timePerSample = samplesOutDuration / samplesOut + desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second) + ) + log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f", + samplesIn, samplesOut, samplesPending, desiredShards) + + // Changes in the number of shards must be greater than shardToleranceFraction. + var ( + lowerBound = float64(t.numShards) * (1. - shardToleranceFraction) + upperBound = float64(t.numShards) * (1. + shardToleranceFraction) + ) + log.Debugf("QueueManager.updateShardsLoop %f <= %f <= %f", lowerBound, desiredShards, upperBound) + if lowerBound <= desiredShards && desiredShards <= upperBound { + return + } + + numShards := int(math.Ceil(desiredShards)) + if numShards > t.cfg.MaxShards { + numShards = t.cfg.MaxShards + } + if numShards == t.numShards { + return + } + + // Resharding can take some time, and we want this loop + // to stay close to shardUpdateDuration. + select { + case t.reshardChan <- numShards: + log.Infof("Remote storage resharding from %d to %d shards.", t.numShards, numShards) + t.numShards = numShards + default: + log.Infof("Currently resharding, skipping.") + } +} + +func (t *QueueManager) reshardLoop() { + defer t.wg.Done() + + for { + select { + case numShards := <-t.reshardChan: + t.reshard(numShards) + case <-t.quit: + return + } + } +} + +func (t *QueueManager) reshard(n int) { + numShards.WithLabelValues(t.queueName).Set(float64(n)) + + t.shardsMtx.Lock() + newShards := t.newShards(n) + oldShards := t.shards + t.shards = newShards + t.shardsMtx.Unlock() + + oldShards.stop() + + // We start the newShards after we have stopped (the therefore completely + // flushed) the oldShards, to guarantee we only every deliver samples in + // order. + newShards.start() +} + +type shards struct { + qm *QueueManager + queues []chan *model.Sample + done chan struct{} + wg sync.WaitGroup +} + +func (t *QueueManager) newShards(numShards int) *shards { + queues := make([]chan *model.Sample, numShards) + for i := 0; i < numShards; i++ { + queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) + } + s := &shards{ + qm: t, + queues: queues, + done: make(chan struct{}), + } + s.wg.Add(numShards) + return s +} + +func (s *shards) len() int { + return len(s.queues) +} + +func (s *shards) start() { + for i := 0; i < len(s.queues); i++ { + go s.runShard(i) + } +} + +func (s *shards) stop() { + for _, shard := range s.queues { + close(shard) + } + s.wg.Wait() +} + +func (s *shards) enqueue(sample *model.Sample) bool { + s.qm.samplesIn.incr(1) + + fp := sample.Metric.FastFingerprint() + shard := uint64(fp) % uint64(len(s.queues)) + + select { + case s.queues[shard] <- sample: + return true + default: + return false + } +} + +func (s *shards) runShard(i int) { + defer s.wg.Done() + queue := s.queues[i] // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline @@ -245,45 +440,48 @@ func (t *QueueManager) runShard(i int) { for { select { - case s, ok := <-shard: + case sample, ok := <-queue: if !ok { if len(pendingSamples) > 0 { - log.Infof("Flushing %d samples to remote storage...", len(pendingSamples)) - t.sendSamples(pendingSamples) - log.Infof("Done flushing.") + log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples)) + s.sendSamples(pendingSamples) + log.Debugf("Done flushing.") } return } - queueLength.WithLabelValues(t.queueName).Dec() - pendingSamples = append(pendingSamples, s) + queueLength.WithLabelValues(s.qm.queueName).Dec() + pendingSamples = append(pendingSamples, sample) - for len(pendingSamples) >= t.cfg.MaxSamplesPerSend { - t.sendSamples(pendingSamples[:t.cfg.MaxSamplesPerSend]) - pendingSamples = pendingSamples[t.cfg.MaxSamplesPerSend:] + for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { + s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) + pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] } - case <-time.After(t.cfg.BatchSendDeadline): + case <-time.After(s.qm.cfg.BatchSendDeadline): if len(pendingSamples) > 0 { - t.sendSamples(pendingSamples) + s.sendSamples(pendingSamples) pendingSamples = pendingSamples[:0] } } } } -func (t *QueueManager) sendSamples(s model.Samples) { +func (s *shards) sendSamples(samples model.Samples) { // Samples are sent to the remote storage on a best-effort basis. If a // sample isn't sent correctly the first time, it's simply dropped on the // floor. begin := time.Now() - err := t.cfg.Client.Store(s) - duration := time.Since(begin).Seconds() + err := s.qm.cfg.Client.Store(samples) + duration := time.Since(begin) if err != nil { - log.Warnf("error sending %d samples to remote storage: %s", len(s), err) - failedSamplesTotal.WithLabelValues(t.queueName).Add(float64(len(s))) + log.Warnf("error sending %d samples to remote storage: %s", len(samples), err) + failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) } else { - sentSamplesTotal.WithLabelValues(t.queueName).Add(float64(len(s))) + sentSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) } - sentBatchDuration.WithLabelValues(t.queueName).Observe(duration) + sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(duration.Seconds()) + + s.qm.samplesOut.incr(int64(len(samples))) + s.qm.samplesOutDuration.incr(int64(duration)) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c843707ea..bfe4b69e1 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -98,8 +98,8 @@ func TestSampleDelivery(t *testing.T) { c.expectSamples(samples[:len(samples)/2]) m := NewQueueManager(QueueManagerConfig{ - Client: c, - Shards: 1, + Client: c, + MaxShards: 1, }) // These should be received by the client. @@ -185,8 +185,10 @@ func (c *TestBlockingStorageClient) Name() string { } func (t *QueueManager) queueLen() int { + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() queueLength := 0 - for _, shard := range t.shards { + for _, shard := range t.shards.queues { queueLength += len(shard) } return queueLength @@ -197,7 +199,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // `MaxSamplesPerSend*Shards` samples should be consumed by the // per-shard goroutines, and then another `MaxSamplesPerSend` // should be left on the queue. - n := defaultMaxSamplesPerSend*defaultShards + defaultMaxSamplesPerSend + n := defaultMaxSamplesPerSend*1 + defaultMaxSamplesPerSend samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -214,6 +216,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { m := NewQueueManager(QueueManagerConfig{ Client: c, QueueCapacity: n, + MaxShards: 1, }) m.Start() @@ -250,7 +253,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } numCalls := c.NumCalls() - if numCalls != uint64(defaultShards) { - t.Errorf("Saw %d concurrent sends, expected %d", numCalls, defaultShards) + if numCalls != uint64(1) { + t.Errorf("Saw %d concurrent sends, expected 1", numCalls) } } diff --git a/web/ui/bindata.go b/web/ui/bindata.go index f08b85bd0..697e005db 100644 --- a/web/ui/bindata.go +++ b/web/ui/bindata.go @@ -261,7 +261,7 @@ func webUiTemplatesTargetsHtml() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "web/ui/templates/targets.html", size: 2275, mode: os.FileMode(420), modTime: time.Unix(1489763290, 0)} + info := bindataFileInfo{name: "web/ui/templates/targets.html", size: 2275, mode: os.FileMode(420), modTime: time.Unix(1490015258, 0)} a := &asset{bytes: bytes, info: info} return a, nil } diff --git a/web/web.go b/web/web.go index 4b0828c0e..e54e33f81 100644 --- a/web/web.go +++ b/web/web.go @@ -425,8 +425,8 @@ func tmplFuncs(consolesPath string, opts *Options) template_text.FuncMap { }, "consolesPath": func() string { return consolesPath }, "pathPrefix": func() string { return opts.ExternalURL.Path }, - "buildVersion": func() string { return opts.Version.Revision[:7] }, - "stripLabels": func(lset model.LabelSet, labels ...model.LabelName) model.LabelSet { + "buildVersion": func() string { return opts.Version.Revision }, + "stripLabels": func(lset map[string]string, labels ...string) map[string]string { for _, ln := range labels { delete(lset, ln) }