Merge pull request #68860 from bsalamat/remove-pdb-cache

Remove PDB and its event handlers from the scheduler cache
pull/58/head
k8s-ci-robot 2018-09-26 19:22:34 -07:00 committed by GitHub
commit d89418a083
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 42 additions and 444 deletions

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/scheduler/cache:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)

View File

@ -19,6 +19,7 @@ package algorithm
import (
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/labels"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
@ -122,6 +123,12 @@ type ReplicaSetLister interface {
GetPodReplicaSets(*v1.Pod) ([]*apps.ReplicaSet, error)
}
// PDBLister interface represents anything that can list PodDisruptionBudget objects.
type PDBLister interface {
// List() returns a list of PodDisruptionBudgets matching the selector.
List(labels.Selector) ([]*policyv1beta1.PodDisruptionBudget, error)
}
var _ ControllerLister = &EmptyControllerLister{}
// EmptyControllerLister implements ControllerLister on []v1.ReplicationController returning empty data

View File

@ -18,7 +18,6 @@ go_library(
"//pkg/scheduler/util:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
@ -43,12 +42,10 @@ go_test(
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
],

View File

@ -29,7 +29,6 @@ import (
"k8s.io/kubernetes/pkg/features"
"github.com/golang/glog"
policy "k8s.io/api/policy/v1beta1"
)
var (
@ -60,7 +59,6 @@ type schedulerCache struct {
podStates map[string]*podState
nodes map[string]*NodeInfo
nodeTree *NodeTree
pdbs map[string]*policy.PodDisruptionBudget
// A map from image name to its imageState.
imageStates map[string]*imageState
}
@ -106,7 +104,6 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
pdbs: make(map[string]*policy.PodDisruptionBudget),
imageStates: make(map[string]*imageState),
}
}
@ -127,15 +124,9 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
assumedPods[k] = v
}
pdbs := make(map[string]*policy.PodDisruptionBudget)
for k, v := range cache.pdbs {
pdbs[k] = v.DeepCopy()
}
return &Snapshot{
Nodes: nodes,
AssumedPods: assumedPods,
Pdbs: pdbs,
}
}
@ -522,39 +513,6 @@ func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
}
}
func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error {
cache.mu.Lock()
defer cache.mu.Unlock()
// Unconditionally update cache.
cache.pdbs[string(pdb.UID)] = pdb
return nil
}
func (cache *schedulerCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error {
return cache.AddPDB(newPDB)
}
func (cache *schedulerCache) RemovePDB(pdb *policy.PodDisruptionBudget) error {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.pdbs, string(pdb.UID))
return nil
}
func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
var pdbs []*policy.PodDisruptionBudget
for _, pdb := range cache.pdbs {
if selector.Matches(labels.Set(pdb.Labels)) {
pdbs = append(pdbs, pdb)
}
}
return pdbs, nil
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}

View File

@ -24,12 +24,10 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
@ -1230,108 +1228,3 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
}
return cache
}
func makePDB(name, namespace string, uid types.UID, labels map[string]string, minAvailable int) *v1beta1.PodDisruptionBudget {
intstrMin := intstr.FromInt(minAvailable)
pdb := &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
Labels: labels,
UID: uid,
},
Spec: v1beta1.PodDisruptionBudgetSpec{
MinAvailable: &intstrMin,
Selector: &metav1.LabelSelector{MatchLabels: labels},
},
}
return pdb
}
// TestPDBOperations tests that a PDB will be add/updated/deleted correctly.
func TestPDBOperations(t *testing.T) {
ttl := 10 * time.Second
testPDBs := []*v1beta1.PodDisruptionBudget{
makePDB("pdb0", "ns1", "uid0", map[string]string{"tkey1": "tval1"}, 3),
makePDB("pdb1", "ns1", "uid1", map[string]string{"tkey1": "tval1", "tkey2": "tval2"}, 1),
makePDB("pdb2", "ns3", "uid2", map[string]string{"tkey3": "tval3", "tkey2": "tval2"}, 10),
}
updatedPDBs := []*v1beta1.PodDisruptionBudget{
makePDB("pdb0", "ns1", "uid0", map[string]string{"tkey4": "tval4"}, 8),
makePDB("pdb1", "ns1", "uid1", map[string]string{"tkey1": "tval1"}, 1),
makePDB("pdb2", "ns3", "uid2", map[string]string{"tkey3": "tval3", "tkey1": "tval1", "tkey2": "tval2"}, 10),
}
tests := []struct {
pdbsToAdd []*v1beta1.PodDisruptionBudget
pdbsToUpdate []*v1beta1.PodDisruptionBudget
pdbsToDelete []*v1beta1.PodDisruptionBudget
expectedPDBs []*v1beta1.PodDisruptionBudget // Expected PDBs after all operations
}{
{
pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]},
pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1], testPDBs[0]},
expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1]}, // both will be in the cache as they have different names
},
{
pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]},
pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]},
expectedPDBs: []*v1beta1.PodDisruptionBudget{updatedPDBs[0]},
},
{
pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[2]},
pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]},
pdbsToDelete: []*v1beta1.PodDisruptionBudget{testPDBs[0]},
expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[2]},
},
}
for _, test := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pdbToAdd := range test.pdbsToAdd {
if err := cache.AddPDB(pdbToAdd); err != nil {
t.Fatalf("AddPDB failed: %v", err)
}
}
for i := range test.pdbsToUpdate {
if i == 0 {
continue
}
if err := cache.UpdatePDB(test.pdbsToUpdate[i-1], test.pdbsToUpdate[i]); err != nil {
t.Fatalf("UpdatePDB failed: %v", err)
}
}
for _, pdb := range test.pdbsToDelete {
if err := cache.RemovePDB(pdb); err != nil {
t.Fatalf("RemovePDB failed: %v", err)
}
}
cachedPDBs, err := cache.ListPDBs(labels.Everything())
if err != nil {
t.Fatalf("ListPDBs failed: %v", err)
}
if len(cachedPDBs) != len(test.expectedPDBs) {
t.Errorf("Expected %d PDBs, got %d", len(test.expectedPDBs), len(cachedPDBs))
}
for _, pdb := range test.expectedPDBs {
found := false
// find it among the cached ones
for _, cpdb := range cachedPDBs {
if pdb.UID == cpdb.UID {
found = true
if !reflect.DeepEqual(pdb, cpdb) {
t.Errorf("%v is not equal to %v", pdb, cpdb)
}
break
}
}
if !found {
t.Errorf("PDB with uid '%v' was not found in the cache.", pdb.UID)
}
}
}
}

View File

@ -18,7 +18,6 @@ package cache
import (
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/labels"
)
@ -97,18 +96,6 @@ type Cache interface {
// RemoveNode removes overall information about node.
RemoveNode(node *v1.Node) error
// AddPDB adds a PodDisruptionBudget object to the cache.
AddPDB(pdb *policy.PodDisruptionBudget) error
// UpdatePDB updates a PodDisruptionBudget object in the cache.
UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error
// RemovePDB removes a PodDisruptionBudget object from the cache.
RemovePDB(pdb *policy.PodDisruptionBudget) error
// List lists all cached PDBs matching the selector.
ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error)
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
@ -131,5 +118,4 @@ type Cache interface {
type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*NodeInfo
Pdbs map[string]*policy.PodDisruptionBudget
}

View File

@ -513,6 +513,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
schedulertesting.FakePDBLister{},
false,
false,
schedulerapi.DefaultPercentageOfNodesToScore)

View File

@ -107,6 +107,7 @@ type genericScheduler struct {
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister algorithm.PDBLister
disablePreemption bool
percentageOfNodesToScore int32
}
@ -266,7 +267,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
pdbs, err := g.cache.ListPDBs(labels.Everything())
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
@ -1146,6 +1147,7 @@ func NewGenericScheduler(
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
pdbLister algorithm.PDBLister,
alwaysCheckAllPredicates bool,
disablePreemption bool,
percentageOfNodesToScore int32,
@ -1162,6 +1164,7 @@ func NewGenericScheduler(
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore,

View File

@ -425,6 +425,7 @@ func TestGenericScheduler(t *testing.T) {
[]algorithm.SchedulerExtender{},
nil,
pvcLister,
schedulertesting.FakePDBLister{},
test.alwaysCheckAllPredicates,
false,
schedulerapi.DefaultPercentageOfNodesToScore)
@ -457,7 +458,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, nil, false, false,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
return s.(*genericScheduler)
@ -1381,6 +1382,7 @@ func TestPreempt(t *testing.T) {
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
schedulertesting.FakePDBLister{},
false,
false,
schedulerapi.DefaultPercentageOfNodesToScore)
@ -1495,6 +1497,7 @@ func TestCacheInvalidationRace(t *testing.T) {
// Set up the scheduler.
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
pdbLister := schedulertesting.FakePDBLister{}
scheduler := NewGenericScheduler(
mockCache,
eCache,
@ -1503,7 +1506,8 @@ func TestCacheInvalidationRace(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false,
nil, nil, pvcLister, pdbLister,
true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
// First scheduling attempt should fail.
@ -1576,6 +1580,7 @@ func TestCacheInvalidationRace2(t *testing.T) {
// Set up the scheduler.
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
pdbLister := schedulertesting.FakePDBLister{}
scheduler := NewGenericScheduler(
cache,
eCache,
@ -1584,7 +1589,7 @@ func TestCacheInvalidationRace2(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false,
nil, nil, pvcLister, pdbLister, true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
// First scheduling attempt should fail.

View File

@ -33,7 +33,6 @@ go_library(
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -78,7 +77,6 @@ go_test(
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -22,10 +22,8 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
v1beta1 "k8s.io/client-go/listers/policy/v1beta1"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core"
)
@ -33,7 +31,6 @@ import (
type cacheComparer struct {
nodeLister corelisters.NodeLister
podLister corelisters.PodLister
pdbLister v1beta1.PodDisruptionBudgetLister
cache schedulercache.Cache
podQueue core.SchedulingQueue
@ -54,11 +51,6 @@ func (c *cacheComparer) Compare() error {
return err
}
pdbs, err := c.pdbLister.List(labels.Everything())
if err != nil {
return err
}
snapshot := c.cache.Snapshot()
waitingPods := c.podQueue.WaitingPods()
@ -71,10 +63,6 @@ func (c *cacheComparer) Compare() error {
glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant)
}
if missed, redundant := c.ComparePdbs(pdbs, snapshot.Pdbs); len(missed)+len(redundant) != 0 {
glog.Warningf("cache mismatch: missed pdbs: %s; redundant pdbs: %s", missed, redundant)
}
return nil
}
@ -114,20 +102,6 @@ func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[
return compareStrings(actual, cached)
}
func (c compareStrategy) ComparePdbs(pdbs []*policy.PodDisruptionBudget, pdbCache map[string]*policy.PodDisruptionBudget) (missed, redundant []string) {
actual := []string{}
for _, pdb := range pdbs {
actual = append(actual, string(pdb.UID))
}
cached := []string{}
for pdbUID := range pdbCache {
cached = append(cached, pdbUID)
}
return compareStrings(actual, cached)
}
func compareStrings(actual, cached []string) (missed, redundant []string) {
missed, redundant = []string{}, []string{}

View File

@ -21,7 +21,6 @@ import (
"testing"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/types"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
@ -191,68 +190,3 @@ func testComparePods(actual, cached, queued, missing, redundant []string, t *tes
t.Errorf("redundant expected to be %s; got %s", redundant, r)
}
}
func TestComparePdbs(t *testing.T) {
tests := []struct {
name string
actual []string
cached []string
missing []string
redundant []string
}{
{
name: "redundant cache value",
actual: []string{"foo", "bar"},
cached: []string{"bar", "foo", "foobar"},
missing: []string{},
redundant: []string{"foobar"},
},
{
name: "missing cache value",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foo"},
missing: []string{"foobar"},
redundant: []string{},
},
{
name: "correct cache",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foobar", "foo"},
missing: []string{},
redundant: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testComparePdbs(test.actual, test.cached, test.missing, test.redundant, t)
})
}
}
func testComparePdbs(actual, cached, missing, redundant []string, t *testing.T) {
compare := compareStrategy{}
pdbs := []*policy.PodDisruptionBudget{}
for _, uid := range actual {
pdb := &policy.PodDisruptionBudget{}
pdb.UID = types.UID(uid)
pdbs = append(pdbs, pdb)
}
cache := make(map[string]*policy.PodDisruptionBudget)
for _, uid := range cached {
pdb := &policy.PodDisruptionBudget{}
pdb.UID = types.UID(uid)
cache[uid] = pdb
}
m, r := compare.ComparePdbs(pdbs, cache)
if !reflect.DeepEqual(m, missing) {
t.Errorf("missing expected to be %s; got %s", missing, m)
}
if !reflect.DeepEqual(r, redundant) {
t.Errorf("redundant expected to be %s; got %s", redundant, r)
}
}

View File

@ -28,7 +28,6 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -258,15 +257,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
)
c.nodeLister = args.NodeInformer.Lister()
args.PdbInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addPDBToCache,
UpdateFunc: c.updatePDBInCache,
DeleteFunc: c.deletePDBFromCache,
},
)
c.pdbLister = args.PdbInformer.Lister()
// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
args.PvInformer.Informer().AddEventHandler(
@ -320,7 +310,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
comparer := &cacheComparer{
podLister: args.PodInformer.Lister(),
nodeLister: args.NodeInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
cache: c.schedulerCache,
podQueue: c.podQueue,
}
@ -1003,56 +992,6 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
}
}
func (c *configFactory) addPDBToCache(obj interface{}) {
pdb, ok := obj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", obj)
return
}
if err := c.schedulerCache.AddPDB(pdb); err != nil {
glog.Errorf("scheduler cache AddPDB failed: %v", err)
}
}
func (c *configFactory) updatePDBInCache(oldObj, newObj interface{}) {
oldPDB, ok := oldObj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert oldObj to *v1beta1.PodDisruptionBudget: %v", oldObj)
return
}
newPDB, ok := newObj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert newObj to *v1beta1.PodDisruptionBudget: %v", newObj)
return
}
if err := c.schedulerCache.UpdatePDB(oldPDB, newPDB); err != nil {
glog.Errorf("scheduler cache UpdatePDB failed: %v", err)
}
}
func (c *configFactory) deletePDBFromCache(obj interface{}) {
var pdb *v1beta1.PodDisruptionBudget
switch t := obj.(type) {
case *v1beta1.PodDisruptionBudget:
pdb = t
case cache.DeletedFinalStateUnknown:
var ok bool
pdb, ok = t.Obj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t.Obj)
return
}
default:
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t)
return
}
if err := c.schedulerCache.RemovePDB(pdb); err != nil {
glog.Errorf("scheduler cache RemovePDB failed: %v", err)
}
}
// Create creates a scheduler with the default algorithm provider.
func (c *configFactory) Create() (*scheduler.Config, error) {
return c.CreateFromProvider(DefaultProvider)
@ -1203,6 +1142,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
@ -1281,6 +1221,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
ReplicaSetLister: c.replicaSetLister,
StatefulSetLister: c.statefulSetLister,
NodeLister: &nodeLister{c.nodeLister},
PDBLister: c.pdbLister,
NodeInfo: &predicates.CachedNodeInfo{NodeLister: c.nodeLister},
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},

