discovery(k8s): add a metric to track failed requests, failures will still be logged.

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
pull/13554/head
machine424 10 months ago
parent e79b9ed2ab
commit 0e81ab44a2
No known key found for this signature in database
GPG Key ID: A4B001A4FDEE017D

@ -485,8 +485,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
eps := NewEndpointSlice( eps := NewEndpointSlice(
log.With(d.logger, "role", "endpointslice"), log.With(d.logger, "role", "endpointslice"),
informer, informer,
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled), d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf, nodeInf,
d.metrics.eventCount, d.metrics.eventCount,
) )
@ -545,8 +545,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
eps := NewEndpoints( eps := NewEndpoints(
log.With(d.logger, "role", "endpoint"), log.With(d.logger, "role", "endpoint"),
d.newEndpointsByNodeInformer(elw), d.newEndpointsByNodeInformer(elw),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled), d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf, nodeInf,
d.metrics.eventCount, d.metrics.eventCount,
) )
@ -602,7 +602,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
} }
svc := NewService( svc := NewService(
log.With(d.logger, "role", "service"), log.With(d.logger, "role", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.metrics.eventCount, d.metrics.eventCount,
) )
d.discoverers = append(d.discoverers, svc) d.discoverers = append(d.discoverers, svc)
@ -641,7 +641,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return i.Watch(ctx, options) return i.Watch(ctx, options)
}, },
} }
informer = cache.NewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled) informer = d.mustNewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
} else { } else {
i := d.client.NetworkingV1beta1().Ingresses(namespace) i := d.client.NetworkingV1beta1().Ingresses(namespace)
ilw := &cache.ListWatch{ ilw := &cache.ListWatch{
@ -656,7 +656,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return i.Watch(ctx, options) return i.Watch(ctx, options)
}, },
} }
informer = cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled) informer = d.mustNewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled)
} }
ingress := NewIngress( ingress := NewIngress(
log.With(d.logger, "role", "ingress"), log.With(d.logger, "role", "ingress"),
@ -747,7 +747,7 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
return d.client.CoreV1().Nodes().Watch(ctx, options) return d.client.CoreV1().Nodes().Watch(ctx, options)
}, },
} }
return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled) return d.mustNewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
} }
func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
@ -762,7 +762,7 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
} }
} }
return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers) return d.mustNewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
} }
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
@ -783,7 +783,7 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
return pods, nil return pods, nil
} }
if !d.attachMetadata.Node { if !d.attachMetadata.Node {
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers) return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
} }
indexers[nodeIndex] = func(obj interface{}) ([]string, error) { indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
@ -809,13 +809,13 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
return nodes, nil return nodes, nil
} }
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers) return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
} }
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer { func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc) indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node { if !d.attachMetadata.Node {
return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers) return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
} }
indexers[nodeIndex] = func(obj interface{}) ([]string, error) { indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
@ -854,7 +854,32 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object
return nodes, nil return nodes, nil
} }
return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers) return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}
func (d *Discovery) informerWatchErrorHandler(r *cache.Reflector, err error) {
d.metrics.failuresCount.Inc()
cache.DefaultWatchErrorHandler(r, err)
}
func (d *Discovery) mustNewSharedInformer(lw cache.ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) cache.SharedInformer {
informer := cache.NewSharedInformer(lw, exampleObject, defaultEventHandlerResyncPeriod)
// Invoking SetWatchErrorHandler should fail only if the informer has been started beforehand.
// Such a scenario would suggest an incorrect use of the API, thus the panic.
if err := informer.SetWatchErrorHandler(d.informerWatchErrorHandler); err != nil {
panic(err)
}
return informer
}
func (d *Discovery) mustNewSharedIndexInformer(lw cache.ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
informer := cache.NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, indexers)
// Invoking SetWatchErrorHandler should fail only if the informer has been started beforehand.
// Such a scenario would suggest an incorrect use of the API, thus the panic.
if err := informer.SetWatchErrorHandler(d.informerWatchErrorHandler); err != nil {
panic(err)
}
return informer
} }
func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) { func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) {

@ -21,12 +21,16 @@ import (
"time" "time"
"github.com/go-kit/log" "github.com/go-kit/log"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/version" "k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
fakediscovery "k8s.io/client-go/discovery/fake" fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
kubetesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -314,3 +318,39 @@ func TestCheckNetworkingV1Supported(t *testing.T) {
}) })
} }
} }
func TestFailuresCountMetric(t *testing.T) {
tests := []struct {
role Role
minFailedWatches int
}{
{RoleNode, 1},
{RolePod, 1},
{RoleService, 1},
{RoleEndpoint, 3},
{RoleEndpointSlice, 3},
{RoleIngress, 1},
}
for _, tc := range tests {
tc := tc
t.Run(string(tc.role), func(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(tc.role, NamespaceDiscovery{})
// The counter is initialized and no failures at the beginning.
require.Equal(t, float64(0), prom_testutil.ToFloat64(n.metrics.failuresCount))
// Simulate an error on watch requests.
c.Discovery().(*fakediscovery.FakeDiscovery).PrependWatchReactor("*", func(action kubetesting.Action) (bool, watch.Interface, error) {
return true, nil, apierrors.NewUnauthorized("unauthorized")
})
// Start the discovery.
k8sDiscoveryTest{discovery: n}.Run(t)
// At least the errors of the initial watches should be caught (watches are retried on errors).
require.GreaterOrEqual(t, prom_testutil.ToFloat64(n.metrics.failuresCount), float64(tc.minFailedWatches))
})
}
}

@ -23,6 +23,7 @@ var _ discovery.DiscovererMetrics = (*kubernetesMetrics)(nil)
type kubernetesMetrics struct { type kubernetesMetrics struct {
eventCount *prometheus.CounterVec eventCount *prometheus.CounterVec
failuresCount prometheus.Counter
metricRegisterer discovery.MetricRegisterer metricRegisterer discovery.MetricRegisterer
} }
@ -37,10 +38,18 @@ func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetric
}, },
[]string{"role", "event"}, []string{"role", "event"},
), ),
failuresCount: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: discovery.KubernetesMetricsNamespace,
Name: "failures_total",
Help: "The number of failed WATCH/LIST requests.",
},
),
} }
m.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{ m.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{
m.eventCount, m.eventCount,
m.failuresCount,
}) })
// Initialize metric vectors. // Initialize metric vectors.
@ -61,6 +70,8 @@ func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetric
} }
} }
m.failuresCount.Add(0)
return m return m
} }

Loading…
Cancel
Save