Add DisruptedPod map to PodDisruptionBudgetStatus

pull/6/head
Marcin 2016-11-04 00:40:59 +01:00
parent b7512d9c8b
commit 47a1458ff3
5 changed files with 211 additions and 36 deletions

View File

@ -51,6 +51,19 @@ type PodDisruptionBudgetStatus struct {
// total number of pods counted by this disruption budget // total number of pods counted by this disruption budget
ExpectedPods int32 `json:"expectedPods"` ExpectedPods int32 `json:"expectedPods"`
// DisruptedPods contains information about pods whose eviction was
// processed by the API server eviction subresource handler but has not
// yet been observed by the PodDisruptionBudget controller.
// A pod will be in this map from the time when the API server processed the
// eviction request to the time when the pod is seen by PDB controller
// as having been marked for deletion (or after a timeout). The key in the map is the name of the pod
// and the value is the time when the API server processed the eviction request. If
// the deletion didn't occur and a pod is still there it will be removed from
// the list automatically by PodDisruptionBudget controller after some time.
// If everything goes smooth this map should be empty for the most of the time.
// Large number of entries in the map may indicate problems with pod deletions.
DisruptedPods map[string]unversioned.Time `json:"disruptedPods" protobuf:"bytes,5,rep,name=disruptedPods"`
} }
// +genclient=true // +genclient=true

View File

@ -49,6 +49,19 @@ type PodDisruptionBudgetStatus struct {
// total number of pods counted by this disruption budget // total number of pods counted by this disruption budget
ExpectedPods int32 `json:"expectedPods" protobuf:"varint,4,opt,name=expectedPods"` ExpectedPods int32 `json:"expectedPods" protobuf:"varint,4,opt,name=expectedPods"`
// DisruptedPods contains information about pods whose eviction was
// processed by the API server eviction subresource handler but has not
// yet been observed by the PodDisruptionBudget controller.
// A pod will be in this map from the time when the API server processed the
// eviction request to the time when the pod is seen by PDB controller
// as having been marked for deletion (or after a timeout). The key in the map is the name of the pod
// and the value is the time when the API server processed the eviction request. If
// the deletion didn't occur and a pod is still there it will be removed from
// the list automatically by PodDisruptionBudget controller after some time.
// If everything goes smooth this map should be empty for the most of the time.
// Large number of entries in the map may indicate problems with pod deletions.
DisruptedPods map[string]unversioned.Time `json:"disruptedPods" protobuf:"bytes,5,rep,name=disruptedPods"`
} }
// +genclient=true // +genclient=true

View File

