mirror of https://github.com/k3s-io/k3s
commit
533296b265
|
@ -33,6 +33,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
const CreatedByAnnotation = "kubernetes.io/created-by"
|
||||
|
@ -91,9 +92,11 @@ type ControllerExpectationsInterface interface {
|
|||
ExpectDeletions(controllerKey string, dels int) error
|
||||
CreationObserved(controllerKey string)
|
||||
DeletionObserved(controllerKey string)
|
||||
RaiseExpectations(controllerKey string, add, del int)
|
||||
LowerExpectations(controllerKey string, add, del int)
|
||||
}
|
||||
|
||||
// ControllerExpectations is a ttl cache mapping controllers to what they expect to see before being woken up for a sync.
|
||||
// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync.
|
||||
type ControllerExpectations struct {
|
||||
cache.Store
|
||||
}
|
||||
|
@ -123,6 +126,9 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo
|
|||
if exp, exists, err := r.GetExpectations(controllerKey); exists {
|
||||
if exp.Fulfilled() {
|
||||
return true
|
||||
} else if exp.isExpired() {
|
||||
glog.V(4).Infof("Controller expectations expired %#v", exp)
|
||||
return true
|
||||
} else {
|
||||
glog.V(4).Infof("Controller still waiting on expectations %#v", exp)
|
||||
return false
|
||||
|
@ -142,9 +148,17 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo
|
|||
return true
|
||||
}
|
||||
|
||||
// TODO: Extend ExpirationCache to support explicit expiration.
|
||||
// TODO: Make this possible to disable in tests.
|
||||
// TODO: Parameterize timeout.
|
||||
// TODO: Support injection of clock.
|
||||
func (exp *ControlleeExpectations) isExpired() bool {
|
||||
return util.RealClock{}.Since(exp.timestamp) > 10*time.Second
|
||||
}
|
||||
|
||||
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
|
||||
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
|
||||
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey}
|
||||
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: util.RealClock{}.Now()}
|
||||
glog.V(4).Infof("Setting expectations %+v", exp)
|
||||
return r.Add(exp)
|
||||
}
|
||||
|
@ -158,22 +172,31 @@ func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int)
|
|||
}
|
||||
|
||||
// Decrements the expectation counts of the given controller.
|
||||
func (r *ControllerExpectations) lowerExpectations(controllerKey string, add, del int) {
|
||||
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
|
||||
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
|
||||
exp.Seen(int64(add), int64(del))
|
||||
exp.Add(int64(-add), int64(-del))
|
||||
// The expectations might've been modified since the update on the previous line.
|
||||
glog.V(4).Infof("Lowering expectations %+v", exp)
|
||||
glog.V(4).Infof("Lowered expectations %+v", exp)
|
||||
}
|
||||
}
|
||||
|
||||
// Increments the expectation counts of the given controller.
|
||||
func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) {
|
||||
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
|
||||
exp.Add(int64(add), int64(del))
|
||||
// The expectations might've been modified since the update on the previous line.
|
||||
glog.V(4).Infof("Raised expectations %+v", exp)
|
||||
}
|
||||
}
|
||||
|
||||
// CreationObserved atomically decrements the `add` expecation count of the given controller.
|
||||
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
|
||||
r.lowerExpectations(controllerKey, 1, 0)
|
||||
r.LowerExpectations(controllerKey, 1, 0)
|
||||
}
|
||||
|
||||
// DeletionObserved atomically decrements the `del` expectation count of the given controller.
|
||||
func (r *ControllerExpectations) DeletionObserved(controllerKey string) {
|
||||
r.lowerExpectations(controllerKey, 0, 1)
|
||||
r.LowerExpectations(controllerKey, 0, 1)
|
||||
}
|
||||
|
||||
// Expectations are either fulfilled, or expire naturally.
|
||||
|
@ -186,12 +209,13 @@ type ControlleeExpectations struct {
|
|||
add int64
|
||||
del int64
|
||||
key string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// Seen decrements the add and del counters.
|
||||
func (e *ControlleeExpectations) Seen(add, del int64) {
|
||||
atomic.AddInt64(&e.add, -add)
|
||||
atomic.AddInt64(&e.del, -del)
|
||||
// Add increments the add and del counters.
|
||||
func (e *ControlleeExpectations) Add(add, del int64) {
|
||||
atomic.AddInt64(&e.add, add)
|
||||
atomic.AddInt64(&e.del, del)
|
||||
}
|
||||
|
||||
// Fulfilled returns true if this expectation has been fulfilled.
|
||||
|
|
|
@ -173,6 +173,8 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
|
|||
&api.Pod{},
|
||||
resyncPeriod(),
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
// When pod is created, we need to update deployment's expectations
|
||||
AddFunc: dc.addPod,
|
||||
// When pod updates (becomes ready), we need to enqueue deployment
|
||||
UpdateFunc: dc.updatePod,
|
||||
// When pod is deleted, we need to update deployment's expectations
|
||||
|
@ -210,7 +212,8 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) {
|
|||
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
|
||||
return
|
||||
}
|
||||
dc.rsExpectations.CreationObserved(dKey)
|
||||
// Decrement expected creations
|
||||
dc.rsExpectations.LowerExpectations(dKey, 1, 0)
|
||||
dc.enqueueDeployment(d)
|
||||
}
|
||||
}
|
||||
|
@ -302,6 +305,25 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De
|
|||
return nil
|
||||
}
|
||||
|
||||
// When a pod is created, update expectations of the controller that manages the pod.
|
||||
func (dc *DeploymentController) addPod(obj interface{}) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
glog.V(4).Infof("Pod %s created.", pod.Name)
|
||||
if d := dc.getDeploymentForPod(pod); d != nil {
|
||||
dKey, err := controller.KeyFunc(d)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
|
||||
return
|
||||
}
|
||||
// Decrement expected creations
|
||||
dc.podExpectations.LowerExpectations(dKey, 1, 0)
|
||||
dc.enqueueDeployment(d)
|
||||
}
|
||||
}
|
||||
|
||||
// updatePod figures out what deployment(s) manage the ReplicaSet that manages the Pod when the Pod
|
||||
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
|
||||
// the old and new deployments. old and cur must be *api.Pod types.
|
||||
|
@ -350,7 +372,9 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
|
|||
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
|
||||
return
|
||||
}
|
||||
dc.podExpectations.DeletionObserved(dKey)
|
||||
// Decrement expected deletions
|
||||
dc.podExpectations.LowerExpectations(dKey, 0, 1)
|
||||
dc.enqueueDeployment(d)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -383,7 +407,7 @@ func (dc *DeploymentController) worker() {
|
|||
defer dc.queue.Done(key)
|
||||
err := dc.syncHandler(key.(string))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing deployment: %v", err)
|
||||
glog.Errorf("Error syncing deployment %v: %v", key, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -417,8 +441,26 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||
dc.rsExpectations.DeleteExpectations(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
d := *obj.(*extensions.Deployment)
|
||||
|
||||
// Note: The expectations cache is not thread-safe for a given key.
|
||||
// Check the replica set expectations of the deployment before creating a new one.
|
||||
// TODO: Explicitly expire expectations if we haven't sync'ed in a long time.
|
||||
dKey, err := controller.KeyFunc(&d)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get key for deployment %#v: %v", d, err)
|
||||
}
|
||||
if !dc.rsExpectations.SatisfiedExpectations(dKey) {
|
||||
return fmt.Errorf("replicaset expectations not met yet for %v in syncDeployment", dKey)
|
||||
}
|
||||
if !dc.podExpectations.SatisfiedExpectations(dKey) {
|
||||
return fmt.Errorf("pod expectations not met yet for %v in syncDeployment", dKey)
|
||||
}
|
||||
// Ensure that an expectations record exists and clear previous expectations.
|
||||
dc.rsExpectations.SetExpectations(dKey, 0, 0)
|
||||
dc.podExpectations.SetExpectations(dKey, 0, 0)
|
||||
|
||||
if d.Spec.Paused {
|
||||
// TODO: Implement scaling for paused deployments.
|
||||
// Dont take any action for paused deployment.
|
||||
|
@ -573,7 +615,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension
|
|||
}
|
||||
|
||||
// Scale down, if we can.
|
||||
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment, true)
|
||||
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -701,15 +743,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Check the replica set expectations of the deployment before creating a new one.
|
||||
dKey, err := controller.KeyFunc(&deployment)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
|
||||
}
|
||||
if !dc.rsExpectations.SatisfiedExpectations(dKey) {
|
||||
dc.enqueueDeployment(&deployment)
|
||||
return nil, fmt.Errorf("replica set expectations not met yet before getting new replica set\n")
|
||||
}
|
||||
// new ReplicaSet does not exist, create one.
|
||||
namespace := deployment.ObjectMeta.Namespace
|
||||
podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template)
|
||||
|
@ -717,12 +750,15 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
|
|||
// Add podTemplateHash label to selector.
|
||||
newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
|
||||
|
||||
// Set ReplicaSet expectations (1 ReplicaSet should be created)
|
||||
dKey, err = controller.KeyFunc(&deployment)
|
||||
// Set ReplicaSet expectations (1 ReplicaSet should be created).
|
||||
// This clobbers previous expectations, but we checked that in syncDeployment.
|
||||
// We don't set expectations for deletions of 0-replica ReplicaSets because re-setting
|
||||
// expectations would clobber these, and redundant deletions shouldn't cause harm.
|
||||
dKey, err := controller.KeyFunc(&deployment)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get key for deployment controller %#v: %v", deployment, err)
|
||||
return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
|
||||
}
|
||||
dc.rsExpectations.ExpectCreations(dKey, 1)
|
||||
|
||||
// Create new ReplicaSet
|
||||
newRS := extensions.ReplicaSet{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
|
@ -742,11 +778,23 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Increment expected creations
|
||||
dc.rsExpectations.RaiseExpectations(dKey, 1, 0)
|
||||
if newReplicasCount != 0 {
|
||||
dc.podExpectations.RaiseExpectations(dKey, newReplicasCount, 0)
|
||||
}
|
||||
|
||||
newRS.Spec.Replicas = newReplicasCount
|
||||
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
|
||||
if err != nil {
|
||||
dc.rsExpectations.DeleteExpectations(dKey)
|
||||
return nil, fmt.Errorf("error creating replica set: %v", err)
|
||||
// Decrement expected creations
|
||||
dc.rsExpectations.LowerExpectations(dKey, 1, 0)
|
||||
if newReplicasCount != 0 {
|
||||
dc.podExpectations.LowerExpectations(dKey, newReplicasCount, 0)
|
||||
}
|
||||
dc.enqueueDeployment(deployment)
|
||||
return nil, fmt.Errorf("error creating replica set %v: %v", dKey, err)
|
||||
}
|
||||
if newReplicasCount > 0 {
|
||||
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
|
||||
|
@ -822,24 +870,13 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
|
|||
return scaled, err
|
||||
}
|
||||
|
||||
// Set expectationsCheck to false to bypass expectations check when testing
|
||||
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment, expectationsCheck bool) (bool, error) {
|
||||
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
|
||||
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
|
||||
if oldPodsCount == 0 {
|
||||
// Can't scale down further
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check the expectations of deployment before reconciling
|
||||
dKey, err := controller.KeyFunc(&deployment)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
|
||||
}
|
||||
if expectationsCheck && !dc.podExpectations.SatisfiedExpectations(dKey) {
|
||||
glog.V(4).Infof("Pod expectations not met yet before reconciling old replica sets\n")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
minReadySeconds := deployment.Spec.MinReadySeconds
|
||||
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
|
||||
newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{newRS}, minReadySeconds)
|
||||
|
@ -903,10 +940,6 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep
|
|||
}
|
||||
|
||||
totalScaledDown := cleanupCount + scaledDownCount
|
||||
if expectationsCheck {
|
||||
dc.podExpectations.ExpectDeletions(dKey, totalScaledDown)
|
||||
}
|
||||
|
||||
return totalScaledDown > 0, nil
|
||||
}
|
||||
|
||||
|
@ -1081,13 +1114,35 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep
|
|||
if rs.Spec.Replicas == newScale {
|
||||
return false, rs, nil
|
||||
}
|
||||
scalingOperation := "down"
|
||||
dKey, err := controller.KeyFunc(&deployment)
|
||||
if err != nil {
|
||||
return false, nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
|
||||
}
|
||||
var scalingOperation string
|
||||
// Set expectations first, because if the update is successful, the expectations will be handled asynchronously immediately.
|
||||
if rs.Spec.Replicas < newScale {
|
||||
scalingOperation = "up"
|
||||
// Increment expected creations
|
||||
dc.podExpectations.RaiseExpectations(dKey, newScale-rs.Spec.Replicas, 0)
|
||||
} else {
|
||||
scalingOperation = "down"
|
||||
// Increment expected deletions
|
||||
dc.podExpectations.RaiseExpectations(dKey, 0, rs.Spec.Replicas-newScale)
|
||||
}
|
||||
newRS, err := dc.scaleReplicaSet(rs, newScale)
|
||||
if err == nil {
|
||||
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
|
||||
} else {
|
||||
// Back out the expectation changes. If we observed a failure even though the update succeeded, this will be wrong.
|
||||
if rs.Spec.Replicas < newScale {
|
||||
// Decrement expected creations
|
||||
dc.podExpectations.LowerExpectations(dKey, newScale-rs.Spec.Replicas, 0)
|
||||
dc.enqueueDeployment(deployment)
|
||||
} else {
|
||||
// Decrement expected deletions
|
||||
dc.podExpectations.LowerExpectations(dKey, 0, rs.Spec.Replicas-newScale)
|
||||
dc.enqueueDeployment(deployment)
|
||||
}
|
||||
}
|
||||
return true, newRS, err
|
||||
}
|
||||
|
|
|
@ -95,6 +95,8 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
|
|||
controller := &DeploymentController{
|
||||
client: &fake,
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
podExpectations: controller.NewControllerExpectations(),
|
||||
rsExpectations: controller.NewControllerExpectations(),
|
||||
}
|
||||
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, deployment)
|
||||
if err != nil {
|
||||
|
@ -269,9 +271,11 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
|
|||
controller := &DeploymentController{
|
||||
client: &fakeClientset,
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
podExpectations: controller.NewControllerExpectations(),
|
||||
rsExpectations: controller.NewControllerExpectations(),
|
||||
}
|
||||
|
||||
scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment, false)
|
||||
scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
continue
|
||||
|
@ -373,6 +377,8 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
|
|||
controller := &DeploymentController{
|
||||
client: &fakeClientset,
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
podExpectations: controller.NewControllerExpectations(),
|
||||
rsExpectations: controller.NewControllerExpectations(),
|
||||
}
|
||||
cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, test.maxCleanupCount)
|
||||
if err != nil {
|
||||
|
@ -460,6 +466,8 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
|
|||
controller := &DeploymentController{
|
||||
client: &fakeClientset,
|
||||
eventRecorder: &record.FakeRecorder{},
|
||||
podExpectations: controller.NewControllerExpectations(),
|
||||
rsExpectations: controller.NewControllerExpectations(),
|
||||
}
|
||||
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
|
||||
if err != nil {
|
||||
|
|
|
@ -836,7 +836,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
|||
}
|
||||
|
||||
// This should have no effect, since we've deleted the ReplicaSet.
|
||||
podExp.Seen(1, 0)
|
||||
podExp.Add(-1, 0)
|
||||
manager.podStore.Store.Replace(make([]interface{}, 0), "0")
|
||||
manager.syncReplicaSet(getKey(rs, t))
|
||||
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
|
||||
|
|
|
@ -819,7 +819,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
|||
}
|
||||
|
||||
// This should have no effect, since we've deleted the rc.
|
||||
podExp.Seen(1, 0)
|
||||
podExp.Add(-1, 0)
|
||||
manager.podStore.Store.Replace(make([]interface{}, 0), "0")
|
||||
manager.syncReplicationController(getKey(rc, t))
|
||||
validateSyncReplication(t, &fakePodControl, 0, 0)
|
||||
|
|
Loading…
Reference in New Issue