View File

@ -41,6 +41,7 @@ type PluginFactoryArgs struct {
ReplicaSetLister algorithm.ReplicaSetLister
StatefulSetLister algorithm.StatefulSetLister
NodeLister algorithm.NodeLister
PDBLister algorithm.PDBLister
NodeInfo predicates.NodeInfo
PVInfo predicates.PersistentVolumeInfo
PVCInfo predicates.PersistentVolumeClaimInfo

View File

@ -561,6 +561,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
schedulertesting.FakePDBLister{},
false,
false,
api.DefaultPercentageOfNodesToScore)
@ -611,6 +612,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
schedulertesting.FakePDBLister{},
false,
false,
api.DefaultPercentageOfNodesToScore)

View File

@ -18,7 +18,6 @@ package testing
import (
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/labels"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
@ -79,20 +78,6 @@ func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.N
return nil
}
// AddPDB is a fake method for testing.
func (f *FakeCache) AddPDB(pdb *policy.PodDisruptionBudget) error { return nil }
// UpdatePDB is a fake method for testing.
func (f *FakeCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error { return nil }
// RemovePDB is a fake method for testing.
func (f *FakeCache) RemovePDB(pdb *policy.PodDisruptionBudget) error { return nil }
// ListPDBs is a fake method for testing.
func (f *FakeCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) {
return nil, nil
}
// List is a fake method for testing.
func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }

View File

