Merge pull request #66666 from bertinatto/metrics_ad_controller

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add more metrics for A/D Controller:

**What this PR does / why we need it**:

This PR adds a few metrics described in the [Metrics Spec](https://docs.google.com/document/d/1Fh0T60T_y888LsRwC51CQHO75b2IZ3A34ZQS71s_F0g/edit#heading=h.ys6pjpbasqdu):

* Number of Volumes in ActualStateOfWorld and DesiredStateOfWorld
* Number of times A/D Controller performs force detach

**Release note**:

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-08-15 09:42:06 -07:00 committed by GitHub
commit 4414ae3d75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 274 additions and 38 deletions

View File

@ -323,7 +323,12 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
go wait.Until(adc.pvcWorker, time.Second, stopCh)
metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr)
metrics.Register(adc.pvcLister,
adc.pvLister,
adc.podLister,
adc.actualStateOfWorld,
adc.desiredStateOfWorld,
&adc.volumePluginMgr)
<-stopCh
}

View File

@ -6,10 +6,10 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/volume:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
@ -22,10 +22,14 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
],

View File

@ -22,69 +22,94 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/volume"
)
const pluginNameNotAvailable = "N/A"
var (
inUseVolumeMetricDesc = prometheus.NewDesc(
prometheus.BuildFQName("", "storage_count", "attachable_volumes_in_use"),
"Measure number of volumes in use",
[]string{"node", "volume_plugin"}, nil)
totalVolumesMetricDesc = prometheus.NewDesc(
prometheus.BuildFQName("", "attachdetach_controller", "total_volumes"),
"Number of volumes in A/D Controller",
[]string{"plugin_name", "state"}, nil)
forcedDetachMetricCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "attachdetach_controller_forced_detaches",
Help: "Number of times the A/D Controller performed a forced detach"})
)
var registerMetrics sync.Once
type volumeInUseCollector struct {
pvcLister corelisters.PersistentVolumeClaimLister
podLister corelisters.PodLister
pvLister corelisters.PersistentVolumeLister
volumePluginMgr *volume.VolumePluginMgr
}
// nodeVolumeCount contains map of {"nodeName": {"pluginName": volume_count }}
// For example :
// node 172.168.1.100.ec2.internal has 10 EBS and 3 glusterfs PVC in use
// {"172.168.1.100.ec2.internal": {"aws-ebs": 10, "glusterfs": 3}}
type nodeVolumeCount map[types.NodeName]map[string]int
// Register registers pvc's in-use metrics
// Register registers metrics in A/D Controller.
func Register(pvcLister corelisters.PersistentVolumeClaimLister,
pvLister corelisters.PersistentVolumeLister,
podLister corelisters.PodLister,
asw cache.ActualStateOfWorld,
dsw cache.DesiredStateOfWorld,
pluginMgr *volume.VolumePluginMgr) {
registerMetrics.Do(func() {
prometheus.MustRegister(newVolumeInUseCollector(pvcLister, podLister, pvLister, pluginMgr))
prometheus.MustRegister(newAttachDetachStateCollector(pvcLister,
podLister,
pvLister,
asw,
dsw,
pluginMgr))
prometheus.MustRegister(forcedDetachMetricCounter)
})
}
func (volumeInUse nodeVolumeCount) add(nodeName types.NodeName, pluginName string) {
nodeCount, ok := volumeInUse[nodeName]
type attachDetachStateCollector struct {
pvcLister corelisters.PersistentVolumeClaimLister
podLister corelisters.PodLister
pvLister corelisters.PersistentVolumeLister
asw cache.ActualStateOfWorld
dsw cache.DesiredStateOfWorld
volumePluginMgr *volume.VolumePluginMgr
}
// volumeCount is a map of maps used as a counter, e.g.:
// node 172.168.1.100.ec2.internal has 10 EBS and 3 glusterfs PVC in use:
// {"172.168.1.100.ec2.internal": {"aws-ebs": 10, "glusterfs": 3}}
// state actual_state_of_world contains a total of 10 EBS volumes:
// {"actual_state_of_world": {"aws-ebs": 10}}
type volumeCount map[string]map[string]int64
func (v volumeCount) add(typeKey, counterKey string) {
count, ok := v[typeKey]
if !ok {
nodeCount = map[string]int{}
count = map[string]int64{}
}
nodeCount[pluginName]++
volumeInUse[nodeName] = nodeCount
count[counterKey]++
v[typeKey] = count
}
func newVolumeInUseCollector(
func newAttachDetachStateCollector(
pvcLister corelisters.PersistentVolumeClaimLister,
podLister corelisters.PodLister,
pvLister corelisters.PersistentVolumeLister,
pluginMgr *volume.VolumePluginMgr) *volumeInUseCollector {
return &volumeInUseCollector{pvcLister, podLister, pvLister, pluginMgr}
asw cache.ActualStateOfWorld,
dsw cache.DesiredStateOfWorld,
pluginMgr *volume.VolumePluginMgr) *attachDetachStateCollector {
return &attachDetachStateCollector{pvcLister, podLister, pvLister, asw, dsw, pluginMgr}
}
// Check if our collector implements necessary collector interface
var _ prometheus.Collector = &volumeInUseCollector{}
var _ prometheus.Collector = &attachDetachStateCollector{}
func (collector *volumeInUseCollector) Describe(ch chan<- *prometheus.Desc) {
func (collector *attachDetachStateCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- inUseVolumeMetricDesc
ch <- totalVolumesMetricDesc
}
func (collector *volumeInUseCollector) Collect(ch chan<- prometheus.Metric) {
func (collector *attachDetachStateCollector) Collect(ch chan<- prometheus.Metric) {
nodeVolumeMap := collector.getVolumeInUseCount()
for nodeName, pluginCount := range nodeVolumeMap {
for pluginName, count := range pluginCount {
@ -99,23 +124,37 @@ func (collector *volumeInUseCollector) Collect(ch chan<- prometheus.Metric) {
ch <- metric
}
}
stateVolumeMap := collector.getTotalVolumesCount()
for stateName, pluginCount := range stateVolumeMap {
for pluginName, count := range pluginCount {
metric, err := prometheus.NewConstMetric(totalVolumesMetricDesc,
prometheus.GaugeValue,
float64(count),
pluginName,
string(stateName))
if err != nil {
glog.Warningf("Failed to create metric : %v", err)
}
ch <- metric
}
}
}
func (collector *volumeInUseCollector) getVolumeInUseCount() nodeVolumeCount {
func (collector *attachDetachStateCollector) getVolumeInUseCount() volumeCount {
pods, err := collector.podLister.List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pod list")
return nil
}
nodeVolumeMap := make(nodeVolumeCount)
nodeVolumeMap := make(volumeCount)
for _, pod := range pods {
if len(pod.Spec.Volumes) <= 0 {
continue
}
nodeName := types.NodeName(pod.Spec.NodeName)
if nodeName == "" {
if pod.Spec.NodeName == "" {
continue
}
for _, podVolume := range pod.Spec.Volumes {
@ -127,8 +166,36 @@ func (collector *volumeInUseCollector) getVolumeInUseCount() nodeVolumeCount {
if err != nil {
continue
}
nodeVolumeMap.add(nodeName, volumePlugin.GetPluginName())
nodeVolumeMap.add(pod.Spec.NodeName, volumePlugin.GetPluginName())
}
}
return nodeVolumeMap
}
func (collector *attachDetachStateCollector) getTotalVolumesCount() volumeCount {
stateVolumeMap := make(volumeCount)
for _, v := range collector.dsw.GetVolumesToAttach() {
if plugin, err := collector.volumePluginMgr.FindPluginBySpec(v.VolumeSpec); err == nil {
pluginName := pluginNameNotAvailable
if plugin != nil {
pluginName = plugin.GetPluginName()
}
stateVolumeMap.add("desired_state_of_world", pluginName)
}
}
for _, v := range collector.asw.GetAttachedVolumes() {
if plugin, err := collector.volumePluginMgr.FindPluginBySpec(v.VolumeSpec); err == nil {
pluginName := pluginNameNotAvailable
if plugin != nil {
pluginName = plugin.GetPluginName()
}
stateVolumeMap.add("actual_state_of_world", pluginName)
}
}
return stateVolumeMap
}
// RecordForcedDetachMetric register a forced detach metric.
func RecordForcedDetachMetric() {
forcedDetachMetricCounter.Inc()
}

View File

@ -22,13 +22,17 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/types"
)
func TestMetricCollection(t *testing.T) {
func TestVolumesInUseMetricCollection(t *testing.T) {
fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
fakeClient := &fake.Clientset{}
@ -103,7 +107,13 @@ func TestMetricCollection(t *testing.T) {
pvcLister := pvcInformer.Lister()
pvLister := pvInformer.Lister()
metricCollector := newVolumeInUseCollector(pvcLister, fakePodInformer.Lister(), pvLister, fakeVolumePluginMgr)
metricCollector := newAttachDetachStateCollector(
pvcLister,
fakePodInformer.Lister(),
pvLister,
nil,
nil,
fakeVolumePluginMgr)
nodeUseMap := metricCollector.getVolumeInUseCount()
if len(nodeUseMap) < 1 {
t.Errorf("Expected one volume in use got %d", len(nodeUseMap))
@ -117,5 +127,54 @@ func TestMetricCollection(t *testing.T) {
if pluginUseCount < 1 {
t.Errorf("Expected at least in-use volume metric got %d", pluginUseCount)
}
}
func TestTotalVolumesMetricCollection(t *testing.T) {
fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
asw := cache.NewActualStateOfWorld(fakeVolumePluginMgr)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := k8stypes.NodeName("node-name")
dsw.AddNode(nodeName, false)
_, err := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
asw.AddVolumeNode(volumeName, volumeSpec, nodeName, "")
metricCollector := newAttachDetachStateCollector(
nil,
nil,
nil,
asw,
dsw,
fakeVolumePluginMgr)
totalVolumesMap := metricCollector.getTotalVolumesCount()
if len(totalVolumesMap) != 2 {
t.Errorf("Expected 2 states, got %d", len(totalVolumesMap))
}
dswCount, ok := totalVolumesMap["desired_state_of_world"]
if !ok {
t.Errorf("Expected desired_state_of_world, got nothing")
}
fakePluginCount := dswCount["fake-plugin"]
if fakePluginCount != 1 {
t.Errorf("Expected 1 fake-plugin volume in DesiredStateOfWorld, got %d", fakePluginCount)
}
aswCount, ok := totalVolumesMap["actual_state_of_world"]
if !ok {
t.Errorf("Expected actual_state_of_world, got nothing")
}
fakePluginCount = aswCount["fake-plugin"]
if fakePluginCount != 1 {
t.Errorf("Expected 1 fake-plugin volume in ActualStateOfWorld, got %d", fakePluginCount)
}
}

View File

@ -12,6 +12,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler",
deps = [
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/metrics:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
@ -232,6 +233,7 @@ func (rc *reconciler) reconcile() {
if !timeout {
glog.Infof(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", ""))
} else {
metrics.RecordForcedDetachMetric()
glog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration)))
}
}

View File

@ -228,6 +228,66 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod))
})
It("should create metrics for total number of volumes in A/D Controller", func() {
var err error
pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(pvc)
Expect(err).NotTo(HaveOccurred())
Expect(pvc).ToNot(Equal(nil))
claims := []*v1.PersistentVolumeClaim{pvc}
pod := framework.MakePod(ns, nil, claims, false, "")
// Get metrics
controllerMetrics, err := metricsGrabber.GrabFromControllerManager()
if err != nil {
framework.Skipf("Could not get controller-manager metrics - skipping")
}
// Create pod
pod, err = c.CoreV1().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForPodRunningInNamespace(c, pod)
framework.ExpectNoError(framework.WaitForPodRunningInNamespace(c, pod), "Error starting pod ", pod.Name)
pod, err = c.CoreV1().Pods(ns).Get(pod.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
// Get updated metrics
updatedControllerMetrics, err := metricsGrabber.GrabFromControllerManager()
if err != nil {
framework.Skipf("Could not get controller-manager metrics - skipping")
}
// Forced detach metric should be present
forceDetachKey := "attachdetach_controller_forced_detaches"
_, ok := updatedControllerMetrics[forceDetachKey]
Expect(ok).To(BeTrue(), "Key %q not found in A/D Controller metrics", forceDetachKey)
// Wait and validate
totalVolumesKey := "attachdetach_controller_total_volumes"
states := []string{"actual_state_of_world", "desired_state_of_world"}
dimensions := []string{"state", "plugin_name"}
waitForADControllerStatesMetrics(metricsGrabber, totalVolumesKey, dimensions, states)
// Total number of volumes in both ActualStateofWorld and DesiredStateOfWorld
// states should be higher or equal than it used to be
oldStates := getStatesMetrics(totalVolumesKey, metrics.Metrics(controllerMetrics))
updatedStates := getStatesMetrics(totalVolumesKey, metrics.Metrics(updatedControllerMetrics))
for _, stateName := range states {
if _, ok := oldStates[stateName]; !ok {
continue
}
for pluginName, numVolumes := range updatedStates[stateName] {
oldNumVolumes := oldStates[stateName][pluginName]
Expect(numVolumes).To(BeNumerically(">=", oldNumVolumes),
"Wrong number of volumes in state %q, plugin %q: wanted >=%d, got %d",
stateName, pluginName, oldNumVolumes, numVolumes)
}
}
framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod))
})
// Test for pv controller metrics, concretely: bound/unbound pv/pvc count.
Describe("PVController", func() {
const (
@ -535,3 +595,41 @@ func hasValidMetrics(metrics metrics.Metrics, metricKey string, dimensions ...st
}
return errCount == 0
}
func getStatesMetrics(metricKey string, givenMetrics metrics.Metrics) map[string]map[string]int64 {
states := make(map[string]map[string]int64)
for _, sample := range givenMetrics[metricKey] {
framework.Logf("Found sample %q", sample.String())
state := string(sample.Metric["state"])
pluginName := string(sample.Metric["plugin_name"])
states[state] = map[string]int64{pluginName: int64(sample.Value)}
}
return states
}
func waitForADControllerStatesMetrics(metricsGrabber *metrics.MetricsGrabber, metricName string, dimensions []string, stateNames []string) {
backoff := wait.Backoff{
Duration: 10 * time.Second,
Factor: 1.2,
Steps: 21,
}
verifyMetricFunc := func() (bool, error) {
updatedMetrics, err := metricsGrabber.GrabFromControllerManager()
if err != nil {
framework.Skipf("Could not get controller-manager metrics - skipping")
return false, err
}
if !hasValidMetrics(metrics.Metrics(updatedMetrics), metricName, dimensions...) {
return false, fmt.Errorf("could not get valid metrics for %q", metricName)
}
states := getStatesMetrics(metricName, metrics.Metrics(updatedMetrics))
for _, name := range stateNames {
if _, ok := states[name]; !ok {
return false, fmt.Errorf("could not get state %q from A/D Controller metrics", name)
}
}
return true, nil
}
waitErr := wait.ExponentialBackoff(backoff, verifyMetricFunc)
Expect(waitErr).NotTo(HaveOccurred(), "Timeout error fetching A/D controller metrics : %v", waitErr)
}