diff --git a/config/config_test.go b/config/config_test.go index afdfcbc52..0d569c59f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -48,6 +48,7 @@ import ( "github.com/prometheus/prometheus/discovery/scaleway" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/triton" + "github.com/prometheus/prometheus/discovery/xds" "github.com/prometheus/prometheus/discovery/zookeeper" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" @@ -439,6 +440,26 @@ var expectedConf = &Config{ }, }, }, + { + JobName: "service-kuma", + + HonorTimestamps: true, + ScrapeInterval: model.Duration(15 * time.Second), + ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, + + MetricsPath: DefaultScrapeConfig.MetricsPath, + Scheme: DefaultScrapeConfig.Scheme, + HTTPClientConfig: config.DefaultHTTPClientConfig, + + ServiceDiscoveryConfigs: discovery.Configs{ + &xds.KumaSDConfig{ + Server: "http://kuma-control-plane.kuma-system.svc:5676", + HTTPClientConfig: config.DefaultHTTPClientConfig, + RefreshInterval: model.Duration(15 * time.Second), + FetchTimeout: model.Duration(2 * time.Minute), + }, + }, + }, { JobName: "service-marathon", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index a72031de3..19fe0c9b2 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -109,7 +109,6 @@ scrape_configs: - second.dns.address.domain.com - names: - first.dns.address.domain.com - # refresh_interval defaults to 30s. relabel_configs: - source_labels: [job] @@ -193,6 +192,11 @@ scrape_configs: username: "myusername" password_file: valid_password_file + - job_name: service-kuma + + kuma_sd_configs: + - server: http://kuma-control-plane.kuma-system.svc:5676 + - job_name: service-marathon marathon_sd_configs: - servers: diff --git a/config/testdata/roundtrip.good.yml b/config/testdata/roundtrip.good.yml index 51b59dd81..f2634d257 100644 --- a/config/testdata/roundtrip.good.yml +++ b/config/testdata/roundtrip.good.yml @@ -106,6 +106,9 @@ scrape_configs: username: username password_file: valid_password_file + kuma_sd_configs: + - server: http://kuma-control-plane.kuma-system.svc:5676 + marathon_sd_configs: - servers: - https://marathon.example.com:443 diff --git a/discovery/install/install.go b/discovery/install/install.go index 6258d6fc0..34ccf3d0f 100644 --- a/discovery/install/install.go +++ b/discovery/install/install.go @@ -33,5 +33,6 @@ import ( _ "github.com/prometheus/prometheus/discovery/openstack" // register openstack _ "github.com/prometheus/prometheus/discovery/scaleway" // register scaleway _ "github.com/prometheus/prometheus/discovery/triton" // register triton + _ "github.com/prometheus/prometheus/discovery/xds" // register xds _ "github.com/prometheus/prometheus/discovery/zookeeper" // register zookeeper ) diff --git a/discovery/xds/client.go b/discovery/xds/client.go new file mode 100644 index 000000000..cd8ffb017 --- /dev/null +++ b/discovery/xds/client.go @@ -0,0 +1,226 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xds + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "time" + + envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/prometheus/common/config" + "github.com/prometheus/common/version" +) + +var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) + +// ResourceClient exposes the xDS protocol for a single resource type. +// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#rest-json-polling-subscriptions . +type ResourceClient interface { + // ResourceTypeURL is the type URL of the resource. + ResourceTypeURL() string + + // Server is the xDS Management server. + Server() string + + // Fetch requests the latest view of the entire resource state. + // If no updates have been made since the last request, the response will be nil. + Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) + + // ID returns the ID of the client that is sent to the xDS server. + ID() string + + // Close releases currently held resources. + Close() +} + +type HTTPResourceClient struct { + client *http.Client + config *HTTPResourceClientConfig + + // endpoint is the fully-constructed xDS HTTP endpoint. + endpoint string + // Caching. + latestVersion string + latestNonce string +} + +type HTTPResourceClientConfig struct { + // HTTP config. + config.HTTPClientConfig + + Name string + + // ExtraQueryParams are extra query parameters to attach to the request URL. + ExtraQueryParams url.Values + + // General xDS config. + + // The timeout for a single fetch request. + Timeout time.Duration + + // Type is the xds type, e.g., clusters + // which is used in the discovery POST request. + ResourceType string + // ResourceTypeURL is the Google type url for the resource, e.g., type.googleapis.com/envoy.api.v2.Cluster. + ResourceTypeURL string + // Server is the xDS management server. + Server string + // ClientID is used to identify the client with the management server. + ClientID string +} + +func NewHTTPResourceClient(conf *HTTPResourceClientConfig, protocolVersion ProtocolVersion) (*HTTPResourceClient, error) { + if protocolVersion != ProtocolV3 { + return nil, errors.New("only the v3 protocol is supported") + } + + if len(conf.Server) == 0 { + return nil, errors.New("empty xDS server") + } + + serverURL, err := url.Parse(conf.Server) + if err != nil { + return nil, err + } + + endpointURL, err := makeXDSResourceHTTPEndpointURL(protocolVersion, serverURL, conf.ResourceType) + if err != nil { + return nil, err + } + + if conf.ExtraQueryParams != nil { + endpointURL.RawQuery = conf.ExtraQueryParams.Encode() + } + + client, err := config.NewClientFromConfig(conf.HTTPClientConfig, conf.Name, config.WithHTTP2Disabled(), config.WithIdleConnTimeout(conf.Timeout)) + if err != nil { + return nil, err + } + + client.Timeout = conf.Timeout + + return &HTTPResourceClient{ + client: client, + config: conf, + endpoint: endpointURL.String(), + latestVersion: "", + latestNonce: "", + }, nil +} + +func makeXDSResourceHTTPEndpointURL(protocolVersion ProtocolVersion, serverURL *url.URL, resourceType string) (*url.URL, error) { + if serverURL == nil { + return nil, errors.New("empty xDS server URL") + } + + if len(serverURL.Scheme) == 0 || len(serverURL.Host) == 0 { + return nil, errors.New("invalid xDS server URL") + } + + if serverURL.Scheme != "http" && serverURL.Scheme != "https" { + return nil, errors.New("invalid xDS server URL protocol. must be either 'http' or 'https'") + } + + serverURL.Path = path.Join(serverURL.Path, string(protocolVersion), fmt.Sprintf("discovery:%s", resourceType)) + + return serverURL, nil +} + +func (rc *HTTPResourceClient) Server() string { + return rc.config.Server +} + +func (rc *HTTPResourceClient) ResourceTypeURL() string { + return rc.config.ResourceTypeURL +} + +func (rc *HTTPResourceClient) ID() string { + return rc.config.ClientID +} + +func (rc *HTTPResourceClient) Close() { + rc.client.CloseIdleConnections() +} + +// Fetch requests the latest state of the resources from the xDS server and cache the version. +// Returns a nil response if the current local version is up to date. +func (rc *HTTPResourceClient) Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) { + discoveryReq := &v3.DiscoveryRequest{ + VersionInfo: rc.latestVersion, + ResponseNonce: rc.latestNonce, + TypeUrl: rc.ResourceTypeURL(), + ResourceNames: []string{}, + Node: &envoy_core.Node{ + Id: rc.ID(), + }, + } + + reqBody, err := protoJSONMarshalOptions.Marshal(discoveryReq) + if err != nil { + return nil, err + } + + request, err := http.NewRequest("POST", rc.endpoint, bytes.NewBuffer(reqBody)) + if err != nil { + return nil, err + } + request = request.WithContext(ctx) + + request.Header.Add("User-Agent", userAgent) + request.Header.Add("Content-Type", "application/json") + request.Header.Add("Accept", "application/json") + + resp, err := rc.client.Do(request) + if err != nil { + return nil, err + } + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() + + if resp.StatusCode == http.StatusNotModified { + // Empty response, already have the latest. + return nil, nil + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("non 200 status '%d' response during xDS fetch", resp.StatusCode) + } + + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + discoveryRes := &v3.DiscoveryResponse{} + if err = protoJSONUnmarshalOptions.Unmarshal(respBody, discoveryRes); err != nil { + return nil, err + } + + // Cache the latest nonce + version info. + rc.latestNonce = discoveryRes.Nonce + rc.latestVersion = discoveryRes.VersionInfo + + return discoveryRes, nil +} diff --git a/discovery/xds/client_test.go b/discovery/xds/client_test.go new file mode 100644 index 000000000..1c0e321d3 --- /dev/null +++ b/discovery/xds/client_test.go @@ -0,0 +1,161 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xds + +import ( + "context" + "errors" + "net/url" + "testing" + "time" + + v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/prometheus/common/config" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" +) + +var ( + httpResourceConf = &HTTPResourceClientConfig{ + HTTPClientConfig: config.HTTPClientConfig{ + TLSConfig: config.TLSConfig{InsecureSkipVerify: true}, + }, + ResourceType: "monitoring", + // Some known type. + ResourceTypeURL: "type.googleapis.com/envoy.service.discovery.v3.DiscoveryRequest", + Server: "http://localhost", + ClientID: "test-id", + } +) + +func urlMustParse(str string) *url.URL { + parsed, err := url.Parse(str) + + if err != nil { + panic(err) + } + + return parsed +} + +func TestMakeXDSResourceHttpEndpointEmptyServerURLScheme(t *testing.T) { + endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("127.0.0.1"), "monitoring") + + require.Empty(t, endpointURL) + require.Error(t, err) + require.Equal(t, err.Error(), "invalid xDS server URL") +} + +func TestMakeXDSResourceHttpEndpointEmptyServerURLHost(t *testing.T) { + endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("grpc://127.0.0.1"), "monitoring") + + require.Empty(t, endpointURL) + require.NotNil(t, err) + require.Contains(t, err.Error(), "must be either 'http' or 'https'") +} + +func TestMakeXDSResourceHttpEndpoint(t *testing.T) { + endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("http://127.0.0.1:5000"), "monitoring") + + require.NoError(t, err) + require.Equal(t, endpointURL.String(), "http://127.0.0.1:5000/v3/discovery:monitoring") +} + +func TestCreateNewHTTPResourceClient(t *testing.T) { + c := &HTTPResourceClientConfig{ + HTTPClientConfig: sdConf.HTTPClientConfig, + Name: "test", + ExtraQueryParams: url.Values{ + "param1": {"v1"}, + }, + Timeout: 1 * time.Minute, + ResourceType: "monitoring", + ResourceTypeURL: "type.googleapis.com/envoy.service.discovery.v3.DiscoveryRequest", + Server: "http://127.0.0.1:5000", + ClientID: "client", + } + + client, err := NewHTTPResourceClient(c, ProtocolV3) + + require.NoError(t, err) + + require.Equal(t, client.endpoint, "http://127.0.0.1:5000/v3/discovery:monitoring?param1=v1") + require.Equal(t, client.client.Timeout, 1*time.Minute) + +} + +func createTestHTTPResourceClient(t *testing.T, conf *HTTPResourceClientConfig, protocolVersion ProtocolVersion, responder discoveryResponder) (*HTTPResourceClient, func()) { + s := createTestHTTPServer(t, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { + require.Equal(t, conf.ResourceTypeURL, request.TypeUrl) + require.Equal(t, conf.ClientID, request.Node.Id) + return responder(request) + }) + + conf.Server = s.URL + client, err := NewHTTPResourceClient(conf, protocolVersion) + require.NoError(t, err) + + return client, s.Close +} + +func TestHTTPResourceClientFetchEmptyResponse(t *testing.T) { + client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { + return nil, nil + }) + defer cleanup() + + res, err := client.Fetch(context.Background()) + require.Nil(t, res) + require.NoError(t, err) +} + +func TestHTTPResourceClientFetchFullResponse(t *testing.T) { + client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { + if request.VersionInfo == "1" { + return nil, nil + } + + return &v3.DiscoveryResponse{ + TypeUrl: request.TypeUrl, + VersionInfo: "1", + Nonce: "abc", + Resources: []*anypb.Any{}, + }, nil + }) + defer cleanup() + + res, err := client.Fetch(context.Background()) + require.NoError(t, err) + require.NotNil(t, res) + + require.Equal(t, client.ResourceTypeURL(), res.TypeUrl) + require.Len(t, res.Resources, 0) + require.Equal(t, "abc", client.latestNonce, "Nonce not cached") + require.Equal(t, "1", client.latestVersion, "Version not cached") + + res, err = client.Fetch(context.Background()) + require.Nil(t, res, "Update not expected") + require.NoError(t, err) +} + +func TestHTTPResourceClientServerError(t *testing.T) { + client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { + return nil, errors.New("server error") + }) + defer cleanup() + + res, err := client.Fetch(context.Background()) + require.Nil(t, res) + require.Error(t, err) +} diff --git a/discovery/xds/kuma.go b/discovery/xds/kuma.go new file mode 100644 index 000000000..d4071be9c --- /dev/null +++ b/discovery/xds/kuma.go @@ -0,0 +1,226 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xds + +import ( + "fmt" + "net/url" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/osutil" + "github.com/prometheus/prometheus/util/strutil" +) + +var ( + // DefaultKumaSDConfig is the default Kuma MADS SD configuration. + DefaultKumaSDConfig = KumaSDConfig{ + HTTPClientConfig: config.DefaultHTTPClientConfig, + RefreshInterval: model.Duration(15 * time.Second), + FetchTimeout: model.Duration(2 * time.Minute), + } + + kumaFetchFailuresCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_kuma_fetch_failures_total", + Help: "The number of Kuma MADS fetch call failures.", + }) + kumaFetchSkipUpdateCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_kuma_fetch_skipped_updates_total", + Help: "The number of Kuma MADS fetch calls that result in no updates to the targets.", + }) + kumaFetchDuration = prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "sd_kuma_fetch_duration_seconds", + Help: "The duration of a Kuma MADS fetch call.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + ) +) + +const ( + // kumaMetaLabelPrefix is the meta prefix used for all kuma meta labels. + kumaMetaLabelPrefix = model.MetaLabelPrefix + "kuma_" + + // kumaMeshLabel is the name of the label that holds the mesh name. + kumaMeshLabel = kumaMetaLabelPrefix + "mesh" + // kumaServiceLabel is the name of the label that holds the service name. + kumaServiceLabel = kumaMetaLabelPrefix + "service" + // kumaDataplaneLabel is the name of the label that holds the dataplane name. + kumaDataplaneLabel = kumaMetaLabelPrefix + "dataplane" + // kumaUserLabelPrefix is the name of the label that namespaces all user-defined labels. + kumaUserLabelPrefix = kumaMetaLabelPrefix + "label_" +) + +const ( + KumaMadsV1ResourceTypeURL = "type.googleapis.com/kuma.observability.v1.MonitoringAssignment" + KumaMadsV1ResourceType = "monitoringassignments" +) + +type KumaSDConfig = SDConfig + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *KumaSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultKumaSDConfig + type plainKumaConf KumaSDConfig + err := unmarshal((*plainKumaConf)(c)) + if err != nil { + return err + } + + if len(c.Server) == 0 { + return errors.Errorf("kuma SD server must not be empty: %s", c.Server) + } + parsedURL, err := url.Parse(c.Server) + if err != nil { + return err + } + + if len(parsedURL.Scheme) == 0 || len(parsedURL.Host) == 0 { + return errors.Errorf("kuma SD server must not be empty and have a scheme: %s", c.Server) + } + + if err := c.HTTPClientConfig.Validate(); err != nil { + return err + } + + return nil +} + +func (c *KumaSDConfig) Name() string { + return "kuma" +} + +// SetDirectory joins any relative file paths with dir. +func (c *KumaSDConfig) SetDirectory(dir string) { + c.HTTPClientConfig.SetDirectory(dir) +} + +func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { + logger := opts.Logger + if logger == nil { + logger = log.NewNopLogger() + } + + return NewKumaHTTPDiscovery(c, logger) +} + +func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) *targetgroup.Group { + commonLabels := convertKumaUserLabels(assignment.Labels) + + commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh) + commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service) + + var targetLabelSets []model.LabelSet + + for _, target := range assignment.Targets { + targetLabels := convertKumaUserLabels(target.Labels) + + targetLabels[kumaDataplaneLabel] = model.LabelValue(target.Name) + targetLabels[model.InstanceLabel] = model.LabelValue(target.Name) + targetLabels[model.AddressLabel] = model.LabelValue(target.Address) + targetLabels[model.SchemeLabel] = model.LabelValue(target.Scheme) + targetLabels[model.MetricsPathLabel] = model.LabelValue(target.MetricsPath) + + targetLabelSets = append(targetLabelSets, targetLabels) + } + + return &targetgroup.Group{ + Labels: commonLabels, + Targets: targetLabelSets, + } +} + +func convertKumaUserLabels(labels map[string]string) model.LabelSet { + labelSet := model.LabelSet{} + for key, value := range labels { + name := kumaUserLabelPrefix + strutil.SanitizeLabelName(key) + labelSet[model.LabelName(name)] = model.LabelValue(value) + } + return labelSet +} + +// kumaMadsV1ResourceParser is an xds.resourceParser. +func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*targetgroup.Group, error) { + if typeURL != KumaMadsV1ResourceTypeURL { + return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL) + } + + var groups []*targetgroup.Group + + for _, resource := range resources { + assignment := &MonitoringAssignment{} + + if err := anypb.UnmarshalTo(resource, assignment, protoUnmarshalOptions); err != nil { + return nil, err + } + + groups = append(groups, convertKumaV1MonitoringAssignment(assignment)) + } + + return groups, nil +} + +func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) { + // Default to "prometheus" if hostname is unavailable. + clientID, err := osutil.GetFQDN() + if err != nil { + level.Debug(logger).Log("msg", "error getting FQDN", "err", err) + clientID = "prometheus" + } + + clientConfig := &HTTPResourceClientConfig{ + HTTPClientConfig: conf.HTTPClientConfig, + ExtraQueryParams: url.Values{ + "fetch-timeout": {conf.FetchTimeout.String()}, + }, + // Allow 15s of buffer over the timeout sent to the xDS server for connection overhead. + Timeout: time.Duration(conf.FetchTimeout) + (15 * time.Second), + ResourceType: KumaMadsV1ResourceType, + ResourceTypeURL: KumaMadsV1ResourceTypeURL, + Server: conf.Server, + ClientID: clientID, + } + + client, err := NewHTTPResourceClient(clientConfig, ProtocolV3) + if err != nil { + return nil, fmt.Errorf("kuma_sd: %w", err) + } + + d := &fetchDiscovery{ + client: client, + logger: logger, + refreshInterval: time.Duration(conf.RefreshInterval), + source: "kuma", + parseResources: kumaMadsV1ResourceParser, + fetchFailuresCount: kumaFetchFailuresCount, + fetchSkipUpdateCount: kumaFetchSkipUpdateCount, + fetchDuration: kumaFetchDuration, + } + + return d, nil +} diff --git a/discovery/xds/kuma_mads.pb.go b/discovery/xds/kuma_mads.pb.go new file mode 100644 index 000000000..b1079bf23 --- /dev/null +++ b/discovery/xds/kuma_mads.pb.go @@ -0,0 +1,398 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.14.0 +// source: observability/v1/mads.proto + +// gRPC-removed vendored file from Kuma. + +package xds + +import ( + context "context" + v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + _ "github.com/envoyproxy/protoc-gen-validate/validate" + _ "google.golang.org/genproto/googleapis/api/annotations" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// MADS resource type. +// +// Describes a group of targets on a single service that need to be monitored. +type MonitoringAssignment struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Mesh of the dataplane. + // + // E.g., `default` + Mesh string `protobuf:"bytes,2,opt,name=mesh,proto3" json:"mesh,omitempty"` + // Identifying service the dataplane is proxying. + // + // E.g., `backend` + Service string `protobuf:"bytes,3,opt,name=service,proto3" json:"service,omitempty"` + // List of targets that need to be monitored. + Targets []*MonitoringAssignment_Target `protobuf:"bytes,4,rep,name=targets,proto3" json:"targets,omitempty"` + // Arbitrary Labels associated with every target in the assignment. + // + // E.g., `{"zone" : "us-east-1", "team": "infra", "commit_hash": "620506a88"}`. + Labels map[string]string `protobuf:"bytes,5,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *MonitoringAssignment) Reset() { + *x = MonitoringAssignment{} + if protoimpl.UnsafeEnabled { + mi := &file_observability_v1_mads_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MonitoringAssignment) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MonitoringAssignment) ProtoMessage() {} + +func (x *MonitoringAssignment) ProtoReflect() protoreflect.Message { + mi := &file_observability_v1_mads_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MonitoringAssignment.ProtoReflect.Descriptor instead. +func (*MonitoringAssignment) Descriptor() ([]byte, []int) { + return file_observability_v1_mads_proto_rawDescGZIP(), []int{0} +} + +func (x *MonitoringAssignment) GetMesh() string { + if x != nil { + return x.Mesh + } + return "" +} + +func (x *MonitoringAssignment) GetService() string { + if x != nil { + return x.Service + } + return "" +} + +func (x *MonitoringAssignment) GetTargets() []*MonitoringAssignment_Target { + if x != nil { + return x.Targets + } + return nil +} + +func (x *MonitoringAssignment) GetLabels() map[string]string { + if x != nil { + return x.Labels + } + return nil +} + +// Describes a single target that needs to be monitored. +type MonitoringAssignment_Target struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // E.g., `backend-01` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // Scheme on which to scrape the target. + //E.g., `http` + Scheme string `protobuf:"bytes,2,opt,name=scheme,proto3" json:"scheme,omitempty"` + // Address (preferably IP) for the service + // E.g., `backend.svc` or `10.1.4.32:9090` + Address string `protobuf:"bytes,3,opt,name=address,proto3" json:"address,omitempty"` + // Optional path to append to the address for scraping + //E.g., `/metrics` + MetricsPath string `protobuf:"bytes,4,opt,name=metrics_path,json=metricsPath,proto3" json:"metrics_path,omitempty"` + // Arbitrary labels associated with that particular target. + // + // E.g., + // `{ + // "commit_hash" : "620506a88", + // }`. + Labels map[string]string `protobuf:"bytes,5,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *MonitoringAssignment_Target) Reset() { + *x = MonitoringAssignment_Target{} + if protoimpl.UnsafeEnabled { + mi := &file_observability_v1_mads_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MonitoringAssignment_Target) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MonitoringAssignment_Target) ProtoMessage() {} + +func (x *MonitoringAssignment_Target) ProtoReflect() protoreflect.Message { + mi := &file_observability_v1_mads_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MonitoringAssignment_Target.ProtoReflect.Descriptor instead. +func (*MonitoringAssignment_Target) Descriptor() ([]byte, []int) { + return file_observability_v1_mads_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *MonitoringAssignment_Target) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *MonitoringAssignment_Target) GetScheme() string { + if x != nil { + return x.Scheme + } + return "" +} + +func (x *MonitoringAssignment_Target) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *MonitoringAssignment_Target) GetMetricsPath() string { + if x != nil { + return x.MetricsPath + } + return "" +} + +func (x *MonitoringAssignment_Target) GetLabels() map[string]string { + if x != nil { + return x.Labels + } + return nil +} + +var File_observability_v1_mads_proto protoreflect.FileDescriptor + +var file_observability_v1_mads_proto_rawDesc = []byte{ + 0x0a, 0x1b, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2f, + 0x76, 0x31, 0x2f, 0x6d, 0x61, 0x64, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15, 0x6b, + 0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, + 0x79, 0x2e, 0x76, 0x31, 0x1a, 0x2a, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2f, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x76, 0x33, + 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, + 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x17, + 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd2, 0x04, 0x0a, 0x14, 0x4d, 0x6f, 0x6e, 0x69, + 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, + 0x12, 0x1b, 0x0a, 0x04, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, + 0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, 0x01, 0x52, 0x04, 0x6d, 0x65, 0x73, 0x68, 0x12, 0x21, 0x0a, + 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, + 0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, 0x01, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x4c, 0x0a, 0x07, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x32, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, + 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, + 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x54, + 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x07, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x12, 0x4f, + 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x37, + 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, + 0x69, 0x74, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, + 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x4c, 0x61, 0x62, 0x65, + 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x1a, + 0x9f, 0x02, 0x0a, 0x06, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x1b, 0x0a, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, + 0x01, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, 0x01, + 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, + 0x65, 0x73, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, + 0x20, 0x01, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x6d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0b, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x50, 0x61, 0x74, 0x68, 0x12, 0x56, + 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x3e, + 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, + 0x69, 0x74, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, + 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x54, 0x61, 0x72, 0x67, + 0x65, 0x74, 0x2e, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, + 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x1a, 0x39, 0x0a, 0x0b, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x1a, 0x39, 0x0a, 0x0b, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xe6, 0x03, 0x0a, + 0x24, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, + 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x1a, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x4d, + 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, + 0x65, 0x6e, 0x74, 0x73, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, + 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, + 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, + 0x01, 0x12, 0x80, 0x01, 0x0a, 0x1b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x6f, 0x6e, 0x69, + 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, + 0x73, 0x12, 0x2c, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x2d, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, + 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, + 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x28, 0x01, 0x30, 0x01, 0x12, 0xae, 0x01, 0x0a, 0x1a, 0x46, 0x65, 0x74, 0x63, 0x68, 0x4d, 0x6f, + 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, + 0x6e, 0x74, 0x73, 0x12, 0x2c, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, + 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x2d, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x33, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x24, 0x22, 0x22, 0x2f, 0x76, 0x33, 0x2f, 0x64, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x3a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, + 0x6e, 0x67, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x03, 0x3a, 0x01, 0x2a, 0x42, 0x04, 0x5a, 0x02, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_observability_v1_mads_proto_rawDescOnce sync.Once + file_observability_v1_mads_proto_rawDescData = file_observability_v1_mads_proto_rawDesc +) + +func file_observability_v1_mads_proto_rawDescGZIP() []byte { + file_observability_v1_mads_proto_rawDescOnce.Do(func() { + file_observability_v1_mads_proto_rawDescData = protoimpl.X.CompressGZIP(file_observability_v1_mads_proto_rawDescData) + }) + return file_observability_v1_mads_proto_rawDescData +} + +var file_observability_v1_mads_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_observability_v1_mads_proto_goTypes = []interface{}{ + (*MonitoringAssignment)(nil), // 0: kuma.observability.v1.MonitoringAssignment + (*MonitoringAssignment_Target)(nil), // 1: kuma.observability.v1.MonitoringAssignment.Target + nil, // 2: kuma.observability.v1.MonitoringAssignment.LabelsEntry + nil, // 3: kuma.observability.v1.MonitoringAssignment.Target.LabelsEntry + (*v3.DeltaDiscoveryRequest)(nil), // 4: envoy.service.discovery.v3.DeltaDiscoveryRequest + (*v3.DiscoveryRequest)(nil), // 5: envoy.service.discovery.v3.DiscoveryRequest + (*v3.DeltaDiscoveryResponse)(nil), // 6: envoy.service.discovery.v3.DeltaDiscoveryResponse + (*v3.DiscoveryResponse)(nil), // 7: envoy.service.discovery.v3.DiscoveryResponse +} +var file_observability_v1_mads_proto_depIdxs = []int32{ + 1, // 0: kuma.observability.v1.MonitoringAssignment.targets:type_name -> kuma.observability.v1.MonitoringAssignment.Target + 2, // 1: kuma.observability.v1.MonitoringAssignment.labels:type_name -> kuma.observability.v1.MonitoringAssignment.LabelsEntry + 3, // 2: kuma.observability.v1.MonitoringAssignment.Target.labels:type_name -> kuma.observability.v1.MonitoringAssignment.Target.LabelsEntry + 4, // 3: kuma.observability.v1.MonitoringAssignmentDiscoveryService.DeltaMonitoringAssignments:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest + 5, // 4: kuma.observability.v1.MonitoringAssignmentDiscoveryService.StreamMonitoringAssignments:input_type -> envoy.service.discovery.v3.DiscoveryRequest + 5, // 5: kuma.observability.v1.MonitoringAssignmentDiscoveryService.FetchMonitoringAssignments:input_type -> envoy.service.discovery.v3.DiscoveryRequest + 6, // 6: kuma.observability.v1.MonitoringAssignmentDiscoveryService.DeltaMonitoringAssignments:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse + 7, // 7: kuma.observability.v1.MonitoringAssignmentDiscoveryService.StreamMonitoringAssignments:output_type -> envoy.service.discovery.v3.DiscoveryResponse + 7, // 8: kuma.observability.v1.MonitoringAssignmentDiscoveryService.FetchMonitoringAssignments:output_type -> envoy.service.discovery.v3.DiscoveryResponse + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_observability_v1_mads_proto_init() } +func file_observability_v1_mads_proto_init() { + if File_observability_v1_mads_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_observability_v1_mads_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MonitoringAssignment); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_observability_v1_mads_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MonitoringAssignment_Target); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_observability_v1_mads_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_observability_v1_mads_proto_goTypes, + DependencyIndexes: file_observability_v1_mads_proto_depIdxs, + MessageInfos: file_observability_v1_mads_proto_msgTypes, + }.Build() + File_observability_v1_mads_proto = out.File + file_observability_v1_mads_proto_rawDesc = nil + file_observability_v1_mads_proto_goTypes = nil + file_observability_v1_mads_proto_depIdxs = nil +} + +// MonitoringAssignmentDiscoveryServiceServer is the server API for MonitoringAssignmentDiscoveryService service. +type MonitoringAssignmentDiscoveryServiceServer interface { + // HTTP + FetchMonitoringAssignments(context.Context, *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) +} diff --git a/discovery/xds/kuma_test.go b/discovery/xds/kuma_test.go new file mode 100644 index 000000000..8db2ce443 --- /dev/null +++ b/discovery/xds/kuma_test.go @@ -0,0 +1,340 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xds + +import ( + "context" + "fmt" + "testing" + "time" + + v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "gopkg.in/yaml.v2" + + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +var ( + kumaConf KumaSDConfig = sdConf + + testKumaMadsV1Resources = []*MonitoringAssignment{ + { + Mesh: "metrics", + Service: "prometheus", + Targets: []*MonitoringAssignment_Target{ + { + Name: "prometheus-01", + Scheme: "http", + Address: "10.1.4.32:9090", + MetricsPath: "/custom-metrics", + Labels: map[string]string{ + "commit_hash": "620506a88", + }, + }, + { + Name: "prometheus-02", + Scheme: "http", + Address: "10.1.4.33:9090", + Labels: map[string]string{ + "commit_hash": "3513bba00", + }, + }, + }, + Labels: map[string]string{ + "kuma.io/zone": "us-east-1", + "team": "infra", + }, + }, + { + Mesh: "metrics", + Service: "grafana", + Targets: []*MonitoringAssignment_Target{}, + Labels: map[string]string{ + "kuma.io/zone": "us-east-1", + "team": "infra", + }, + }, + { + Mesh: "data", + Service: "elasticsearch", + Targets: []*MonitoringAssignment_Target{ + { + Name: "elasticsearch-01", + Scheme: "http", + Address: "10.1.1.1", + Labels: map[string]string{ + "role": "ml", + }, + }, + }, + }, + } +) + +func getKumaMadsV1DiscoveryResponse(resources ...*MonitoringAssignment) (*v3.DiscoveryResponse, error) { + serialized := make([]*anypb.Any, len(resources)) + for i, res := range resources { + data, err := proto.Marshal(res) + + if err != nil { + return nil, err + } + + serialized[i] = &anypb.Any{ + TypeUrl: KumaMadsV1ResourceTypeURL, + Value: data, + } + } + return &v3.DiscoveryResponse{ + TypeUrl: KumaMadsV1ResourceTypeURL, + Resources: serialized, + }, nil +} + +func newKumaTestHTTPDiscovery(c KumaSDConfig) (*fetchDiscovery, error) { + kd, err := NewKumaHTTPDiscovery(&c, nopLogger) + if err != nil { + return nil, err + } + + pd, ok := kd.(*fetchDiscovery) + if !ok { + return nil, errors.New("not a fetchDiscovery") + } + return pd, nil +} + +func TestKumaMadsV1ResourceParserInvalidTypeURL(t *testing.T) { + resources := make([]*anypb.Any, 0) + groups, err := kumaMadsV1ResourceParser(resources, "type.googleapis.com/some.api.v1.Monitoring") + require.Nil(t, groups) + require.Error(t, err) +} + +func TestKumaMadsV1ResourceParserEmptySlice(t *testing.T) { + resources := make([]*anypb.Any, 0) + groups, err := kumaMadsV1ResourceParser(resources, KumaMadsV1ResourceTypeURL) + require.Len(t, groups, 0) + require.NoError(t, err) +} + +func TestKumaMadsV1ResourceParserValidResources(t *testing.T) { + res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) + require.NoError(t, err) + + groups, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL) + require.NoError(t, err) + require.Len(t, groups, 3) + + expectedGroup1 := &targetgroup.Group{ + Targets: []model.LabelSet{ + { + "__address__": "10.1.4.32:9090", + "__meta_kuma_label_commit_hash": "620506a88", + "__meta_kuma_dataplane": "prometheus-01", + "__metrics_path__": "/custom-metrics", + "__scheme__": "http", + "instance": "prometheus-01", + }, + { + "__address__": "10.1.4.33:9090", + "__meta_kuma_label_commit_hash": "3513bba00", + "__meta_kuma_dataplane": "prometheus-02", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "prometheus-02", + }, + }, + Labels: model.LabelSet{ + "__meta_kuma_mesh": "metrics", + "__meta_kuma_service": "prometheus", + "__meta_kuma_label_team": "infra", + "__meta_kuma_label_kuma_io_zone": "us-east-1", + }, + } + require.Equal(t, expectedGroup1, groups[0]) + + expectedGroup2 := &targetgroup.Group{ + Labels: model.LabelSet{ + "__meta_kuma_mesh": "metrics", + "__meta_kuma_service": "grafana", + "__meta_kuma_label_team": "infra", + "__meta_kuma_label_kuma_io_zone": "us-east-1", + }, + } + require.Equal(t, expectedGroup2, groups[1]) + + expectedGroup3 := &targetgroup.Group{ + Targets: []model.LabelSet{ + { + "__address__": "10.1.1.1", + "__meta_kuma_label_role": "ml", + "__meta_kuma_dataplane": "elasticsearch-01", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "elasticsearch-01", + }, + }, + Labels: model.LabelSet{ + "__meta_kuma_mesh": "data", + "__meta_kuma_service": "elasticsearch", + }, + } + require.Equal(t, expectedGroup3, groups[2]) +} + +func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) { + data, err := protoJSONMarshalOptions.Marshal(&MonitoringAssignment_Target{}) + require.NoError(t, err) + + resources := []*anypb.Any{{ + TypeUrl: KumaMadsV1ResourceTypeURL, + Value: data, + }} + groups, err := kumaMadsV1ResourceParser(resources, KumaMadsV1ResourceTypeURL) + require.Nil(t, groups) + require.Error(t, err) + + require.Contains(t, err.Error(), "cannot parse") +} + +func TestNewKumaHTTPDiscovery(t *testing.T) { + kd, err := newKumaTestHTTPDiscovery(kumaConf) + require.NoError(t, err) + require.NotNil(t, kd) + + resClient, ok := kd.client.(*HTTPResourceClient) + require.True(t, ok) + require.Equal(t, kumaConf.Server, resClient.Server()) + require.Equal(t, KumaMadsV1ResourceTypeURL, resClient.ResourceTypeURL()) + require.NotEmpty(t, resClient.ID()) + require.Equal(t, KumaMadsV1ResourceType, resClient.config.ResourceType) +} + +func TestKumaHTTPDiscoveryRefresh(t *testing.T) { + s := createTestHTTPServer(t, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { + if request.VersionInfo == "1" { + return nil, nil + } + + res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) + require.NoError(t, err) + + res.VersionInfo = "1" + res.Nonce = "abc" + + return res, nil + }) + defer s.Close() + + cfgString := fmt.Sprintf(` +--- +server: %s +refresh_interval: 10s +tls_config: + insecure_skip_verify: true +`, s.URL) + + var cfg KumaSDConfig + require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) + + kd, err := newKumaTestHTTPDiscovery(cfg) + require.NoError(t, err) + require.NotNil(t, kd) + + ch := make(chan []*targetgroup.Group, 1) + kd.poll(context.Background(), ch) + + groups := <-ch + require.Len(t, groups, 3) + + expectedGroup1 := &targetgroup.Group{ + Source: "kuma", + Targets: []model.LabelSet{ + { + "__address__": "10.1.4.32:9090", + "__meta_kuma_label_commit_hash": "620506a88", + "__meta_kuma_dataplane": "prometheus-01", + "__metrics_path__": "/custom-metrics", + "__scheme__": "http", + "instance": "prometheus-01", + }, + { + "__address__": "10.1.4.33:9090", + "__meta_kuma_label_commit_hash": "3513bba00", + "__meta_kuma_dataplane": "prometheus-02", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "prometheus-02", + }, + }, + Labels: model.LabelSet{ + "__meta_kuma_mesh": "metrics", + "__meta_kuma_service": "prometheus", + "__meta_kuma_label_team": "infra", + "__meta_kuma_label_kuma_io_zone": "us-east-1", + }, + } + require.Equal(t, expectedGroup1, groups[0]) + + expectedGroup2 := &targetgroup.Group{ + Source: "kuma", + Labels: model.LabelSet{ + "__meta_kuma_mesh": "metrics", + "__meta_kuma_service": "grafana", + "__meta_kuma_label_team": "infra", + "__meta_kuma_label_kuma_io_zone": "us-east-1", + }, + } + require.Equal(t, expectedGroup2, groups[1]) + + expectedGroup3 := &targetgroup.Group{ + Source: "kuma", + Targets: []model.LabelSet{ + { + "__address__": "10.1.1.1", + "__meta_kuma_label_role": "ml", + "__meta_kuma_dataplane": "elasticsearch-01", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "elasticsearch-01", + }, + }, + Labels: model.LabelSet{ + "__meta_kuma_mesh": "data", + "__meta_kuma_service": "elasticsearch", + }, + } + require.Equal(t, expectedGroup3, groups[2]) + + // Should skip the next update. + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Second) + cancel() + }() + + kd.poll(ctx, ch) + select { + case <-ctx.Done(): + return + case <-ch: + require.Fail(t, "no update expected") + } +} diff --git a/discovery/xds/xds.go b/discovery/xds/xds.go new file mode 100644 index 000000000..f99e03da9 --- /dev/null +++ b/discovery/xds/xds.go @@ -0,0 +1,176 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xds + +import ( + "context" + "github.com/prometheus/common/model" + "time" + + v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +const ( + // Constants for instrumentation. + namespace = "prometheus" +) + +// ProtocolVersion is the xDS protocol version. +type ProtocolVersion string + +const ( + ProtocolV3 = ProtocolVersion("v3") +) + +type HTTPConfig struct { + config.HTTPClientConfig `yaml:",inline"` +} + +// SDConfig is a base config for xDS-based SD mechanisms. +type SDConfig struct { + HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + FetchTimeout model.Duration `yaml:"fetch_timeout,omitempty"` + Server string `yaml:"server,omitempty"` +} + +// mustRegisterMessage registers the provided message type in the typeRegistry, and panics +// if there is an error. +func mustRegisterMessage(typeRegistry *protoregistry.Types, mt protoreflect.MessageType) { + if err := typeRegistry.RegisterMessage(mt); err != nil { + panic(err) + } +} + +func init() { + // Register top-level SD Configs. + discovery.RegisterConfig(&KumaSDConfig{}) + + // Register metrics. + prometheus.MustRegister(kumaFetchDuration, kumaFetchSkipUpdateCount, kumaFetchFailuresCount) + + // Register protobuf types that need to be marshalled/ unmarshalled. + mustRegisterMessage(protoTypes, (&v3.DiscoveryRequest{}).ProtoReflect().Type()) + mustRegisterMessage(protoTypes, (&v3.DiscoveryResponse{}).ProtoReflect().Type()) + mustRegisterMessage(protoTypes, (&MonitoringAssignment{}).ProtoReflect().Type()) +} + +var ( + protoTypes = new(protoregistry.Types) + protoUnmarshalOptions = proto.UnmarshalOptions{ + DiscardUnknown: true, // Only want known fields. + Merge: true, // Always using new messages. + Resolver: protoTypes, // Only want known types. + } + protoJSONUnmarshalOptions = protojson.UnmarshalOptions{ + DiscardUnknown: true, // Only want known fields. + Resolver: protoTypes, // Only want known types. + } + protoJSONMarshalOptions = protojson.MarshalOptions{ + UseProtoNames: true, + Resolver: protoTypes, // Only want known types. + } +) + +type resourceParser func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) + +// fetchDiscovery implements long-polling via xDS Fetch REST-JSON. +type fetchDiscovery struct { + client ResourceClient + source string + + refreshInterval time.Duration + + parseResources resourceParser + logger log.Logger + + fetchDuration prometheus.Observer + fetchSkipUpdateCount prometheus.Counter + fetchFailuresCount prometheus.Counter +} + +func (d *fetchDiscovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + defer d.client.Close() + + ticker := time.NewTicker(d.refreshInterval) + + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + default: + d.poll(ctx, ch) + <-ticker.C + } + } +} + +func (d *fetchDiscovery) poll(ctx context.Context, ch chan<- []*targetgroup.Group) { + t0 := time.Now() + response, err := d.client.Fetch(ctx) + elapsed := time.Since(t0) + d.fetchDuration.Observe(elapsed.Seconds()) + + // Check the context before in order to exit early. + select { + case <-ctx.Done(): + return + default: + } + + if err != nil { + level.Error(d.logger).Log("msg", "error parsing resources", "err", err) + d.fetchFailuresCount.Inc() + return + } + + if response == nil { + // No update needed. + d.fetchSkipUpdateCount.Inc() + return + } + + parsedGroups, err := d.parseResources(response.Resources, response.TypeUrl) + if err != nil { + level.Error(d.logger).Log("msg", "error parsing resources", "err", err) + d.fetchFailuresCount.Inc() + return + } + + for _, group := range parsedGroups { + group.Source = d.source + } + + level.Debug(d.logger).Log("msg", "updated to version", "version", response.VersionInfo, "groups", len(parsedGroups)) + + // Check the context before sending an update on the channel. + select { + case <-ctx.Done(): + return + case ch <- parsedGroups: + } +} diff --git a/discovery/xds/xds_test.go b/discovery/xds/xds_test.go new file mode 100644 index 000000000..412cbda68 --- /dev/null +++ b/discovery/xds/xds_test.go @@ -0,0 +1,201 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xds + +import ( + "context" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +var ( + sdConf = SDConfig{ + Server: "http://127.0.0.1", + RefreshInterval: model.Duration(10 * time.Second), + } + + testFetchFailuresCount = prometheus.NewCounter( + prometheus.CounterOpts{}) + testFetchSkipUpdateCount = prometheus.NewCounter( + prometheus.CounterOpts{}) + testFetchDuration = prometheus.NewSummary( + prometheus.SummaryOpts{}, + ) +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +type discoveryResponder func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) + +func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest.Server { + return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Validate req MIME types. + require.Equal(t, "application/json", r.Header.Get("Content-Type")) + require.Equal(t, "application/json", r.Header.Get("Accept")) + + body, err := ioutil.ReadAll(r.Body) + defer func() { + _, _ = io.Copy(ioutil.Discard, r.Body) + _ = r.Body.Close() + }() + require.NotEmpty(t, body) + require.NoError(t, err) + + // Validate discovery request. + discoveryReq := &v3.DiscoveryRequest{} + err = protoJSONUnmarshalOptions.Unmarshal(body, discoveryReq) + require.NoError(t, err) + + discoveryRes, err := responder(discoveryReq) + if err != nil { + w.WriteHeader(500) + return + } + + if discoveryRes == nil { + w.WriteHeader(304) + return + } + + w.WriteHeader(200) + data, err := protoJSONMarshalOptions.Marshal(discoveryRes) + require.NoError(t, err) + + _, err = w.Write(data) + require.NoError(t, err) + })) +} + +func constantResourceParser(groups []*targetgroup.Group, err error) resourceParser { + return func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) { + return groups, err + } +} + +var nopLogger = log.NewNopLogger() + +type testResourceClient struct { + resourceTypeURL string + server string + protocolVersion ProtocolVersion + fetch func(ctx context.Context) (*v3.DiscoveryResponse, error) +} + +func (rc testResourceClient) ResourceTypeURL() string { + return rc.resourceTypeURL +} + +func (rc testResourceClient) Server() string { + return rc.server +} + +func (rc testResourceClient) Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) { + return rc.fetch(ctx) +} + +func (rc testResourceClient) ID() string { + return "test-client" +} + +func (rc testResourceClient) Close() { +} + +func TestPollingRefreshSkipUpdate(t *testing.T) { + rc := &testResourceClient{ + fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { + return nil, nil + }, + } + pd := &fetchDiscovery{ + client: rc, + logger: nopLogger, + fetchDuration: testFetchDuration, + fetchFailuresCount: testFetchFailuresCount, + fetchSkipUpdateCount: testFetchSkipUpdateCount, + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Second) + cancel() + }() + + ch := make(chan []*targetgroup.Group, 1) + pd.poll(ctx, ch) + select { + case <-ctx.Done(): + return + case <-ch: + require.Fail(t, "no update expected") + } +} + +func TestPollingRefreshAttachesGroupMetadata(t *testing.T) { + server := "http://198.161.2.0" + source := "test" + rc := &testResourceClient{ + server: server, + protocolVersion: ProtocolV3, + fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { + return &v3.DiscoveryResponse{}, nil + }, + } + pd := &fetchDiscovery{ + source: source, + client: rc, + logger: nopLogger, + fetchDuration: testFetchDuration, + fetchFailuresCount: testFetchFailuresCount, + fetchSkipUpdateCount: testFetchSkipUpdateCount, + parseResources: constantResourceParser([]*targetgroup.Group{ + {}, + { + Source: "a-custom-source", + Labels: model.LabelSet{ + "__meta_custom_xds_label": "a-value", + }, + }, + }, nil), + } + ch := make(chan []*targetgroup.Group, 1) + pd.poll(context.Background(), ch) + groups := <-ch + require.NotNil(t, groups) + + require.Len(t, groups, 2) + + for _, group := range groups { + require.Equal(t, source, group.Source) + } + + group2 := groups[1] + require.Contains(t, group2.Labels, model.LabelName("__meta_custom_xds_label")) + require.Equal(t, model.LabelValue("a-value"), group2.Labels["__meta_custom_xds_label"]) +} diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 8276ebbea..c0393c655 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -248,6 +248,10 @@ http_sd_configs: kubernetes_sd_configs: [ - ... ] +# List of Kuma service discovery configurations. +kuma_sd_configs: + [ - ... ] + # List of Lightsail service discovery configurations. lightsail_sd_configs: [ - ... ] @@ -1547,6 +1551,74 @@ for a detailed example of configuring Prometheus for Kubernetes. You may wish to check out the 3rd party [Prometheus Operator](https://github.com/coreos/prometheus-operator), which automates the Prometheus setup on top of Kubernetes. +### `` + +Kuma SD configurations allow retrieving scrape target from the [Kuma](https://kuma.io) control plane. + +This SD discovers "monitoring assignments" based on Kuma [Dataplane Proxies](https://kuma.io/docs/latest/documentation/dps-and-data-model), +via the MADS v1 (Monitoring Assignment Discovery Service) xDS API, and will create a target for each proxy +inside a Prometheus-enabled mesh. + +The following meta labels are available for each target: + +* `__meta_kuma_mesh`: the name of the proxy's Mesh +* `__meta_kuma_dataplane`: the name of the proxy +* `__meta_kuma_service`: the name of the proxy's associated Service +* `__meta_kuma_label_`: each tag of the proxy + +See below for the configuration options for Kuma MonitoringAssignment discovery: + +```yaml +# Address of the Kuma Control Plane's MADS xDS server. +server: + +# The time to wait between polling update requests. +[ refresh_interval: | default = 30s ] + +# The time after which the monitoring assignments are refreshed. +[ fetch_timeout: | default = 2m ] + +# Optional proxy URL. +[ proxy_url: ] + +# TLS configuration. +tls_config: + [ ] + +# Authentication information used to authenticate to the Docker daemon. +# Note that `basic_auth` and `authorization` options are +# mutually exclusive. +# password and password_file are mutually exclusive. + +# Optional HTTP basic authentication information. +basic_auth: + [ username: ] + [ password: ] + [ password_file: ] + +# Optional the `Authorization` header configuration. +authorization: + # Sets the authentication type. + [ type: | default: Bearer ] + # Sets the credentials. It is mutually exclusive with + # `credentials_file`. + [ credentials: ] + # Sets the credentials with the credentials read from the configured file. + # It is mutually exclusive with `credentials`. + [ credentials_file: ] + +# Optional OAuth 2.0 configuration. +# Cannot be used at the same time as basic_auth or authorization. +oauth2: + [ ] + +# Configure whether HTTP requests follow HTTP 3xx redirects. +[ follow_redirects: | default = true ] +``` + +The [relabeling phase](#relabel_config) is the preferred and more powerful way +to filter proxies and user-defined tags. + ### `` Lightsail SD configurations allow retrieving scrape targets from [AWS Lightsail](https://aws.amazon.com/lightsail/) diff --git a/go.mod b/go.mod index bf36d90d4..1610e5042 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,8 @@ require ( github.com/docker/docker v20.10.7+incompatible github.com/docker/go-connections v0.4.0 // indirect github.com/edsrzf/mmap-go v1.0.0 + github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d + github.com/envoyproxy/protoc-gen-validate v0.1.0 github.com/go-kit/kit v0.10.0 github.com/go-kit/log v0.1.0 github.com/go-logfmt/logfmt v0.5.0 @@ -62,6 +64,8 @@ require ( golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 golang.org/x/tools v0.1.3 google.golang.org/api v0.48.0 + google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08 + google.golang.org/protobuf v1.26.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/fsnotify/fsnotify.v1 v1.4.7 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 57c620a52..70d13bd17 100644 --- a/go.sum +++ b/go.sum @@ -227,6 +227,7 @@ github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= @@ -393,7 +394,9 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d h1:QyzYnTnPE15SQyUeqU6qLbWxMkwyAyu+vGksa0b7j00= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses= diff --git a/scrape/manager.go b/scrape/manager.go index 20932cd6d..e86221c66 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -14,11 +14,8 @@ package scrape import ( - "encoding" "fmt" "hash/fnv" - "net" - "os" "reflect" "sync" "time" @@ -32,6 +29,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/osutil" ) var targetMetadataCache = newMetadataMetricsCollector() @@ -206,7 +204,7 @@ func (m *Manager) reload() { // setJitterSeed calculates a global jitterSeed per server relying on extra label set. func (m *Manager) setJitterSeed(labels labels.Labels) error { h := fnv.New64a() - hostname, err := getFqdn() + hostname, err := osutil.GetFQDN() if err != nil { return err } @@ -319,46 +317,3 @@ func (m *Manager) TargetsDropped() map[string][]*Target { } return targets } - -// getFqdn returns a FQDN if it's possible, otherwise falls back to hostname. -func getFqdn() (string, error) { - hostname, err := os.Hostname() - if err != nil { - return "", err - } - - ips, err := net.LookupIP(hostname) - if err != nil { - // Return the system hostname if we can't look up the IP address. - return hostname, nil - } - - lookup := func(ipStr encoding.TextMarshaler) (string, error) { - ip, err := ipStr.MarshalText() - if err != nil { - return "", err - } - hosts, err := net.LookupAddr(string(ip)) - if err != nil || len(hosts) == 0 { - return "", err - } - return hosts[0], nil - } - - for _, addr := range ips { - if ip := addr.To4(); ip != nil { - if fqdn, err := lookup(ip); err == nil { - return fqdn, nil - } - - } - - if ip := addr.To16(); ip != nil { - if fqdn, err := lookup(ip); err == nil { - return fqdn, nil - } - - } - } - return hostname, nil -} diff --git a/util/osutil/hostname.go b/util/osutil/hostname.go new file mode 100644 index 000000000..224dffe7c --- /dev/null +++ b/util/osutil/hostname.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package osutil + +import ( + "encoding" + "net" + "os" +) + +// GetFQDN returns a FQDN if it's possible, otherwise falls back to hostname. +func GetFQDN() (string, error) { + hostname, err := os.Hostname() + if err != nil { + return "", err + } + + ips, err := net.LookupIP(hostname) + if err != nil { + // Return the system hostname if we can't look up the IP address. + return hostname, nil + } + + lookup := func(ipStr encoding.TextMarshaler) (string, error) { + ip, err := ipStr.MarshalText() + if err != nil { + return "", err + } + hosts, err := net.LookupAddr(string(ip)) + if err != nil || len(hosts) == 0 { + return "", err + } + return hosts[0], nil + } + + for _, addr := range ips { + if ip := addr.To4(); ip != nil { + if fqdn, err := lookup(ip); err == nil { + return fqdn, nil + } + + } + + if ip := addr.To16(); ip != nil { + if fqdn, err := lookup(ip); err == nil { + return fqdn, nil + } + + } + } + return hostname, nil +}