@ -21,6 +21,7 @@ import (
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
@ -214,3 +215,11 @@ func (f *fakePersistentVolumeClaimNamespaceLister) Get(name string) (*v1.Persist
func (f fakePersistentVolumeClaimNamespaceLister) List(selector labels.Selector) (ret []*v1.PersistentVolumeClaim, err error) {
return nil, fmt.Errorf("not implemented")
}
// FakePDBLister implements PDBLister on a slice of PodDisruptionBudgets for test purposes.
type FakePDBLister []*policy.PodDisruptionBudget
// List returns a list of PodDisruptionBudgets.
func (f FakePDBLister) List(labels.Selector) ([]*policy.PodDisruptionBudget, error) {
return f, nil
}

View File

@ -50,10 +50,8 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",

View File

@ -842,8 +842,8 @@ func TestPDBInPreemption(t *testing.T) {
t.Fatalf("Failed to create PDB: %v", err)
}
}
// Wait for PDBs to show up in the scheduler's cache and become stable.
if err := waitCachedPDBsStable(context, test.pdbs, test.pdbPodNum); err != nil {
// Wait for PDBs to become stable.
if err := waitForPDBsStable(context, test.pdbs, test.pdbPodNum); err != nil {
t.Fatalf("Not all pdbs are stable in the cache: %v", err)
}

View File

@ -20,19 +20,13 @@ package scheduler
import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
@ -671,95 +665,6 @@ func TestAllocatable(t *testing.T) {
}
}
// TestPDBCache verifies that scheduler cache works as expected when handling
// PodDisruptionBudget.
func TestPDBCache(t *testing.T) {
context := initTest(t, "pdbcache")
defer cleanupTest(t, context)
intstrMin := intstr.FromInt(4)
pdb := &policy.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: context.ns.Name,
Name: "test-pdb",
UID: types.UID("test-pdb-uid"),
Labels: map[string]string{"tkey1": "tval1", "tkey2": "tval2"},
},
Spec: policy.PodDisruptionBudgetSpec{
MinAvailable: &intstrMin,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"tkey": "tvalue"}},
},
}
createdPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb)
if err != nil {
t.Errorf("Failed to create PDB: %v", err)
}
// Wait for PDB to show up in the scheduler's cache.
if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if err != nil {
t.Errorf("Error while polling for PDB: %v", err)
return false, err
}
return len(cachedPDBs) > 0, err
}); err != nil {
t.Fatalf("No PDB was added to the cache: %v", err)
}
// Read PDB from the cache and compare it.
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if len(cachedPDBs) != 1 {
t.Fatalf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs))
}
if !reflect.DeepEqual(createdPDB, cachedPDBs[0]) {
t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(createdPDB, cachedPDBs[0]))
}
// Update PDB and change its labels.
pdbCopy := *cachedPDBs[0]
pdbCopy.Labels = map[string]string{}
updatedPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Update(&pdbCopy)
if err != nil {
t.Errorf("Failed to update PDB: %v", err)
}
// Wait for PDB to be updated in the scheduler's cache.
if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if err != nil {
t.Errorf("Error while polling for PDB: %v", err)
return false, err
}
return len(cachedPDBs[0].Labels) == 0, err
}); err != nil {
t.Fatalf("No PDB was updated in the cache: %v", err)
}
// Read PDB from the cache and compare it.
cachedPDBs, err = context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if len(cachedPDBs) != 1 {
t.Errorf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs))
}
if !reflect.DeepEqual(updatedPDB, cachedPDBs[0]) {
t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(updatedPDB, cachedPDBs[0]))
}
// Delete PDB.
err = context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Delete(pdb.Name, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("Failed to delete PDB: %v", err)
}
// Wait for PDB to be deleted from the scheduler's cache.
if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if err != nil {
t.Errorf("Error while polling for PDB: %v", err)
return false, err
}
return len(cachedPDBs) == 0, err
}); err != nil {
t.Errorf("No PDB was deleted from the cache: %v", err)
}
}
// TestSchedulerInformers tests that scheduler receives informer events and updates its cache when
// pods are scheduled by other schedulers.
func TestSchedulerInformers(t *testing.T) {

View File

@ -631,20 +631,20 @@ func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
}
// waitCachedPDBsStable waits for PDBs in scheduler cache to have "CurrentHealthy" status equal to
// waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
// the expected values.
func waitCachedPDBsStable(context *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
func waitForPDBsStable(context *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
pdbList, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).List(metav1.ListOptions{})
if err != nil {
return false, err
}
if len(cachedPDBs) != len(pdbs) {
if len(pdbList.Items) != len(pdbs) {
return false, nil
}
for i, pdb := range pdbs {
found := false
for _, cpdb := range cachedPDBs {
for _, cpdb := range pdbList.Items {
if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
found = true
if cpdb.Status.CurrentHealthy != pdbPodNum[i] {