Fix `kuma_sd` targetgroup reporting (#9157)

* Bundle all xDS targets into a single group

Signed-off-by: austin ce <austin.cawley@gmail.com>
pull/9161/head
Austin Cawley-Edwards 2021-08-05 19:29:15 -04:00 committed by Julien Pivotto
parent b86bdddb8c
commit 39a8db22eb
4 changed files with 170 additions and 140 deletions

View File

@ -27,7 +27,6 @@ import (
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
"github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/osutil" "github.com/prometheus/prometheus/util/osutil"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -129,30 +128,27 @@ func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discover
return NewKumaHTTPDiscovery(c, logger) return NewKumaHTTPDiscovery(c, logger)
} }
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) *targetgroup.Group { func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) []model.LabelSet {
commonLabels := convertKumaUserLabels(assignment.Labels) commonLabels := convertKumaUserLabels(assignment.Labels)
commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh) commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh)
commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service) commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service)
var targetLabelSets []model.LabelSet var targets []model.LabelSet
for _, target := range assignment.Targets { for _, madsTarget := range assignment.Targets {
targetLabels := convertKumaUserLabels(target.Labels) targetLabels := convertKumaUserLabels(madsTarget.Labels).Merge(commonLabels)
targetLabels[kumaDataplaneLabel] = model.LabelValue(target.Name) targetLabels[kumaDataplaneLabel] = model.LabelValue(madsTarget.Name)
targetLabels[model.InstanceLabel] = model.LabelValue(target.Name) targetLabels[model.AddressLabel] = model.LabelValue(madsTarget.Address)
targetLabels[model.AddressLabel] = model.LabelValue(target.Address) targetLabels[model.InstanceLabel] = model.LabelValue(madsTarget.Name)
targetLabels[model.SchemeLabel] = model.LabelValue(target.Scheme) targetLabels[model.SchemeLabel] = model.LabelValue(madsTarget.Scheme)
targetLabels[model.MetricsPathLabel] = model.LabelValue(target.MetricsPath) targetLabels[model.MetricsPathLabel] = model.LabelValue(madsTarget.MetricsPath)
targetLabelSets = append(targetLabelSets, targetLabels) targets = append(targets, targetLabels)
} }
return &targetgroup.Group{ return targets
Labels: commonLabels,
Targets: targetLabelSets,
}
} }
func convertKumaUserLabels(labels map[string]string) model.LabelSet { func convertKumaUserLabels(labels map[string]string) model.LabelSet {
@ -165,12 +161,12 @@ func convertKumaUserLabels(labels map[string]string) model.LabelSet {
} }
// kumaMadsV1ResourceParser is an xds.resourceParser. // kumaMadsV1ResourceParser is an xds.resourceParser.
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*targetgroup.Group, error) { func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]model.LabelSet, error) {
if typeURL != KumaMadsV1ResourceTypeURL { if typeURL != KumaMadsV1ResourceTypeURL {
return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL) return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL)
} }
var groups []*targetgroup.Group var targets []model.LabelSet
for _, resource := range resources { for _, resource := range resources {
assignment := &MonitoringAssignment{} assignment := &MonitoringAssignment{}
@ -179,10 +175,10 @@ func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*target
return nil, err return nil, err
} }
groups = append(groups, convertKumaV1MonitoringAssignment(assignment)) targets = append(targets, convertKumaV1MonitoringAssignment(assignment)...)
} }
return groups, nil return targets, nil
} }
func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) { func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) {

View File

@ -138,65 +138,47 @@ func TestKumaMadsV1ResourceParserValidResources(t *testing.T) {
res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...)
require.NoError(t, err) require.NoError(t, err)
groups, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL) targets, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, groups, 3) require.Len(t, targets, 3)
expectedGroup1 := &targetgroup.Group{ expectedTargets := []model.LabelSet{
Targets: []model.LabelSet{
{ {
"__address__": "10.1.4.32:9090", "__address__": "10.1.4.32:9090",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
"__metrics_path__": "/custom-metrics", "__metrics_path__": "/custom-metrics",
"__scheme__": "http", "__scheme__": "http",
"instance": "prometheus-01", "instance": "prometheus-01",
},
{
"__address__": "10.1.4.33:9090",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "metrics", "__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "prometheus", "__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra", "__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1", "__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
}, },
} {
require.Equal(t, expectedGroup1, groups[0]) "__address__": "10.1.4.33:9090",
"__metrics_path__": "",
expectedGroup2 := &targetgroup.Group{ "__scheme__": "http",
Labels: model.LabelSet{ "instance": "prometheus-02",
"__meta_kuma_mesh": "metrics", "__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "grafana", "__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra", "__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1", "__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
}, },
}
require.Equal(t, expectedGroup2, groups[1])
expectedGroup3 := &targetgroup.Group{
Targets: []model.LabelSet{
{ {
"__address__": "10.1.1.1", "__address__": "10.1.1.1",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
"__metrics_path__": "", "__metrics_path__": "",
"__scheme__": "http", "__scheme__": "http",
"instance": "elasticsearch-01", "instance": "elasticsearch-01",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "data", "__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch", "__meta_kuma_service": "elasticsearch",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
}, },
} }
require.Equal(t, expectedGroup3, groups[2]) require.Equal(t, expectedTargets, targets)
} }
func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) { func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) {
@ -262,66 +244,48 @@ tls_config:
kd.poll(context.Background(), ch) kd.poll(context.Background(), ch)
groups := <-ch groups := <-ch
require.Len(t, groups, 3) require.Len(t, groups, 1)
expectedGroup1 := &targetgroup.Group{ targets := groups[0].Targets
Source: "kuma", require.Len(t, targets, 3)
Targets: []model.LabelSet{
expectedTargets := []model.LabelSet{
{ {
"__address__": "10.1.4.32:9090", "__address__": "10.1.4.32:9090",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
"__metrics_path__": "/custom-metrics", "__metrics_path__": "/custom-metrics",
"__scheme__": "http", "__scheme__": "http",
"instance": "prometheus-01", "instance": "prometheus-01",
},
{
"__address__": "10.1.4.33:9090",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "metrics", "__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "prometheus", "__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra", "__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1", "__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
}, },
} {
require.Equal(t, expectedGroup1, groups[0]) "__address__": "10.1.4.33:9090",
"__metrics_path__": "",
expectedGroup2 := &targetgroup.Group{ "__scheme__": "http",
Source: "kuma", "instance": "prometheus-02",
Labels: model.LabelSet{
"__meta_kuma_mesh": "metrics", "__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "grafana", "__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra", "__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1", "__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
}, },
}
require.Equal(t, expectedGroup2, groups[1])
expectedGroup3 := &targetgroup.Group{
Source: "kuma",
Targets: []model.LabelSet{
{ {
"__address__": "10.1.1.1", "__address__": "10.1.1.1",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
"__metrics_path__": "", "__metrics_path__": "",
"__scheme__": "http", "__scheme__": "http",
"instance": "elasticsearch-01", "instance": "elasticsearch-01",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "data", "__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch", "__meta_kuma_service": "elasticsearch",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
}, },
} }
require.Equal(t, expectedGroup3, groups[2]) require.Equal(t, expectedTargets, targets)
// Should skip the next update. // Should skip the next update.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View File

@ -15,7 +15,6 @@ package xds
import ( import (
"context" "context"
"github.com/prometheus/common/model"
"time" "time"
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@ -23,6 +22,7 @@ import (
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config" "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
@ -95,7 +95,9 @@ var (
} }
) )
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) // resourceParser is a function that takes raw discovered objects and translates them into
// targetgroup.Group Targets. On error, no updates are sent to the scrape manager and the failure count is incremented.
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error)
// fetchDiscovery implements long-polling via xDS Fetch REST-JSON. // fetchDiscovery implements long-polling via xDS Fetch REST-JSON.
type fetchDiscovery struct { type fetchDiscovery struct {
@ -154,23 +156,18 @@ func (d *fetchDiscovery) poll(ctx context.Context, ch chan<- []*targetgroup.Grou
return return
} }
parsedGroups, err := d.parseResources(response.Resources, response.TypeUrl) parsedTargets, err := d.parseResources(response.Resources, response.TypeUrl)
if err != nil { if err != nil {
level.Error(d.logger).Log("msg", "error parsing resources", "err", err) level.Error(d.logger).Log("msg", "error parsing resources", "err", err)
d.fetchFailuresCount.Inc() d.fetchFailuresCount.Inc()
return return
} }
for _, group := range parsedGroups { level.Debug(d.logger).Log("msg", "Updated to version", "version", response.VersionInfo, "targets", len(parsedTargets))
group.Source = d.source
}
level.Debug(d.logger).Log("msg", "updated to version", "version", response.VersionInfo, "groups", len(parsedGroups))
// Check the context before sending an update on the channel.
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case ch <- parsedGroups: case ch <- []*targetgroup.Group{{Source: d.source, Targets: parsedTargets}}:
} }
} }

View File

@ -93,9 +93,9 @@ func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest.
})) }))
} }
func constantResourceParser(groups []*targetgroup.Group, err error) resourceParser { func constantResourceParser(targets []model.LabelSet, err error) resourceParser {
return func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) { return func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) {
return groups, err return targets, err
} }
} }
@ -174,13 +174,16 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) {
fetchDuration: testFetchDuration, fetchDuration: testFetchDuration,
fetchFailuresCount: testFetchFailuresCount, fetchFailuresCount: testFetchFailuresCount,
fetchSkipUpdateCount: testFetchSkipUpdateCount, fetchSkipUpdateCount: testFetchSkipUpdateCount,
parseResources: constantResourceParser([]*targetgroup.Group{ parseResources: constantResourceParser([]model.LabelSet{
{},
{ {
Source: "a-custom-source",
Labels: model.LabelSet{
"__meta_custom_xds_label": "a-value", "__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
}, },
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.5.32:9090",
"instance": "prometheus-02",
}, },
}, nil), }, nil),
} }
@ -189,13 +192,83 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) {
groups := <-ch groups := <-ch
require.NotNil(t, groups) require.NotNil(t, groups)
require.Len(t, groups, 2) require.Len(t, groups, 1)
for _, group := range groups { group := groups[0]
require.Equal(t, source, group.Source) require.Equal(t, source, group.Source)
require.Len(t, group.Targets, 2)
target2 := group.Targets[1]
require.Contains(t, target2, model.LabelName("__meta_custom_xds_label"))
require.Equal(t, model.LabelValue("a-value"), target2["__meta_custom_xds_label"])
}
func TestPollingDisappearingTargets(t *testing.T) {
server := "http://198.161.2.0"
source := "test"
rc := &testResourceClient{
server: server,
protocolVersion: ProtocolV3,
fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) {
return &v3.DiscoveryResponse{}, nil
},
} }
group2 := groups[1] // On the first poll, send back two targets. On the next, send just one.
require.Contains(t, group2.Labels, model.LabelName("__meta_custom_xds_label")) counter := 0
require.Equal(t, model.LabelValue("a-value"), group2.Labels["__meta_custom_xds_label"]) parser := func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) {
counter++
if counter == 1 {
return []model.LabelSet{
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.5.32:9090",
"instance": "prometheus-02",
},
}, nil
}
return []model.LabelSet{
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
}, nil
}
pd := &fetchDiscovery{
source: source,
client: rc,
logger: nopLogger,
fetchDuration: testFetchDuration,
fetchFailuresCount: testFetchFailuresCount,
fetchSkipUpdateCount: testFetchSkipUpdateCount,
parseResources: parser,
}
ch := make(chan []*targetgroup.Group, 1)
pd.poll(context.Background(), ch)
groups := <-ch
require.NotNil(t, groups)
require.Len(t, groups, 1)
require.Equal(t, source, groups[0].Source)
require.Len(t, groups[0].Targets, 2)
pd.poll(context.Background(), ch)
groups = <-ch
require.NotNil(t, groups)
require.Len(t, groups, 1)
require.Equal(t, source, groups[0].Source)
require.Len(t, groups[0].Targets, 1)
} }