mirror of https://github.com/k3s-io/k3s
commit
c7a50206c0
|
@ -35,6 +35,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/integer"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -243,11 +244,105 @@ func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
|
|||
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
|
||||
}
|
||||
|
||||
// NewControllerExpectations returns a store for ControlleeExpectations.
|
||||
// NewControllerExpectations returns a store for ControllerExpectations.
|
||||
func NewControllerExpectations() *ControllerExpectations {
|
||||
return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
|
||||
}
|
||||
|
||||
// UIDSetKeyFunc to parse out the key from a UIDSet.
|
||||
var UIDSetKeyFunc = func(obj interface{}) (string, error) {
|
||||
if u, ok := obj.(*UIDSet); ok {
|
||||
return u.key, nil
|
||||
}
|
||||
return "", fmt.Errorf("Could not find key for obj %#v", obj)
|
||||
}
|
||||
|
||||
// UIDSet holds a key and a set of UIDs. Used by the
|
||||
// UIDTrackingControllerExpectations to remember which UID it has seen/still
|
||||
// waiting for.
|
||||
type UIDSet struct {
|
||||
sets.String
|
||||
key string
|
||||
}
|
||||
|
||||
// UIDTrackingControllerExpectations tracks the UID of the pods it deletes.
|
||||
// This cache is needed over plain old expectations to safely handle graceful
|
||||
// deletion. The desired behavior is to treat an update that sets the
|
||||
// DeletionTimestamp on an object as a delete. To do so consistenly, one needs
|
||||
// to remember the expected deletes so they aren't double counted.
|
||||
// TODO: Track creates as well (#22599)
|
||||
type UIDTrackingControllerExpectations struct {
|
||||
ControllerExpectationsInterface
|
||||
// TODO: There is a much nicer way to do this that involves a single store,
|
||||
// a lock per entry, and a ControlleeExpectationsInterface type.
|
||||
uidStoreLock sync.Mutex
|
||||
// Store used for the UIDs associated with any expectation tracked via the
|
||||
// ControllerExpectationsInterface.
|
||||
uidStore cache.Store
|
||||
}
|
||||
|
||||
// GetUIDs is a convenience method to avoid exposing the set of expected uids.
|
||||
// The returned set is not thread safe, all modifications must be made holding
|
||||
// the uidStoreLock.
|
||||
func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String {
|
||||
if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists {
|
||||
return uid.(*UIDSet).String
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExpectDeletions records expectations for the given deleteKeys, against the given controller.
|
||||
func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error {
|
||||
u.uidStoreLock.Lock()
|
||||
defer u.uidStoreLock.Unlock()
|
||||
|
||||
if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
|
||||
glog.Errorf("Clobbering existing delete keys: %+v", existing)
|
||||
}
|
||||
expectedUIDs := sets.NewString()
|
||||
for _, k := range deletedKeys {
|
||||
expectedUIDs.Insert(k)
|
||||
}
|
||||
glog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys)
|
||||
if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
|
||||
return err
|
||||
}
|
||||
return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len())
|
||||
}
|
||||
|
||||
// DeletionObserved records the given deleteKey as a deletion, for the given rc.
|
||||
func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) {
|
||||
u.uidStoreLock.Lock()
|
||||
defer u.uidStoreLock.Unlock()
|
||||
|
||||
uids := u.GetUIDs(rcKey)
|
||||
if uids != nil && uids.Has(deleteKey) {
|
||||
glog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey)
|
||||
u.ControllerExpectationsInterface.DeletionObserved(rcKey)
|
||||
uids.Delete(deleteKey)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the
|
||||
// underlying ControllerExpectationsInterface.
|
||||
func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) {
|
||||
u.uidStoreLock.Lock()
|
||||
defer u.uidStoreLock.Unlock()
|
||||
|
||||
u.ControllerExpectationsInterface.DeleteExpectations(rcKey)
|
||||
if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists {
|
||||
if err := u.uidStore.Delete(uidExp); err != nil {
|
||||
glog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewUIDTrackingControllerExpectations returns a wrapper around
|
||||
// ControllerExpectations that is aware of deleteKeys.
|
||||
func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations {
|
||||
return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
|
||||
}
|
||||
|
||||
// PodControlInterface is an interface that knows how to add or delete pods
|
||||
// created as an interface to allow testing.
|
||||
type PodControlInterface interface {
|
||||
|
@ -517,6 +612,14 @@ func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions
|
|||
return active
|
||||
}
|
||||
|
||||
// PodKey returns a key unique to the given pod within a cluster.
|
||||
// It's used so we consistently use the same key scheme in this module.
|
||||
// It does exactly what cache.MetaNamespaceKeyFunc would have done
|
||||
// expcept there's not possibility for error since we know the exact type.
|
||||
func PodKey(pod *api.Pod) string {
|
||||
return fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
|
||||
}
|
||||
|
||||
// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker.
|
||||
type ControllersByCreationTimestamp []*api.ReplicationController
|
||||
|
||||
|
|
|
@ -183,6 +183,57 @@ func TestControllerExpectations(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUIDExpectations(t *testing.T) {
|
||||
uidExp := NewUIDTrackingControllerExpectations(NewControllerExpectations())
|
||||
rcList := []*api.ReplicationController{
|
||||
newReplicationController(2),
|
||||
newReplicationController(1),
|
||||
newReplicationController(0),
|
||||
newReplicationController(5),
|
||||
}
|
||||
rcToPods := map[string][]string{}
|
||||
rcKeys := []string{}
|
||||
for i := range rcList {
|
||||
rc := rcList[i]
|
||||
rcName := fmt.Sprintf("rc-%v", i)
|
||||
rc.Name = rcName
|
||||
rc.Spec.Selector[rcName] = rcName
|
||||
podList := newPodList(nil, 5, api.PodRunning, rc)
|
||||
rcKey, err := KeyFunc(rc)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't get key for object %+v: %v", rc, err)
|
||||
}
|
||||
rcKeys = append(rcKeys, rcKey)
|
||||
rcPodNames := []string{}
|
||||
for i := range podList.Items {
|
||||
p := &podList.Items[i]
|
||||
p.Name = fmt.Sprintf("%v-%v", p.Name, rc.Name)
|
||||
rcPodNames = append(rcPodNames, PodKey(p))
|
||||
}
|
||||
rcToPods[rcKey] = rcPodNames
|
||||
uidExp.ExpectDeletions(rcKey, rcPodNames)
|
||||
}
|
||||
for i := range rcKeys {
|
||||
j := rand.Intn(i + 1)
|
||||
rcKeys[i], rcKeys[j] = rcKeys[j], rcKeys[i]
|
||||
}
|
||||
for _, rcKey := range rcKeys {
|
||||
if uidExp.SatisfiedExpectations(rcKey) {
|
||||
t.Errorf("Controller %v satisfied expectations before deletion", rcKey)
|
||||
}
|
||||
for _, p := range rcToPods[rcKey] {
|
||||
uidExp.DeletionObserved(rcKey, p)
|
||||
}
|
||||
if !uidExp.SatisfiedExpectations(rcKey) {
|
||||
t.Errorf("Controller %v didn't satisfy expectations after deletion", rcKey)
|
||||
}
|
||||
uidExp.DeleteExpectations(rcKey)
|
||||
if uidExp.GetUIDs(rcKey) != nil {
|
||||
t.Errorf("Failed to delete uid expectations for %v", rcKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreatePods(t *testing.T) {
|
||||
ns := api.NamespaceDefault
|
||||
body := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}})
|
||||
|
|
|
@ -49,7 +49,7 @@ const (
|
|||
// happens based on contents in local pod storage.
|
||||
FullControllerResyncPeriod = 30 * time.Second
|
||||
|
||||
// Realistic value of the burstReplica field for the replication manager based off
|
||||
// Realistic value of the burstReplica field for the replica set manager based off
|
||||
// performance requirements for kubernetes 1.0.
|
||||
BurstReplicas = 500
|
||||
|
||||
|
@ -73,8 +73,8 @@ type ReplicaSetController struct {
|
|||
// To allow injection of syncReplicaSet for testing.
|
||||
syncHandler func(rsKey string) error
|
||||
|
||||
// A TTLCache of pod creates/deletes each ReplicaSet expects to see
|
||||
expectations controller.ControllerExpectationsInterface
|
||||
// A TTLCache of pod creates/deletes each rc expects to see.
|
||||
expectations *controller.UIDTrackingControllerExpectations
|
||||
|
||||
// A store of ReplicaSets, populated by the rsController
|
||||
rsStore cache.StoreToReplicaSetLister
|
||||
|
@ -107,7 +107,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
|
|||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
|
||||
},
|
||||
burstReplicas: burstReplicas,
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
|
||||
queue: workqueue.New(),
|
||||
}
|
||||
|
||||
|
@ -297,17 +297,16 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
|
|||
}
|
||||
rsKey, err := controller.KeyFunc(rs)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for replication controller %#v: %v", rs, err)
|
||||
glog.Errorf("Couldn't get key for replica set %#v: %v", rs, err)
|
||||
return
|
||||
}
|
||||
if pod.DeletionTimestamp != nil {
|
||||
// on a restart of the controller manager, it's possible a new pod shows up in a state that
|
||||
// is already pending deletion. Prevent the pod from being a creation observation.
|
||||
glog.V(4).Infof("Add for pod %v with deletion timestamp %+v, counted as new deletion for rs %v", pod.Name, pod.DeletionTimestamp, rsKey)
|
||||
rsc.expectations.DeletionObserved(rsKey)
|
||||
} else {
|
||||
rsc.expectations.CreationObserved(rsKey)
|
||||
rsc.deletePod(pod)
|
||||
return
|
||||
}
|
||||
rsc.expectations.CreationObserved(rsKey)
|
||||
rsc.enqueueReplicaSet(rs)
|
||||
}
|
||||
|
||||
|
@ -326,22 +325,15 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
|||
if rs == nil {
|
||||
return
|
||||
}
|
||||
rsKey, err := controller.KeyFunc(rs)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for replication controller %#v: %v", rs, err)
|
||||
return
|
||||
}
|
||||
|
||||
if curPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil {
|
||||
if curPod.DeletionTimestamp != nil {
|
||||
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
|
||||
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
|
||||
// for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
|
||||
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
|
||||
// an rs never initiates a phase change, and so is never asleep waiting for the same.
|
||||
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v counted as delete for rs %v", curPod.Name, curPod.DeletionTimestamp, rsKey)
|
||||
rsc.expectations.DeletionObserved(rsKey)
|
||||
} else {
|
||||
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v. Not counting it as a new deletion for rs %v", curPod.Name, curPod.DeletionTimestamp, rsKey)
|
||||
rsc.deletePod(curPod)
|
||||
return
|
||||
}
|
||||
|
||||
rsc.enqueueReplicaSet(rs)
|
||||
|
@ -375,21 +367,14 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
|
|||
return
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("Pod %s deleted: %+v.", pod.Name, pod)
|
||||
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
|
||||
if rs := rsc.getPodReplicaSet(pod); rs != nil {
|
||||
rsKey, err := controller.KeyFunc(rs)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)
|
||||
return
|
||||
}
|
||||
// This method only manages expectations for the case where a pod is
|
||||
// deleted without a grace period.
|
||||
if pod.DeletionTimestamp == nil {
|
||||
glog.V(4).Infof("Received new delete for rs %v, pod %v", rsKey, pod.Name)
|
||||
rsc.expectations.DeletionObserved(rsKey)
|
||||
} else {
|
||||
glog.V(4).Infof("Received delete for rs %v pod %v with non nil deletion timestamp %+v. Not counting it as a new deletion.", rsKey, pod.Name, pod.DeletionTimestamp)
|
||||
}
|
||||
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
|
||||
rsc.enqueueReplicaSet(rs)
|
||||
}
|
||||
}
|
||||
|
@ -442,6 +427,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
|
|||
if diff > rsc.burstReplicas {
|
||||
diff = rsc.burstReplicas
|
||||
}
|
||||
// TODO: Track UIDs of creates just like deletes. The problem currently
|
||||
// is we'd need to wait on the result of a create to record the pod's
|
||||
// UID, which would require locking *across* the create, which will turn
|
||||
// into a performance bottleneck. We should generate a UID for the pod
|
||||
// beforehand and store it via ExpectCreations.
|
||||
rsc.expectations.ExpectCreations(rsKey, diff)
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(diff)
|
||||
|
@ -462,7 +452,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
|
|||
if diff > rsc.burstReplicas {
|
||||
diff = rsc.burstReplicas
|
||||
}
|
||||
rsc.expectations.ExpectDeletions(rsKey, diff)
|
||||
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff)
|
||||
// No need to sort pods if we are about to delete all of them
|
||||
if rs.Spec.Replicas != 0 {
|
||||
|
@ -471,7 +460,17 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
|
|||
// in the earlier stages whenever possible.
|
||||
sort.Sort(controller.ActivePods(filteredPods))
|
||||
}
|
||||
|
||||
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
|
||||
// deleted, so we know to record their expectations exactly once either
|
||||
// when we see it as an update of the deletion timestamp, or as a delete.
|
||||
// Note that if the labels on a pod/rs change in a way that the pod gets
|
||||
// orphaned, the rs will only wake up after the expectations have
|
||||
// expired even if other pods are deleted.
|
||||
deletedPodKeys := []string{}
|
||||
for i := 0; i < diff; i++ {
|
||||
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
|
||||
}
|
||||
rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys)
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(diff)
|
||||
for i := 0; i < diff; i++ {
|
||||
|
@ -479,8 +478,9 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
|
|||
defer wait.Done()
|
||||
if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil {
|
||||
// Decrement the expected number of deletes because the informer won't observe this deletion
|
||||
glog.V(2).Infof("Failed deletion, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
|
||||
rsc.expectations.DeletionObserved(rsKey)
|
||||
podKey := controller.PodKey(filteredPods[ix])
|
||||
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
|
||||
rsc.expectations.DeletionObserved(rsKey, podKey)
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
}(i)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -652,6 +653,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite.
|
||||
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
|
@ -703,7 +705,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
|||
|
||||
podExp, exists, err := manager.expectations.GetExpectations(rsKey)
|
||||
if !exists || err != nil {
|
||||
t.Fatalf("Did not find expectations for ReplicaSet.")
|
||||
t.Fatalf("Did not find expectations for rc.")
|
||||
}
|
||||
if add, _ := podExp.GetExpectations(); add != 1 {
|
||||
t.Fatalf("Expectations are wrong %v", podExp)
|
||||
|
@ -714,9 +716,27 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
|||
expectedPods = burstReplicas
|
||||
}
|
||||
validateSyncReplicaSet(t, &fakePodControl, 0, expectedPods)
|
||||
for i := 0; i < expectedPods-1; i++ {
|
||||
manager.podStore.Store.Delete(&pods.Items[i])
|
||||
manager.deletePod(&pods.Items[i])
|
||||
|
||||
// To accurately simulate a watch we must delete the exact pods
|
||||
// the rs is waiting for.
|
||||
expectedDels := manager.expectations.GetUIDs(getKey(rsSpec, t))
|
||||
podsToDelete := []*api.Pod{}
|
||||
for _, key := range expectedDels.List() {
|
||||
nsName := strings.Split(key, "/")
|
||||
podsToDelete = append(podsToDelete, &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: nsName[1],
|
||||
Namespace: nsName[0],
|
||||
Labels: rsSpec.Spec.Selector.MatchLabels,
|
||||
},
|
||||
})
|
||||
}
|
||||
// Don't delete all pods because we confirm that the last pod
|
||||
// has exactly one expectation at the end, to verify that we
|
||||
// don't double delete.
|
||||
for i := range podsToDelete[1:] {
|
||||
manager.podStore.Delete(podsToDelete[i])
|
||||
manager.deletePod(podsToDelete[i])
|
||||
}
|
||||
podExp, exists, err := manager.expectations.GetExpectations(rsKey)
|
||||
if !exists || err != nil {
|
||||
|
@ -739,8 +759,20 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
|||
manager.podStore.Store.Add(&pods.Items[expectedPods-1])
|
||||
manager.addPod(&pods.Items[expectedPods-1])
|
||||
} else {
|
||||
manager.podStore.Store.Delete(&pods.Items[expectedPods-1])
|
||||
manager.deletePod(&pods.Items[expectedPods-1])
|
||||
expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t))
|
||||
if expectedDel.Len() != 1 {
|
||||
t.Fatalf("Waiting on unexpected number of deletes.")
|
||||
}
|
||||
nsName := strings.Split(expectedDel.List()[0], "/")
|
||||
lastPod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: nsName[1],
|
||||
Namespace: nsName[0],
|
||||
Labels: rsSpec.Spec.Selector.MatchLabels,
|
||||
},
|
||||
}
|
||||
manager.podStore.Store.Delete(lastPod)
|
||||
manager.deletePod(lastPod)
|
||||
}
|
||||
pods.Items = pods.Items[expectedPods:]
|
||||
}
|
||||
|
@ -788,14 +820,14 @@ func TestRSSyncExpectations(t *testing.T) {
|
|||
manager.podStore.Store.Add(&pods.Items[0])
|
||||
postExpectationsPod := pods.Items[1]
|
||||
|
||||
manager.expectations = FakeRSExpectations{
|
||||
manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{
|
||||
controller.NewControllerExpectations(), true, func() {
|
||||
// If we check active pods before checking expectataions, the
|
||||
// ReplicaSet will create a new replica because it doesn't see
|
||||
// this pod, but has fulfilled its expectations.
|
||||
manager.podStore.Store.Add(&postExpectationsPod)
|
||||
},
|
||||
}
|
||||
})
|
||||
manager.syncReplicaSet(getKey(rsSpec, t))
|
||||
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
|
||||
}
|
||||
|
@ -925,7 +957,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
}
|
||||
pod := newPodList(nil, 1, api.PodPending, labelMap, rs).Items[0]
|
||||
pod.DeletionTimestamp = &unversioned.Time{time.Now()}
|
||||
manager.expectations.SetExpectations(rsKey, 0, 1)
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||
|
||||
// A pod added with a deletion timestamp should decrement deletions, not creations.
|
||||
manager.addPod(&pod)
|
||||
|
@ -944,7 +976,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
// An update from no deletion timestamp to having one should be treated
|
||||
// as a deletion.
|
||||
oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs).Items[0]
|
||||
manager.expectations.SetExpectations(rsKey, 0, 1)
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
|
||||
queueRC, _ = manager.queue.Get()
|
||||
|
@ -960,7 +992,14 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
|
||||
// An update to the pod (including an update to the deletion timestamp)
|
||||
// should not be counted as a second delete.
|
||||
manager.expectations.SetExpectations(rsKey, 0, 1)
|
||||
secondPod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: pod.Namespace,
|
||||
Name: "secondPod",
|
||||
Labels: pod.Labels,
|
||||
},
|
||||
}
|
||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)})
|
||||
oldPod.DeletionTimestamp = &unversioned.Time{time.Now()}
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
|
||||
|
@ -977,9 +1016,8 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
t.Fatalf("Wrong expectations %+v", podExp)
|
||||
}
|
||||
|
||||
// A pod with a nil timestamp should be counted as a deletion.
|
||||
pod.DeletionTimestamp = nil
|
||||
manager.deletePod(&pod)
|
||||
// Deleting the second pod should clear expectations.
|
||||
manager.deletePod(secondPod)
|
||||
|
||||
queueRC, _ = manager.queue.Get()
|
||||
if queueRC != rsKey {
|
||||
|
|
|
@ -72,8 +72,8 @@ type ReplicationManager struct {
|
|||
// To allow injection of syncReplicationController for testing.
|
||||
syncHandler func(rcKey string) error
|
||||
|
||||
// A TTLCache of pod creates/deletes each rc expects to see
|
||||
expectations controller.ControllerExpectationsInterface
|
||||
// A TTLCache of pod creates/deletes each rc expects to see.
|
||||
expectations *controller.UIDTrackingControllerExpectations
|
||||
|
||||
// A store of replication controllers, populated by the rcController
|
||||
rcStore cache.StoreToReplicationControllerLister
|
||||
|
@ -106,7 +106,7 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll
|
|||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
|
||||
},
|
||||
burstReplicas: burstReplicas,
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
|
||||
queue: workqueue.New(),
|
||||
}
|
||||
|
||||
|
@ -300,11 +300,10 @@ func (rm *ReplicationManager) addPod(obj interface{}) {
|
|||
if pod.DeletionTimestamp != nil {
|
||||
// on a restart of the controller manager, it's possible a new pod shows up in a state that
|
||||
// is already pending deletion. Prevent the pod from being a creation observation.
|
||||
glog.V(4).Infof("Add for pod %v with deletion timestamp %+v, counted as new deletion for rc %v", pod.Name, pod.DeletionTimestamp, rcKey)
|
||||
rm.expectations.DeletionObserved(rcKey)
|
||||
} else {
|
||||
rm.expectations.CreationObserved(rcKey)
|
||||
rm.deletePod(pod)
|
||||
return
|
||||
}
|
||||
rm.expectations.CreationObserved(rcKey)
|
||||
rm.enqueueController(rc)
|
||||
}
|
||||
|
||||
|
@ -321,25 +320,17 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
|
|||
if rc == nil {
|
||||
return
|
||||
}
|
||||
rcKey, err := controller.KeyFunc(rc)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
|
||||
return
|
||||
}
|
||||
oldPod := old.(*api.Pod)
|
||||
|
||||
if curPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil {
|
||||
if curPod.DeletionTimestamp != nil {
|
||||
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
|
||||
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
|
||||
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
|
||||
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
|
||||
// an rc never initiates a phase change, and so is never asleep waiting for the same.
|
||||
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v counted as delete for rc %v", curPod.Name, curPod.DeletionTimestamp, rcKey)
|
||||
rm.expectations.DeletionObserved(rcKey)
|
||||
} else {
|
||||
glog.V(4).Infof("Update to pod %v with deletion timestamp %+v. Not counting it as a new deletion for rc %v.", curPod.Name, curPod.DeletionTimestamp, rcKey)
|
||||
rm.deletePod(curPod)
|
||||
return
|
||||
}
|
||||
|
||||
rm.enqueueController(rc)
|
||||
// Only need to get the old controller if the labels changed.
|
||||
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
|
||||
|
@ -372,20 +363,14 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
|
|||
return
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v, labels %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod.Labels)
|
||||
if rc := rm.getPodController(pod); rc != nil {
|
||||
rcKey, err := controller.KeyFunc(rc)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
|
||||
return
|
||||
}
|
||||
// This method only manages expectations for the case where a pod is
|
||||
// deleted without a grace period.
|
||||
if pod.DeletionTimestamp == nil {
|
||||
glog.V(4).Infof("Received new delete for rc %v, pod %v", rcKey, pod.Name)
|
||||
rm.expectations.DeletionObserved(rcKey)
|
||||
} else {
|
||||
glog.V(4).Infof("Received delete for rc %v pod %v with non nil deletion timestamp %+v. Not counting it as a new deletion.", rcKey, pod.Name, pod.DeletionTimestamp)
|
||||
}
|
||||
rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod))
|
||||
rm.enqueueController(rc)
|
||||
}
|
||||
}
|
||||
|
@ -438,6 +423,11 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
|
|||
if diff > rm.burstReplicas {
|
||||
diff = rm.burstReplicas
|
||||
}
|
||||
// TODO: Track UIDs of creates just like deletes. The problem currently
|
||||
// is we'd need to wait on the result of a create to record the pod's
|
||||
// UID, which would require locking *across* the create, which will turn
|
||||
// into a performance bottleneck. We should generate a UID for the pod
|
||||
// beforehand and store it via ExpectCreations.
|
||||
rm.expectations.ExpectCreations(rcKey, diff)
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(diff)
|
||||
|
@ -458,7 +448,6 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
|
|||
if diff > rm.burstReplicas {
|
||||
diff = rm.burstReplicas
|
||||
}
|
||||
rm.expectations.ExpectDeletions(rcKey, diff)
|
||||
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
|
||||
// No need to sort pods if we are about to delete all of them
|
||||
if rc.Spec.Replicas != 0 {
|
||||
|
@ -467,7 +456,20 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
|
|||
// in the earlier stages whenever possible.
|
||||
sort.Sort(controller.ActivePods(filteredPods))
|
||||
}
|
||||
|
||||
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
|
||||
// deleted, so we know to record their expectations exactly once either
|
||||
// when we see it as an update of the deletion timestamp, or as a delete.
|
||||
// Note that if the labels on a pod/rc change in a way that the pod gets
|
||||
// orphaned, the rs will only wake up after the expectations have
|
||||
// expired even if other pods are deleted.
|
||||
deletedPodKeys := []string{}
|
||||
for i := 0; i < diff; i++ {
|
||||
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
|
||||
}
|
||||
// We use pod namespace/name as a UID to wait for deletions, so if the
|
||||
// labels on a pod/rc change in a way that the pod gets orphaned, the
|
||||
// rc will only wake up after the expectation has expired.
|
||||
rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(diff)
|
||||
for i := 0; i < diff; i++ {
|
||||
|
@ -475,8 +477,9 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
|
|||
defer wait.Done()
|
||||
if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
|
||||
// Decrement the expected number of deletes because the informer won't observe this deletion
|
||||
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
|
||||
rm.expectations.DeletionObserved(rcKey)
|
||||
podKey := controller.PodKey(filteredPods[ix])
|
||||
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rc.Namespace, rc.Name)
|
||||
rm.expectations.DeletionObserved(rcKey, podKey)
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
}(i)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -638,6 +639,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite.
|
||||
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
|
||||
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
|
@ -698,9 +700,27 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
|||
expectedPods = burstReplicas
|
||||
}
|
||||
validateSyncReplication(t, &fakePodControl, 0, expectedPods)
|
||||
for i := 0; i < expectedPods-1; i++ {
|
||||
manager.podStore.Store.Delete(&pods.Items[i])
|
||||
manager.deletePod(&pods.Items[i])
|
||||
|
||||
// To accurately simulate a watch we must delete the exact pods
|
||||
// the rc is waiting for.
|
||||
expectedDels := manager.expectations.GetUIDs(getKey(controllerSpec, t))
|
||||
podsToDelete := []*api.Pod{}
|
||||
for _, key := range expectedDels.List() {
|
||||
nsName := strings.Split(key, "/")
|
||||
podsToDelete = append(podsToDelete, &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: nsName[1],
|
||||
Namespace: nsName[0],
|
||||
Labels: controllerSpec.Spec.Selector,
|
||||
},
|
||||
})
|
||||
}
|
||||
// Don't delete all pods because we confirm that the last pod
|
||||
// has exactly one expectation at the end, to verify that we
|
||||
// don't double delete.
|
||||
for i := range podsToDelete[1:] {
|
||||
manager.podStore.Delete(podsToDelete[i])
|
||||
manager.deletePod(podsToDelete[i])
|
||||
}
|
||||
podExp, exists, err := manager.expectations.GetExpectations(rcKey)
|
||||
if !exists || err != nil {
|
||||
|
@ -723,8 +743,20 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
|||
manager.podStore.Store.Add(&pods.Items[expectedPods-1])
|
||||
manager.addPod(&pods.Items[expectedPods-1])
|
||||
} else {
|
||||
manager.podStore.Store.Delete(&pods.Items[expectedPods-1])
|
||||
manager.deletePod(&pods.Items[expectedPods-1])
|
||||
expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t))
|
||||
if expectedDel.Len() != 1 {
|
||||
t.Fatalf("Waiting on unexpected number of deletes.")
|
||||
}
|
||||
nsName := strings.Split(expectedDel.List()[0], "/")
|
||||
lastPod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: nsName[1],
|
||||
Namespace: nsName[0],
|
||||
Labels: controllerSpec.Spec.Selector,
|
||||
},
|
||||
}
|
||||
manager.podStore.Store.Delete(lastPod)
|
||||
manager.deletePod(lastPod)
|
||||
}
|
||||
pods.Items = pods.Items[expectedPods:]
|
||||
}
|
||||
|
@ -771,14 +803,14 @@ func TestRCSyncExpectations(t *testing.T) {
|
|||
manager.podStore.Store.Add(&pods.Items[0])
|
||||
postExpectationsPod := pods.Items[1]
|
||||
|
||||
manager.expectations = FakeRCExpectations{
|
||||
manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{
|
||||
controller.NewControllerExpectations(), true, func() {
|
||||
// If we check active pods before checking expectataions, the rc
|
||||
// will create a new replica because it doesn't see this pod, but
|
||||
// has fulfilled its expectations.
|
||||
manager.podStore.Store.Add(&postExpectationsPod)
|
||||
},
|
||||
}
|
||||
})
|
||||
manager.syncReplicationController(getKey(controllerSpec, t))
|
||||
validateSyncReplication(t, &fakePodControl, 0, 0)
|
||||
}
|
||||
|
@ -906,7 +938,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
}
|
||||
pod := newPodList(nil, 1, api.PodPending, controllerSpec).Items[0]
|
||||
pod.DeletionTimestamp = &unversioned.Time{time.Now()}
|
||||
manager.expectations.SetExpectations(rcKey, 0, 1)
|
||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
||||
|
||||
// A pod added with a deletion timestamp should decrement deletions, not creations.
|
||||
manager.addPod(&pod)
|
||||
|
@ -925,7 +957,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
// An update from no deletion timestamp to having one should be treated
|
||||
// as a deletion.
|
||||
oldPod := newPodList(nil, 1, api.PodPending, controllerSpec).Items[0]
|
||||
manager.expectations.SetExpectations(rcKey, 0, 1)
|
||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
|
||||
queueRC, _ = manager.queue.Get()
|
||||
|
@ -941,7 +973,14 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
|
||||
// An update to the pod (including an update to the deletion timestamp)
|
||||
// should not be counted as a second delete.
|
||||
manager.expectations.SetExpectations(rcKey, 0, 1)
|
||||
secondPod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: pod.Namespace,
|
||||
Name: "secondPod",
|
||||
Labels: pod.Labels,
|
||||
},
|
||||
}
|
||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)})
|
||||
oldPod.DeletionTimestamp = &unversioned.Time{time.Now()}
|
||||
manager.updatePod(&oldPod, &pod)
|
||||
|
||||
|
@ -958,9 +997,8 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||
t.Fatalf("Wrong expectations %+v", podExp)
|
||||
}
|
||||
|
||||
// A pod with a nil timestamp should be counted as a deletion.
|
||||
pod.DeletionTimestamp = nil
|
||||
manager.deletePod(&pod)
|
||||
// Deleting the second pod should clear expectations.
|
||||
manager.deletePod(secondPod)
|
||||
|
||||
queueRC, _ = manager.queue.Get()
|
||||
if queueRC != rcKey {
|
||||
|
|
|
@ -76,3 +76,14 @@ func HandleError(err error) {
|
|||
func logError(err error) {
|
||||
glog.ErrorDepth(2, err)
|
||||
}
|
||||
|
||||
// GetCaller returns the caller of the function that calls it.
|
||||
func GetCaller() string {
|
||||
var pc [1]uintptr
|
||||
runtime.Callers(3, pc[:])
|
||||
f := runtime.FuncForPC(pc[0])
|
||||
if f == nil {
|
||||
return fmt.Sprintf("Unable to find caller")
|
||||
}
|
||||
return f.Name()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue