2015-09-03 00:02:22 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2015 The Kubernetes Authors .
2015-09-03 00:02:22 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2016-06-25 09:31:32 +00:00
// Package deployment contains all the logic for handling Kubernetes Deployments.
// It implements a set of strategies (rolling, recreate) for deploying an application,
// the means to rollback to previous versions, proportional scaling for mitigating
// risk, cleanup policy, and other useful features of Deployments.
2015-09-03 00:02:22 +00:00
package deployment
import (
"fmt"
2016-01-15 02:04:05 +00:00
"reflect"
2016-08-17 01:47:15 +00:00
"sort"
2015-09-03 00:02:22 +00:00
"time"
2015-09-29 15:09:33 +00:00
"github.com/golang/glog"
2017-01-13 17:48:50 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-11 14:09:48 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
2016-11-19 23:32:10 +00:00
"k8s.io/kubernetes/pkg/api"
2016-11-18 20:50:17 +00:00
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
2015-09-21 07:06:45 +00:00
"k8s.io/kubernetes/pkg/client/cache"
2017-01-06 06:34:29 +00:00
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
2016-12-14 01:18:17 +00:00
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
2017-01-20 19:10:59 +00:00
"k8s.io/kubernetes/pkg/client/legacylisters"
2015-09-29 23:55:06 +00:00
"k8s.io/kubernetes/pkg/client/record"
2015-09-21 07:06:45 +00:00
"k8s.io/kubernetes/pkg/controller"
2016-07-08 12:48:38 +00:00
"k8s.io/kubernetes/pkg/controller/deployment/util"
2016-10-07 19:06:57 +00:00
"k8s.io/kubernetes/pkg/controller/informers"
2016-04-13 18:38:32 +00:00
"k8s.io/kubernetes/pkg/util/metrics"
2015-09-21 07:06:45 +00:00
"k8s.io/kubernetes/pkg/util/workqueue"
)
const (
2015-11-18 23:12:11 +00:00
// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
2016-02-28 02:13:32 +00:00
// of all deployments.
2015-11-18 23:12:11 +00:00
// This recomputation happens based on contents in the local caches.
2015-09-21 07:06:45 +00:00
FullDeploymentResyncPeriod = 30 * time . Second
2016-02-11 19:31:47 +00:00
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
StoreSyncedPollPeriod = 100 * time . Millisecond
2016-07-08 12:48:38 +00:00
// MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
MaxRetries = 5
2015-09-03 00:02:22 +00:00
)
2016-10-27 06:58:20 +00:00
func getDeploymentKind ( ) schema . GroupVersionKind {
return extensions . SchemeGroupVersion . WithKind ( "Deployment" )
}
2015-11-18 23:12:11 +00:00
// DeploymentController is responsible for synchronizing Deployment objects stored
2016-01-20 00:40:18 +00:00
// in the system with actual running replica sets and pods.
2015-09-03 00:02:22 +00:00
type DeploymentController struct {
2016-10-27 06:58:20 +00:00
rsControl controller . RSControlInterface
2016-01-15 05:00:58 +00:00
client clientset . Interface
2015-09-29 23:55:06 +00:00
eventRecorder record . EventRecorder
2015-09-21 07:06:45 +00:00
// To allow injection of syncDeployment for testing.
syncHandler func ( dKey string ) error
2016-12-21 13:46:07 +00:00
// used for unit testing
enqueueDeployment func ( deployment * extensions . Deployment )
2015-09-21 07:06:45 +00:00
// A store of deployments, populated by the dController
2017-01-20 19:10:59 +00:00
dLister * listers . StoreToDeploymentLister
2016-01-20 00:40:18 +00:00
// A store of ReplicaSets, populated by the rsController
2017-01-20 19:10:59 +00:00
rsLister * listers . StoreToReplicaSetLister
2015-09-21 07:06:45 +00:00
// A store of pods, populated by the podController
2017-01-20 19:10:59 +00:00
podLister * listers . StoreToPodLister
2016-07-08 12:48:38 +00:00
2016-10-04 17:11:07 +00:00
// dListerSynced returns true if the Deployment store has been synced at least once.
2016-07-08 12:48:38 +00:00
// Added as a member to the struct to allow injection for testing.
2016-10-07 19:06:57 +00:00
dListerSynced cache . InformerSynced
2016-10-04 17:11:07 +00:00
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
2016-07-08 12:48:38 +00:00
// Added as a member to the struct to allow injection for testing.
2016-10-07 19:06:57 +00:00
rsListerSynced cache . InformerSynced
2016-10-04 17:11:07 +00:00
// podListerSynced returns true if the pod store has been synced at least once.
2015-09-21 07:06:45 +00:00
// Added as a member to the struct to allow injection for testing.
2016-10-07 19:06:57 +00:00
podListerSynced cache . InformerSynced
2015-09-21 07:06:45 +00:00
// Deployments that need to be synced
2016-07-08 12:48:38 +00:00
queue workqueue . RateLimitingInterface
2016-11-10 16:59:30 +00:00
// Deployments that need to be checked for progress.
progressQueue workqueue . RateLimitingInterface
2015-09-03 00:02:22 +00:00
}
2015-11-18 23:12:11 +00:00
// NewDeploymentController creates a new DeploymentController.
2016-10-07 19:06:57 +00:00
func NewDeploymentController ( dInformer informers . DeploymentInformer , rsInformer informers . ReplicaSetInformer , podInformer informers . PodInformer , client clientset . Interface ) * DeploymentController {
2015-09-29 23:55:06 +00:00
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( glog . Infof )
2016-01-15 05:00:58 +00:00
// TODO: remove the wrapper when every clients have moved to use the clientset.
2016-11-18 20:50:17 +00:00
eventBroadcaster . StartRecordingToSink ( & v1core . EventSinkImpl { Interface : client . Core ( ) . Events ( "" ) } )
2015-09-29 23:55:06 +00:00
2016-10-13 12:56:07 +00:00
if client != nil && client . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "deployment_controller" , client . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) )
2016-04-13 18:38:32 +00:00
}
2015-09-21 07:06:45 +00:00
dc := & DeploymentController {
2016-02-28 02:13:32 +00:00
client : client ,
2016-11-18 20:50:17 +00:00
eventRecorder : eventBroadcaster . NewRecorder ( v1 . EventSource { Component : "deployment-controller" } ) ,
2016-08-22 18:15:45 +00:00
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "deployment" ) ,
2016-11-10 16:59:30 +00:00
progressQueue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "progress-check" ) ,
2015-09-03 00:02:22 +00:00
}
2016-10-27 06:58:20 +00:00
dc . rsControl = controller . RealRSControl {
KubeClient : client ,
Recorder : dc . eventRecorder ,
}
2015-09-21 07:06:45 +00:00
2016-10-07 19:06:57 +00:00
dInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
2016-11-10 16:59:30 +00:00
AddFunc : dc . addDeployment ,
UpdateFunc : dc . updateDeployment ,
2016-10-07 19:06:57 +00:00
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
2016-11-10 16:59:30 +00:00
DeleteFunc : dc . deleteDeployment ,
2016-10-07 19:06:57 +00:00
} )
rsInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : dc . addReplicaSet ,
UpdateFunc : dc . updateReplicaSet ,
DeleteFunc : dc . deleteReplicaSet ,
} )
2016-12-09 16:16:00 +00:00
podInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
DeleteFunc : dc . deletePod ,
} )
2015-09-21 07:06:45 +00:00
dc . syncHandler = dc . syncDeployment
2016-12-21 13:46:07 +00:00
dc . enqueueDeployment = dc . enqueue
2016-10-07 19:06:57 +00:00
dc . dLister = dInformer . Lister ( )
dc . rsLister = rsInformer . Lister ( )
dc . podLister = podInformer . Lister ( )
dc . dListerSynced = dInformer . Informer ( ) . HasSynced
dc . rsListerSynced = dInformer . Informer ( ) . HasSynced
dc . podListerSynced = dInformer . Informer ( ) . HasSynced
2015-09-21 07:06:45 +00:00
return dc
2015-09-03 00:02:22 +00:00
}
2015-11-18 23:12:11 +00:00
// Run begins watching and syncing.
func ( dc * DeploymentController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2016-10-07 19:06:57 +00:00
defer dc . queue . ShutDown ( )
2016-11-10 16:59:30 +00:00
defer dc . progressQueue . ShutDown ( )
2016-07-08 12:48:38 +00:00
2016-10-07 19:06:57 +00:00
glog . Infof ( "Starting deployment controller" )
2016-07-08 12:48:38 +00:00
2016-10-07 19:06:57 +00:00
if ! cache . WaitForCacheSync ( stopCh , dc . dListerSynced , dc . rsListerSynced , dc . podListerSynced ) {
2016-07-08 12:48:38 +00:00
return
}
2015-11-18 23:12:11 +00:00
for i := 0 ; i < workers ; i ++ {
2016-02-02 10:57:06 +00:00
go wait . Until ( dc . worker , time . Second , stopCh )
2015-11-18 23:12:11 +00:00
}
2016-11-10 16:59:30 +00:00
go wait . Until ( dc . progressWorker , time . Second , stopCh )
2016-07-08 12:48:38 +00:00
2015-11-18 23:12:11 +00:00
<- stopCh
glog . Infof ( "Shutting down deployment controller" )
2016-07-08 12:48:38 +00:00
}
2016-11-10 16:59:30 +00:00
func ( dc * DeploymentController ) addDeployment ( obj interface { } ) {
2016-03-01 02:13:01 +00:00
d := obj . ( * extensions . Deployment )
glog . V ( 4 ) . Infof ( "Adding deployment %s" , d . Name )
dc . enqueueDeployment ( d )
}
2016-11-10 16:59:30 +00:00
func ( dc * DeploymentController ) updateDeployment ( old , cur interface { } ) {
2016-03-01 02:13:01 +00:00
oldD := old . ( * extensions . Deployment )
2016-12-05 02:34:51 +00:00
curD := cur . ( * extensions . Deployment )
2016-03-01 02:13:01 +00:00
glog . V ( 4 ) . Infof ( "Updating deployment %s" , oldD . Name )
2016-12-05 02:34:51 +00:00
dc . enqueueDeployment ( curD )
// If the selector of the current deployment just changed, we need to requeue any old
// overlapping deployments. If the new selector steps on another deployment, the current
// deployment will get denied during the resync loop.
if ! reflect . DeepEqual ( curD . Spec . Selector , oldD . Spec . Selector ) {
deployments , err := dc . dLister . Deployments ( curD . Namespace ) . List ( labels . Everything ( ) )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "error listing deployments in namespace %s: %v" , curD . Namespace , err ) )
return
}
// Trigger cleanup of any old overlapping deployments; we don't care about any error
// returned here.
for i := range deployments {
otherD := deployments [ i ]
oldOverlaps , oldErr := util . OverlapsWith ( oldD , otherD )
curOverlaps , curErr := util . OverlapsWith ( curD , otherD )
// Enqueue otherD so it gets cleaned up
if oldErr == nil && curErr == nil && oldOverlaps && ! curOverlaps {
dc . enqueueDeployment ( otherD )
}
}
}
2016-03-01 02:13:01 +00:00
}
2016-11-10 16:59:30 +00:00
func ( dc * DeploymentController ) deleteDeployment ( obj interface { } ) {
2016-03-01 02:13:01 +00:00
d , ok := obj . ( * extensions . Deployment )
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2016-12-09 16:16:00 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get object from tombstone %#v" , obj ) )
2016-03-01 02:13:01 +00:00
return
}
d , ok = tombstone . Obj . ( * extensions . Deployment )
if ! ok {
2016-12-09 16:16:00 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Tombstone contained object that is not a Deployment %#v" , obj ) )
2016-03-01 02:13:01 +00:00
return
}
}
glog . V ( 4 ) . Infof ( "Deleting deployment %s" , d . Name )
dc . enqueueDeployment ( d )
2016-12-05 02:34:51 +00:00
deployments , err := dc . dLister . Deployments ( d . Namespace ) . List ( labels . Everything ( ) )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "error listing deployments in namespace %s: %v" , d . Namespace , err ) )
return
}
// Trigger cleanup of any old overlapping deployments; we don't care about any error
// returned here.
for i := range deployments {
otherD := deployments [ i ]
overlaps , err := util . OverlapsWith ( d , otherD )
// Enqueue otherD so it gets cleaned up
if err == nil && overlaps {
dc . enqueueDeployment ( otherD )
}
}
2016-03-01 02:13:01 +00:00
}
2016-01-20 00:40:18 +00:00
// addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
func ( dc * DeploymentController ) addReplicaSet ( obj interface { } ) {
rs := obj . ( * extensions . ReplicaSet )
glog . V ( 4 ) . Infof ( "ReplicaSet %s added." , rs . Name )
if d := dc . getDeploymentForReplicaSet ( rs ) ; d != nil {
2015-09-21 07:06:45 +00:00
dc . enqueueDeployment ( d )
}
2015-09-03 00:02:22 +00:00
}
2016-01-20 00:40:18 +00:00
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
func ( dc * DeploymentController ) getDeploymentForReplicaSet ( rs * extensions . ReplicaSet ) * extensions . Deployment {
2016-10-04 17:11:07 +00:00
deployments , err := dc . dLister . GetDeploymentsForReplicaSet ( rs )
2015-11-18 23:12:11 +00:00
if err != nil || len ( deployments ) == 0 {
2016-01-20 00:40:18 +00:00
glog . V ( 4 ) . Infof ( "Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing." , err , rs . Name )
2015-09-21 07:06:45 +00:00
return nil
}
2016-01-20 00:40:18 +00:00
// Because all ReplicaSet's belonging to a deployment should have a unique label key,
2015-09-21 07:06:45 +00:00
// there should never be more than one deployment returned by the above method.
// If that happens we should probably dynamically repair the situation by ultimately
2016-08-17 01:47:15 +00:00
// trying to clean up one of the controllers, for now we just return the older one
if len ( deployments ) > 1 {
sort . Sort ( util . BySelectorLastUpdateTime ( deployments ) )
2016-12-09 16:16:00 +00:00
glog . V ( 4 ) . Infof ( "user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s" ,
rs . Namespace , rs . Name , rs . Labels , deployments [ 0 ] . Namespace , deployments [ 0 ] . Name )
2016-08-17 01:47:15 +00:00
}
2016-10-04 17:11:07 +00:00
return deployments [ 0 ]
2015-09-21 07:06:45 +00:00
}
2016-01-20 00:40:18 +00:00
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
// is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
// awaken both the old and new deployments. old and cur must be *extensions.ReplicaSet
// types.
func ( dc * DeploymentController ) updateReplicaSet ( old , cur interface { } ) {
2016-08-09 13:57:21 +00:00
curRS := cur . ( * extensions . ReplicaSet )
oldRS := old . ( * extensions . ReplicaSet )
if curRS . ResourceVersion == oldRS . ResourceVersion {
// Periodic resync will send update events for all known replica sets.
// Two different versions of the same replica set will always have different RVs.
2015-09-21 07:06:45 +00:00
return
2015-09-03 00:02:22 +00:00
}
2015-09-21 07:06:45 +00:00
// TODO: Write a unittest for this case
2016-01-20 00:40:18 +00:00
glog . V ( 4 ) . Infof ( "ReplicaSet %s updated." , curRS . Name )
if d := dc . getDeploymentForReplicaSet ( curRS ) ; d != nil {
2015-09-21 07:06:45 +00:00
dc . enqueueDeployment ( d )
}
// A number of things could affect the old deployment: labels changing,
// pod template changing, etc.
2016-11-19 23:32:10 +00:00
if ! api . Semantic . DeepEqual ( oldRS , curRS ) {
2016-01-20 00:40:18 +00:00
if oldD := dc . getDeploymentForReplicaSet ( oldRS ) ; oldD != nil {
2015-09-21 07:06:45 +00:00
dc . enqueueDeployment ( oldD )
}
}
}
2016-01-20 00:40:18 +00:00
// deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
// the ReplicaSet is deleted. obj could be an *extensions.ReplicaSet, or
// a DeletionFinalStateUnknown marker item.
func ( dc * DeploymentController ) deleteReplicaSet ( obj interface { } ) {
rs , ok := obj . ( * extensions . ReplicaSet )
2015-09-21 07:06:45 +00:00
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
2016-01-20 00:40:18 +00:00
// the deleted key/value. Note that this value might be stale. If the ReplicaSet
2015-09-21 07:06:45 +00:00
// changed labels the new deployment will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2016-12-09 16:16:00 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets" , obj , FullDeploymentResyncPeriod ) )
2015-09-21 07:06:45 +00:00
return
}
2016-01-20 00:40:18 +00:00
rs , ok = tombstone . Obj . ( * extensions . ReplicaSet )
2015-09-21 07:06:45 +00:00
if ! ok {
2016-12-09 16:16:00 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets" , obj , FullDeploymentResyncPeriod ) )
2015-09-21 07:06:45 +00:00
return
2015-09-03 00:02:22 +00:00
}
}
2016-01-20 00:40:18 +00:00
glog . V ( 4 ) . Infof ( "ReplicaSet %s deleted." , rs . Name )
if d := dc . getDeploymentForReplicaSet ( rs ) ; d != nil {
2015-09-21 07:06:45 +00:00
dc . enqueueDeployment ( d )
}
}
2016-12-09 16:16:00 +00:00
// deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
func ( dc * DeploymentController ) deletePod ( obj interface { } ) {
pod , ok := obj . ( * v1 . Pod )
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the Pod
// changed labels the new deployment will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates pod" , obj , FullDeploymentResyncPeriod ) )
return
}
pod , ok = tombstone . Obj . ( * v1 . Pod )
if ! ok {
utilruntime . HandleError ( fmt . Errorf ( "Tombstone contained object that is not a pod %#v, could take up to %v before a deployment recreates/updates pods" , obj , FullDeploymentResyncPeriod ) )
return
}
}
if d := dc . getDeploymentForPod ( pod ) ; d != nil && d . Spec . Strategy . Type == extensions . RecreateDeploymentStrategyType {
podList , err := dc . listPods ( d )
if err == nil && len ( podList . Items ) == 0 {
dc . enqueueDeployment ( d )
}
}
}
2016-12-21 13:46:07 +00:00
func ( dc * DeploymentController ) enqueue ( deployment * extensions . Deployment ) {
2016-02-28 02:13:32 +00:00
key , err := controller . KeyFunc ( deployment )
2015-09-21 07:06:45 +00:00
if err != nil {
2016-12-09 16:16:00 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %#v: %v" , deployment , err ) )
2015-09-21 07:06:45 +00:00
return
}
dc . queue . Add ( key )
2015-09-03 00:02:22 +00:00
}
2016-12-09 16:16:00 +00:00
// checkProgressAfter will enqueue a deployment after the provided amount of time in a secondary queue.
2016-11-10 16:59:30 +00:00
// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued
// back to the main queue iff it has failed progressing.
2016-12-09 16:16:00 +00:00
func ( dc * DeploymentController ) checkProgressAfter ( deployment * extensions . Deployment , after time . Duration ) {
2016-11-10 16:59:30 +00:00
key , err := controller . KeyFunc ( deployment )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %#v: %v" , deployment , err ) )
return
}
dc . progressQueue . AddAfter ( key , after )
}
2016-12-09 16:16:00 +00:00
// getDeploymentForPod returns the deployment managing the given Pod.
func ( dc * DeploymentController ) getDeploymentForPod ( pod * v1 . Pod ) * extensions . Deployment {
// Find the owning replica set
var rs * extensions . ReplicaSet
var err error
// Look at the owner reference
2016-12-27 14:45:13 +00:00
controllerRef := controller . GetControllerOf ( & pod . ObjectMeta )
2016-12-09 16:16:00 +00:00
if controllerRef != nil {
// Not a pod owned by a replica set.
if controllerRef . Kind != extensions . SchemeGroupVersion . WithKind ( "ReplicaSet" ) . Kind {
return nil
}
rs , err = dc . rsLister . ReplicaSets ( pod . Namespace ) . Get ( controllerRef . Name )
if err != nil {
glog . V ( 4 ) . Infof ( "Cannot get replicaset %q for pod %q: %v" , controllerRef . Name , pod . Name , err )
return nil
}
} else {
// Fallback to listing replica sets.
rss , err := dc . rsLister . GetPodReplicaSets ( pod )
if err != nil {
glog . V ( 4 ) . Infof ( "Cannot list replica sets for pod %q: %v" , pod . Name , err )
return nil
}
// TODO: Handle multiple replica sets gracefully
// For now we return the oldest replica set.
if len ( rss ) > 1 {
utilruntime . HandleError ( fmt . Errorf ( "more than one ReplicaSet is selecting pod %q with labels: %+v" , pod . Name , pod . Labels ) )
sort . Sort ( controller . ReplicaSetsByCreationTimestamp ( rss ) )
}
rs = rss [ 0 ]
}
return dc . getDeploymentForReplicaSet ( rs )
}
2015-09-21 07:06:45 +00:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func ( dc * DeploymentController ) worker ( ) {
2016-11-10 16:59:30 +00:00
for dc . processNextWorkItem ( ) {
}
}
2016-07-08 12:48:38 +00:00
2016-11-10 16:59:30 +00:00
func ( dc * DeploymentController ) processNextWorkItem ( ) bool {
key , quit := dc . queue . Get ( )
if quit {
2016-07-08 12:48:38 +00:00
return false
}
2016-11-10 16:59:30 +00:00
defer dc . queue . Done ( key )
2016-07-08 12:48:38 +00:00
2016-11-10 16:59:30 +00:00
err := dc . syncHandler ( key . ( string ) )
dc . handleErr ( err , key )
return true
2016-07-08 12:48:38 +00:00
}
func ( dc * DeploymentController ) handleErr ( err error , key interface { } ) {
if err == nil {
dc . queue . Forget ( key )
return
}
if dc . queue . NumRequeues ( key ) < MaxRetries {
glog . V ( 2 ) . Infof ( "Error syncing deployment %v: %v" , key , err )
dc . queue . AddRateLimited ( key )
return
2015-09-21 07:06:45 +00:00
}
2016-07-08 12:48:38 +00:00
utilruntime . HandleError ( err )
2016-11-10 14:45:56 +00:00
glog . V ( 2 ) . Infof ( "Dropping deployment %q out of the queue: %v" , key , err )
2016-07-08 12:48:38 +00:00
dc . queue . Forget ( key )
2015-09-21 07:06:45 +00:00
}
2016-10-27 06:58:20 +00:00
// classifyReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets
// and adopts them if their labels match the Deployment but are missing the reference.
// It also removes the controllerRef for ReplicaSets, whose labels no longer matches
// the deployment.
func ( dc * DeploymentController ) classifyReplicaSets ( deployment * extensions . Deployment ) error {
rsList , err := dc . rsLister . ReplicaSets ( deployment . Namespace ) . List ( labels . Everything ( ) )
if err != nil {
return err
}
deploymentSelector , err := metav1 . LabelSelectorAsSelector ( deployment . Spec . Selector )
if err != nil {
return fmt . Errorf ( "deployment %s/%s has invalid label selector: %v" , deployment . Namespace , deployment . Name , err )
}
cm := controller . NewReplicaSetControllerRefManager ( dc . rsControl , deployment . ObjectMeta , deploymentSelector , getDeploymentKind ( ) )
matchesAndControlled , matchesNeedsController , controlledDoesNotMatch := cm . Classify ( rsList )
// Adopt replica sets only if this deployment is not going to be deleted.
if deployment . DeletionTimestamp == nil {
for _ , replicaSet := range matchesNeedsController {
err := cm . AdoptReplicaSet ( replicaSet )
// continue to next RS if adoption fails.
if err != nil {
// If the RS no longer exists, don't even log the error.
if ! errors . IsNotFound ( err ) {
utilruntime . HandleError ( err )
}
} else {
matchesAndControlled = append ( matchesAndControlled , replicaSet )
}
}
}
// remove the controllerRef for the RS that no longer have matching labels
var errlist [ ] error
for _ , replicaSet := range controlledDoesNotMatch {
err := cm . ReleaseReplicaSet ( replicaSet )
if err != nil {
errlist = append ( errlist , err )
}
}
return utilerrors . NewAggregate ( errlist )
}
2015-11-18 23:12:11 +00:00
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
2015-09-21 07:06:45 +00:00
func ( dc * DeploymentController ) syncDeployment ( key string ) error {
startTime := time . Now ( )
2016-11-10 16:59:30 +00:00
glog . V ( 4 ) . Infof ( "Started syncing deployment %q (%v)" , key , startTime )
2015-09-21 07:06:45 +00:00
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing deployment %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-10-04 17:11:07 +00:00
obj , exists , err := dc . dLister . Indexer . GetByKey ( key )
2015-09-21 07:06:45 +00:00
if err != nil {
2016-12-09 16:16:00 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to retrieve deployment %v from store: %v" , key , err ) )
2015-09-21 07:06:45 +00:00
return err
}
2015-11-18 23:12:11 +00:00
if ! exists {
glog . Infof ( "Deployment has been deleted %v" , key )
return nil
}
2016-02-25 06:40:14 +00:00
2016-07-08 12:48:38 +00:00
deployment := obj . ( * extensions . Deployment )
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d , err := util . DeploymentDeepCopy ( deployment )
if err != nil {
return err
}
2016-12-03 18:57:26 +00:00
everything := metav1 . LabelSelector { }
2016-10-14 12:40:37 +00:00
if reflect . DeepEqual ( d . Spec . Selector , & everything ) {
2016-11-18 20:50:17 +00:00
dc . eventRecorder . Eventf ( d , v1 . EventTypeWarning , "SelectingAll" , "This deployment is selecting all pods. A non-empty selector is required." )
2016-10-14 12:40:37 +00:00
if d . Status . ObservedGeneration < d . Generation {
d . Status . ObservedGeneration = d . Generation
dc . client . Extensions ( ) . Deployments ( d . Namespace ) . UpdateStatus ( d )
}
return nil
}
2016-12-05 02:34:51 +00:00
deployments , err := dc . dLister . Deployments ( d . Namespace ) . List ( labels . Everything ( ) )
if err != nil {
return fmt . Errorf ( "error listing deployments in namespace %s: %v" , d . Namespace , err )
2016-06-17 11:46:43 +00:00
}
2016-08-17 01:47:15 +00:00
// Handle overlapping deployments by deterministically avoid syncing deployments that fight over ReplicaSets.
2016-12-05 02:34:51 +00:00
overlaps , err := dc . handleOverlap ( d , deployments )
if err != nil {
if overlaps {
// Emit an event and return a nil error for overlapping deployments so we won't resync them again.
dc . eventRecorder . Eventf ( d , v1 . EventTypeWarning , "SelectorOverlap" , err . Error ( ) )
return nil
}
// For any other failure, we should retry the deployment.
return err
}
if d . DeletionTimestamp != nil {
return dc . syncStatusOnly ( d )
2016-08-17 01:47:15 +00:00
}
2017-01-18 13:35:30 +00:00
// Why run the cleanup policy only when there is no rollback request?
// The thing with the cleanup policy currently is that it is far from smart because it takes into account
// the latest replica sets while it should instead retain the latest *working* replica sets. This means that
// you can have a cleanup policy of 1 but your last known working replica set may be 2 or 3 versions back
// in the history.
// Eventually we will want to find a way to recognize replica sets that have worked at some point in time
// (and chances are higher that they will work again as opposed to others that didn't) for candidates to
// automatically roll back to (#23211) and the cleanup policy should help.
if d . Spec . RollbackTo == nil {
_ , oldRSs , err := dc . getAllReplicaSetsAndSyncRevision ( d , false )
if err != nil {
return err
}
// So far the cleanup policy was executed once a deployment was paused, scaled up/down, or it
// succesfully completed deploying a replica set. Decouple it from the strategies and have it
// run almost unconditionally - cleanupDeployment is safe by default.
dc . cleanupDeployment ( oldRSs , d )
}
err = dc . classifyReplicaSets ( d )
2016-10-27 06:58:20 +00:00
if err != nil {
return err
}
2016-09-15 15:57:53 +00:00
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc . checkPausedConditions ( d ) ; err != nil {
return err
}
_ , err = dc . hasFailed ( d )
if err != nil {
return err
}
// TODO: Automatically rollback here if we failed above. Locate the last complete
// revision and populate the rollback spec with it.
// See https://github.com/kubernetes/kubernetes/issues/23211.
2016-01-21 10:12:58 +00:00
if d . Spec . Paused {
2016-01-28 16:35:14 +00:00
return dc . sync ( d )
2016-01-21 10:12:58 +00:00
}
2016-01-28 16:35:14 +00:00
2016-01-15 02:04:05 +00:00
if d . Spec . RollbackTo != nil {
revision := d . Spec . RollbackTo . Revision
2016-08-25 18:38:07 +00:00
if d , err = dc . rollback ( d , & revision ) ; err != nil {
2016-01-15 02:04:05 +00:00
return err
}
}
2016-06-28 11:31:03 +00:00
scalingEvent , err := dc . isScalingEvent ( d )
if err != nil {
return err
}
if scalingEvent {
2016-01-28 16:35:14 +00:00
return dc . sync ( d )
}
2015-09-21 07:06:45 +00:00
switch d . Spec . Strategy . Type {
2015-10-09 22:49:10 +00:00
case extensions . RecreateDeploymentStrategyType :
2016-01-28 16:35:14 +00:00
return dc . rolloutRecreate ( d )
2015-10-09 22:49:10 +00:00
case extensions . RollingUpdateDeploymentStrategyType :
2016-01-28 16:35:14 +00:00
return dc . rolloutRolling ( d )
2015-09-17 19:41:06 +00:00
}
2015-11-18 23:12:11 +00:00
return fmt . Errorf ( "unexpected deployment strategy type: %s" , d . Spec . Strategy . Type )
2015-09-17 19:41:06 +00:00
}
2016-08-17 01:47:15 +00:00
2016-12-05 02:34:51 +00:00
// handleOverlap will avoid syncing the newer overlapping ones (only sync the oldest one). New/old is
// determined by when the deployment's selector is last updated.
func ( dc * DeploymentController ) handleOverlap ( d * extensions . Deployment , deployments [ ] * extensions . Deployment ) ( bool , error ) {
2016-08-17 01:47:15 +00:00
overlapping := false
2016-12-05 02:34:51 +00:00
var errs [ ] error
for i := range deployments {
otherD := deployments [ i ]
if d . Name == otherD . Name {
continue
2016-11-02 21:43:23 +00:00
}
2016-12-05 02:34:51 +00:00
// Error is already checked during validation
foundOverlaps , _ := util . OverlapsWith ( d , otherD )
// If the otherD deployment overlaps with the current we need to identify which one
// holds the set longer and mark the other as overlapping. Requeue the overlapping
// deployments if this one has been marked deleted, we only update its status as long
// as it is not actually deleted.
if foundOverlaps && d . DeletionTimestamp == nil {
2017-01-04 15:44:13 +00:00
overlapping = true
// Look at the overlapping annotation in both deployments. If one of them has it and points
// to the other one then we don't need to compare their timestamps.
otherOverlapsWith := otherD . Annotations [ util . OverlapAnnotation ]
currentOverlapsWith := d . Annotations [ util . OverlapAnnotation ]
// The other deployment is already marked as overlapping with the current one.
if otherOverlapsWith == d . Name {
var err error
if d , err = dc . clearDeploymentOverlap ( d , otherD . Name ) ; err != nil {
errs = append ( errs , err )
}
continue
}
2016-12-05 02:34:51 +00:00
otherCopy , err := util . DeploymentDeepCopy ( otherD )
2016-10-04 17:11:07 +00:00
if err != nil {
2016-12-05 02:34:51 +00:00
return false , err
2016-10-04 17:11:07 +00:00
}
2016-12-05 02:34:51 +00:00
2016-10-14 12:40:37 +00:00
// Skip syncing this one if older overlapping one is found.
2017-01-04 15:44:13 +00:00
if currentOverlapsWith == otherCopy . Name || util . SelectorUpdatedBefore ( otherCopy , d ) {
2016-12-05 02:34:51 +00:00
if _ , err = dc . markDeploymentOverlap ( d , otherCopy . Name ) ; err != nil {
return false , err
}
if _ , err = dc . clearDeploymentOverlap ( otherCopy , d . Name ) ; err != nil {
return false , err
}
return true , fmt . Errorf ( "deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it" , d . Namespace , d . Name , otherCopy . Namespace , otherCopy . Name )
}
// TODO: We need to support annotations in deployments that overlap with multiple other
// deployments.
if _ , err = dc . markDeploymentOverlap ( otherCopy , d . Name ) ; err != nil {
errs = append ( errs , err )
}
// This is going to get some deployments into update hotlooping if we remove the overlapping
// annotation unconditionally.
//
// Scenario:
// --> Deployment foo with label selector A=A is created.
// --> Deployment bar with label selector A=A,B=B is created. Marked as overlapping since it
// overlaps with foo.
// --> Deployment baz with label selector B=B is created. Marked as overlapping, since it
// overlaps with bar, bar overlapping annotation is cleaned up. Next sync loop marks bar
// as overlapping and it gets in an update hotloop.
if d , err = dc . clearDeploymentOverlap ( d , otherCopy . Name ) ; err != nil {
errs = append ( errs , err )
}
continue
}
// If the otherD deployment does not overlap with the current deployment *anymore*
// we need to cleanup otherD from the overlapping annotation so it can be synced by
// the deployment controller.
dName , hasOverlappingAnnotation := otherD . Annotations [ util . OverlapAnnotation ]
if hasOverlappingAnnotation && dName == d . Name {
otherCopy , err := util . DeploymentDeepCopy ( otherD )
if err != nil {
return false , err
}
if _ , err = dc . clearDeploymentOverlap ( otherCopy , d . Name ) ; err != nil {
errs = append ( errs , err )
2016-08-17 01:47:15 +00:00
}
}
}
2016-12-05 02:34:51 +00:00
2016-08-17 01:47:15 +00:00
if ! overlapping {
2016-12-05 02:34:51 +00:00
var err error
if d , err = dc . clearDeploymentOverlap ( d , "" ) ; err != nil {
errs = append ( errs , err )
}
2016-08-17 01:47:15 +00:00
}
2016-12-05 02:34:51 +00:00
return false , utilerrors . NewAggregate ( errs )
2016-08-17 01:47:15 +00:00
}
2016-10-14 12:40:37 +00:00
func ( dc * DeploymentController ) markDeploymentOverlap ( deployment * extensions . Deployment , withDeployment string ) ( * extensions . Deployment , error ) {
if deployment . Annotations [ util . OverlapAnnotation ] == withDeployment && deployment . Status . ObservedGeneration >= deployment . Generation {
return deployment , nil
}
if deployment . Annotations == nil {
deployment . Annotations = make ( map [ string ] string )
}
// Update observedGeneration for overlapping deployments so that their deletion won't be blocked.
deployment . Status . ObservedGeneration = deployment . Generation
deployment . Annotations [ util . OverlapAnnotation ] = withDeployment
return dc . client . Extensions ( ) . Deployments ( deployment . Namespace ) . UpdateStatus ( deployment )
}
2016-12-05 02:34:51 +00:00
func ( dc * DeploymentController ) clearDeploymentOverlap ( deployment * extensions . Deployment , otherName string ) ( * extensions . Deployment , error ) {
overlapsWith := deployment . Annotations [ util . OverlapAnnotation ]
if len ( overlapsWith ) == 0 {
return deployment , nil
}
// This is not the deployment found in the annotation - do not remove the annotation.
if len ( otherName ) > 0 && otherName != overlapsWith {
2016-10-14 12:40:37 +00:00
return deployment , nil
}
delete ( deployment . Annotations , util . OverlapAnnotation )
return dc . client . Extensions ( ) . Deployments ( deployment . Namespace ) . UpdateStatus ( deployment )
}
2016-11-10 16:59:30 +00:00
// progressWorker runs a worker thread that pops items out of a secondary queue, checks if they
// have failed progressing and if so it adds them back to the main queue.
func ( dc * DeploymentController ) progressWorker ( ) {
for dc . checkNextItemForProgress ( ) {
}
}
// checkNextItemForProgress checks if a deployment has failed progressing and if so it adds it back
// to the main queue.
func ( dc * DeploymentController ) checkNextItemForProgress ( ) bool {
key , quit := dc . progressQueue . Get ( )
if quit {
return false
}
defer dc . progressQueue . Done ( key )
needsResync , err := dc . checkForProgress ( key . ( string ) )
if err != nil {
utilruntime . HandleError ( err )
}
if err == nil && needsResync {
dc . queue . AddRateLimited ( key )
}
dc . progressQueue . Forget ( key )
return true
}
// checkForProgress checks the progress for the provided deployment. Meant to be called
// by the progressWorker and work on items synced in a secondary queue.
func ( dc * DeploymentController ) checkForProgress ( key string ) ( bool , error ) {
obj , exists , err := dc . dLister . Indexer . GetByKey ( key )
if err != nil {
glog . V ( 2 ) . Infof ( "Cannot retrieve deployment %q found in the secondary queue: %#v" , key , err )
return false , err
}
if ! exists {
return false , nil
}
deployment := obj . ( * extensions . Deployment )
cond := util . GetDeploymentCondition ( deployment . Status , extensions . DeploymentProgressing )
// Already marked with a terminal reason - no need to add it back to the main queue.
if cond != nil && ( cond . Reason == util . TimedOutReason || cond . Reason == util . NewRSAvailableReason ) {
return false , nil
}
// Deep-copy otherwise we may mutate our cache.
// TODO: Remove deep-copying from here. This worker does not need to sync the annotations
// in the deployment.
d , err := util . DeploymentDeepCopy ( deployment )
if err != nil {
return false , err
}
return dc . hasFailed ( d )
}