mirror of https://github.com/k3s-io/k3s
Merge pull request #77442 from cofyc/fix77084
Fix go lint failures in volume scheduling packagesk3s-v1.15.3
commit
43284ecbfc
|
@ -103,8 +103,6 @@ pkg/controller/volume/expand
|
|||
pkg/controller/volume/persistentvolume
|
||||
pkg/controller/volume/persistentvolume/config/v1alpha1
|
||||
pkg/controller/volume/persistentvolume/options
|
||||
pkg/controller/volume/persistentvolume/testing
|
||||
pkg/controller/volume/scheduling
|
||||
pkg/credentialprovider
|
||||
pkg/credentialprovider/gcp
|
||||
pkg/features
|
||||
|
|
|
@ -688,7 +688,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
|
|||
obj := reactor.PopChange()
|
||||
if obj == nil {
|
||||
// Nothing was changed, should we exit?
|
||||
if firstSync || reactor.ChangedSinceLastSync() > 0 {
|
||||
if firstSync || reactor.GetChangeCount() > 0 {
|
||||
// There were some changes after the last "periodic sync".
|
||||
// Simulate "periodic sync" of everything (until it produces
|
||||
// no changes).
|
||||
|
@ -712,7 +712,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
|
|||
ctrl.claims.Update(claim)
|
||||
err = ctrl.syncClaim(claim)
|
||||
if err != nil {
|
||||
if err == pvtesting.VersionConflictError {
|
||||
if err == pvtesting.ErrVersionConflict {
|
||||
// Ignore version errors
|
||||
klog.V(4).Infof("test intentionaly ignores version error.")
|
||||
} else {
|
||||
|
@ -729,7 +729,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
|
|||
ctrl.volumes.store.Update(volume)
|
||||
err = ctrl.syncVolume(volume)
|
||||
if err != nil {
|
||||
if err == pvtesting.VersionConflictError {
|
||||
if err == pvtesting.ErrVersionConflict {
|
||||
// Ignore version errors
|
||||
klog.V(4).Infof("test intentionaly ignores version error.")
|
||||
} else {
|
||||
|
|
|
@ -37,7 +37,9 @@ import (
|
|||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
var VersionConflictError = errors.New("VersionError")
|
||||
// ErrVersionConflict is the error returned when resource version of requested
|
||||
// object conflicts with the object in storage.
|
||||
var ErrVersionConflict = errors.New("VersionError")
|
||||
|
||||
// VolumeReactor is a core.Reactor that simulates etcd and API server. It
|
||||
// stores:
|
||||
|
@ -157,7 +159,7 @@ func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
|||
storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion)
|
||||
requestedVer, _ := strconv.Atoi(volume.ResourceVersion)
|
||||
if storedVer != requestedVer {
|
||||
return true, obj, VersionConflictError
|
||||
return true, obj, ErrVersionConflict
|
||||
}
|
||||
if reflect.DeepEqual(storedVolume, volume) {
|
||||
klog.V(4).Infof("nothing updated volume %s", volume.Name)
|
||||
|
@ -190,7 +192,7 @@ func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
|||
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
|
||||
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
|
||||
if storedVer != requestedVer {
|
||||
return true, obj, VersionConflictError
|
||||
return true, obj, ErrVersionConflict
|
||||
}
|
||||
if reflect.DeepEqual(storedClaim, claim) {
|
||||
klog.V(4).Infof("nothing updated claim %s", claim.Name)
|
||||
|
@ -219,10 +221,9 @@ func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
|||
if found {
|
||||
klog.V(4).Infof("GetVolume: found %s", volume.Name)
|
||||
return true, volume.DeepCopy(), nil
|
||||
} else {
|
||||
klog.V(4).Infof("GetVolume: volume %s not found", name)
|
||||
return true, nil, fmt.Errorf("Cannot find volume %s", name)
|
||||
}
|
||||
klog.V(4).Infof("GetVolume: volume %s not found", name)
|
||||
return true, nil, fmt.Errorf("Cannot find volume %s", name)
|
||||
|
||||
case action.Matches("get", "persistentvolumeclaims"):
|
||||
name := action.(core.GetAction).GetName()
|
||||
|
@ -230,10 +231,9 @@ func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
|||
if found {
|
||||
klog.V(4).Infof("GetClaim: found %s", claim.Name)
|
||||
return true, claim.DeepCopy(), nil
|
||||
} else {
|
||||
klog.V(4).Infof("GetClaim: claim %s not found", name)
|
||||
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
|
||||
}
|
||||
klog.V(4).Infof("GetClaim: claim %s not found", name)
|
||||
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
|
||||
|
||||
case action.Matches("delete", "persistentvolumes"):
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
|
@ -246,9 +246,8 @@ func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
|||
}
|
||||
r.changedSinceLastSync++
|
||||
return true, nil, nil
|
||||
} else {
|
||||
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
|
||||
}
|
||||
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
|
||||
|
||||
case action.Matches("delete", "persistentvolumeclaims"):
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
|
@ -261,9 +260,8 @@ func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
|||
}
|
||||
r.changedSinceLastSync++
|
||||
return true, nil, nil
|
||||
} else {
|
||||
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
|
||||
}
|
||||
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
|
||||
}
|
||||
|
||||
return false, nil, nil
|
||||
|
@ -299,12 +297,6 @@ func (r *VolumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) [
|
|||
return watches
|
||||
}
|
||||
|
||||
func (r *VolumeReactor) ChangedSinceLastSync() int {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
return r.changedSinceLastSync
|
||||
}
|
||||
|
||||
// injectReactError returns an error when the test requested given action to
|
||||
// fail. nil is returned otherwise.
|
||||
func (r *VolumeReactor) injectReactError(action core.Action) error {
|
||||
|
@ -435,6 +427,7 @@ func (r *VolumeReactor) SyncAll() {
|
|||
r.changedSinceLastSync = 0
|
||||
}
|
||||
|
||||
// GetChangeCount returns changes since last sync.
|
||||
func (r *VolumeReactor) GetChangeCount() int {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
@ -515,6 +508,7 @@ func (r *VolumeReactor) AddClaimEvent(claim *v1.PersistentVolumeClaim) {
|
|||
}
|
||||
}
|
||||
|
||||
// AddClaims adds PVCs into VolumeReactor.
|
||||
func (r *VolumeReactor) AddClaims(claims []*v1.PersistentVolumeClaim) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
@ -523,6 +517,7 @@ func (r *VolumeReactor) AddClaims(claims []*v1.PersistentVolumeClaim) {
|
|||
}
|
||||
}
|
||||
|
||||
// AddVolumes adds PVs into VolumeReactor.
|
||||
func (r *VolumeReactor) AddVolumes(volumes []*v1.PersistentVolume) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
@ -531,24 +526,28 @@ func (r *VolumeReactor) AddVolumes(volumes []*v1.PersistentVolume) {
|
|||
}
|
||||
}
|
||||
|
||||
// AddClaim adds a PVC into VolumeReactor.
|
||||
func (r *VolumeReactor) AddClaim(claim *v1.PersistentVolumeClaim) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.claims[claim.Name] = claim
|
||||
}
|
||||
|
||||
// AddVolume adds a PV into VolumeReactor.
|
||||
func (r *VolumeReactor) AddVolume(volume *v1.PersistentVolume) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.volumes[volume.Name] = volume
|
||||
}
|
||||
|
||||
// DeleteVolume deletes a PV by name.
|
||||
func (r *VolumeReactor) DeleteVolume(name string) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
delete(r.volumes, name)
|
||||
}
|
||||
|
||||
// AddClaimBoundToVolume adds a PVC and binds it to corresponding PV.
|
||||
func (r *VolumeReactor) AddClaimBoundToVolume(claim *v1.PersistentVolumeClaim) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
@ -558,6 +557,7 @@ func (r *VolumeReactor) AddClaimBoundToVolume(claim *v1.PersistentVolumeClaim) {
|
|||
}
|
||||
}
|
||||
|
||||
// MarkVolumeAvaiable marks a PV available by name.
|
||||
func (r *VolumeReactor) MarkVolumeAvaiable(name string) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
@ -568,6 +568,7 @@ func (r *VolumeReactor) MarkVolumeAvaiable(name string) {
|
|||
}
|
||||
}
|
||||
|
||||
// NewVolumeReactor creates a volume reactor.
|
||||
func NewVolumeReactor(client *fake.Clientset, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []ReactorError) *VolumeReactor {
|
||||
reactor := &VolumeReactor{
|
||||
volumes: make(map[string]*v1.PersistentVolume),
|
||||
|
|
|
@ -127,7 +127,8 @@ func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
|
|||
return c.indexFunc(objInfo.latestObj)
|
||||
}
|
||||
|
||||
func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) *assumeCache {
|
||||
// NewAssumeCache creates an assume cache for genernal objects.
|
||||
func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
|
||||
c := &assumeCache{
|
||||
description: description,
|
||||
indexFunc: indexFunc,
|
||||
|
@ -344,7 +345,7 @@ type PVAssumeCache interface {
|
|||
}
|
||||
|
||||
type pvAssumeCache struct {
|
||||
*assumeCache
|
||||
AssumeCache
|
||||
}
|
||||
|
||||
func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
|
||||
|
@ -354,8 +355,9 @@ func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
|
|||
return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
|
||||
}
|
||||
|
||||
// NewPVAssumeCache creates a PV assume cache.
|
||||
func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
|
||||
return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
|
||||
return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
|
||||
|
@ -411,11 +413,12 @@ type PVCAssumeCache interface {
|
|||
}
|
||||
|
||||
type pvcAssumeCache struct {
|
||||
*assumeCache
|
||||
AssumeCache
|
||||
}
|
||||
|
||||
// NewPVCAssumeCache creates a PVC assume cache.
|
||||
func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
|
||||
return &pvcAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)}
|
||||
return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)}
|
||||
}
|
||||
|
||||
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
|
||||
|
|
|
@ -109,13 +109,13 @@ func TestAssumePV(t *testing.T) {
|
|||
|
||||
for name, scenario := range scenarios {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Add oldPV to cache
|
||||
internal_cache.add(scenario.oldPV)
|
||||
internalCache.add(scenario.oldPV)
|
||||
if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
|
||||
t.Errorf("Failed to GetPV() after initial update: %v", err)
|
||||
continue
|
||||
|
@ -143,7 +143,7 @@ func TestAssumePV(t *testing.T) {
|
|||
|
||||
func TestRestorePV(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ func TestRestorePV(t *testing.T) {
|
|||
cache.Restore("nothing")
|
||||
|
||||
// Add oldPV to cache
|
||||
internal_cache.add(oldPV)
|
||||
internalCache.add(oldPV)
|
||||
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after initial update: %v", err)
|
||||
}
|
||||
|
@ -183,7 +183,7 @@ func TestRestorePV(t *testing.T) {
|
|||
|
||||
func TestBasicPVCache(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ func TestBasicPVCache(t *testing.T) {
|
|||
for i := 0; i < 10; i++ {
|
||||
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "")
|
||||
pvs[pv.Name] = pv
|
||||
internal_cache.add(pv)
|
||||
internalCache.add(pv)
|
||||
}
|
||||
|
||||
// List them
|
||||
|
@ -211,7 +211,7 @@ func TestBasicPVCache(t *testing.T) {
|
|||
// Update a PV
|
||||
updatedPV := makePV("test-pv3", "2", "")
|
||||
pvs[updatedPV.Name] = updatedPV
|
||||
internal_cache.update(nil, updatedPV)
|
||||
internalCache.update(nil, updatedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs, "")
|
||||
|
@ -219,7 +219,7 @@ func TestBasicPVCache(t *testing.T) {
|
|||
// Delete a PV
|
||||
deletedPV := pvs["test-pv7"]
|
||||
delete(pvs, deletedPV.Name)
|
||||
internal_cache.delete(deletedPV)
|
||||
internalCache.delete(deletedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs, "")
|
||||
|
@ -227,7 +227,7 @@ func TestBasicPVCache(t *testing.T) {
|
|||
|
||||
func TestPVCacheWithStorageClasses(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
|
|||
for i := 0; i < 10; i++ {
|
||||
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "class1")
|
||||
pvs1[pv.Name] = pv
|
||||
internal_cache.add(pv)
|
||||
internalCache.add(pv)
|
||||
}
|
||||
|
||||
// Add a bunch of PVs
|
||||
|
@ -245,7 +245,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
|
|||
for i := 0; i < 10; i++ {
|
||||
pv := makePV(fmt.Sprintf("test2-pv%v", i), "1", "class2")
|
||||
pvs2[pv.Name] = pv
|
||||
internal_cache.add(pv)
|
||||
internalCache.add(pv)
|
||||
}
|
||||
|
||||
// List them
|
||||
|
@ -255,7 +255,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
|
|||
// Update a PV
|
||||
updatedPV := makePV("test-pv3", "2", "class1")
|
||||
pvs1[updatedPV.Name] = updatedPV
|
||||
internal_cache.update(nil, updatedPV)
|
||||
internalCache.update(nil, updatedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs1, "class1")
|
||||
|
@ -264,7 +264,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
|
|||
// Delete a PV
|
||||
deletedPV := pvs1["test-pv7"]
|
||||
delete(pvs1, deletedPV.Name)
|
||||
internal_cache.delete(deletedPV)
|
||||
internalCache.delete(deletedPV)
|
||||
|
||||
// List them
|
||||
verifyListPVs(t, cache, pvs1, "class1")
|
||||
|
@ -273,7 +273,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
|
|||
|
||||
func TestAssumeUpdatePVCache(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
@ -282,7 +282,7 @@ func TestAssumeUpdatePVCache(t *testing.T) {
|
|||
|
||||
// Add a PV
|
||||
pv := makePV(pvName, "1", "")
|
||||
internal_cache.add(pv)
|
||||
internalCache.add(pv)
|
||||
if err := verifyPV(cache, pvName, pv); err != nil {
|
||||
t.Fatalf("failed to get PV: %v", err)
|
||||
}
|
||||
|
@ -298,7 +298,7 @@ func TestAssumeUpdatePVCache(t *testing.T) {
|
|||
}
|
||||
|
||||
// Add old PV
|
||||
internal_cache.add(pv)
|
||||
internalCache.add(pv)
|
||||
if err := verifyPV(cache, pvName, newPV); err != nil {
|
||||
t.Fatalf("failed to get PV after old PV added: %v", err)
|
||||
}
|
||||
|
@ -366,13 +366,13 @@ func TestAssumePVC(t *testing.T) {
|
|||
|
||||
for name, scenario := range scenarios {
|
||||
cache := NewPVCAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvcAssumeCache)
|
||||
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
// Add oldPVC to cache
|
||||
internal_cache.add(scenario.oldPVC)
|
||||
internalCache.add(scenario.oldPVC)
|
||||
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
|
||||
t.Errorf("Failed to GetPVC() after initial update: %v", err)
|
||||
continue
|
||||
|
@ -400,7 +400,7 @@ func TestAssumePVC(t *testing.T) {
|
|||
|
||||
func TestRestorePVC(t *testing.T) {
|
||||
cache := NewPVCAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvcAssumeCache)
|
||||
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
@ -412,7 +412,7 @@ func TestRestorePVC(t *testing.T) {
|
|||
cache.Restore("nothing")
|
||||
|
||||
// Add oldPVC to cache
|
||||
internal_cache.add(oldPVC)
|
||||
internalCache.add(oldPVC)
|
||||
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
|
||||
t.Fatalf("Failed to GetPVC() after initial update: %v", err)
|
||||
}
|
||||
|
@ -440,7 +440,7 @@ func TestRestorePVC(t *testing.T) {
|
|||
|
||||
func TestAssumeUpdatePVCCache(t *testing.T) {
|
||||
cache := NewPVCAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvcAssumeCache)
|
||||
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
@ -450,7 +450,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
|
|||
|
||||
// Add a PVC
|
||||
pvc := makeClaim(pvcName, "1", pvcNamespace)
|
||||
internal_cache.add(pvc)
|
||||
internalCache.add(pvc)
|
||||
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
|
||||
t.Fatalf("failed to get PVC: %v", err)
|
||||
}
|
||||
|
@ -466,7 +466,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
|
|||
}
|
||||
|
||||
// Add old PVC
|
||||
internal_cache.add(pvc)
|
||||
internalCache.add(pvc)
|
||||
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
|
||||
t.Fatalf("failed to get PVC after old PVC added: %v", err)
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
const VolumeSchedulerSubsystem = "scheduler_volume"
|
||||
|
||||
var (
|
||||
// VolumeBindingRequestSchedulerBinderCache tracks the number of volume binder cache operations.
|
||||
VolumeBindingRequestSchedulerBinderCache = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Subsystem: VolumeSchedulerSubsystem,
|
||||
|
@ -32,6 +33,7 @@ var (
|
|||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
// VolumeSchedulingStageLatency tracks the latency of volume scheduling operations.
|
||||
VolumeSchedulingStageLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: VolumeSchedulerSubsystem,
|
||||
|
@ -41,6 +43,7 @@ var (
|
|||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
// VolumeSchedulingStageFailed tracks the number of failed volume scheduling operations.
|
||||
VolumeSchedulingStageFailed = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Subsystem: VolumeSchedulerSubsystem,
|
||||
|
|
|
@ -397,14 +397,14 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
|
|||
// TODO: does it hurt if we make an api call and nothing needs to be updated?
|
||||
claimKey := claimToClaimKey(binding.pvc)
|
||||
klog.V(2).Infof("claim %q bound to volume %q", claimKey, binding.pv.Name)
|
||||
if newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(binding.pv); err != nil {
|
||||
newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(binding.pv)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", binding.pv.Name, claimKey, err)
|
||||
return err
|
||||
} else {
|
||||
klog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", binding.pv.Name, claimKey)
|
||||
// Save updated object from apiserver for later checking.
|
||||
binding.pv = newPV
|
||||
}
|
||||
klog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", binding.pv.Name, claimKey)
|
||||
// Save updated object from apiserver for later checking.
|
||||
binding.pv = newPV
|
||||
lastProcessedBinding++
|
||||
}
|
||||
|
||||
|
@ -412,12 +412,12 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
|
|||
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
|
||||
for i, claim = range claimsToProvision {
|
||||
klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim))
|
||||
if newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
|
||||
newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
// Save updated object from apiserver for later checking.
|
||||
claimsToProvision[i] = newClaim
|
||||
}
|
||||
// Save updated object from apiserver for later checking.
|
||||
claimsToProvision[i] = newClaim
|
||||
lastProcessedProvisioning++
|
||||
}
|
||||
|
||||
|
@ -527,9 +527,8 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
|||
// And if PV does not exist because it's deleted, PVC will
|
||||
// be unbound eventually.
|
||||
return false, nil
|
||||
} else {
|
||||
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
|
||||
}
|
||||
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
|
||||
}
|
||||
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
|
||||
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
"k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// podBindingCache stores PV binding decisions per pod per node.
|
||||
// PodBindingCache stores PV binding decisions per pod per node.
|
||||
// Pod entries are removed when the Pod is deleted or updated to
|
||||
// no longer be schedulable.
|
||||
type PodBindingCache interface {
|
||||
|
@ -69,6 +69,7 @@ type nodeDecision struct {
|
|||
provisionings []*v1.PersistentVolumeClaim
|
||||
}
|
||||
|
||||
// NewPodBindingCache creates a pod binding cache.
|
||||
func NewPodBindingCache() PodBindingCache {
|
||||
return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package scheduling
|
|||
|
||||
import "k8s.io/api/core/v1"
|
||||
|
||||
// FakeVolumeBinderConfig holds configurations for fake volume binder.
|
||||
type FakeVolumeBinderConfig struct {
|
||||
AllBound bool
|
||||
FindUnboundSatsified bool
|
||||
|
@ -27,7 +28,7 @@ type FakeVolumeBinderConfig struct {
|
|||
BindErr error
|
||||
}
|
||||
|
||||
// NewVolumeBinder sets up all the caches needed for the scheduler to make
|
||||
// NewFakeVolumeBinder sets up all the caches needed for the scheduler to make
|
||||
// topology-aware volume binding decisions.
|
||||
func NewFakeVolumeBinder(config *FakeVolumeBinderConfig) *FakeVolumeBinder {
|
||||
return &FakeVolumeBinder{
|
||||
|
@ -35,26 +36,31 @@ func NewFakeVolumeBinder(config *FakeVolumeBinderConfig) *FakeVolumeBinder {
|
|||
}
|
||||
}
|
||||
|
||||
// FakeVolumeBinder represents a fake volume binder for testing.
|
||||
type FakeVolumeBinder struct {
|
||||
config *FakeVolumeBinderConfig
|
||||
AssumeCalled bool
|
||||
BindCalled bool
|
||||
}
|
||||
|
||||
// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
|
||||
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatsified bool, err error) {
|
||||
return b.config.FindUnboundSatsified, b.config.FindBoundSatsified, b.config.FindErr
|
||||
}
|
||||
|
||||
// AssumePodVolumes implements SchedulerVolumeBinder.AssumePodVolumes.
|
||||
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, error) {
|
||||
b.AssumeCalled = true
|
||||
return b.config.AllBound, b.config.AssumeErr
|
||||
}
|
||||
|
||||
// BindPodVolumes implements SchedulerVolumeBinder.BindPodVolumes.
|
||||
func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
|
||||
b.BindCalled = true
|
||||
return b.config.BindErr
|
||||
}
|
||||
|
||||
// GetBindingsCache implements SchedulerVolumeBinder.GetBindingsCache.
|
||||
func (b *FakeVolumeBinder) GetBindingsCache() PodBindingCache {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -103,8 +103,8 @@ type testEnv struct {
|
|||
binder SchedulerVolumeBinder
|
||||
internalBinder *volumeBinder
|
||||
internalNodeInformer coreinformers.NodeInformer
|
||||
internalPVCache *pvAssumeCache
|
||||
internalPVCCache *pvcAssumeCache
|
||||
internalPVCache *assumeCache
|
||||
internalPVCCache *assumeCache
|
||||
}
|
||||
|
||||
func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
|
||||
|
@ -206,13 +206,13 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
|
|||
}
|
||||
|
||||
pvCache := internalBinder.pvCache
|
||||
internalPVCache, ok := pvCache.(*pvAssumeCache)
|
||||
internalPVCache, ok := pvCache.(*pvAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to convert to internal PV cache")
|
||||
}
|
||||
|
||||
pvcCache := internalBinder.pvcCache
|
||||
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
|
||||
internalPVCCache, ok := pvcCache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to convert to internal PVC cache")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue