mirror of https://github.com/k3s-io/k3s
Merge pull request #67068 from sttts/sttts-non-fatal-ratelimitermetircs-reuse
Automatic merge from submit-queue (batch tested with PRs 66793, 67405, 67068, 67501, 67484). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. pkg/util/metrics: make re-registration of RateLimiterMetric non-fatal In integration tests we might register these metrics multiple times in parallel. Instead of unregistering and making somehow sure those tests can run in parallel, we just make the registration idem-potent. Prerequisite for controller manager integration tests https://github.com/kubernetes/kubernetes/pull/64149.pull/8/head
commit
40af953850
|
@ -12,7 +12,6 @@ go_library(
|
||||||
importpath = "k8s.io/kubernetes/pkg/util/metrics",
|
importpath = "k8s.io/kubernetes/pkg/util/metrics",
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
|
||||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -19,21 +19,15 @@ package metrics
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/client-go/util/flowcontrol"
|
"k8s.io/client-go/util/flowcontrol"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
updatePeriod = 5 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
metricsLock sync.Mutex
|
metricsLock sync.Mutex
|
||||||
rateLimiterMetrics = make(map[string]rateLimiterMetric)
|
rateLimiterMetrics = make(map[string]*rateLimiterMetric)
|
||||||
)
|
)
|
||||||
|
|
||||||
type rateLimiterMetric struct {
|
type rateLimiterMetric struct {
|
||||||
|
@ -46,7 +40,8 @@ func registerRateLimiterMetric(ownerName string) error {
|
||||||
defer metricsLock.Unlock()
|
defer metricsLock.Unlock()
|
||||||
|
|
||||||
if _, ok := rateLimiterMetrics[ownerName]; ok {
|
if _, ok := rateLimiterMetrics[ownerName]; ok {
|
||||||
return fmt.Errorf("Rate Limiter Metric for %v already registered", ownerName)
|
// only register once in Prometheus. We happen to see an ownerName reused in parallel integration tests.
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
metric := prometheus.NewGauge(prometheus.GaugeOpts{
|
metric := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Name: "rate_limiter_use",
|
Name: "rate_limiter_use",
|
||||||
|
@ -57,7 +52,7 @@ func registerRateLimiterMetric(ownerName string) error {
|
||||||
return fmt.Errorf("error registering rate limiter usage metric: %v", err)
|
return fmt.Errorf("error registering rate limiter usage metric: %v", err)
|
||||||
}
|
}
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
rateLimiterMetrics[ownerName] = rateLimiterMetric{
|
rateLimiterMetrics[ownerName] = &rateLimiterMetric{
|
||||||
metric: metric,
|
metric: metric,
|
||||||
stopCh: stopCh,
|
stopCh: stopCh,
|
||||||
}
|
}
|
||||||
|
@ -79,22 +74,3 @@ func RegisterMetricAndTrackRateLimiterUsage(ownerName string, rateLimiter flowco
|
||||||
// }, updatePeriod, rateLimiterMetrics[ownerName].stopCh)
|
// }, updatePeriod, rateLimiterMetrics[ownerName].stopCh)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnregisterMetricAndUntrackRateLimiterUsage unregisters a metric ownerName_rate_limiter_use from prometheus and
|
|
||||||
// stops the goroutine that updates this metric
|
|
||||||
func UnregisterMetricAndUntrackRateLimiterUsage(ownerName string) bool {
|
|
||||||
metricsLock.Lock()
|
|
||||||
defer metricsLock.Unlock()
|
|
||||||
|
|
||||||
rlm, ok := rateLimiterMetrics[ownerName]
|
|
||||||
if !ok {
|
|
||||||
glog.Warningf("Rate Limiter Metric for %v not registered", ownerName)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
close(rlm.stopCh)
|
|
||||||
prometheus.Unregister(rlm.metric)
|
|
||||||
delete(rateLimiterMetrics, ownerName)
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -57,27 +57,3 @@ func TestRegisterMetricAndTrackRateLimiterUsage(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUnregisterMetricAndUntrackRateLimiterUsage(t *testing.T) {
|
|
||||||
RegisterMetricAndTrackRateLimiterUsage("owner_name", flowcontrol.NewTokenBucketRateLimiter(1, 1))
|
|
||||||
testCases := []struct {
|
|
||||||
ownerName string
|
|
||||||
ok bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
ownerName: "owner_name",
|
|
||||||
ok: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ownerName: "owner_name",
|
|
||||||
ok: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, tc := range testCases {
|
|
||||||
ok := UnregisterMetricAndUntrackRateLimiterUsage(tc.ownerName)
|
|
||||||
if tc.ok != ok {
|
|
||||||
t.Errorf("Case[%d] Expected %v, got %v", i, tc.ok, ok)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ go_test(
|
||||||
"//pkg/scheduler/algorithmprovider:go_default_library",
|
"//pkg/scheduler/algorithmprovider:go_default_library",
|
||||||
"//pkg/scheduler/factory:go_default_library",
|
"//pkg/scheduler/factory:go_default_library",
|
||||||
"//pkg/util/labels:go_default_library",
|
"//pkg/util/labels:go_default_library",
|
||||||
"//pkg/util/metrics:go_default_library",
|
|
||||||
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
|
|
@ -52,7 +52,6 @@ import (
|
||||||
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||||
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,7 +68,6 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS
|
||||||
}
|
}
|
||||||
resyncPeriod := 12 * time.Hour
|
resyncPeriod := 12 * time.Hour
|
||||||
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-informers")), resyncPeriod)
|
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-informers")), resyncPeriod)
|
||||||
metrics.UnregisterMetricAndUntrackRateLimiterUsage("daemon_controller")
|
|
||||||
dc, err := daemon.NewDaemonSetsController(
|
dc, err := daemon.NewDaemonSetsController(
|
||||||
informers.Apps().V1().DaemonSets(),
|
informers.Apps().V1().DaemonSets(),
|
||||||
informers.Apps().V1().ControllerRevisions(),
|
informers.Apps().V1().ControllerRevisions(),
|
||||||
|
|
|
@ -38,7 +38,6 @@ go_library(
|
||||||
"//pkg/controller/deployment:go_default_library",
|
"//pkg/controller/deployment:go_default_library",
|
||||||
"//pkg/controller/deployment/util:go_default_library",
|
"//pkg/controller/deployment/util:go_default_library",
|
||||||
"//pkg/controller/replicaset:go_default_library",
|
"//pkg/controller/replicaset:go_default_library",
|
||||||
"//pkg/util/metrics:go_default_library",
|
|
||||||
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/extensions/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||||
|
|
|
@ -36,7 +36,6 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/controller/deployment"
|
"k8s.io/kubernetes/pkg/controller/deployment"
|
||||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||||
"k8s.io/kubernetes/pkg/controller/replicaset"
|
"k8s.io/kubernetes/pkg/controller/replicaset"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
testutil "k8s.io/kubernetes/test/utils"
|
testutil "k8s.io/kubernetes/test/utils"
|
||||||
)
|
)
|
||||||
|
@ -156,7 +155,6 @@ func dcSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *replicaset.R
|
||||||
resyncPeriod := 12 * time.Hour
|
resyncPeriod := 12 * time.Hour
|
||||||
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "deployment-informers")), resyncPeriod)
|
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "deployment-informers")), resyncPeriod)
|
||||||
|
|
||||||
metrics.UnregisterMetricAndUntrackRateLimiterUsage("deployment_controller")
|
|
||||||
dc, err := deployment.NewDeploymentController(
|
dc, err := deployment.NewDeploymentController(
|
||||||
informers.Apps().V1().Deployments(),
|
informers.Apps().V1().Deployments(),
|
||||||
informers.Apps().V1().ReplicaSets(),
|
informers.Apps().V1().ReplicaSets(),
|
||||||
|
|
|
@ -19,7 +19,6 @@ go_test(
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/controller/serviceaccount:go_default_library",
|
"//pkg/controller/serviceaccount:go_default_library",
|
||||||
"//pkg/serviceaccount:go_default_library",
|
"//pkg/serviceaccount:go_default_library",
|
||||||
"//pkg/util/metrics:go_default_library",
|
|
||||||
"//plugin/pkg/admission/serviceaccount:go_default_library",
|
"//plugin/pkg/admission/serviceaccount:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||||
|
|
|
@ -50,7 +50,6 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
|
||||||
serviceaccountadmission "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
|
serviceaccountadmission "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
)
|
)
|
||||||
|
@ -437,7 +436,6 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie
|
||||||
apiServer.Close()
|
apiServer.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.UnregisterMetricAndUntrackRateLimiterUsage("serviceaccount_tokens_controller")
|
|
||||||
tokenController, err := serviceaccountcontroller.NewTokensController(
|
tokenController, err := serviceaccountcontroller.NewTokensController(
|
||||||
informers.Core().V1().ServiceAccounts(),
|
informers.Core().V1().ServiceAccounts(),
|
||||||
informers.Core().V1().Secrets(),
|
informers.Core().V1().Secrets(),
|
||||||
|
@ -449,7 +447,6 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie
|
||||||
}
|
}
|
||||||
go tokenController.Run(1, stopCh)
|
go tokenController.Run(1, stopCh)
|
||||||
|
|
||||||
metrics.UnregisterMetricAndUntrackRateLimiterUsage("serviceaccount_controller")
|
|
||||||
serviceAccountController, err := serviceaccountcontroller.NewServiceAccountsController(
|
serviceAccountController, err := serviceaccountcontroller.NewServiceAccountsController(
|
||||||
informers.Core().V1().ServiceAccounts(),
|
informers.Core().V1().ServiceAccounts(),
|
||||||
informers.Core().V1().Namespaces(),
|
informers.Core().V1().Namespaces(),
|
||||||
|
|
Loading…
Reference in New Issue