mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
218 lines
6.9 KiB
218 lines
6.9 KiB
// Copyright 2021 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package xds |
|
|
|
import ( |
|
"fmt" |
|
"net/url" |
|
"time" |
|
|
|
"github.com/go-kit/log" |
|
"github.com/go-kit/log/level" |
|
"github.com/pkg/errors" |
|
"github.com/prometheus/client_golang/prometheus" |
|
"github.com/prometheus/common/config" |
|
"github.com/prometheus/common/model" |
|
"google.golang.org/protobuf/types/known/anypb" |
|
|
|
"github.com/prometheus/prometheus/discovery" |
|
"github.com/prometheus/prometheus/util/osutil" |
|
"github.com/prometheus/prometheus/util/strutil" |
|
) |
|
|
|
var ( |
|
// DefaultKumaSDConfig is the default Kuma MADS SD configuration. |
|
DefaultKumaSDConfig = KumaSDConfig{ |
|
HTTPClientConfig: config.DefaultHTTPClientConfig, |
|
RefreshInterval: model.Duration(15 * time.Second), |
|
FetchTimeout: model.Duration(2 * time.Minute), |
|
} |
|
|
|
kumaFetchFailuresCount = prometheus.NewCounter( |
|
prometheus.CounterOpts{ |
|
Namespace: namespace, |
|
Name: "sd_kuma_fetch_failures_total", |
|
Help: "The number of Kuma MADS fetch call failures.", |
|
}) |
|
kumaFetchSkipUpdateCount = prometheus.NewCounter( |
|
prometheus.CounterOpts{ |
|
Namespace: namespace, |
|
Name: "sd_kuma_fetch_skipped_updates_total", |
|
Help: "The number of Kuma MADS fetch calls that result in no updates to the targets.", |
|
}) |
|
kumaFetchDuration = prometheus.NewSummary( |
|
prometheus.SummaryOpts{ |
|
Namespace: namespace, |
|
Name: "sd_kuma_fetch_duration_seconds", |
|
Help: "The duration of a Kuma MADS fetch call.", |
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, |
|
}, |
|
) |
|
) |
|
|
|
const ( |
|
// kumaMetaLabelPrefix is the meta prefix used for all kuma meta labels. |
|
kumaMetaLabelPrefix = model.MetaLabelPrefix + "kuma_" |
|
|
|
// kumaMeshLabel is the name of the label that holds the mesh name. |
|
kumaMeshLabel = kumaMetaLabelPrefix + "mesh" |
|
// kumaServiceLabel is the name of the label that holds the service name. |
|
kumaServiceLabel = kumaMetaLabelPrefix + "service" |
|
// kumaDataplaneLabel is the name of the label that holds the dataplane name. |
|
kumaDataplaneLabel = kumaMetaLabelPrefix + "dataplane" |
|
// kumaUserLabelPrefix is the name of the label that namespaces all user-defined labels. |
|
kumaUserLabelPrefix = kumaMetaLabelPrefix + "label_" |
|
) |
|
|
|
const ( |
|
KumaMadsV1ResourceTypeURL = "type.googleapis.com/kuma.observability.v1.MonitoringAssignment" |
|
KumaMadsV1ResourceType = "monitoringassignments" |
|
) |
|
|
|
type KumaSDConfig = SDConfig |
|
|
|
// UnmarshalYAML implements the yaml.Unmarshaler interface. |
|
func (c *KumaSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { |
|
*c = DefaultKumaSDConfig |
|
type plainKumaConf KumaSDConfig |
|
err := unmarshal((*plainKumaConf)(c)) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
if len(c.Server) == 0 { |
|
return errors.Errorf("kuma SD server must not be empty: %s", c.Server) |
|
} |
|
parsedURL, err := url.Parse(c.Server) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
if len(parsedURL.Scheme) == 0 || len(parsedURL.Host) == 0 { |
|
return errors.Errorf("kuma SD server must not be empty and have a scheme: %s", c.Server) |
|
} |
|
|
|
return c.HTTPClientConfig.Validate() |
|
} |
|
|
|
func (c *KumaSDConfig) Name() string { |
|
return "kuma" |
|
} |
|
|
|
// SetDirectory joins any relative file paths with dir. |
|
func (c *KumaSDConfig) SetDirectory(dir string) { |
|
c.HTTPClientConfig.SetDirectory(dir) |
|
} |
|
|
|
func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { |
|
logger := opts.Logger |
|
if logger == nil { |
|
logger = log.NewNopLogger() |
|
} |
|
|
|
return NewKumaHTTPDiscovery(c, logger) |
|
} |
|
|
|
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) []model.LabelSet { |
|
commonLabels := convertKumaUserLabels(assignment.Labels) |
|
|
|
commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh) |
|
commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service) |
|
|
|
var targets []model.LabelSet |
|
|
|
for _, madsTarget := range assignment.Targets { |
|
targetLabels := convertKumaUserLabels(madsTarget.Labels).Merge(commonLabels) |
|
|
|
targetLabels[kumaDataplaneLabel] = model.LabelValue(madsTarget.Name) |
|
targetLabels[model.AddressLabel] = model.LabelValue(madsTarget.Address) |
|
targetLabels[model.InstanceLabel] = model.LabelValue(madsTarget.Name) |
|
targetLabels[model.SchemeLabel] = model.LabelValue(madsTarget.Scheme) |
|
targetLabels[model.MetricsPathLabel] = model.LabelValue(madsTarget.MetricsPath) |
|
|
|
targets = append(targets, targetLabels) |
|
} |
|
|
|
return targets |
|
} |
|
|
|
func convertKumaUserLabels(labels map[string]string) model.LabelSet { |
|
labelSet := model.LabelSet{} |
|
for key, value := range labels { |
|
name := kumaUserLabelPrefix + strutil.SanitizeLabelName(key) |
|
labelSet[model.LabelName(name)] = model.LabelValue(value) |
|
} |
|
return labelSet |
|
} |
|
|
|
// kumaMadsV1ResourceParser is an xds.resourceParser. |
|
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]model.LabelSet, error) { |
|
if typeURL != KumaMadsV1ResourceTypeURL { |
|
return nil, errors.Errorf("received invalid typeURL for Kuma MADS v1 Resource: %s", typeURL) |
|
} |
|
|
|
var targets []model.LabelSet |
|
|
|
for _, resource := range resources { |
|
assignment := &MonitoringAssignment{} |
|
|
|
if err := anypb.UnmarshalTo(resource, assignment, protoUnmarshalOptions); err != nil { |
|
return nil, err |
|
} |
|
|
|
targets = append(targets, convertKumaV1MonitoringAssignment(assignment)...) |
|
} |
|
|
|
return targets, nil |
|
} |
|
|
|
func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) { |
|
// Default to "prometheus" if hostname is unavailable. |
|
clientID, err := osutil.GetFQDN() |
|
if err != nil { |
|
level.Debug(logger).Log("msg", "error getting FQDN", "err", err) |
|
clientID = "prometheus" |
|
} |
|
|
|
clientConfig := &HTTPResourceClientConfig{ |
|
HTTPClientConfig: conf.HTTPClientConfig, |
|
ExtraQueryParams: url.Values{ |
|
"fetch-timeout": {conf.FetchTimeout.String()}, |
|
}, |
|
// Allow 15s of buffer over the timeout sent to the xDS server for connection overhead. |
|
Timeout: time.Duration(conf.FetchTimeout) + (15 * time.Second), |
|
ResourceType: KumaMadsV1ResourceType, |
|
ResourceTypeURL: KumaMadsV1ResourceTypeURL, |
|
Server: conf.Server, |
|
ClientID: clientID, |
|
} |
|
|
|
client, err := NewHTTPResourceClient(clientConfig, ProtocolV3) |
|
if err != nil { |
|
return nil, fmt.Errorf("kuma_sd: %w", err) |
|
} |
|
|
|
d := &fetchDiscovery{ |
|
client: client, |
|
logger: logger, |
|
refreshInterval: time.Duration(conf.RefreshInterval), |
|
source: "kuma", |
|
parseResources: kumaMadsV1ResourceParser, |
|
fetchFailuresCount: kumaFetchFailuresCount, |
|
fetchSkipUpdateCount: kumaFetchSkipUpdateCount, |
|
fetchDuration: kumaFetchDuration, |
|
} |
|
|
|
return d, nil |
|
}
|
|
|