mirror of https://github.com/k3s-io/k3s
record provision and deletion latency metric
instead of using provisioner from storage class directly, uses plugin name firstlyk3s-v1.15.3
parent
a4fc418c84
commit
9688511595
|
@ -419,6 +419,26 @@ func claimWithAnnotation(name, value string, claims []*v1.PersistentVolumeClaim)
|
|||
return claims
|
||||
}
|
||||
|
||||
// volumeWithAnnotation saves given annotation into given volume.
|
||||
// Meant to be used to compose volume specified inline in a test.
|
||||
func volumeWithAnnotation(name, value string, volume *v1.PersistentVolume) *v1.PersistentVolume {
|
||||
if volume.Annotations == nil {
|
||||
volume.Annotations = map[string]string{name: value}
|
||||
} else {
|
||||
volume.Annotations[name] = value
|
||||
}
|
||||
return volume
|
||||
}
|
||||
|
||||
// volumesWithAnnotation saves given annotation into given volumes.
|
||||
// Meant to be used to compose volumes specified inline in a test.
|
||||
func volumesWithAnnotation(name, value string, volumes []*v1.PersistentVolume) []*v1.PersistentVolume {
|
||||
for _, volume := range volumes {
|
||||
volumeWithAnnotation(name, value, volume)
|
||||
}
|
||||
return volumes
|
||||
}
|
||||
|
||||
// claimWithAccessMode saves given access into given claims.
|
||||
// Meant to be used to compose claims specified inline in a test.
|
||||
func claimWithAccessMode(modes []v1.PersistentVolumeAccessMode, claims []*v1.PersistentVolumeClaim) []*v1.PersistentVolumeClaim {
|
||||
|
|
|
@ -6,6 +6,7 @@ go_library(
|
|||
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
|
|
|
@ -18,11 +18,12 @@ package metrics
|
|||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
metricutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -56,7 +57,6 @@ type PVCLister interface {
|
|||
func Register(pvLister PVLister, pvcLister PVCLister) {
|
||||
registerMetrics.Do(func() {
|
||||
prometheus.MustRegister(newPVAndPVCCountCollector(pvLister, pvcLister))
|
||||
prometheus.MustRegister(volumeOperationMetric)
|
||||
prometheus.MustRegister(volumeOperationErrorsMetric)
|
||||
})
|
||||
}
|
||||
|
@ -92,12 +92,6 @@ var (
|
|||
"Gauge measuring number of persistent volume claim currently unbound",
|
||||
[]string{namespaceLabel}, nil)
|
||||
|
||||
volumeOperationMetric = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "volume_operation_total_seconds",
|
||||
Help: "Total volume operation time",
|
||||
},
|
||||
[]string{"plugin_name", "operation_name"})
|
||||
volumeOperationErrorsMetric = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "volume_operation_total_errors",
|
||||
|
@ -198,14 +192,79 @@ func (collector *pvAndPVCCountCollector) pvcCollect(ch chan<- prometheus.Metric)
|
|||
}
|
||||
}
|
||||
|
||||
// RecordVolumeOperationMetric records the latency and errors of volume operations.
|
||||
func RecordVolumeOperationMetric(pluginName, opName string, timeTaken float64, err error) {
|
||||
// RecordVolumeOperationErrorMetric records error count into metric
|
||||
// volume_operation_total_errors for provisioning/deletion operations
|
||||
func RecordVolumeOperationErrorMetric(pluginName, opName string) {
|
||||
if pluginName == "" {
|
||||
pluginName = "N/A"
|
||||
}
|
||||
if err != nil {
|
||||
volumeOperationErrorsMetric.WithLabelValues(pluginName, opName).Inc()
|
||||
volumeOperationErrorsMetric.WithLabelValues(pluginName, opName).Inc()
|
||||
}
|
||||
|
||||
// operationTimestamp stores the start time of an operation by a plugin
|
||||
type operationTimestamp struct {
|
||||
pluginName string
|
||||
operation string
|
||||
startTs time.Time
|
||||
}
|
||||
|
||||
func newOperationTimestamp(pluginName, operationName string) *operationTimestamp {
|
||||
return &operationTimestamp{
|
||||
pluginName: pluginName,
|
||||
operation: operationName,
|
||||
startTs: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// OperationStartTimeCache concurrent safe cache for operation start timestamps
|
||||
type OperationStartTimeCache struct {
|
||||
cache sync.Map // [string]operationTimestamp
|
||||
}
|
||||
|
||||
// NewOperationStartTimeCache creates a operation timestamp cache
|
||||
func NewOperationStartTimeCache() OperationStartTimeCache {
|
||||
return OperationStartTimeCache{
|
||||
cache: sync.Map{}, //[string]operationTimestamp {}
|
||||
}
|
||||
}
|
||||
|
||||
// AddIfNotExist returns directly if there exists an entry with the key. Otherwise, it
|
||||
// creates a new operation timestamp using operationName, pluginName, and current timestamp
|
||||
// and stores the operation timestamp with the key
|
||||
func (c *OperationStartTimeCache) AddIfNotExist(key, pluginName, operationName string) {
|
||||
ts := newOperationTimestamp(pluginName, operationName)
|
||||
c.cache.LoadOrStore(key, ts)
|
||||
}
|
||||
|
||||
// Delete deletes a value for a key.
|
||||
func (c *OperationStartTimeCache) Delete(key string) {
|
||||
c.cache.Delete(key)
|
||||
}
|
||||
|
||||
// Has returns a bool value indicates the existence of a key in the cache
|
||||
func (c *OperationStartTimeCache) Has(key string) bool {
|
||||
_, exists := c.cache.Load(key)
|
||||
return exists
|
||||
}
|
||||
|
||||
// RecordMetric records either an error count metric or a latency metric if there
|
||||
// exists a start timestamp entry in the cache. For a successful operation, i.e.,
|
||||
// err == nil, the corresponding timestamp entry will be removed from cache
|
||||
func RecordMetric(key string, c *OperationStartTimeCache, err error) {
|
||||
obj, exists := c.cache.Load(key)
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
volumeOperationMetric.WithLabelValues(pluginName, opName).Observe(timeTaken)
|
||||
ts, ok := obj.(*operationTimestamp)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
RecordVolumeOperationErrorMetric(ts.pluginName, ts.operation)
|
||||
} else {
|
||||
timeTaken := time.Since(ts.startTs).Seconds()
|
||||
metricutil.RecordOperationLatencyMetric(ts.pluginName, ts.operation, timeTaken)
|
||||
// end of this operation, remove the timestamp entry from cache
|
||||
c.Delete(key)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
"errors"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -432,6 +432,17 @@ func TestProvisionSync(t *testing.T) {
|
|||
[]string{"Normal ExternalProvisioning"},
|
||||
noerrors, wrapTestWithCSIMigrationProvisionCalls(testSyncClaim),
|
||||
},
|
||||
{
|
||||
// volume provisioned and available
|
||||
// in this case, NO normal event with external provisioner should be issued
|
||||
"11-22 - external provisioner with volume available",
|
||||
newVolumeArray("volume11-22", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRetain, classExternal),
|
||||
newVolumeArray("volume11-22", "1Gi", "uid11-22", "claim11-22", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController),
|
||||
newClaimArray("claim11-22", "uid11-22", "1Gi", "", v1.ClaimPending, &classExternal),
|
||||
newClaimArray("claim11-22", "uid11-22", "1Gi", "volume11-22", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted),
|
||||
noevents,
|
||||
noerrors, wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim),
|
||||
},
|
||||
}
|
||||
runSyncTests(t, tests, storageClasses, []*v1.Pod{})
|
||||
}
|
||||
|
@ -461,6 +472,68 @@ func TestProvisionMultiSync(t *testing.T) {
|
|||
newClaimArray("claim12-1", "uid12-1", "1Gi", "pvc-uid12-1", v1.ClaimBound, &classGold, pvutil.AnnBoundByController, pvutil.AnnBindCompleted, pvutil.AnnStorageProvisioner),
|
||||
noevents, noerrors, wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim),
|
||||
},
|
||||
{
|
||||
// provision a volume (external provisioner) and binding + normal event with external provisioner
|
||||
"12-2 - external provisioner with volume provisioned success",
|
||||
novolumes,
|
||||
newVolumeArray("pvc-uid12-2", "1Gi", "uid12-2", "claim12-2", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController),
|
||||
newClaimArray("claim12-2", "uid12-2", "1Gi", "", v1.ClaimPending, &classExternal),
|
||||
claimWithAnnotation(pvutil.AnnStorageProvisioner, "vendor.com/my-volume",
|
||||
newClaimArray("claim12-2", "uid12-2", "1Gi", "pvc-uid12-2", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)),
|
||||
[]string{"Normal ExternalProvisioning"},
|
||||
noerrors,
|
||||
wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
|
||||
// Create a volume before syncClaim tries to bind a PV to PVC
|
||||
// This simulates external provisioner creating a volume while the controller
|
||||
// is waiting for a volume to bind to the existed claim
|
||||
// the external provisioner workflow implemented in "provisionClaimOperationCSI"
|
||||
// should issue an ExternalProvisioning event to signal that some external provisioner
|
||||
// is working on provisioning the PV, also add the operation start timestamp into local cache
|
||||
// operationTimestamps. Rely on the existences of the start time stamp to create a PV for binding
|
||||
if ctrl.operationTimestamps.Has("default/claim12-2") {
|
||||
volume := newVolume("pvc-uid12-2", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRetain, classExternal)
|
||||
ctrl.volumes.store.Add(volume) // add the volume to controller
|
||||
reactor.AddVolume(volume)
|
||||
}
|
||||
}),
|
||||
},
|
||||
{
|
||||
// provision a volume (external provisioner) but binding will not happen + normal event with external provisioner
|
||||
"12-3 - external provisioner with volume to be provisioned",
|
||||
novolumes,
|
||||
novolumes,
|
||||
newClaimArray("claim12-3", "uid12-3", "1Gi", "", v1.ClaimPending, &classExternal),
|
||||
claimWithAnnotation(pvutil.AnnStorageProvisioner, "vendor.com/my-volume",
|
||||
newClaimArray("claim12-3", "uid12-3", "1Gi", "", v1.ClaimPending, &classExternal)),
|
||||
[]string{"Normal ExternalProvisioning"},
|
||||
noerrors,
|
||||
wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim),
|
||||
},
|
||||
{
|
||||
// provision a volume (external provisioner) and binding + normal event with external provisioner
|
||||
"12-4 - external provisioner with volume provisioned/bound success",
|
||||
novolumes,
|
||||
newVolumeArray("pvc-uid12-4", "1Gi", "uid12-4", "claim12-4", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController),
|
||||
newClaimArray("claim12-4", "uid12-4", "1Gi", "", v1.ClaimPending, &classExternal),
|
||||
claimWithAnnotation(pvutil.AnnStorageProvisioner, "vendor.com/my-volume",
|
||||
newClaimArray("claim12-4", "uid12-4", "1Gi", "pvc-uid12-4", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)),
|
||||
[]string{"Normal ExternalProvisioning"},
|
||||
noerrors,
|
||||
wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
|
||||
// Create a volume before syncClaim tries to bind a PV to PVC
|
||||
// This simulates external provisioner creating a volume while the controller
|
||||
// is waiting for a volume to bind to the existed claim
|
||||
// the external provisioner workflow implemented in "provisionClaimOperationCSI"
|
||||
// should issue an ExternalProvisioning event to signal that some external provisioner
|
||||
// is working on provisioning the PV, also add the operation start timestamp into local cache
|
||||
// operationTimestamps. Rely on the existences of the start time stamp to create a PV for binding
|
||||
if ctrl.operationTimestamps.Has("default/claim12-4") {
|
||||
volume := newVolume("pvc-uid12-4", "1Gi", "uid12-4", "claim12-4", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController)
|
||||
ctrl.volumes.store.Add(volume) // add the volume to controller
|
||||
reactor.AddVolume(volume)
|
||||
}
|
||||
}),
|
||||
},
|
||||
}
|
||||
|
||||
runMultisyncTests(t, tests, storageClasses, storageClasses[0].Name)
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -203,6 +203,28 @@ type PersistentVolumeController struct {
|
|||
// For testing only: hook to intercept CSI driver name <=> Intree plugin name mapping
|
||||
// Not used when set to nil
|
||||
csiNameFromIntreeNameHook func(pluginName string) (string, error)
|
||||
|
||||
// operationTimestamps caches start timestamp of operations
|
||||
// (currently provision + binding/deletion) for metric recording.
|
||||
// Detailed lifecyle/key for each operation
|
||||
// 1. provision + binding
|
||||
// key: claimKey
|
||||
// start time: user has NOT provide any volume ref in the claim AND
|
||||
// there is no existing volume found for the claim,
|
||||
// "provisionClaim" is called with a valid plugin/external provisioner
|
||||
// to provision a volume
|
||||
// end time: after a volume has been provisioned and bound to the claim successfully
|
||||
// the corresponding timestamp entry will be deleted from cache
|
||||
// abort: claim has not been bound to a volume yet but a claim deleted event
|
||||
// has been received from API server
|
||||
// 2. deletion
|
||||
// key: volumeName
|
||||
// start time: when "reclaimVolume" process a volume with reclaim policy
|
||||
// set to be "PersistentVolumeReclaimDelete"
|
||||
// end time: after a volume deleted event has been received from API server
|
||||
// the corresponding timestamp entry will be deleted from cache
|
||||
// abort: N.A.
|
||||
operationTimestamps metrics.OperationStartTimeCache
|
||||
}
|
||||
|
||||
// syncClaim is the main controller method to decide what to do with a claim.
|
||||
|
@ -323,13 +345,21 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
|
|||
} else /* pv != nil */ {
|
||||
// Found a PV for this claim
|
||||
// OBSERVATION: pvc is "Pending", pv is "Available"
|
||||
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), volume.Name, getVolumeStatusForLogging(volume))
|
||||
claimKey := claimToClaimKey(claim)
|
||||
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
|
||||
if err = ctrl.bind(volume, claim); err != nil {
|
||||
// On any error saving the volume or the claim, subsequent
|
||||
// syncClaim will finish the binding.
|
||||
// record count error for provision if exists
|
||||
// timestamp entry will remain in cache until a success binding has happened
|
||||
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
|
||||
return err
|
||||
}
|
||||
// OBSERVATION: claim is "Bound", pv is "Bound"
|
||||
// if exists a timestamp entry in cache, record end to end provision latency and clean up cache
|
||||
// End of the provision + binding operation lifecycle, cache will be cleaned by "RecordMetric"
|
||||
// [Unit test 12-1, 12-2, 12-4]
|
||||
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
|
||||
return nil
|
||||
}
|
||||
} else /* pvc.Spec.VolumeName != nil */ {
|
||||
|
@ -1011,11 +1041,17 @@ func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolum
|
|||
case v1.PersistentVolumeReclaimDelete:
|
||||
klog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)
|
||||
opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))
|
||||
startTime := time.Now()
|
||||
// create a start timestamp entry in cache for deletion operation if no one exists with
|
||||
// key = volume.Name, pluginName = provisionerName, operation = "delete"
|
||||
ctrl.operationTimestamps.AddIfNotExist(volume.Name, ctrl.getProvisionerNameFromVolume(volume), "delete")
|
||||
ctrl.scheduleOperation(opName, func() error {
|
||||
pluginName, err := ctrl.deleteVolumeOperation(volume)
|
||||
timeTaken := time.Since(startTime).Seconds()
|
||||
metrics.RecordVolumeOperationMetric(pluginName, "delete", timeTaken, err)
|
||||
_, err := ctrl.deleteVolumeOperation(volume)
|
||||
if err != nil {
|
||||
// only report error count to "volume_operation_total_errors"
|
||||
// latency reporting will happen when the volume get finally
|
||||
// deleted and a volume deleted event is captured
|
||||
metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
|
@ -1309,11 +1345,31 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum
|
|||
}
|
||||
klog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
|
||||
opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
|
||||
startTime := time.Now()
|
||||
plugin, storageClass, err := ctrl.findProvisionablePlugin(claim)
|
||||
if err != nil {
|
||||
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())
|
||||
klog.Errorf("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)
|
||||
// failed to find the requested provisioning plugin, directly return err for now.
|
||||
// controller will retry the provisioning in every syncUnboundClaim() call
|
||||
// retain the original behavior of returning nil from provisionClaim call
|
||||
return nil
|
||||
}
|
||||
ctrl.scheduleOperation(opName, func() error {
|
||||
pluginName, err := ctrl.provisionClaimOperation(claim)
|
||||
timeTaken := time.Since(startTime).Seconds()
|
||||
metrics.RecordVolumeOperationMetric(pluginName, "provision", timeTaken, err)
|
||||
// create a start timestamp entry in cache for provision operation if no one exists with
|
||||
// key = claimKey, pluginName = provisionerName, operation = "provision"
|
||||
claimKey := claimToClaimKey(claim)
|
||||
ctrl.operationTimestamps.AddIfNotExist(claimKey, ctrl.getProvisionerName(plugin, storageClass), "provision")
|
||||
var err error
|
||||
if plugin == nil || plugin.IsMigratedToCSI() {
|
||||
_, err = ctrl.provisionClaimOperationExternal(claim, plugin, storageClass)
|
||||
} else {
|
||||
_, err = ctrl.provisionClaimOperation(claim, plugin, storageClass)
|
||||
}
|
||||
// if error happened, record an error count metric
|
||||
// timestamp entry will remain in cache until a success binding has happened
|
||||
if err != nil {
|
||||
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return nil
|
||||
|
@ -1328,39 +1384,20 @@ func (ctrl *PersistentVolumeController) getCSINameFromIntreeName(pluginName stri
|
|||
|
||||
// provisionClaimOperation provisions a volume. This method is running in
|
||||
// standalone goroutine and already has all necessary locks.
|
||||
func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (string, error) {
|
||||
func (ctrl *PersistentVolumeController) provisionClaimOperation(
|
||||
claim *v1.PersistentVolumeClaim,
|
||||
plugin vol.ProvisionableVolumePlugin,
|
||||
storageClass *storage.StorageClass) (string, error) {
|
||||
claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
|
||||
klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)
|
||||
|
||||
plugin, storageClass, err := ctrl.findProvisionablePlugin(claim)
|
||||
if err != nil {
|
||||
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())
|
||||
klog.V(2).Infof("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)
|
||||
// The controller will retry provisioning the volume in every
|
||||
// syncVolume() call.
|
||||
return "", err
|
||||
}
|
||||
|
||||
var pluginName string
|
||||
// called from provisionClaim(), in this case, plugin MUST NOT be nil and
|
||||
// plugin.IsMigratedToCSI() MUST return FALSE
|
||||
// NOTE: checks on plugin/storageClass has been saved
|
||||
pluginName := plugin.GetPluginName()
|
||||
provisionerName := storageClass.Provisioner
|
||||
if plugin != nil {
|
||||
if plugin.IsMigratedToCSI() {
|
||||
// pluginName is not set here to align with existing behavior
|
||||
// of not setting pluginName for external provisioners (including CSI)
|
||||
// Set provisionerName to CSI plugin name for setClaimProvisioner
|
||||
provisionerName, err = ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
|
||||
if err != nil {
|
||||
strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err)
|
||||
klog.V(2).Infof("%s", strerr)
|
||||
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
pluginName = plugin.GetPluginName()
|
||||
}
|
||||
}
|
||||
|
||||
// Add provisioner annotation so external provisioners know when to start
|
||||
// Add provisioner annotation to be consistent with external provisioner workflow
|
||||
newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName)
|
||||
if err != nil {
|
||||
// Save failed, the controller will retry in the next sync
|
||||
|
@ -1369,19 +1406,9 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
|
|||
}
|
||||
claim = newClaim
|
||||
|
||||
if plugin == nil || plugin.IsMigratedToCSI() {
|
||||
// findProvisionablePlugin returned no error nor plugin.
|
||||
// This means that an unknown provisioner is requested. Report an event
|
||||
// and wait for the external provisioner
|
||||
msg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", storageClass.Provisioner)
|
||||
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg)
|
||||
klog.V(3).Infof("provisioning claim %q: %s", claimToClaimKey(claim), msg)
|
||||
return pluginName, nil
|
||||
}
|
||||
|
||||
// internal provisioning
|
||||
|
||||
// A previous doProvisionClaim may just have finished while we were waiting for
|
||||
// A previous provisionClaimOperation may just have finished while we were waiting for
|
||||
// the locks. Check that PV (with deterministic name) hasn't been provisioned
|
||||
// yet.
|
||||
|
||||
|
@ -1547,6 +1574,44 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
|
|||
return pluginName, nil
|
||||
}
|
||||
|
||||
// provisionClaimOperationExternal provisions a volume using external provisioner async-ly
|
||||
// This method will be running in a standalone go-routine scheduled in "provisionClaim"
|
||||
func (ctrl *PersistentVolumeController) provisionClaimOperationExternal(
|
||||
claim *v1.PersistentVolumeClaim,
|
||||
plugin vol.ProvisionableVolumePlugin,
|
||||
storageClass *storage.StorageClass) (string, error) {
|
||||
claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
|
||||
klog.V(4).Infof("provisionClaimOperationExternal [%s] started, class: %q", claimToClaimKey(claim), claimClass)
|
||||
// Set provisionerName to external provisioner name by setClaimProvisioner
|
||||
var err error
|
||||
provisionerName := storageClass.Provisioner
|
||||
if plugin != nil {
|
||||
// update the provisioner name to use the CSI in-tree name
|
||||
provisionerName, err = ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
|
||||
if err != nil {
|
||||
strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err)
|
||||
klog.V(2).Infof("%s", strerr)
|
||||
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
|
||||
return provisionerName, err
|
||||
}
|
||||
}
|
||||
// Add provisioner annotation so external provisioners know when to start
|
||||
newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName)
|
||||
if err != nil {
|
||||
// Save failed, the controller will retry in the next sync
|
||||
klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
|
||||
return provisionerName, err
|
||||
}
|
||||
claim = newClaim
|
||||
msg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", provisionerName)
|
||||
// External provisioner has been requested for provisioning the volume
|
||||
// Report an event and wait for external provisioner to finish
|
||||
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg)
|
||||
klog.V(3).Infof("provisionClaimOperationExternal provisioning claim %q: %s", claimToClaimKey(claim), msg)
|
||||
// return provisioner name here for metric reporting
|
||||
return provisionerName, nil
|
||||
}
|
||||
|
||||
// rescheduleProvisioning signal back to the scheduler to retry dynamic provisioning
|
||||
// by removing the AnnSelectedNode annotation
|
||||
func (ctrl *PersistentVolumeController) rescheduleProvisioning(claim *v1.PersistentVolumeClaim) {
|
||||
|
@ -1661,3 +1726,47 @@ func (ctrl *PersistentVolumeController) findDeletablePlugin(volume *v1.Persisten
|
|||
}
|
||||
return plugin, nil
|
||||
}
|
||||
|
||||
// obtain provisioner/deleter name for a volume
|
||||
func (ctrl *PersistentVolumeController) getProvisionerNameFromVolume(volume *v1.PersistentVolume) string {
|
||||
plugin, err := ctrl.findDeletablePlugin(volume)
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
}
|
||||
// if external provisioner was used for provisioning,
|
||||
// the volume MUST have annotation of AnnDynamicallyProvisioned, use the value
|
||||
// as the provisioner name
|
||||
if plugin == nil {
|
||||
return volume.Annotations[pvutil.AnnDynamicallyProvisioned]
|
||||
} else if plugin.IsMigratedToCSI() {
|
||||
// in case where a plugin has been migrated to CSI,
|
||||
// use the CSI name instead of in-tree name
|
||||
storageClass := v1helper.GetPersistentVolumeClass(volume)
|
||||
class, err := ctrl.classLister.Get(storageClass)
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
}
|
||||
provisionerName, err := ctrl.getCSINameFromIntreeName(class.Provisioner)
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
}
|
||||
return provisionerName
|
||||
}
|
||||
return plugin.GetPluginName()
|
||||
}
|
||||
|
||||
// obtain plugin/external provisioner name from plugin and storage class
|
||||
func (ctrl *PersistentVolumeController) getProvisionerName(plugin vol.ProvisionableVolumePlugin, storageClass *storage.StorageClass) string {
|
||||
// intree plugin, returns the plugin's name
|
||||
if plugin != nil && !plugin.IsMigratedToCSI() {
|
||||
return plugin.GetPluginName()
|
||||
} else if plugin != nil {
|
||||
// get the CSI in-tree name from storage class provisioner name
|
||||
provisionerName, err := ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
|
||||
if err != nil {
|
||||
return "N/A"
|
||||
}
|
||||
return provisionerName
|
||||
}
|
||||
return storageClass.Provisioner
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -92,6 +92,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
|
|||
claimQueue: workqueue.NewNamed("claims"),
|
||||
volumeQueue: workqueue.NewNamed("volumes"),
|
||||
resyncPeriod: p.SyncPeriod,
|
||||
operationTimestamps: metrics.NewOperationStartTimeCache(),
|
||||
}
|
||||
|
||||
// Prober is nil because PV is not aware of Flexvolume.
|
||||
|
@ -209,6 +210,10 @@ func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume
|
|||
func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume) {
|
||||
_ = ctrl.volumes.store.Delete(volume)
|
||||
klog.V(4).Infof("volume %q deleted", volume.Name)
|
||||
// record deletion metric if a deletion start timestamp is in the cache
|
||||
// the following calls will be a no-op if there is nothing for this volume in the cache
|
||||
// end of timestamp cache entry lifecycle, "RecordMetric" will do the clean
|
||||
metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, nil)
|
||||
|
||||
if volume.Spec.ClaimRef == nil {
|
||||
return
|
||||
|
@ -245,20 +250,26 @@ func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeCl
|
|||
}
|
||||
}
|
||||
|
||||
// Unit test [5-5] [5-6] [5-7]
|
||||
// deleteClaim runs in worker thread and handles "claim deleted" event.
|
||||
func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeClaim) {
|
||||
_ = ctrl.claims.Delete(claim)
|
||||
klog.V(4).Infof("claim %q deleted", claimToClaimKey(claim))
|
||||
claimKey := claimToClaimKey(claim)
|
||||
klog.V(4).Infof("claim %q deleted", claimKey)
|
||||
// clean any possible unfinished provision start timestamp from cache
|
||||
// Unit test [5-8] [5-9]
|
||||
ctrl.operationTimestamps.Delete(claimKey)
|
||||
|
||||
volumeName := claim.Spec.VolumeName
|
||||
if volumeName == "" {
|
||||
klog.V(5).Infof("deleteClaim[%q]: volume not bound", claimToClaimKey(claim))
|
||||
klog.V(5).Infof("deleteClaim[%q]: volume not bound", claimKey)
|
||||
return
|
||||
}
|
||||
|
||||
// sync the volume when its claim is deleted. Explicitly sync'ing the
|
||||
// volume here in response to claim deletion prevents the volume from
|
||||
// waiting until the next sync period for its Release.
|
||||
klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimToClaimKey(claim), volumeName)
|
||||
klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimKey, volumeName)
|
||||
ctrl.volumeQueue.Add(volumeName)
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package persistentvolume
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -26,6 +27,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog"
|
||||
|
@ -100,6 +102,130 @@ func TestControllerSync(t *testing.T) {
|
|||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// deleteClaim with a bound claim makes bound volume released with external deleter.
|
||||
// delete the corresponding volume from apiserver, and report latency metric
|
||||
"5-5 - delete claim and delete volume report metric",
|
||||
volumesWithAnnotation(pvutil.AnnDynamicallyProvisioned, "gcr.io/vendor-csi",
|
||||
newVolumeArray("volume5-6", "10Gi", "uid5-6", "claim5-6", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController)),
|
||||
novolumes,
|
||||
claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi",
|
||||
newClaimArray("claim5-5", "uid5-5", "1Gi", "volume5-5", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)),
|
||||
noclaims,
|
||||
noevents, noerrors,
|
||||
// Custom test function that generates a delete claim event which should have been caught by
|
||||
// "deleteClaim" to remove the claim from controller's cache, after that, a volume deleted
|
||||
// event will be generated to trigger "deleteVolume" call for metric reporting
|
||||
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
|
||||
test.initialVolumes[0].Annotations[pvutil.AnnDynamicallyProvisioned] = "gcr.io/vendor-csi"
|
||||
obj := ctrl.claims.List()[0]
|
||||
claim := obj.(*v1.PersistentVolumeClaim)
|
||||
reactor.DeleteClaimEvent(claim)
|
||||
for len(ctrl.claims.ListKeys()) > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
// claim has been removed from controller's cache, generate a volume deleted event
|
||||
volume := ctrl.volumes.store.List()[0].(*v1.PersistentVolume)
|
||||
reactor.DeleteVolumeEvent(volume)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// deleteClaim with a bound claim makes bound volume released with external deleter pending
|
||||
// there should be an entry in operation timestamps cache in controller
|
||||
"5-6 - delete claim and waiting for external volume deletion",
|
||||
volumesWithAnnotation(pvutil.AnnDynamicallyProvisioned, "gcr.io/vendor-csi",
|
||||
newVolumeArray("volume5-6", "10Gi", "uid5-6", "claim5-6", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController)),
|
||||
volumesWithAnnotation(pvutil.AnnDynamicallyProvisioned, "gcr.io/vendor-csi",
|
||||
newVolumeArray("volume5-6", "10Gi", "uid5-6", "claim5-6", v1.VolumeReleased, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController)),
|
||||
claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi",
|
||||
newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)),
|
||||
noclaims,
|
||||
noevents, noerrors,
|
||||
// Custom test function that generates a delete claim event which should have been caught by
|
||||
// "deleteClaim" to remove the claim from controller's cache and mark bound volume to be released
|
||||
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
|
||||
// should have been provisioned by external provisioner
|
||||
obj := ctrl.claims.List()[0]
|
||||
claim := obj.(*v1.PersistentVolumeClaim)
|
||||
reactor.DeleteClaimEvent(claim)
|
||||
// wait until claim is cleared from cache, i.e., deleteClaim is called
|
||||
for len(ctrl.claims.ListKeys()) > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
// make sure the operation timestamp cache is NOT empty
|
||||
if !ctrl.operationTimestamps.Has("volume5-6") {
|
||||
return errors.New("failed checking timestamp cache: should not be empty")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// deleteVolume event issued before deleteClaim, no metric should have been reported
|
||||
// and no delete operation start timestamp should be inserted into controller.operationTimestamps cache
|
||||
"5-7 - delete volume event makes claim lost, delete claim event will not report metric",
|
||||
newVolumeArray("volume5-7", "10Gi", "uid5-7", "claim5-7", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController, pvutil.AnnDynamicallyProvisioned),
|
||||
novolumes,
|
||||
claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi",
|
||||
newClaimArray("claim5-7", "uid5-7", "1Gi", "volume5-7", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)),
|
||||
noclaims,
|
||||
[]string{"Warning ClaimLost"},
|
||||
noerrors,
|
||||
// Custom test function that generates a delete claim event which should have been caught by
|
||||
// "deleteClaim" to remove the claim from controller's cache and mark bound volume to be released
|
||||
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
|
||||
volume := ctrl.volumes.store.List()[0].(*v1.PersistentVolume)
|
||||
reactor.DeleteVolumeEvent(volume)
|
||||
for len(ctrl.volumes.store.ListKeys()) > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
// trying to remove the claim as well
|
||||
obj := ctrl.claims.List()[0]
|
||||
claim := obj.(*v1.PersistentVolumeClaim)
|
||||
reactor.DeleteClaimEvent(claim)
|
||||
// wait until claim is cleared from cache, i.e., deleteClaim is called
|
||||
for len(ctrl.claims.ListKeys()) > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
// make sure operation timestamp cache is empty
|
||||
if ctrl.operationTimestamps.Has("volume5-7") {
|
||||
return errors.New("failed checking timestamp cache")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
// delete a claim waiting for being bound cleans up provision(volume ref == "") entry from timestamp cache
|
||||
"5-8 - delete claim cleans up operation timestamp cache for provision",
|
||||
novolumes,
|
||||
novolumes,
|
||||
claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi",
|
||||
newClaimArray("claim5-8", "uid5-8", "1Gi", "", v1.ClaimPending, &classExternal)),
|
||||
noclaims,
|
||||
[]string{"Normal ExternalProvisioning"},
|
||||
noerrors,
|
||||
// Custom test function that generates a delete claim event which should have been caught by
|
||||
// "deleteClaim" to remove the claim from controller's cache and mark bound volume to be released
|
||||
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
|
||||
// wait until the provision timestamp has been inserted
|
||||
for !ctrl.operationTimestamps.Has("default/claim5-8") {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
// delete the claim
|
||||
obj := ctrl.claims.List()[0]
|
||||
claim := obj.(*v1.PersistentVolumeClaim)
|
||||
reactor.DeleteClaimEvent(claim)
|
||||
// wait until claim is cleared from cache, i.e., deleteClaim is called
|
||||
for len(ctrl.claims.ListKeys()) > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
// make sure operation timestamp cache is empty
|
||||
if ctrl.operationTimestamps.Has("default/claim5-8") {
|
||||
return errors.New("failed checking timestamp cache")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -120,6 +246,18 @@ func TestControllerSync(t *testing.T) {
|
|||
t.Fatalf("Test %q construct persistent volume failed: %v", test.name, err)
|
||||
}
|
||||
|
||||
// Inject storage classes into controller via a custom lister for test [5-5]
|
||||
storageClasses := []*storagev1.StorageClass{
|
||||
makeStorageClass(classExternal, &modeImmediate),
|
||||
}
|
||||
|
||||
storageClasses[0].Provisioner = "gcr.io/vendor-csi"
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
||||
for _, class := range storageClasses {
|
||||
indexer.Add(class)
|
||||
}
|
||||
ctrl.classLister = storagelisters.NewStorageClassLister(indexer)
|
||||
|
||||
reactor := newVolumeReactor(client, ctrl, fakeVolumeWatch, fakeClaimWatch, test.errors)
|
||||
for _, claim := range test.initialClaims {
|
||||
reactor.AddClaim(claim)
|
||||
|
|
|
@ -54,6 +54,15 @@ var storageOperationStatusMetric = prometheus.NewCounterVec(
|
|||
[]string{"volume_plugin", "operation_name", "status"},
|
||||
)
|
||||
|
||||
var storageOperationEndToEndLatencyMetric = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "volume_operation_total_seconds",
|
||||
Help: "Storage operation end to end duration in seconds",
|
||||
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
|
||||
},
|
||||
[]string{"volume_plugin", "operation_name"},
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerMetrics()
|
||||
}
|
||||
|
@ -62,6 +71,7 @@ func registerMetrics() {
|
|||
prometheus.MustRegister(storageOperationMetric)
|
||||
prometheus.MustRegister(storageOperationErrorMetric)
|
||||
prometheus.MustRegister(storageOperationStatusMetric)
|
||||
prometheus.MustRegister(storageOperationEndToEndLatencyMetric)
|
||||
}
|
||||
|
||||
// OperationCompleteHook returns a hook to call when an operation is completed
|
||||
|
@ -95,3 +105,9 @@ func GetFullQualifiedPluginNameForVolume(pluginName string, spec *volume.Spec) s
|
|||
}
|
||||
return pluginName
|
||||
}
|
||||
|
||||
// RecordOperationLatencyMetric records the end to end latency for certain operation
|
||||
// into metric volume_operation_total_seconds
|
||||
func RecordOperationLatencyMetric(plugin, operationName string, secondsTaken float64) {
|
||||
storageOperationEndToEndLatencyMetric.WithLabelValues(plugin, operationName).Observe(secondsTaken)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue