mirror of https://github.com/prometheus/prometheus
Julien Pivotto
3 years ago
committed by
GitHub
16 changed files with 1902 additions and 48 deletions
@ -0,0 +1,226 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/url" |
||||
"path" |
||||
"time" |
||||
|
||||
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/prometheus/common/config" |
||||
"github.com/prometheus/common/version" |
||||
) |
||||
|
||||
var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) |
||||
|
||||
// ResourceClient exposes the xDS protocol for a single resource type.
|
||||
// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#rest-json-polling-subscriptions .
|
||||
type ResourceClient interface { |
||||
// ResourceTypeURL is the type URL of the resource.
|
||||
ResourceTypeURL() string |
||||
|
||||
// Server is the xDS Management server.
|
||||
Server() string |
||||
|
||||
// Fetch requests the latest view of the entire resource state.
|
||||
// If no updates have been made since the last request, the response will be nil.
|
||||
Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) |
||||
|
||||
// ID returns the ID of the client that is sent to the xDS server.
|
||||
ID() string |
||||
|
||||
// Close releases currently held resources.
|
||||
Close() |
||||
} |
||||
|
||||
type HTTPResourceClient struct { |
||||
client *http.Client |
||||
config *HTTPResourceClientConfig |
||||
|
||||
// endpoint is the fully-constructed xDS HTTP endpoint.
|
||||
endpoint string |
||||
// Caching.
|
||||
latestVersion string |
||||
latestNonce string |
||||
} |
||||
|
||||
type HTTPResourceClientConfig struct { |
||||
// HTTP config.
|
||||
config.HTTPClientConfig |
||||
|
||||
Name string |
||||
|
||||
// ExtraQueryParams are extra query parameters to attach to the request URL.
|
||||
ExtraQueryParams url.Values |
||||
|
||||
// General xDS config.
|
||||
|
||||
// The timeout for a single fetch request.
|
||||
Timeout time.Duration |
||||
|
||||
// Type is the xds type, e.g., clusters
|
||||
// which is used in the discovery POST request.
|
||||
ResourceType string |
||||
// ResourceTypeURL is the Google type url for the resource, e.g., type.googleapis.com/envoy.api.v2.Cluster.
|
||||
ResourceTypeURL string |
||||
// Server is the xDS management server.
|
||||
Server string |
||||
// ClientID is used to identify the client with the management server.
|
||||
ClientID string |
||||
} |
||||
|
||||
func NewHTTPResourceClient(conf *HTTPResourceClientConfig, protocolVersion ProtocolVersion) (*HTTPResourceClient, error) { |
||||
if protocolVersion != ProtocolV3 { |
||||
return nil, errors.New("only the v3 protocol is supported") |
||||
} |
||||
|
||||
if len(conf.Server) == 0 { |
||||
return nil, errors.New("empty xDS server") |
||||
} |
||||
|
||||
serverURL, err := url.Parse(conf.Server) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
endpointURL, err := makeXDSResourceHTTPEndpointURL(protocolVersion, serverURL, conf.ResourceType) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if conf.ExtraQueryParams != nil { |
||||
endpointURL.RawQuery = conf.ExtraQueryParams.Encode() |
||||
} |
||||
|
||||
client, err := config.NewClientFromConfig(conf.HTTPClientConfig, conf.Name, config.WithHTTP2Disabled(), config.WithIdleConnTimeout(conf.Timeout)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
client.Timeout = conf.Timeout |
||||
|
||||
return &HTTPResourceClient{ |
||||
client: client, |
||||
config: conf, |
||||
endpoint: endpointURL.String(), |
||||
latestVersion: "", |
||||
latestNonce: "", |
||||
}, nil |
||||
} |
||||
|
||||
func makeXDSResourceHTTPEndpointURL(protocolVersion ProtocolVersion, serverURL *url.URL, resourceType string) (*url.URL, error) { |
||||
if serverURL == nil { |
||||
return nil, errors.New("empty xDS server URL") |
||||
} |
||||
|
||||
if len(serverURL.Scheme) == 0 || len(serverURL.Host) == 0 { |
||||
return nil, errors.New("invalid xDS server URL") |
||||
} |
||||
|
||||
if serverURL.Scheme != "http" && serverURL.Scheme != "https" { |
||||
return nil, errors.New("invalid xDS server URL protocol. must be either 'http' or 'https'") |
||||
} |
||||
|
||||
serverURL.Path = path.Join(serverURL.Path, string(protocolVersion), fmt.Sprintf("discovery:%s", resourceType)) |
||||
|
||||
return serverURL, nil |
||||
} |
||||
|
||||
func (rc *HTTPResourceClient) Server() string { |
||||
return rc.config.Server |
||||
} |
||||
|
||||
func (rc *HTTPResourceClient) ResourceTypeURL() string { |
||||
return rc.config.ResourceTypeURL |
||||
} |
||||
|
||||
func (rc *HTTPResourceClient) ID() string { |
||||
return rc.config.ClientID |
||||
} |
||||
|
||||
func (rc *HTTPResourceClient) Close() { |
||||
rc.client.CloseIdleConnections() |
||||
} |
||||
|
||||
// Fetch requests the latest state of the resources from the xDS server and cache the version.
|
||||
// Returns a nil response if the current local version is up to date.
|
||||
func (rc *HTTPResourceClient) Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) { |
||||
discoveryReq := &v3.DiscoveryRequest{ |
||||
VersionInfo: rc.latestVersion, |
||||
ResponseNonce: rc.latestNonce, |
||||
TypeUrl: rc.ResourceTypeURL(), |
||||
ResourceNames: []string{}, |
||||
Node: &envoy_core.Node{ |
||||
Id: rc.ID(), |
||||
}, |
||||
} |
||||
|
||||
reqBody, err := protoJSONMarshalOptions.Marshal(discoveryReq) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
request, err := http.NewRequest("POST", rc.endpoint, bytes.NewBuffer(reqBody)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
request = request.WithContext(ctx) |
||||
|
||||
request.Header.Add("User-Agent", userAgent) |
||||
request.Header.Add("Content-Type", "application/json") |
||||
request.Header.Add("Accept", "application/json") |
||||
|
||||
resp, err := rc.client.Do(request) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer func() { |
||||
io.Copy(ioutil.Discard, resp.Body) |
||||
resp.Body.Close() |
||||
}() |
||||
|
||||
if resp.StatusCode == http.StatusNotModified { |
||||
// Empty response, already have the latest.
|
||||
return nil, nil |
||||
} |
||||
|
||||
if resp.StatusCode != http.StatusOK { |
||||
return nil, fmt.Errorf("non 200 status '%d' response during xDS fetch", resp.StatusCode) |
||||
} |
||||
|
||||
respBody, err := ioutil.ReadAll(resp.Body) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
discoveryRes := &v3.DiscoveryResponse{} |
||||
if err = protoJSONUnmarshalOptions.Unmarshal(respBody, discoveryRes); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// Cache the latest nonce + version info.
|
||||
rc.latestNonce = discoveryRes.Nonce |
||||
rc.latestVersion = discoveryRes.VersionInfo |
||||
|
||||
return discoveryRes, nil |
||||
} |
@ -0,0 +1,161 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"net/url" |
||||
"testing" |
||||
"time" |
||||
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/prometheus/common/config" |
||||
"github.com/stretchr/testify/require" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
) |
||||
|
||||
var ( |
||||
httpResourceConf = &HTTPResourceClientConfig{ |
||||
HTTPClientConfig: config.HTTPClientConfig{ |
||||
TLSConfig: config.TLSConfig{InsecureSkipVerify: true}, |
||||
}, |
||||
ResourceType: "monitoring", |
||||
// Some known type.
|
||||
ResourceTypeURL: "type.googleapis.com/envoy.service.discovery.v3.DiscoveryRequest", |
||||
Server: "http://localhost", |
||||
ClientID: "test-id", |
||||
} |
||||
) |
||||
|
||||
func urlMustParse(str string) *url.URL { |
||||
parsed, err := url.Parse(str) |
||||
|
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
return parsed |
||||
} |
||||
|
||||
func TestMakeXDSResourceHttpEndpointEmptyServerURLScheme(t *testing.T) { |
||||
endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("127.0.0.1"), "monitoring") |
||||
|
||||
require.Empty(t, endpointURL) |
||||
require.Error(t, err) |
||||
require.Equal(t, err.Error(), "invalid xDS server URL") |
||||
} |
||||
|
||||
func TestMakeXDSResourceHttpEndpointEmptyServerURLHost(t *testing.T) { |
||||
endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("grpc://127.0.0.1"), "monitoring") |
||||
|
||||
require.Empty(t, endpointURL) |
||||
require.NotNil(t, err) |
||||
require.Contains(t, err.Error(), "must be either 'http' or 'https'") |
||||
} |
||||
|
||||
func TestMakeXDSResourceHttpEndpoint(t *testing.T) { |
||||
endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("http://127.0.0.1:5000"), "monitoring") |
||||
|
||||
require.NoError(t, err) |
||||
require.Equal(t, endpointURL.String(), "http://127.0.0.1:5000/v3/discovery:monitoring") |
||||
} |
||||
|
||||
func TestCreateNewHTTPResourceClient(t *testing.T) { |
||||
c := &HTTPResourceClientConfig{ |
||||
HTTPClientConfig: sdConf.HTTPClientConfig, |
||||
Name: "test", |
||||
ExtraQueryParams: url.Values{ |
||||
"param1": {"v1"}, |
||||
}, |
||||
Timeout: 1 * time.Minute, |
||||
ResourceType: "monitoring", |
||||
ResourceTypeURL: "type.googleapis.com/envoy.service.discovery.v3.DiscoveryRequest", |
||||
Server: "http://127.0.0.1:5000", |
||||
ClientID: "client", |
||||
} |
||||
|
||||
client, err := NewHTTPResourceClient(c, ProtocolV3) |
||||
|
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, client.endpoint, "http://127.0.0.1:5000/v3/discovery:monitoring?param1=v1") |
||||
require.Equal(t, client.client.Timeout, 1*time.Minute) |
||||
|
||||
} |
||||
|
||||
func createTestHTTPResourceClient(t *testing.T, conf *HTTPResourceClientConfig, protocolVersion ProtocolVersion, responder discoveryResponder) (*HTTPResourceClient, func()) { |
||||
s := createTestHTTPServer(t, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
require.Equal(t, conf.ResourceTypeURL, request.TypeUrl) |
||||
require.Equal(t, conf.ClientID, request.Node.Id) |
||||
return responder(request) |
||||
}) |
||||
|
||||
conf.Server = s.URL |
||||
client, err := NewHTTPResourceClient(conf, protocolVersion) |
||||
require.NoError(t, err) |
||||
|
||||
return client, s.Close |
||||
} |
||||
|
||||
func TestHTTPResourceClientFetchEmptyResponse(t *testing.T) { |
||||
client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
return nil, nil |
||||
}) |
||||
defer cleanup() |
||||
|
||||
res, err := client.Fetch(context.Background()) |
||||
require.Nil(t, res) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestHTTPResourceClientFetchFullResponse(t *testing.T) { |
||||
client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
if request.VersionInfo == "1" { |
||||
return nil, nil |
||||
} |
||||
|
||||
return &v3.DiscoveryResponse{ |
||||
TypeUrl: request.TypeUrl, |
||||
VersionInfo: "1", |
||||
Nonce: "abc", |
||||
Resources: []*anypb.Any{}, |
||||
}, nil |
||||
}) |
||||
defer cleanup() |
||||
|
||||
res, err := client.Fetch(context.Background()) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, res) |
||||
|
||||
require.Equal(t, client.ResourceTypeURL(), res.TypeUrl) |
||||
require.Len(t, res.Resources, 0) |
||||
require.Equal(t, "abc", client.latestNonce, "Nonce not cached") |
||||
require.Equal(t, "1", client.latestVersion, "Version not cached") |
||||
|
||||
res, err = client.Fetch(context.Background()) |
||||
require.Nil(t, res, "Update not expected") |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestHTTPResourceClientServerError(t *testing.T) { |
||||
client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
return nil, errors.New("server error") |
||||
}) |
||||
defer cleanup() |
||||
|
||||
res, err := client.Fetch(context.Background()) |
||||
require.Nil(t, res) |
||||
require.Error(t, err) |
||||
} |
@ -0,0 +1,226 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net/url" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/config" |
||||
"github.com/prometheus/common/model" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
|
||||
"github.com/prometheus/prometheus/discovery" |
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
"github.com/prometheus/prometheus/util/osutil" |
||||
"github.com/prometheus/prometheus/util/strutil" |
||||
) |
||||
|
||||
var ( |
||||
// DefaultKumaSDConfig is the default Kuma MADS SD configuration.
|
||||
DefaultKumaSDConfig = KumaSDConfig{ |
||||
HTTPClientConfig: config.DefaultHTTPClientConfig, |
||||
RefreshInterval: model.Duration(15 * time.Second), |
||||
FetchTimeout: model.Duration(2 * time.Minute), |
||||
} |
||||
|
||||
kumaFetchFailuresCount = prometheus.NewCounter( |
||||
prometheus.CounterOpts{ |
||||
Namespace: namespace, |
||||
Name: "sd_kuma_fetch_failures_total", |
||||
Help: "The number of Kuma MADS fetch call failures.", |
||||
}) |
||||
kumaFetchSkipUpdateCount = prometheus.NewCounter( |
||||
prometheus.CounterOpts{ |
||||
Namespace: namespace, |
||||
Name: "sd_kuma_fetch_skipped_updates_total", |
||||
Help: "The number of Kuma MADS fetch calls that result in no updates to the targets.", |
||||
}) |
||||
kumaFetchDuration = prometheus.NewSummary( |
||||
prometheus.SummaryOpts{ |
||||
Namespace: namespace, |
||||
Name: "sd_kuma_fetch_duration_seconds", |
||||
Help: "The duration of a Kuma MADS fetch call.", |
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, |
||||
}, |
||||
) |
||||
) |
||||
|
||||
const ( |
||||
// kumaMetaLabelPrefix is the meta prefix used for all kuma meta labels.
|
||||
kumaMetaLabelPrefix = model.MetaLabelPrefix + "kuma_" |
||||
|
||||
// kumaMeshLabel is the name of the label that holds the mesh name.
|
||||
kumaMeshLabel = kumaMetaLabelPrefix + "mesh" |
||||
// kumaServiceLabel is the name of the label that holds the service name.
|
||||
kumaServiceLabel = kumaMetaLabelPrefix + "service" |
||||
// kumaDataplaneLabel is the name of the label that holds the dataplane name.
|
||||
kumaDataplaneLabel = kumaMetaLabelPrefix + "dataplane" |
||||
// kumaUserLabelPrefix is the name of the label that namespaces all user-defined labels.
|
||||
kumaUserLabelPrefix = kumaMetaLabelPrefix + "label_" |
||||
) |
||||
|
||||
const ( |
||||
KumaMadsV1ResourceTypeURL = "type.googleapis.com/kuma.observability.v1.MonitoringAssignment" |
||||
KumaMadsV1ResourceType = "monitoringassignments" |
||||
) |
||||
|
||||
type KumaSDConfig = SDConfig |
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
func (c *KumaSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { |
||||
*c = DefaultKumaSDConfig |
||||
type plainKumaConf KumaSDConfig |
||||
err := unmarshal((*plainKumaConf)(c)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if len(c.Server) == 0 { |
||||
return errors.Errorf("kuma SD server must not be empty: %s", c.Server) |
||||
} |
||||
parsedURL, err := url.Parse(c.Server) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if len(parsedURL.Scheme) == 0 || len(parsedURL.Host) == 0 { |
||||
return errors.Errorf("kuma SD server must not be empty and have a scheme: %s", c.Server) |
||||
} |
||||
|
||||
if err := c.HTTPClientConfig.Validate(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (c *KumaSDConfig) Name() string { |
||||
return "kuma" |
||||
} |
||||
|
||||
// SetDirectory joins any relative file paths with dir.
|
||||
func (c *KumaSDConfig) SetDirectory(dir string) { |
||||
c.HTTPClientConfig.SetDirectory(dir) |
||||
} |
||||
|
||||
func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { |
||||
logger := opts.Logger |
||||
if logger == nil { |
||||
logger = log.NewNopLogger() |
||||
} |
||||
|
||||
return NewKumaHTTPDiscovery(c, logger) |
||||
} |
||||
|
||||
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) *targetgroup.Group { |
||||
commonLabels := convertKumaUserLabels(assignment.Labels) |
||||
|
||||
commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh) |
||||
commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service) |
||||
|
||||
var targetLabelSets []model.LabelSet |
||||
|
||||
for _, target := range assignment.Targets { |
||||
targetLabels := convertKumaUserLabels(target.Labels) |
||||
|
||||
targetLabels[kumaDataplaneLabel] = model.LabelValue(target.Name) |
||||
targetLabels[model.InstanceLabel] = model.LabelValue(target.Name) |
||||
targetLabels[model.AddressLabel] = model.LabelValue(target.Address) |
||||
targetLabels[model.SchemeLabel] = model.LabelValue(target.Scheme) |
||||
targetLabels[model.MetricsPathLabel] = model.LabelValue(target.MetricsPath) |
||||
|
||||
targetLabelSets = append(targetLabelSets, targetLabels) |
||||
} |
||||
|
||||
return &targetgroup.Group{ |
||||
Labels: commonLabels, |
||||
Targets: targetLabelSets, |
||||
} |
||||
} |
||||
|
||||
func convertKumaUserLabels(labels map[string]string) model.LabelSet { |
||||
labelSet := model.LabelSet{} |
||||
for key, value := range labels { |
||||
name := kumaUserLabelPrefix + strutil.SanitizeLabelName(key) |
||||
labelSet[model.LabelName(name)] = model.LabelValue(value) |
||||
} |
||||
return labelSet |
||||
} |
||||
|
||||
// kumaMadsV1ResourceParser is an xds.resourceParser.
|
||||
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*targetgroup.Group, error) { |
||||
if typeURL != KumaMadsV1ResourceTypeURL { |
||||
return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL) |
||||
} |
||||
|
||||
var groups []*targetgroup.Group |
||||
|
||||
for _, resource := range resources { |
||||
assignment := &MonitoringAssignment{} |
||||
|
||||
if err := anypb.UnmarshalTo(resource, assignment, protoUnmarshalOptions); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
groups = append(groups, convertKumaV1MonitoringAssignment(assignment)) |
||||
} |
||||
|
||||
return groups, nil |
||||
} |
||||
|
||||
func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) { |
||||
// Default to "prometheus" if hostname is unavailable.
|
||||
clientID, err := osutil.GetFQDN() |
||||
if err != nil { |
||||
level.Debug(logger).Log("msg", "error getting FQDN", "err", err) |
||||
clientID = "prometheus" |
||||
} |
||||
|
||||
clientConfig := &HTTPResourceClientConfig{ |
||||
HTTPClientConfig: conf.HTTPClientConfig, |
||||
ExtraQueryParams: url.Values{ |
||||
"fetch-timeout": {conf.FetchTimeout.String()}, |
||||
}, |
||||
// Allow 15s of buffer over the timeout sent to the xDS server for connection overhead.
|
||||
Timeout: time.Duration(conf.FetchTimeout) + (15 * time.Second), |
||||
ResourceType: KumaMadsV1ResourceType, |
||||
ResourceTypeURL: KumaMadsV1ResourceTypeURL, |
||||
Server: conf.Server, |
||||
ClientID: clientID, |
||||
} |
||||
|
||||
client, err := NewHTTPResourceClient(clientConfig, ProtocolV3) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("kuma_sd: %w", err) |
||||
} |
||||
|
||||
d := &fetchDiscovery{ |
||||
client: client, |
||||
logger: logger, |
||||
refreshInterval: time.Duration(conf.RefreshInterval), |
||||
source: "kuma", |
||||
parseResources: kumaMadsV1ResourceParser, |
||||
fetchFailuresCount: kumaFetchFailuresCount, |
||||
fetchSkipUpdateCount: kumaFetchSkipUpdateCount, |
||||
fetchDuration: kumaFetchDuration, |
||||
} |
||||
|
||||
return d, nil |
||||
} |
@ -0,0 +1,398 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.25.0
|
||||
// protoc v3.14.0
|
||||
// source: observability/v1/mads.proto
|
||||
|
||||
// gRPC-removed vendored file from Kuma.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
context "context" |
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
_ "github.com/envoyproxy/protoc-gen-validate/validate" |
||||
_ "google.golang.org/genproto/googleapis/api/annotations" |
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect" |
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl" |
||||
reflect "reflect" |
||||
sync "sync" |
||||
) |
||||
|
||||
const ( |
||||
// Verify that this generated code is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) |
||||
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) |
||||
) |
||||
|
||||
// MADS resource type.
|
||||
//
|
||||
// Describes a group of targets on a single service that need to be monitored.
|
||||
type MonitoringAssignment struct { |
||||
state protoimpl.MessageState |
||||
sizeCache protoimpl.SizeCache |
||||
unknownFields protoimpl.UnknownFields |
||||
|
||||
// Mesh of the dataplane.
|
||||
//
|
||||
// E.g., `default`
|
||||
Mesh string `protobuf:"bytes,2,opt,name=mesh,proto3" json:"mesh,omitempty"` |
||||
// Identifying service the dataplane is proxying.
|
||||
//
|
||||
// E.g., `backend`
|
||||
Service string `protobuf:"bytes,3,opt,name=service,proto3" json:"service,omitempty"` |
||||
// List of targets that need to be monitored.
|
||||
Targets []*MonitoringAssignment_Target `protobuf:"bytes,4,rep,name=targets,proto3" json:"targets,omitempty"` |
||||
// Arbitrary Labels associated with every target in the assignment.
|
||||
//
|
||||
// E.g., `{"zone" : "us-east-1", "team": "infra", "commit_hash": "620506a88"}`.
|
||||
Labels map[string]string `protobuf:"bytes,5,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` |
||||
} |
||||
|
||||
func (x *MonitoringAssignment) Reset() { |
||||
*x = MonitoringAssignment{} |
||||
if protoimpl.UnsafeEnabled { |
||||
mi := &file_observability_v1_mads_proto_msgTypes[0] |
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
||||
ms.StoreMessageInfo(mi) |
||||
} |
||||
} |
||||
|
||||
func (x *MonitoringAssignment) String() string { |
||||
return protoimpl.X.MessageStringOf(x) |
||||
} |
||||
|
||||
func (*MonitoringAssignment) ProtoMessage() {} |
||||
|
||||
func (x *MonitoringAssignment) ProtoReflect() protoreflect.Message { |
||||
mi := &file_observability_v1_mads_proto_msgTypes[0] |
||||
if protoimpl.UnsafeEnabled && x != nil { |
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
||||
if ms.LoadMessageInfo() == nil { |
||||
ms.StoreMessageInfo(mi) |
||||
} |
||||
return ms |
||||
} |
||||
return mi.MessageOf(x) |
||||
} |
||||
|
||||
// Deprecated: Use MonitoringAssignment.ProtoReflect.Descriptor instead.
|
||||
func (*MonitoringAssignment) Descriptor() ([]byte, []int) { |
||||
return file_observability_v1_mads_proto_rawDescGZIP(), []int{0} |
||||
} |
||||
|
||||
func (x *MonitoringAssignment) GetMesh() string { |
||||
if x != nil { |
||||
return x.Mesh |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (x *MonitoringAssignment) GetService() string { |
||||
if x != nil { |
||||
return x.Service |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (x *MonitoringAssignment) GetTargets() []*MonitoringAssignment_Target { |
||||
if x != nil { |
||||
return x.Targets |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (x *MonitoringAssignment) GetLabels() map[string]string { |
||||
if x != nil { |
||||
return x.Labels |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Describes a single target that needs to be monitored.
|
||||
type MonitoringAssignment_Target struct { |
||||
state protoimpl.MessageState |
||||
sizeCache protoimpl.SizeCache |
||||
unknownFields protoimpl.UnknownFields |
||||
|
||||
// E.g., `backend-01`
|
||||
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` |
||||
// Scheme on which to scrape the target.
|
||||
//E.g., `http`
|
||||
Scheme string `protobuf:"bytes,2,opt,name=scheme,proto3" json:"scheme,omitempty"` |
||||
// Address (preferably IP) for the service
|
||||
// E.g., `backend.svc` or `10.1.4.32:9090`
|
||||
Address string `protobuf:"bytes,3,opt,name=address,proto3" json:"address,omitempty"` |
||||
// Optional path to append to the address for scraping
|
||||
//E.g., `/metrics`
|
||||
MetricsPath string `protobuf:"bytes,4,opt,name=metrics_path,json=metricsPath,proto3" json:"metrics_path,omitempty"` |
||||
// Arbitrary labels associated with that particular target.
|
||||
//
|
||||
// E.g.,
|
||||
// `{
|
||||
// "commit_hash" : "620506a88",
|
||||
// }`.
|
||||
Labels map[string]string `protobuf:"bytes,5,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` |
||||
} |
||||
|
||||
func (x *MonitoringAssignment_Target) Reset() { |
||||
*x = MonitoringAssignment_Target{} |
||||
if protoimpl.UnsafeEnabled { |
||||
mi := &file_observability_v1_mads_proto_msgTypes[1] |
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
||||
ms.StoreMessageInfo(mi) |
||||
} |
||||
} |
||||
|
||||
func (x *MonitoringAssignment_Target) String() string { |
||||
return protoimpl.X.MessageStringOf(x) |
||||
} |
||||
|
||||
func (*MonitoringAssignment_Target) ProtoMessage() {} |
||||
|
||||
func (x *MonitoringAssignment_Target) ProtoReflect() protoreflect.Message { |
||||
mi := &file_observability_v1_mads_proto_msgTypes[1] |
||||
if protoimpl.UnsafeEnabled && x != nil { |
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
||||
if ms.LoadMessageInfo() == nil { |
||||
ms.StoreMessageInfo(mi) |
||||
} |
||||
return ms |
||||
} |
||||
return mi.MessageOf(x) |
||||
} |
||||
|
||||
// Deprecated: Use MonitoringAssignment_Target.ProtoReflect.Descriptor instead.
|
||||
func (*MonitoringAssignment_Target) Descriptor() ([]byte, []int) { |
||||
return file_observability_v1_mads_proto_rawDescGZIP(), []int{0, 0} |
||||
} |
||||
|
||||
func (x *MonitoringAssignment_Target) GetName() string { |
||||
if x != nil { |
||||
return x.Name |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (x *MonitoringAssignment_Target) GetScheme() string { |
||||
if x != nil { |
||||
return x.Scheme |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (x *MonitoringAssignment_Target) GetAddress() string { |
||||
if x != nil { |
||||
return x.Address |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (x *MonitoringAssignment_Target) GetMetricsPath() string { |
||||
if x != nil { |
||||
return x.MetricsPath |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (x *MonitoringAssignment_Target) GetLabels() map[string]string { |
||||
if x != nil { |
||||
return x.Labels |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
var File_observability_v1_mads_proto protoreflect.FileDescriptor |
||||
|
||||
var file_observability_v1_mads_proto_rawDesc = []byte{ |
||||
0x0a, 0x1b, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2f, |
||||
0x76, 0x31, 0x2f, 0x6d, 0x61, 0x64, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15, 0x6b, |
||||
0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, |
||||
0x79, 0x2e, 0x76, 0x31, 0x1a, 0x2a, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2f, 0x73, 0x65, 0x72, 0x76, |
||||
0x69, 0x63, 0x65, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x76, 0x33, |
||||
0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, |
||||
0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, |
||||
0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x17, |
||||
0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, |
||||
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd2, 0x04, 0x0a, 0x14, 0x4d, 0x6f, 0x6e, 0x69, |
||||
0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, |
||||
0x12, 0x1b, 0x0a, 0x04, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, |
||||
0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, 0x01, 0x52, 0x04, 0x6d, 0x65, 0x73, 0x68, 0x12, 0x21, 0x0a, |
||||
0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, |
||||
0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, 0x01, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, |
||||
0x12, 0x4c, 0x0a, 0x07, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, |
||||
0x0b, 0x32, 0x32, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, |
||||
0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, |
||||
0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x54, |
||||
0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x07, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x12, 0x4f, |
||||
0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x37, |
||||
0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, |
||||
0x69, 0x74, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, |
||||
0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x4c, 0x61, 0x62, 0x65, |
||||
0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x1a, |
||||
0x9f, 0x02, 0x0a, 0x06, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x1b, 0x0a, 0x04, 0x6e, 0x61, |
||||
0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, |
||||
0x01, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, |
||||
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, 0x20, 0x01, |
||||
0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, |
||||
0x65, 0x73, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, |
||||
0x20, 0x01, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x6d, |
||||
0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, |
||||
0x09, 0x52, 0x0b, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x50, 0x61, 0x74, 0x68, 0x12, 0x56, |
||||
0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x3e, |
||||
0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x62, 0x69, 0x6c, |
||||
0x69, 0x74, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, |
||||
0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x54, 0x61, 0x72, 0x67, |
||||
0x65, 0x74, 0x2e, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, |
||||
0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x1a, 0x39, 0x0a, 0x0b, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, |
||||
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, |
||||
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, |
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, |
||||
0x01, 0x1a, 0x39, 0x0a, 0x0b, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, |
||||
0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, |
||||
0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, |
||||
0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xe6, 0x03, 0x0a, |
||||
0x24, 0x4d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, |
||||
0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x53, 0x65, |
||||
0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x1a, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x4d, |
||||
0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, |
||||
0x65, 0x6e, 0x74, 0x73, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, |
||||
0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, |
||||
0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, |
||||
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, |
||||
0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, |
||||
0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, |
||||
0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, |
||||
0x01, 0x12, 0x80, 0x01, 0x0a, 0x1b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x6f, 0x6e, 0x69, |
||||
0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, |
||||
0x73, 0x12, 0x2c, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, |
||||
0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, |
||||
0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, |
||||
0x2d, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, |
||||
0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, |
||||
0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, |
||||
0x28, 0x01, 0x30, 0x01, 0x12, 0xae, 0x01, 0x0a, 0x1a, 0x46, 0x65, 0x74, 0x63, 0x68, 0x4d, 0x6f, |
||||
0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, |
||||
0x6e, 0x74, 0x73, 0x12, 0x2c, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, |
||||
0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, |
||||
0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, |
||||
0x74, 0x1a, 0x2d, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, |
||||
0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, |
||||
0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, |
||||
0x22, 0x33, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x24, 0x22, 0x22, 0x2f, 0x76, 0x33, 0x2f, 0x64, 0x69, |
||||
0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x3a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x69, |
||||
0x6e, 0x67, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x82, 0xd3, 0xe4, 0x93, |
||||
0x02, 0x03, 0x3a, 0x01, 0x2a, 0x42, 0x04, 0x5a, 0x02, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, |
||||
0x74, 0x6f, 0x33, |
||||
} |
||||
|
||||
var ( |
||||
file_observability_v1_mads_proto_rawDescOnce sync.Once |
||||
file_observability_v1_mads_proto_rawDescData = file_observability_v1_mads_proto_rawDesc |
||||
) |
||||
|
||||
func file_observability_v1_mads_proto_rawDescGZIP() []byte { |
||||
file_observability_v1_mads_proto_rawDescOnce.Do(func() { |
||||
file_observability_v1_mads_proto_rawDescData = protoimpl.X.CompressGZIP(file_observability_v1_mads_proto_rawDescData) |
||||
}) |
||||
return file_observability_v1_mads_proto_rawDescData |
||||
} |
||||
|
||||
var file_observability_v1_mads_proto_msgTypes = make([]protoimpl.MessageInfo, 4) |
||||
var file_observability_v1_mads_proto_goTypes = []interface{}{ |
||||
(*MonitoringAssignment)(nil), // 0: kuma.observability.v1.MonitoringAssignment
|
||||
(*MonitoringAssignment_Target)(nil), // 1: kuma.observability.v1.MonitoringAssignment.Target
|
||||
nil, // 2: kuma.observability.v1.MonitoringAssignment.LabelsEntry
|
||||
nil, // 3: kuma.observability.v1.MonitoringAssignment.Target.LabelsEntry
|
||||
(*v3.DeltaDiscoveryRequest)(nil), // 4: envoy.service.discovery.v3.DeltaDiscoveryRequest
|
||||
(*v3.DiscoveryRequest)(nil), // 5: envoy.service.discovery.v3.DiscoveryRequest
|
||||
(*v3.DeltaDiscoveryResponse)(nil), // 6: envoy.service.discovery.v3.DeltaDiscoveryResponse
|
||||
(*v3.DiscoveryResponse)(nil), // 7: envoy.service.discovery.v3.DiscoveryResponse
|
||||
} |
||||
var file_observability_v1_mads_proto_depIdxs = []int32{ |
||||
1, // 0: kuma.observability.v1.MonitoringAssignment.targets:type_name -> kuma.observability.v1.MonitoringAssignment.Target
|
||||
2, // 1: kuma.observability.v1.MonitoringAssignment.labels:type_name -> kuma.observability.v1.MonitoringAssignment.LabelsEntry
|
||||
3, // 2: kuma.observability.v1.MonitoringAssignment.Target.labels:type_name -> kuma.observability.v1.MonitoringAssignment.Target.LabelsEntry
|
||||
4, // 3: kuma.observability.v1.MonitoringAssignmentDiscoveryService.DeltaMonitoringAssignments:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest
|
||||
5, // 4: kuma.observability.v1.MonitoringAssignmentDiscoveryService.StreamMonitoringAssignments:input_type -> envoy.service.discovery.v3.DiscoveryRequest
|
||||
5, // 5: kuma.observability.v1.MonitoringAssignmentDiscoveryService.FetchMonitoringAssignments:input_type -> envoy.service.discovery.v3.DiscoveryRequest
|
||||
6, // 6: kuma.observability.v1.MonitoringAssignmentDiscoveryService.DeltaMonitoringAssignments:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse
|
||||
7, // 7: kuma.observability.v1.MonitoringAssignmentDiscoveryService.StreamMonitoringAssignments:output_type -> envoy.service.discovery.v3.DiscoveryResponse
|
||||
7, // 8: kuma.observability.v1.MonitoringAssignmentDiscoveryService.FetchMonitoringAssignments:output_type -> envoy.service.discovery.v3.DiscoveryResponse
|
||||
6, // [6:9] is the sub-list for method output_type
|
||||
3, // [3:6] is the sub-list for method input_type
|
||||
3, // [3:3] is the sub-list for extension type_name
|
||||
3, // [3:3] is the sub-list for extension extendee
|
||||
0, // [0:3] is the sub-list for field type_name
|
||||
} |
||||
|
||||
func init() { file_observability_v1_mads_proto_init() } |
||||
func file_observability_v1_mads_proto_init() { |
||||
if File_observability_v1_mads_proto != nil { |
||||
return |
||||
} |
||||
if !protoimpl.UnsafeEnabled { |
||||
file_observability_v1_mads_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { |
||||
switch v := v.(*MonitoringAssignment); i { |
||||
case 0: |
||||
return &v.state |
||||
case 1: |
||||
return &v.sizeCache |
||||
case 2: |
||||
return &v.unknownFields |
||||
default: |
||||
return nil |
||||
} |
||||
} |
||||
file_observability_v1_mads_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { |
||||
switch v := v.(*MonitoringAssignment_Target); i { |
||||
case 0: |
||||
return &v.state |
||||
case 1: |
||||
return &v.sizeCache |
||||
case 2: |
||||
return &v.unknownFields |
||||
default: |
||||
return nil |
||||
} |
||||
} |
||||
} |
||||
type x struct{} |
||||
out := protoimpl.TypeBuilder{ |
||||
File: protoimpl.DescBuilder{ |
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), |
||||
RawDescriptor: file_observability_v1_mads_proto_rawDesc, |
||||
NumEnums: 0, |
||||
NumMessages: 4, |
||||
NumExtensions: 0, |
||||
NumServices: 1, |
||||
}, |
||||
GoTypes: file_observability_v1_mads_proto_goTypes, |
||||
DependencyIndexes: file_observability_v1_mads_proto_depIdxs, |
||||
MessageInfos: file_observability_v1_mads_proto_msgTypes, |
||||
}.Build() |
||||
File_observability_v1_mads_proto = out.File |
||||
file_observability_v1_mads_proto_rawDesc = nil |
||||
file_observability_v1_mads_proto_goTypes = nil |
||||
file_observability_v1_mads_proto_depIdxs = nil |
||||
} |
||||
|
||||
// MonitoringAssignmentDiscoveryServiceServer is the server API for MonitoringAssignmentDiscoveryService service.
|
||||
type MonitoringAssignmentDiscoveryServiceServer interface { |
||||
// HTTP
|
||||
FetchMonitoringAssignments(context.Context, *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) |
||||
} |
@ -0,0 +1,340 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"testing" |
||||
"time" |
||||
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
"gopkg.in/yaml.v2" |
||||
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
) |
||||
|
||||
var ( |
||||
kumaConf KumaSDConfig = sdConf |
||||
|
||||
testKumaMadsV1Resources = []*MonitoringAssignment{ |
||||
{ |
||||
Mesh: "metrics", |
||||
Service: "prometheus", |
||||
Targets: []*MonitoringAssignment_Target{ |
||||
{ |
||||
Name: "prometheus-01", |
||||
Scheme: "http", |
||||
Address: "10.1.4.32:9090", |
||||
MetricsPath: "/custom-metrics", |
||||
Labels: map[string]string{ |
||||
"commit_hash": "620506a88", |
||||
}, |
||||
}, |
||||
{ |
||||
Name: "prometheus-02", |
||||
Scheme: "http", |
||||
Address: "10.1.4.33:9090", |
||||
Labels: map[string]string{ |
||||
"commit_hash": "3513bba00", |
||||
}, |
||||
}, |
||||
}, |
||||
Labels: map[string]string{ |
||||
"kuma.io/zone": "us-east-1", |
||||
"team": "infra", |
||||
}, |
||||
}, |
||||
{ |
||||
Mesh: "metrics", |
||||
Service: "grafana", |
||||
Targets: []*MonitoringAssignment_Target{}, |
||||
Labels: map[string]string{ |
||||
"kuma.io/zone": "us-east-1", |
||||
"team": "infra", |
||||
}, |
||||
}, |
||||
{ |
||||
Mesh: "data", |
||||
Service: "elasticsearch", |
||||
Targets: []*MonitoringAssignment_Target{ |
||||
{ |
||||
Name: "elasticsearch-01", |
||||
Scheme: "http", |
||||
Address: "10.1.1.1", |
||||
Labels: map[string]string{ |
||||
"role": "ml", |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
func getKumaMadsV1DiscoveryResponse(resources ...*MonitoringAssignment) (*v3.DiscoveryResponse, error) { |
||||
serialized := make([]*anypb.Any, len(resources)) |
||||
for i, res := range resources { |
||||
data, err := proto.Marshal(res) |
||||
|
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
serialized[i] = &anypb.Any{ |
||||
TypeUrl: KumaMadsV1ResourceTypeURL, |
||||
Value: data, |
||||
} |
||||
} |
||||
return &v3.DiscoveryResponse{ |
||||
TypeUrl: KumaMadsV1ResourceTypeURL, |
||||
Resources: serialized, |
||||
}, nil |
||||
} |
||||
|
||||
func newKumaTestHTTPDiscovery(c KumaSDConfig) (*fetchDiscovery, error) { |
||||
kd, err := NewKumaHTTPDiscovery(&c, nopLogger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
pd, ok := kd.(*fetchDiscovery) |
||||
if !ok { |
||||
return nil, errors.New("not a fetchDiscovery") |
||||
} |
||||
return pd, nil |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserInvalidTypeURL(t *testing.T) { |
||||
resources := make([]*anypb.Any, 0) |
||||
groups, err := kumaMadsV1ResourceParser(resources, "type.googleapis.com/some.api.v1.Monitoring") |
||||
require.Nil(t, groups) |
||||
require.Error(t, err) |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserEmptySlice(t *testing.T) { |
||||
resources := make([]*anypb.Any, 0) |
||||
groups, err := kumaMadsV1ResourceParser(resources, KumaMadsV1ResourceTypeURL) |
||||
require.Len(t, groups, 0) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserValidResources(t *testing.T) { |
||||
res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) |
||||
require.NoError(t, err) |
||||
|
||||
groups, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL) |
||||
require.NoError(t, err) |
||||
require.Len(t, groups, 3) |
||||
|
||||
expectedGroup1 := &targetgroup.Group{ |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.4.32:9090", |
||||
"__meta_kuma_label_commit_hash": "620506a88", |
||||
"__meta_kuma_dataplane": "prometheus-01", |
||||
"__metrics_path__": "/custom-metrics", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-01", |
||||
}, |
||||
{ |
||||
"__address__": "10.1.4.33:9090", |
||||
"__meta_kuma_label_commit_hash": "3513bba00", |
||||
"__meta_kuma_dataplane": "prometheus-02", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-02", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "prometheus", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup1, groups[0]) |
||||
|
||||
expectedGroup2 := &targetgroup.Group{ |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "grafana", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup2, groups[1]) |
||||
|
||||
expectedGroup3 := &targetgroup.Group{ |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.1.1", |
||||
"__meta_kuma_label_role": "ml", |
||||
"__meta_kuma_dataplane": "elasticsearch-01", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "elasticsearch-01", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "data", |
||||
"__meta_kuma_service": "elasticsearch", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup3, groups[2]) |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) { |
||||
data, err := protoJSONMarshalOptions.Marshal(&MonitoringAssignment_Target{}) |
||||
require.NoError(t, err) |
||||
|
||||
resources := []*anypb.Any{{ |
||||
TypeUrl: KumaMadsV1ResourceTypeURL, |
||||
Value: data, |
||||
}} |
||||
groups, err := kumaMadsV1ResourceParser(resources, KumaMadsV1ResourceTypeURL) |
||||
require.Nil(t, groups) |
||||
require.Error(t, err) |
||||
|
||||
require.Contains(t, err.Error(), "cannot parse") |
||||
} |
||||
|
||||
func TestNewKumaHTTPDiscovery(t *testing.T) { |
||||
kd, err := newKumaTestHTTPDiscovery(kumaConf) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, kd) |
||||
|
||||
resClient, ok := kd.client.(*HTTPResourceClient) |
||||
require.True(t, ok) |
||||
require.Equal(t, kumaConf.Server, resClient.Server()) |
||||
require.Equal(t, KumaMadsV1ResourceTypeURL, resClient.ResourceTypeURL()) |
||||
require.NotEmpty(t, resClient.ID()) |
||||
require.Equal(t, KumaMadsV1ResourceType, resClient.config.ResourceType) |
||||
} |
||||
|
||||
func TestKumaHTTPDiscoveryRefresh(t *testing.T) { |
||||
s := createTestHTTPServer(t, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
if request.VersionInfo == "1" { |
||||
return nil, nil |
||||
} |
||||
|
||||
res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) |
||||
require.NoError(t, err) |
||||
|
||||
res.VersionInfo = "1" |
||||
res.Nonce = "abc" |
||||
|
||||
return res, nil |
||||
}) |
||||
defer s.Close() |
||||
|
||||
cfgString := fmt.Sprintf(` |
||||
--- |
||||
server: %s |
||||
refresh_interval: 10s |
||||
tls_config: |
||||
insecure_skip_verify: true |
||||
`, s.URL) |
||||
|
||||
var cfg KumaSDConfig |
||||
require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) |
||||
|
||||
kd, err := newKumaTestHTTPDiscovery(cfg) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, kd) |
||||
|
||||
ch := make(chan []*targetgroup.Group, 1) |
||||
kd.poll(context.Background(), ch) |
||||
|
||||
groups := <-ch |
||||
require.Len(t, groups, 3) |
||||
|
||||
expectedGroup1 := &targetgroup.Group{ |
||||
Source: "kuma", |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.4.32:9090", |
||||
"__meta_kuma_label_commit_hash": "620506a88", |
||||
"__meta_kuma_dataplane": "prometheus-01", |
||||
"__metrics_path__": "/custom-metrics", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-01", |
||||
}, |
||||
{ |
||||
"__address__": "10.1.4.33:9090", |
||||
"__meta_kuma_label_commit_hash": "3513bba00", |
||||
"__meta_kuma_dataplane": "prometheus-02", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-02", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "prometheus", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup1, groups[0]) |
||||
|
||||
expectedGroup2 := &targetgroup.Group{ |
||||
Source: "kuma", |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "grafana", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup2, groups[1]) |
||||
|
||||
expectedGroup3 := &targetgroup.Group{ |
||||
Source: "kuma", |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.1.1", |
||||
"__meta_kuma_label_role": "ml", |
||||
"__meta_kuma_dataplane": "elasticsearch-01", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "elasticsearch-01", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "data", |
||||
"__meta_kuma_service": "elasticsearch", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup3, groups[2]) |
||||
|
||||
// Should skip the next update.
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
go func() { |
||||
time.Sleep(1 * time.Second) |
||||
cancel() |
||||
}() |
||||
|
||||
kd.poll(ctx, ch) |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-ch: |
||||
require.Fail(t, "no update expected") |
||||
} |
||||
} |
@ -0,0 +1,176 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"context" |
||||
"github.com/prometheus/common/model" |
||||
"time" |
||||
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/config" |
||||
"google.golang.org/protobuf/encoding/protojson" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/reflect/protoreflect" |
||||
"google.golang.org/protobuf/reflect/protoregistry" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
|
||||
"github.com/prometheus/prometheus/discovery" |
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
) |
||||
|
||||
const ( |
||||
// Constants for instrumentation.
|
||||
namespace = "prometheus" |
||||
) |
||||
|
||||
// ProtocolVersion is the xDS protocol version.
|
||||
type ProtocolVersion string |
||||
|
||||
const ( |
||||
ProtocolV3 = ProtocolVersion("v3") |
||||
) |
||||
|
||||
type HTTPConfig struct { |
||||
config.HTTPClientConfig `yaml:",inline"` |
||||
} |
||||
|
||||
// SDConfig is a base config for xDS-based SD mechanisms.
|
||||
type SDConfig struct { |
||||
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` |
||||
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` |
||||
FetchTimeout model.Duration `yaml:"fetch_timeout,omitempty"` |
||||
Server string `yaml:"server,omitempty"` |
||||
} |
||||
|
||||
// mustRegisterMessage registers the provided message type in the typeRegistry, and panics
|
||||
// if there is an error.
|
||||
func mustRegisterMessage(typeRegistry *protoregistry.Types, mt protoreflect.MessageType) { |
||||
if err := typeRegistry.RegisterMessage(mt); err != nil { |
||||
panic(err) |
||||
} |
||||
} |
||||
|
||||
func init() { |
||||
// Register top-level SD Configs.
|
||||
discovery.RegisterConfig(&KumaSDConfig{}) |
||||
|
||||
// Register metrics.
|
||||
prometheus.MustRegister(kumaFetchDuration, kumaFetchSkipUpdateCount, kumaFetchFailuresCount) |
||||
|
||||
// Register protobuf types that need to be marshalled/ unmarshalled.
|
||||
mustRegisterMessage(protoTypes, (&v3.DiscoveryRequest{}).ProtoReflect().Type()) |
||||
mustRegisterMessage(protoTypes, (&v3.DiscoveryResponse{}).ProtoReflect().Type()) |
||||
mustRegisterMessage(protoTypes, (&MonitoringAssignment{}).ProtoReflect().Type()) |
||||
} |
||||
|
||||
var ( |
||||
protoTypes = new(protoregistry.Types) |
||||
protoUnmarshalOptions = proto.UnmarshalOptions{ |
||||
DiscardUnknown: true, // Only want known fields.
|
||||
Merge: true, // Always using new messages.
|
||||
Resolver: protoTypes, // Only want known types.
|
||||
} |
||||
protoJSONUnmarshalOptions = protojson.UnmarshalOptions{ |
||||
DiscardUnknown: true, // Only want known fields.
|
||||
Resolver: protoTypes, // Only want known types.
|
||||
} |
||||
protoJSONMarshalOptions = protojson.MarshalOptions{ |
||||
UseProtoNames: true, |
||||
Resolver: protoTypes, // Only want known types.
|
||||
} |
||||
) |
||||
|
||||
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) |
||||
|
||||
// fetchDiscovery implements long-polling via xDS Fetch REST-JSON.
|
||||
type fetchDiscovery struct { |
||||
client ResourceClient |
||||
source string |
||||
|
||||
refreshInterval time.Duration |
||||
|
||||
parseResources resourceParser |
||||
logger log.Logger |
||||
|
||||
fetchDuration prometheus.Observer |
||||
fetchSkipUpdateCount prometheus.Counter |
||||
fetchFailuresCount prometheus.Counter |
||||
} |
||||
|
||||
func (d *fetchDiscovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { |
||||
defer d.client.Close() |
||||
|
||||
ticker := time.NewTicker(d.refreshInterval) |
||||
|
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
ticker.Stop() |
||||
return |
||||
default: |
||||
d.poll(ctx, ch) |
||||
<-ticker.C |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (d *fetchDiscovery) poll(ctx context.Context, ch chan<- []*targetgroup.Group) { |
||||
t0 := time.Now() |
||||
response, err := d.client.Fetch(ctx) |
||||
elapsed := time.Since(t0) |
||||
d.fetchDuration.Observe(elapsed.Seconds()) |
||||
|
||||
// Check the context before in order to exit early.
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
default: |
||||
} |
||||
|
||||
if err != nil { |
||||
level.Error(d.logger).Log("msg", "error parsing resources", "err", err) |
||||
d.fetchFailuresCount.Inc() |
||||
return |
||||
} |
||||
|
||||
if response == nil { |
||||
// No update needed.
|
||||
d.fetchSkipUpdateCount.Inc() |
||||
return |
||||
} |
||||
|
||||
parsedGroups, err := d.parseResources(response.Resources, response.TypeUrl) |
||||
if err != nil { |
||||
level.Error(d.logger).Log("msg", "error parsing resources", "err", err) |
||||
d.fetchFailuresCount.Inc() |
||||
return |
||||
} |
||||
|
||||
for _, group := range parsedGroups { |
||||
group.Source = d.source |
||||
} |
||||
|
||||
level.Debug(d.logger).Log("msg", "updated to version", "version", response.VersionInfo, "groups", len(parsedGroups)) |
||||
|
||||
// Check the context before sending an update on the channel.
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case ch <- parsedGroups: |
||||
} |
||||
} |
@ -0,0 +1,201 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"context" |
||||
"io" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"testing" |
||||
"time" |
||||
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/go-kit/log" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
"go.uber.org/goleak" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
) |
||||
|
||||
var ( |
||||
sdConf = SDConfig{ |
||||
Server: "http://127.0.0.1", |
||||
RefreshInterval: model.Duration(10 * time.Second), |
||||
} |
||||
|
||||
testFetchFailuresCount = prometheus.NewCounter( |
||||
prometheus.CounterOpts{}) |
||||
testFetchSkipUpdateCount = prometheus.NewCounter( |
||||
prometheus.CounterOpts{}) |
||||
testFetchDuration = prometheus.NewSummary( |
||||
prometheus.SummaryOpts{}, |
||||
) |
||||
) |
||||
|
||||
func TestMain(m *testing.M) { |
||||
goleak.VerifyTestMain(m) |
||||
} |
||||
|
||||
type discoveryResponder func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) |
||||
|
||||
func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest.Server { |
||||
return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
||||
// Validate req MIME types.
|
||||
require.Equal(t, "application/json", r.Header.Get("Content-Type")) |
||||
require.Equal(t, "application/json", r.Header.Get("Accept")) |
||||
|
||||
body, err := ioutil.ReadAll(r.Body) |
||||
defer func() { |
||||
_, _ = io.Copy(ioutil.Discard, r.Body) |
||||
_ = r.Body.Close() |
||||
}() |
||||
require.NotEmpty(t, body) |
||||
require.NoError(t, err) |
||||
|
||||
// Validate discovery request.
|
||||
discoveryReq := &v3.DiscoveryRequest{} |
||||
err = protoJSONUnmarshalOptions.Unmarshal(body, discoveryReq) |
||||
require.NoError(t, err) |
||||
|
||||
discoveryRes, err := responder(discoveryReq) |
||||
if err != nil { |
||||
w.WriteHeader(500) |
||||
return |
||||
} |
||||
|
||||
if discoveryRes == nil { |
||||
w.WriteHeader(304) |
||||
return |
||||
} |
||||
|
||||
w.WriteHeader(200) |
||||
data, err := protoJSONMarshalOptions.Marshal(discoveryRes) |
||||
require.NoError(t, err) |
||||
|
||||
_, err = w.Write(data) |
||||
require.NoError(t, err) |
||||
})) |
||||
} |
||||
|
||||
func constantResourceParser(groups []*targetgroup.Group, err error) resourceParser { |
||||
return func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) { |
||||
return groups, err |
||||
} |
||||
} |
||||
|
||||
var nopLogger = log.NewNopLogger() |
||||
|
||||
type testResourceClient struct { |
||||
resourceTypeURL string |
||||
server string |
||||
protocolVersion ProtocolVersion |
||||
fetch func(ctx context.Context) (*v3.DiscoveryResponse, error) |
||||
} |
||||
|
||||
func (rc testResourceClient) ResourceTypeURL() string { |
||||
return rc.resourceTypeURL |
||||
} |
||||
|
||||
func (rc testResourceClient) Server() string { |
||||
return rc.server |
||||
} |
||||
|
||||
func (rc testResourceClient) Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) { |
||||
return rc.fetch(ctx) |
||||
} |
||||
|
||||
func (rc testResourceClient) ID() string { |
||||
return "test-client" |
||||
} |
||||
|
||||
func (rc testResourceClient) Close() { |
||||
} |
||||
|
||||
func TestPollingRefreshSkipUpdate(t *testing.T) { |
||||
rc := &testResourceClient{ |
||||
fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { |
||||
return nil, nil |
||||
}, |
||||
} |
||||
pd := &fetchDiscovery{ |
||||
client: rc, |
||||
logger: nopLogger, |
||||
fetchDuration: testFetchDuration, |
||||
fetchFailuresCount: testFetchFailuresCount, |
||||
fetchSkipUpdateCount: testFetchSkipUpdateCount, |
||||
} |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
go func() { |
||||
time.Sleep(1 * time.Second) |
||||
cancel() |
||||
}() |
||||
|
||||
ch := make(chan []*targetgroup.Group, 1) |
||||
pd.poll(ctx, ch) |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-ch: |
||||
require.Fail(t, "no update expected") |
||||
} |
||||
} |
||||
|
||||
func TestPollingRefreshAttachesGroupMetadata(t *testing.T) { |
||||
server := "http://198.161.2.0" |
||||
source := "test" |
||||
rc := &testResourceClient{ |
||||
server: server, |
||||
protocolVersion: ProtocolV3, |
||||
fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { |
||||
return &v3.DiscoveryResponse{}, nil |
||||
}, |
||||
} |
||||
pd := &fetchDiscovery{ |
||||
source: source, |
||||
client: rc, |
||||
logger: nopLogger, |
||||
fetchDuration: testFetchDuration, |
||||
fetchFailuresCount: testFetchFailuresCount, |
||||
fetchSkipUpdateCount: testFetchSkipUpdateCount, |
||||
parseResources: constantResourceParser([]*targetgroup.Group{ |
||||
{}, |
||||
{ |
||||
Source: "a-custom-source", |
||||
Labels: model.LabelSet{ |
||||
"__meta_custom_xds_label": "a-value", |
||||
}, |
||||
}, |
||||
}, nil), |
||||
} |
||||
ch := make(chan []*targetgroup.Group, 1) |
||||
pd.poll(context.Background(), ch) |
||||
groups := <-ch |
||||
require.NotNil(t, groups) |
||||
|
||||
require.Len(t, groups, 2) |
||||
|
||||
for _, group := range groups { |
||||
require.Equal(t, source, group.Source) |
||||
} |
||||
|
||||
group2 := groups[1] |
||||
require.Contains(t, group2.Labels, model.LabelName("__meta_custom_xds_label")) |
||||
require.Equal(t, model.LabelValue("a-value"), group2.Labels["__meta_custom_xds_label"]) |
||||
} |
@ -0,0 +1,63 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package osutil |
||||
|
||||
import ( |
||||
"encoding" |
||||
"net" |
||||
"os" |
||||
) |
||||
|
||||
// GetFQDN returns a FQDN if it's possible, otherwise falls back to hostname.
|
||||
func GetFQDN() (string, error) { |
||||
hostname, err := os.Hostname() |
||||
if err != nil { |
||||
return "", err |
||||
} |
||||
|
||||
ips, err := net.LookupIP(hostname) |
||||
if err != nil { |
||||
// Return the system hostname if we can't look up the IP address.
|
||||
return hostname, nil |
||||
} |
||||
|
||||
lookup := func(ipStr encoding.TextMarshaler) (string, error) { |
||||
ip, err := ipStr.MarshalText() |
||||
if err != nil { |
||||
return "", err |
||||
} |
||||
hosts, err := net.LookupAddr(string(ip)) |
||||
if err != nil || len(hosts) == 0 { |
||||
return "", err |
||||
} |
||||
return hosts[0], nil |
||||
} |
||||
|
||||
for _, addr := range ips { |
||||
if ip := addr.To4(); ip != nil { |
||||
if fqdn, err := lookup(ip); err == nil { |
||||
return fqdn, nil |
||||
} |
||||
|
||||
} |
||||
|
||||
if ip := addr.To16(); ip != nil { |
||||
if fqdn, err := lookup(ip); err == nil { |
||||
return fqdn, nil |
||||
} |
||||
|
||||
} |
||||
} |
||||
return hostname, nil |
||||
} |
Loading…
Reference in new issue