@ -18,6 +18,7 @@ package disruption
import ( import (
"fmt" "fmt"
"reflect"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -43,6 +44,17 @@ import (
const statusUpdateRetries = 2 const statusUpdateRetries = 2
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the cotroller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
const DeletionTimeout = 2 * 60 * time.Second
type updater func(*policy.PodDisruptionBudget) error type updater func(*policy.PodDisruptionBudget) error
type DisruptionController struct { type DisruptionController struct {
@ -68,7 +80,8 @@ type DisruptionController struct {
dLister cache.StoreToDeploymentLister dLister cache.StoreToDeploymentLister
// PodDisruptionBudget keys that need to be synced. // PodDisruptionBudget keys that need to be synced.
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
recheckQueue workqueue.DelayingInterface
broadcaster record.EventBroadcaster broadcaster record.EventBroadcaster
recorder record.EventRecorder recorder record.EventRecorder
@ -92,6 +105,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i
kubeClient: kubeClient, kubeClient: kubeClient,
podController: podInformer.GetController(), podController: podInformer.GetController(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"),
broadcaster: record.NewBroadcaster(), broadcaster: record.NewBroadcaster(),
} }
dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
@ -270,6 +284,8 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
go dc.rsController.Run(stopCh) go dc.rsController.Run(stopCh)
go dc.dController.Run(stopCh) go dc.dController.Run(stopCh)
go wait.Until(dc.worker, time.Second, stopCh) go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh)
<-stopCh <-stopCh
glog.V(0).Infof("Shutting down disruption controller") glog.V(0).Infof("Shutting down disruption controller")
} }
@ -355,6 +371,15 @@ func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
dc.queue.Add(key) dc.queue.Add(key)
} }
func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) {
key, err := controller.KeyFunc(pdb)
if err != nil {
glog.Errorf("Cound't get key for PodDisruptionBudget object %+v: %v", pdb, err)
return
}
dc.recheckQueue.AddAfter(key, delay)
}
func (dc *DisruptionController) getPdbForPod(pod *api.Pod) *policy.PodDisruptionBudget { func (dc *DisruptionController) getPdbForPod(pod *api.Pod) *policy.PodDisruptionBudget {
// GetPodPodDisruptionBudgets returns an error only if no // GetPodPodDisruptionBudgets returns an error only if no
// PodDisruptionBudgets are found. We don't return that as an error to the // PodDisruptionBudgets are found. We don't return that as an error to the
@ -417,6 +442,21 @@ func (dc *DisruptionController) processNextWorkItem() bool {
return true return true
} }
func (dc *DisruptionController) recheckWorker() {
for dc.processNextRecheckWorkItem() {
}
}
func (dc *DisruptionController) processNextRecheckWorkItem() bool {
dKey, quit := dc.recheckQueue.Get()
if quit {
return false
}
defer dc.recheckQueue.Done(dKey)
dc.queue.AddRateLimited(dKey)
return true
}
func (dc *DisruptionController) sync(key string) error { func (dc *DisruptionController) sync(key string) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
@ -452,9 +492,17 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
return err return err
} }
currentHealthy := countHealthyPods(pods) currentTime := time.Now()
err = dc.updatePdbSpec(pdb, currentHealthy, desiredHealthy, expectedCount) disruptedPods, recheckTime := buildDisruptedPodMap(pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
if err == nil && recheckTime != nil {
// There is always at most one PDB waiting with a particular name in the queue,
// and each PDB in the queue is associated with the lowest timestamp
// that was supplied when a PDB with that name was added.
dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime))
}
return err return err
} }
@ -527,20 +575,60 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud
return return
} }
func countHealthyPods(pods []*api.Pod) (currentHealthy int32) { func countHealthyPods(pods []*api.Pod, disruptedPods map[string]unversioned.Time, currentTime time.Time) (currentHealthy int32) {
Pod: Pod:
for _, pod := range pods { for _, pod := range pods {
for _, c := range pod.Status.Conditions { // Pod is beeing deleted.
if c.Type == api.PodReady && c.Status == api.ConditionTrue { if pod.DeletionTimestamp != nil {
currentHealthy++ continue
continue Pod }
} // Pod is expected to be deleted soon.
if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
continue
}
if api.IsPodReady(pod) {
currentHealthy++
continue Pod
} }
} }
return return
} }
// Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
// or not-deleted at all items. Also returns an information when this check should be repeated.
func buildDisruptedPodMap(pods []*api.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]unversioned.Time, *time.Time) {
disruptedPods := pdb.Status.DisruptedPods
result := make(map[string]unversioned.Time)
var recheckTime *time.Time
if disruptedPods == nil || len(disruptedPods) == 0 {
return result, recheckTime
}
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
// Already being deleted.
continue
}
disruptionTime, found := disruptedPods[pod.Name]
if !found {
// Pod not on the list.
continue
}
expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
if expectedDeletion.Before(currentTime) {
glog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s",
pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name)
} else {
if recheckTime == nil || expectedDeletion.Before(*recheckTime) {
recheckTime = &expectedDeletion
}
result[pod.Name] = disruptionTime
}
}
return result, recheckTime
}
// failSafe is an attempt to at least update the PodDisruptionsAllowed field to // failSafe is an attempt to at least update the PodDisruptionsAllowed field to
// 0 if everything else has failed. This is one place we // 0 if everything else has failed. This is one place we
// implement the "fail open" part of the design since if we manage to update // implement the "fail open" part of the design since if we manage to update
@ -557,7 +645,9 @@ func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error
return dc.getUpdater()(&newPdb) return dc.getUpdater()(&newPdb)
} }
func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32) error { func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
disruptedPods map[string]unversioned.Time) error {
// We require expectedCount to be > 0 so that PDBs which currently match no // We require expectedCount to be > 0 so that PDBs which currently match no
// pods are in a safe state when their first pods appear but this controller // pods are in a safe state when their first pods appear but this controller
// has not updated their status yet. This isn't the only race, but it's a // has not updated their status yet. This isn't the only race, but it's a
@ -567,7 +657,11 @@ func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, c
disruptionsAllowed = 0 disruptionsAllowed = 0
} }
if pdb.Status.CurrentHealthy == currentHealthy && pdb.Status.DesiredHealthy == desiredHealthy && pdb.Status.ExpectedPods == expectedCount && pdb.Status.PodDisruptionsAllowed == disruptionsAllowed { if pdb.Status.CurrentHealthy == currentHealthy &&
pdb.Status.DesiredHealthy == desiredHealthy &&
pdb.Status.ExpectedPods == expectedCount &&
pdb.Status.PodDisruptionsAllowed == disruptionsAllowed &&
reflect.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) {
return nil return nil
} }
@ -582,6 +676,7 @@ func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, c
DesiredHealthy: desiredHealthy, DesiredHealthy: desiredHealthy,
ExpectedPods: expectedCount, ExpectedPods: expectedCount,
PodDisruptionsAllowed: disruptionsAllowed, PodDisruptionsAllowed: disruptionsAllowed,
DisruptedPods: disruptedPods,
} }
return dc.getUpdater()(&newPdb) return dc.getUpdater()(&newPdb)

