mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
227 lines
7.1 KiB
227 lines
7.1 KiB
4 years ago
|
// Copyright 2021 The Prometheus Authors
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package xds
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"net/url"
|
||
|
"time"
|
||
|
|
||
|
"github.com/go-kit/log"
|
||
|
"github.com/go-kit/log/level"
|
||
|
"github.com/pkg/errors"
|
||
|
"github.com/prometheus/client_golang/prometheus"
|
||
|
"github.com/prometheus/common/config"
|
||
|
"github.com/prometheus/common/model"
|
||
|
"google.golang.org/protobuf/types/known/anypb"
|
||
|
|
||
|
"github.com/prometheus/prometheus/discovery"
|
||
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||
|
"github.com/prometheus/prometheus/util/osutil"
|
||
|
"github.com/prometheus/prometheus/util/strutil"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// DefaultKumaSDConfig is the default Kuma MADS SD configuration.
|
||
|
DefaultKumaSDConfig = KumaSDConfig{
|
||
|
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||
|
RefreshInterval: model.Duration(15 * time.Second),
|
||
|
FetchTimeout: model.Duration(2 * time.Minute),
|
||
|
}
|
||
|
|
||
|
kumaFetchFailuresCount = prometheus.NewCounter(
|
||
|
prometheus.CounterOpts{
|
||
|
Namespace: namespace,
|
||
|
Name: "sd_kuma_fetch_failures_total",
|
||
|
Help: "The number of Kuma MADS fetch call failures.",
|
||
|
})
|
||
|
kumaFetchSkipUpdateCount = prometheus.NewCounter(
|
||
|
prometheus.CounterOpts{
|
||
|
Namespace: namespace,
|
||
|
Name: "sd_kuma_fetch_skipped_updates_total",
|
||
|
Help: "The number of Kuma MADS fetch calls that result in no updates to the targets.",
|
||
|
})
|
||
|
kumaFetchDuration = prometheus.NewSummary(
|
||
|
prometheus.SummaryOpts{
|
||
|
Namespace: namespace,
|
||
|
Name: "sd_kuma_fetch_duration_seconds",
|
||
|
Help: "The duration of a Kuma MADS fetch call.",
|
||
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||
|
},
|
||
|
)
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// kumaMetaLabelPrefix is the meta prefix used for all kuma meta labels.
|
||
|
kumaMetaLabelPrefix = model.MetaLabelPrefix + "kuma_"
|
||
|
|
||
|
// kumaMeshLabel is the name of the label that holds the mesh name.
|
||
|
kumaMeshLabel = kumaMetaLabelPrefix + "mesh"
|
||
|
// kumaServiceLabel is the name of the label that holds the service name.
|
||
|
kumaServiceLabel = kumaMetaLabelPrefix + "service"
|
||
|
// kumaDataplaneLabel is the name of the label that holds the dataplane name.
|
||
|
kumaDataplaneLabel = kumaMetaLabelPrefix + "dataplane"
|
||
|
// kumaUserLabelPrefix is the name of the label that namespaces all user-defined labels.
|
||
|
kumaUserLabelPrefix = kumaMetaLabelPrefix + "label_"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
KumaMadsV1ResourceTypeURL = "type.googleapis.com/kuma.observability.v1.MonitoringAssignment"
|
||
|
KumaMadsV1ResourceType = "monitoringassignments"
|
||
|
)
|
||
|
|
||
|
type KumaSDConfig = SDConfig
|
||
|
|
||
|
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||
|
func (c *KumaSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||
|
*c = DefaultKumaSDConfig
|
||
|
type plainKumaConf KumaSDConfig
|
||
|
err := unmarshal((*plainKumaConf)(c))
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if len(c.Server) == 0 {
|
||
|
return errors.Errorf("kuma SD server must not be empty: %s", c.Server)
|
||
|
}
|
||
|
parsedURL, err := url.Parse(c.Server)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if len(parsedURL.Scheme) == 0 || len(parsedURL.Host) == 0 {
|
||
|
return errors.Errorf("kuma SD server must not be empty and have a scheme: %s", c.Server)
|
||
|
}
|
||
|
|
||
|
if err := c.HTTPClientConfig.Validate(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *KumaSDConfig) Name() string {
|
||
|
return "kuma"
|
||
|
}
|
||
|
|
||
|
// SetDirectory joins any relative file paths with dir.
|
||
|
func (c *KumaSDConfig) SetDirectory(dir string) {
|
||
|
c.HTTPClientConfig.SetDirectory(dir)
|
||
|
}
|
||
|
|
||
|
func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
|
||
|
logger := opts.Logger
|
||
|
if logger == nil {
|
||
|
logger = log.NewNopLogger()
|
||
|
}
|
||
|
|
||
|
return NewKumaHTTPDiscovery(c, logger)
|
||
|
}
|
||
|
|
||
|
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) *targetgroup.Group {
|
||
|
commonLabels := convertKumaUserLabels(assignment.Labels)
|
||
|
|
||
|
commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh)
|
||
|
commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service)
|
||
|
|
||
|
var targetLabelSets []model.LabelSet
|
||
|
|
||
|
for _, target := range assignment.Targets {
|
||
|
targetLabels := convertKumaUserLabels(target.Labels)
|
||
|
|
||
|
targetLabels[kumaDataplaneLabel] = model.LabelValue(target.Name)
|
||
|
targetLabels[model.InstanceLabel] = model.LabelValue(target.Name)
|
||
|
targetLabels[model.AddressLabel] = model.LabelValue(target.Address)
|
||
|
targetLabels[model.SchemeLabel] = model.LabelValue(target.Scheme)
|
||
|
targetLabels[model.MetricsPathLabel] = model.LabelValue(target.MetricsPath)
|
||
|
|
||
|
targetLabelSets = append(targetLabelSets, targetLabels)
|
||
|
}
|
||
|
|
||
|
return &targetgroup.Group{
|
||
|
Labels: commonLabels,
|
||
|
Targets: targetLabelSets,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func convertKumaUserLabels(labels map[string]string) model.LabelSet {
|
||
|
labelSet := model.LabelSet{}
|
||
|
for key, value := range labels {
|
||
|
name := kumaUserLabelPrefix + strutil.SanitizeLabelName(key)
|
||
|
labelSet[model.LabelName(name)] = model.LabelValue(value)
|
||
|
}
|
||
|
return labelSet
|
||
|
}
|
||
|
|
||
|
// kumaMadsV1ResourceParser is an xds.resourceParser.
|
||
|
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*targetgroup.Group, error) {
|
||
|
if typeURL != KumaMadsV1ResourceTypeURL {
|
||
|
return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL)
|
||
|
}
|
||
|
|
||
|
var groups []*targetgroup.Group
|
||
|
|
||
|
for _, resource := range resources {
|
||
|
assignment := &MonitoringAssignment{}
|
||
|
|
||
|
if err := anypb.UnmarshalTo(resource, assignment, protoUnmarshalOptions); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
groups = append(groups, convertKumaV1MonitoringAssignment(assignment))
|
||
|
}
|
||
|
|
||
|
return groups, nil
|
||
|
}
|
||
|
|
||
|
func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) {
|
||
|
// Default to "prometheus" if hostname is unavailable.
|
||
|
clientID, err := osutil.GetFQDN()
|
||
|
if err != nil {
|
||
|
level.Debug(logger).Log("msg", "error getting FQDN", "err", err)
|
||
|
clientID = "prometheus"
|
||
|
}
|
||
|
|
||
|
clientConfig := &HTTPResourceClientConfig{
|
||
|
HTTPClientConfig: conf.HTTPClientConfig,
|
||
|
ExtraQueryParams: url.Values{
|
||
|
"fetch-timeout": {conf.FetchTimeout.String()},
|
||
|
},
|
||
|
// Allow 15s of buffer over the timeout sent to the xDS server for connection overhead.
|
||
|
Timeout: time.Duration(conf.FetchTimeout) + (15 * time.Second),
|
||
|
ResourceType: KumaMadsV1ResourceType,
|
||
|
ResourceTypeURL: KumaMadsV1ResourceTypeURL,
|
||
|
Server: conf.Server,
|
||
|
ClientID: clientID,
|
||
|
}
|
||
|
|
||
|
client, err := NewHTTPResourceClient(clientConfig, ProtocolV3)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("kuma_sd: %w", err)
|
||
|
}
|
||
|
|
||
|
d := &fetchDiscovery{
|
||
|
client: client,
|
||
|
logger: logger,
|
||
|
refreshInterval: time.Duration(conf.RefreshInterval),
|
||
|
source: "kuma",
|
||
|
parseResources: kumaMadsV1ResourceParser,
|
||
|
fetchFailuresCount: kumaFetchFailuresCount,
|
||
|
fetchSkipUpdateCount: kumaFetchSkipUpdateCount,
|
||
|
fetchDuration: kumaFetchDuration,
|
||
|
}
|
||
|
|
||
|
return d, nil
|
||
|
}
|