Merge pull request #39454 from foxish/fix-stateful-set-detection

Automatic merge from submit-queue (batch tested with PRs 39435, 39454)

Fix PDB by percentages for StatefulSet pods

Previously, PDBs defined in terms of percentages would error out with StatefulSet as they did not know how to find the scale associated.
This change teaches the disruption controller to also look at StatefulSets and their scale.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes https://github.com/kubernetes/kubernetes/issues/39125

**Release note**:
```release-note
Fix issue with PodDisruptionBudgets in which `minAvailable` specified as a percentage did not work with StatefulSet Pods.
```

cc @a-robinson @kow3ns @kubernetes/sig-apps-misc
pull/6/head
Kubernetes Submit Queue 2017-01-05 02:50:20 -08:00 committed by GitHub
commit f8b708385e
3 changed files with 110 additions and 3 deletions

View File

@ -15,6 +15,7 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library",
@ -44,6 +45,7 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library",

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
@ -51,7 +52,7 @@ const statusUpdateRetries = 2
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the cotroller is running on a different node it is important that the two nodes have synced
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
const DeletionTimeout = 2 * 60 * time.Second
@ -80,6 +81,10 @@ type DisruptionController struct {
dController *cache.Controller
dLister cache.StoreToDeploymentLister
ssStore cache.Store
ssController *cache.Controller
ssLister cache.StoreToStatefulSetLister
// PodDisruptionBudget keys that need to be synced.
queue workqueue.RateLimitingInterface
recheckQueue workqueue.DelayingInterface
@ -187,9 +192,23 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient c
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.dLister.Indexer = dc.dIndexer
dc.ssStore, dc.ssController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).Watch(options)
},
},
&apps.StatefulSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
)
dc.ssLister.Store = dc.ssStore
return dc
}
@ -201,7 +220,8 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient c
// and we may well need further tweaks just to be able to access scale
// subresources.
func (dc *DisruptionController) finders() []podControllerFinder {
return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets}
return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets,
dc.getPodStatefulSets}
}
// getPodReplicaSets finds replicasets which have no matching deployments.
@ -231,6 +251,29 @@ func (dc *DisruptionController) getPodReplicaSets(pod *v1.Pod) ([]controllerAndS
return cas, nil
}
// getPodStatefulSet returns the statefulset managing the given pod.
func (dc *DisruptionController) getPodStatefulSets(pod *v1.Pod) ([]controllerAndScale, error) {
cas := []controllerAndScale{}
ss, err := dc.ssLister.GetPodStatefulSets(pod)
// GetPodStatefulSets returns an error only if no StatefulSets are found. We
// don't return that as an error to the caller.
if err != nil {
return cas, nil
}
controllerScale := map[types.UID]int32{}
for _, s := range ss {
controllerScale[s.UID] = *(s.Spec.Replicas)
}
for uid, scale := range controllerScale {
cas = append(cas, controllerAndScale{UID: uid, scale: scale})
}
return cas, nil
}
// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
func (dc *DisruptionController) getPodDeployments(pod *v1.Pod) ([]controllerAndScale, error) {
cas := []controllerAndScale{}
@ -284,6 +327,7 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
go dc.rcController.Run(stopCh)
go dc.rsController.Run(stopCh)
go dc.dController.Run(stopCh)
go dc.ssController.Run(stopCh)
go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh)

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
@ -92,6 +93,7 @@ func newFakeDisruptionController() (*DisruptionController, *pdbStates) {
rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
rsLister: cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
ssLister: cache.StoreToStatefulSetLister{Store: cache.NewStore(controller.KeyFunc)},
getUpdater: func() updater { return ps.Set },
broadcaster: record.NewBroadcaster(),
}
@ -236,6 +238,30 @@ func newReplicaSet(t *testing.T, size int32) (*extensions.ReplicaSet, string) {
return rs, rsName
}
func newStatefulSet(t *testing.T, size int32) (*apps.StatefulSet, string) {
ss := &apps.StatefulSet{
TypeMeta: metav1.TypeMeta{APIVersion: registered.GroupOrDie(v1.GroupName).GroupVersion.String()},
ObjectMeta: v1.ObjectMeta{
UID: uuid.NewUUID(),
Name: "foobar",
Namespace: v1.NamespaceDefault,
ResourceVersion: "18",
Labels: fooBar(),
},
Spec: apps.StatefulSetSpec{
Replicas: &size,
Selector: newSelFooBar(),
},
}
ssName, err := controller.KeyFunc(ss)
if err != nil {
t.Fatalf("Unexpected error naming StatefulSet %q: %v", ss.Name, err)
}
return ss, ssName
}
func update(t *testing.T, store cache.Store, obj interface{}) {
if err := store.Update(obj); err != nil {
t.Fatalf("Could not add %+v to %+v: %v", obj, store, err)
@ -409,6 +435,41 @@ func TestReplicationController(t *testing.T) {
ps.VerifyDisruptionAllowed(t, pdbName, 0)
}
func TestStatefulSetController(t *testing.T) {
labels := map[string]string{
"foo": "bar",
"baz": "quux",
}
dc, ps := newFakeDisruptionController()
// 34% should round up to 2
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%"))
add(t, dc.pdbLister.Store, pdb)
ss, _ := newStatefulSet(t, 3)
add(t, dc.ssLister.Store, ss)
dc.sync(pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know
// about the SS. This is a known bug. TODO(mml): file issue
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
pods := []*v1.Pod{}
for i := int32(0); i < 3; i++ {
pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
pods = append(pods, pod)
pod.Labels = labels
add(t, dc.podLister.Indexer, pod)
dc.sync(pdbName)
if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
} else {
ps.VerifyPdbStatus(t, pdbName, 1, 3, 2, 3, map[string]metav1.Time{})
}
}
}
func TestTwoControllers(t *testing.T) {
// Most of this test is in verifying intermediate cases as we define the
// three controllers and create the pods.