View File

@ -21,6 +21,7 @@ import (
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
@ -32,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/workqueue"
) )
type pdbStates map[string]policy.PodDisruptionBudget type pdbStates map[string]policy.PodDisruptionBudget
@ -54,12 +56,14 @@ func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget {
return (*ps)[key] return (*ps)[key]
} }
func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32) { func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32,
disruptedPodMap map[string]unversioned.Time) {
expectedStatus := policy.PodDisruptionBudgetStatus{ expectedStatus := policy.PodDisruptionBudgetStatus{
PodDisruptionsAllowed: disruptionsAllowed, PodDisruptionsAllowed: disruptionsAllowed,
CurrentHealthy: currentHealthy, CurrentHealthy: currentHealthy,
DesiredHealthy: desiredHealthy, DesiredHealthy: desiredHealthy,
ExpectedPods: expectedPods, ExpectedPods: expectedPods,
DisruptedPods: disruptedPodMap,
} }
actualStatus := ps.Get(key).Status actualStatus := ps.Get(key).Status
if !reflect.DeepEqual(actualStatus, expectedStatus) { if !reflect.DeepEqual(actualStatus, expectedStatus) {
@ -251,11 +255,11 @@ func TestNoSelector(t *testing.T) {
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbLister.Store, pdb)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]unversioned.Time{})
add(t, dc.podLister.Indexer, pod) add(t, dc.podLister.Indexer, pod)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]unversioned.Time{})
} }
// Verify that available/expected counts go up as we add pods, then verify that // Verify that available/expected counts go up as we add pods, then verify that
@ -270,13 +274,13 @@ func TestUnavailable(t *testing.T) {
// Add three pods, verifying that the counts go up at each step. // Add three pods, verifying that the counts go up at each step.
pods := []*api.Pod{} pods := []*api.Pod{}
for i := int32(0); i < 4; i++ { for i := int32(0); i < 4; i++ {
ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i) ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i, map[string]unversioned.Time{})
pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i)) pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i))
pods = append(pods, pod) pods = append(pods, pod)
add(t, dc.podLister.Indexer, pod) add(t, dc.podLister.Indexer, pod)
dc.sync(pdbName) dc.sync(pdbName)
} }
ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4) ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]unversioned.Time{})
// Now set one pod as unavailable // Now set one pod as unavailable
pods[0].Status.Conditions = []api.PodCondition{} pods[0].Status.Conditions = []api.PodCondition{}
@ -284,7 +288,7 @@ func TestUnavailable(t *testing.T) {
dc.sync(pdbName) dc.sync(pdbName)
// Verify expected update // Verify expected update
ps.VerifyPdbStatus(t, pdbName, 0, 3, 3, 4) ps.VerifyPdbStatus(t, pdbName, 0, 3, 3, 4, map[string]unversioned.Time{})
} }
// Create a pod with no controller, and verify that a PDB with a percentage // Create a pod with no controller, and verify that a PDB with a percentage
@ -318,7 +322,7 @@ func TestReplicaSet(t *testing.T) {
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
add(t, dc.podLister.Indexer, pod) add(t, dc.podLister.Indexer, pod)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10) ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]unversioned.Time{})
} }
// Verify that multiple controllers doesn't allow the PDB to be set true. // Verify that multiple controllers doesn't allow the PDB to be set true.
@ -376,9 +380,10 @@ func TestReplicationController(t *testing.T) {
rc.Spec.Selector = labels rc.Spec.Selector = labels
add(t, dc.rcLister.Indexer, rc) add(t, dc.rcLister.Indexer, rc)
dc.sync(pdbName) dc.sync(pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know // It starts out at 0 expected because, with no pods, the PDB doesn't know
// about the RC. This is a known bug. TODO(mml): file issue // about the RC. This is a known bug. TODO(mml): file issue
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]unversioned.Time{})
pods := []*api.Pod{} pods := []*api.Pod{}
@ -389,9 +394,9 @@ func TestReplicationController(t *testing.T) {
add(t, dc.podLister.Indexer, pod) add(t, dc.podLister.Indexer, pod)
dc.sync(pdbName) dc.sync(pdbName)
if i < 2 { if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3) ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]unversioned.Time{})
} else { } else {
ps.VerifyPdbStatus(t, pdbName, 1, 3, 2, 3) ps.VerifyPdbStatus(t, pdbName, 1, 3, 2, 3, map[string]unversioned.Time{})
} }
} }
@ -430,7 +435,7 @@ func TestTwoControllers(t *testing.T) {
add(t, dc.rcLister.Indexer, rc) add(t, dc.rcLister.Indexer, rc)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]unversioned.Time{})
pods := []*api.Pod{} pods := []*api.Pod{}
@ -445,11 +450,11 @@ func TestTwoControllers(t *testing.T) {
add(t, dc.podLister.Indexer, pod) add(t, dc.podLister.Indexer, pod)
dc.sync(pdbName) dc.sync(pdbName)
if i <= unavailablePods { if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize) ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]unversioned.Time{})
} else if i-unavailablePods <= minimumOne { } else if i-unavailablePods <= minimumOne {
ps.VerifyPdbStatus(t, pdbName, 0, i-unavailablePods, minimumOne, collectionSize) ps.VerifyPdbStatus(t, pdbName, 0, i-unavailablePods, minimumOne, collectionSize, map[string]unversioned.Time{})
} else { } else {
ps.VerifyPdbStatus(t, pdbName, 1, i-unavailablePods, minimumOne, collectionSize) ps.VerifyPdbStatus(t, pdbName, 1, i-unavailablePods, minimumOne, collectionSize, map[string]unversioned.Time{})
} }
} }
@ -457,14 +462,14 @@ func TestTwoControllers(t *testing.T) {
d.Spec.Selector = newSel(dLabels) d.Spec.Selector = newSel(dLabels)
add(t, dc.dLister.Indexer, d) add(t, dc.dLister.Indexer, d)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]unversioned.Time{})
rs, _ := newReplicaSet(t, collectionSize) rs, _ := newReplicaSet(t, collectionSize)
rs.Spec.Selector = newSel(dLabels) rs.Spec.Selector = newSel(dLabels)
rs.Labels = dLabels rs.Labels = dLabels
add(t, dc.rsLister.Indexer, rs) add(t, dc.rsLister.Indexer, rs)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]unversioned.Time{})
// By the end of this loop, the number of ready pods should be N+2 (hence minimumTwo+2). // By the end of this loop, the number of ready pods should be N+2 (hence minimumTwo+2).
unavailablePods = 2*collectionSize - (minimumTwo + 2) - unavailablePods unavailablePods = 2*collectionSize - (minimumTwo + 2) - unavailablePods
@ -478,33 +483,33 @@ func TestTwoControllers(t *testing.T) {
add(t, dc.podLister.Indexer, pod) add(t, dc.podLister.Indexer, pod)
dc.sync(pdbName) dc.sync(pdbName)
if i <= unavailablePods { if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize) ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]unversioned.Time{})
} else if i-unavailablePods <= minimumTwo-(minimumOne+1) { } else if i-unavailablePods <= minimumTwo-(minimumOne+1) {
ps.VerifyPdbStatus(t, pdbName, 0, (minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize) ps.VerifyPdbStatus(t, pdbName, 0, (minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize, map[string]unversioned.Time{})
} else { } else {
ps.VerifyPdbStatus(t, pdbName, i-unavailablePods-(minimumTwo-(minimumOne+1)), ps.VerifyPdbStatus(t, pdbName, i-unavailablePods-(minimumTwo-(minimumOne+1)),
(minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize) (minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize, map[string]unversioned.Time{})
} }
} }
// Now we verify we can bring down 1 pod and a disruption is still permitted, // Now we verify we can bring down 1 pod and a disruption is still permitted,
// but if we bring down two, it's not. Then we make the pod ready again and // but if we bring down two, it's not. Then we make the pod ready again and
// verify that a disruption is permitted again. // verify that a disruption is permitted again.
ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize) ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{})
pods[collectionSize-1].Status.Conditions = []api.PodCondition{} pods[collectionSize-1].Status.Conditions = []api.PodCondition{}
update(t, dc.podLister.Indexer, pods[collectionSize-1]) update(t, dc.podLister.Indexer, pods[collectionSize-1])
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{})
pods[collectionSize-2].Status.Conditions = []api.PodCondition{} pods[collectionSize-2].Status.Conditions = []api.PodCondition{}
update(t, dc.podLister.Indexer, pods[collectionSize-2]) update(t, dc.podLister.Indexer, pods[collectionSize-2])
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize) ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{})
pods[collectionSize-1].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}} pods[collectionSize-1].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}}
update(t, dc.podLister.Indexer, pods[collectionSize-1]) update(t, dc.podLister.Indexer, pods[collectionSize-1])
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{})
} }
// Test pdb doesn't exist // Test pdb doesn't exist
@ -516,3 +521,30 @@ func TestPDBNotExist(t *testing.T) {
t.Errorf("Unexpected error: %v, expect nil", err) t.Errorf("Unexpected error: %v, expect nil", err)
} }
} }
func TestUpdateDisruptedPods(t *testing.T) {
dc, ps := newFakeDisruptionController()
dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb-queue")
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(1))
currentTime := time.Now()
pdb.Status.DisruptedPods = map[string]unversioned.Time{
"p1": {Time: currentTime}, // Should be removed, pod deletion started.
"p2": {Time: currentTime.Add(-5 * time.Minute)}, // Should be removed, expired.
"p3": {Time: currentTime}, // Should remain, pod untouched.
"notthere": {Time: currentTime}, // Should be removed, pod deleted.
}
add(t, dc.pdbLister.Store, pdb)
pod1, _ := newPod(t, "p1")
pod1.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
pod2, _ := newPod(t, "p2")
pod3, _ := newPod(t, "p3")
add(t, dc.podLister.Indexer, pod1)
add(t, dc.podLister.Indexer, pod2)
add(t, dc.podLister.Indexer, pod3)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]unversioned.Time{"p3": {Time: currentTime}})
}

View File

@ -18,6 +18,7 @@ package etcd
import ( import (
"fmt" "fmt"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/rest"
@ -29,6 +30,16 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
) )
const (
// MaxDisruptedPodSize is the max size of PodDisruptionBudgetStatus.DisruptedPods. API server eviction
// subresource handler will refuse to evict pods covered by the corresponding PDB
// if the size of the map exceeds this value. It means a large number of
// evictions have been approved by the API server but not noticed by the PDB controller yet.
// This situation should self-correct because the PDB controller removes
// entries from the map automatically after the PDB DeletionTimeout regardless.
MaxDisruptedPodSize = 2000
)
func newEvictionStorage(store *registry.Store, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) *EvictionREST { func newEvictionStorage(store *registry.Store, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) *EvictionREST {
return &EvictionREST{store: store, podDisruptionBudgetClient: podDisruptionBudgetClient} return &EvictionREST{store: store, podDisruptionBudgetClient: podDisruptionBudgetClient}
} }
@ -72,7 +83,7 @@ func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Obje
// If it was false already, or if it becomes false during the course of our retries, // If it was false already, or if it becomes false during the course of our retries,
// raise an error marked as a 429. // raise an error marked as a 429.
ok, err := r.checkAndDecrement(pod.Namespace, pdb) ok, err := r.checkAndDecrement(pod.Namespace, pod.Name, pdb)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -104,14 +115,25 @@ func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Obje
return &unversioned.Status{Status: unversioned.StatusSuccess}, nil return &unversioned.Status{Status: unversioned.StatusSuccess}, nil
} }
func (r *EvictionREST) checkAndDecrement(namespace string, pdb policy.PodDisruptionBudget) (ok bool, err error) { func (r *EvictionREST) checkAndDecrement(namespace string, podName string, pdb policy.PodDisruptionBudget) (ok bool, err error) {
if pdb.Status.PodDisruptionsAllowed < 0 { if pdb.Status.PodDisruptionsAllowed < 0 {
return false, fmt.Errorf("pdb disruptions allowed is negative") return false, fmt.Errorf("pdb disruptions allowed is negative")
} }
if len(pdb.Status.DisruptedPods) > MaxDisruptedPodSize {
return false, fmt.Errorf("DisrputedPods map too big - too many evictions not confirmed by PDB controller")
}
if pdb.Status.PodDisruptionsAllowed == 0 { if pdb.Status.PodDisruptionsAllowed == 0 {
return false, nil return false, nil
} }
pdb.Status.PodDisruptionsAllowed-- pdb.Status.PodDisruptionsAllowed--
if pdb.Status.DisruptedPods == nil {
pdb.Status.DisruptedPods = make(map[string]unversioned.Time)
}
// Eviction handler needs to inform the PDB controller that it is about to delete a pod
// so it should not consider it as available in calculations when updating PodDisruptions allowed.
// If the pod is not deleted within a reasonable time limit PDB controller will assume that it won't
// be deleted at all and remove it from DisruptedPod map.
pdb.Status.DisruptedPods[podName] = unversioned.Time{Time: time.Now()}
if _, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(namespace).UpdateStatus(&pdb); err != nil { if _, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(namespace).UpdateStatus(&pdb); err != nil {
return false, err return false, err
} }