2014-06-06 23:40:48 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2014 The Kubernetes Authors .
2014-06-06 23:40:48 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2014-06-17 21:49:44 +00:00
2016-01-17 22:26:25 +00:00
// If you make changes to this file, you should also make the corresponding change in ReplicaSet.
2015-10-10 03:58:57 +00:00
package replication
2014-06-06 23:40:48 +00:00
import (
2015-04-21 20:40:35 +00:00
"reflect"
2015-04-17 00:37:57 +00:00
"sort"
2014-07-25 04:55:56 +00:00
"sync"
2014-06-06 23:40:48 +00:00
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2016-06-10 23:28:42 +00:00
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
2016-12-05 03:31:04 +00:00
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
2015-09-03 21:40:58 +00:00
"k8s.io/kubernetes/pkg/client/cache"
2016-12-14 01:18:17 +00:00
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
2015-09-03 21:40:58 +00:00
"k8s.io/kubernetes/pkg/client/record"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/controller"
2016-09-14 18:35:38 +00:00
"k8s.io/kubernetes/pkg/controller/informers"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
2016-11-21 02:55:31 +00:00
"k8s.io/kubernetes/pkg/runtime/schema"
2016-05-13 08:30:45 +00:00
"k8s.io/kubernetes/pkg/util"
2016-06-10 23:28:42 +00:00
utilerrors "k8s.io/kubernetes/pkg/util/errors"
2016-04-13 18:38:32 +00:00
"k8s.io/kubernetes/pkg/util/metrics"
2016-01-15 07:32:10 +00:00
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
2016-02-02 10:57:06 +00:00
"k8s.io/kubernetes/pkg/util/wait"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
2014-06-06 23:40:48 +00:00
)
2015-04-21 20:40:35 +00:00
const (
// We'll attempt to recompute the required replicas of all replication controllers
2015-08-31 08:42:34 +00:00
// that have fulfilled their expectations at least this often. This recomputation
2015-04-30 17:58:18 +00:00
// happens based on contents in local pod storage.
2016-05-16 06:52:01 +00:00
// Full Resync shouldn't be needed at all in a healthy system. This is a protection
// against disappearing objects and watch notification, that we believe should not
// happen at all.
// TODO: We should get rid of it completely in the fullness of time.
FullControllerResyncPeriod = 10 * time . Minute
2015-04-21 20:40:35 +00:00
2015-05-06 21:39:14 +00:00
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
2015-06-19 20:35:19 +00:00
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time . Millisecond
2015-07-28 01:21:37 +00:00
// The number of times we retry updating a replication controller's status.
statusUpdateRetries = 1
2015-04-21 20:40:35 +00:00
)
2016-11-21 02:55:31 +00:00
func getRCKind ( ) schema . GroupVersionKind {
2016-06-10 23:28:42 +00:00
return v1 . SchemeGroupVersion . WithKind ( "ReplicationController" )
}
2014-08-04 03:27:38 +00:00
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
2015-07-31 11:38:04 +00:00
// TODO: this really should be called ReplicationController. The only reason why it's a Manager
// is to distinguish this type from API object "ReplicationController". We should fix this.
2014-06-06 23:40:48 +00:00
type ReplicationManager struct {
2016-01-15 05:00:58 +00:00
kubeClient clientset . Interface
2015-07-28 01:21:37 +00:00
podControl controller . PodControlInterface
2014-06-18 20:10:19 +00:00
2016-04-14 18:00:52 +00:00
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
2016-09-14 18:35:38 +00:00
internalPodInformer cache . SharedIndexInformer
2016-04-14 18:00:52 +00:00
2015-05-06 21:39:14 +00:00
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
2014-06-18 20:10:19 +00:00
// To allow injection of syncReplicationController for testing.
2015-04-21 20:40:35 +00:00
syncHandler func ( rcKey string ) error
2015-06-19 20:35:19 +00:00
2016-03-05 00:51:01 +00:00
// A TTLCache of pod creates/deletes each rc expects to see.
expectations * controller . UIDTrackingControllerExpectations
2015-07-28 01:21:37 +00:00
2015-07-27 22:41:00 +00:00
// A store of replication controllers, populated by the rcController
rcStore cache . StoreToReplicationControllerLister
2015-04-21 20:40:35 +00:00
// Watches changes to all replication controllers
2016-09-14 18:35:38 +00:00
rcController * cache . Controller
2015-09-19 01:52:50 +00:00
// A store of pods, populated by the podController
podStore cache . StoreToPodLister
2015-04-21 20:40:35 +00:00
// Watches changes to all pods
2016-09-14 18:35:38 +00:00
podController cache . ControllerInterface
2015-09-19 01:52:50 +00:00
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func ( ) bool
2016-02-23 15:17:27 +00:00
lookupCache * controller . MatchingCache
2015-09-19 01:52:50 +00:00
// Controllers that need to be synced
2016-08-15 14:36:46 +00:00
queue workqueue . RateLimitingInterface
2016-06-10 23:28:42 +00:00
// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
// manager behaves differently if GC is enabled.
garbageCollectorEnabled bool
2014-06-06 23:40:48 +00:00
}
2016-05-10 19:58:49 +00:00
// NewReplicationManager creates a replication manager
2016-09-14 18:35:38 +00:00
func NewReplicationManager ( podInformer cache . SharedIndexInformer , kubeClient clientset . Interface , resyncPeriod controller . ResyncPeriodFunc , burstReplicas int , lookupCacheSize int , garbageCollectorEnabled bool ) * ReplicationManager {
2015-04-14 19:42:49 +00:00
eventBroadcaster := record . NewBroadcaster ( )
2015-06-03 06:51:32 +00:00
eventBroadcaster . StartLogging ( glog . Infof )
2016-11-18 20:50:17 +00:00
eventBroadcaster . StartRecordingToSink ( & v1core . EventSinkImpl { Interface : kubeClient . Core ( ) . Events ( "" ) } )
2016-06-10 23:28:42 +00:00
return newReplicationManager (
2016-11-18 20:50:17 +00:00
eventBroadcaster . NewRecorder ( v1 . EventSource { Component : "replication-controller" } ) ,
2016-06-10 23:28:42 +00:00
podInformer , kubeClient , resyncPeriod , burstReplicas , lookupCacheSize , garbageCollectorEnabled )
2016-05-10 19:58:49 +00:00
}
2015-04-14 19:42:49 +00:00
2016-06-10 23:28:42 +00:00
// newReplicationManager configures a replication manager with the specified event recorder
2016-09-14 18:35:38 +00:00
func newReplicationManager ( eventRecorder record . EventRecorder , podInformer cache . SharedIndexInformer , kubeClient clientset . Interface , resyncPeriod controller . ResyncPeriodFunc , burstReplicas int , lookupCacheSize int , garbageCollectorEnabled bool ) * ReplicationManager {
2016-10-13 12:56:07 +00:00
if kubeClient != nil && kubeClient . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "replication_controller" , kubeClient . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) )
2016-04-13 18:38:32 +00:00
}
2014-06-18 20:10:19 +00:00
rm := & ReplicationManager {
2014-06-06 23:40:48 +00:00
kubeClient : kubeClient ,
2015-07-28 01:21:37 +00:00
podControl : controller . RealPodControl {
KubeClient : kubeClient ,
2016-05-10 19:58:49 +00:00
Recorder : eventRecorder ,
2014-06-06 23:40:48 +00:00
} ,
2015-05-06 21:39:14 +00:00
burstReplicas : burstReplicas ,
2016-03-05 00:51:01 +00:00
expectations : controller . NewUIDTrackingControllerExpectations ( controller . NewControllerExpectations ( ) ) ,
2016-08-22 18:15:45 +00:00
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "replicationmanager" ) ,
2016-06-10 23:28:42 +00:00
garbageCollectorEnabled : garbageCollectorEnabled ,
2014-06-06 23:40:48 +00:00
}
2015-04-14 19:42:49 +00:00
2016-09-14 18:35:38 +00:00
rm . rcStore . Indexer , rm . rcController = cache . NewIndexerInformer (
2015-04-21 20:40:35 +00:00
& cache . ListWatch {
2016-11-18 20:50:17 +00:00
ListFunc : func ( options v1 . ListOptions ) ( runtime . Object , error ) {
return rm . kubeClient . Core ( ) . ReplicationControllers ( v1 . NamespaceAll ) . List ( options )
2015-04-21 20:40:35 +00:00
} ,
2016-11-18 20:50:17 +00:00
WatchFunc : func ( options v1 . ListOptions ) ( watch . Interface , error ) {
return rm . kubeClient . Core ( ) . ReplicationControllers ( v1 . NamespaceAll ) . Watch ( options )
2015-04-21 20:40:35 +00:00
} ,
} ,
2016-11-18 20:50:17 +00:00
& v1 . ReplicationController { } ,
2015-10-06 09:12:00 +00:00
// TODO: Can we have much longer period here?
2015-04-21 20:40:35 +00:00
FullControllerResyncPeriod ,
2016-09-14 18:35:38 +00:00
cache . ResourceEventHandlerFuncs {
2016-06-10 23:28:42 +00:00
AddFunc : rm . enqueueController ,
UpdateFunc : rm . updateRC ,
2015-08-08 21:29:57 +00:00
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
2015-05-21 21:10:25 +00:00
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
2015-04-21 20:40:35 +00:00
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc : rm . enqueueController ,
} ,
2016-04-07 12:15:21 +00:00
cache . Indexers { cache . NamespaceIndex : cache . MetaNamespaceIndexFunc } ,
2015-04-21 20:40:35 +00:00
)
2016-09-14 18:35:38 +00:00
podInformer . AddEventHandler ( cache . ResourceEventHandlerFuncs {
2016-04-14 18:00:52 +00:00
AddFunc : rm . addPod ,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc : rm . updatePod ,
DeleteFunc : rm . deletePod ,
} )
2016-04-07 12:15:21 +00:00
rm . podStore . Indexer = podInformer . GetIndexer ( )
2016-04-14 18:00:52 +00:00
rm . podController = podInformer . GetController ( )
2015-04-21 20:40:35 +00:00
2014-07-20 19:00:52 +00:00
rm . syncHandler = rm . syncReplicationController
2015-06-19 20:35:19 +00:00
rm . podStoreSynced = rm . podController . HasSynced
2016-02-24 12:19:30 +00:00
rm . lookupCache = controller . NewMatchingCache ( lookupCacheSize )
2014-06-18 20:10:19 +00:00
return rm
2016-05-10 19:58:49 +00:00
}
2016-04-14 18:00:52 +00:00
2016-05-10 19:58:49 +00:00
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
func NewReplicationManagerFromClientForIntegration ( kubeClient clientset . Interface , resyncPeriod controller . ResyncPeriodFunc , burstReplicas int , lookupCacheSize int ) * ReplicationManager {
2016-08-04 07:02:13 +00:00
podInformer := informers . NewPodInformer ( kubeClient , resyncPeriod ( ) )
2016-06-10 23:28:42 +00:00
garbageCollectorEnabled := false
rm := newReplicationManager ( & record . FakeRecorder { } , podInformer , kubeClient , resyncPeriod , burstReplicas , lookupCacheSize , garbageCollectorEnabled )
2016-05-10 19:58:49 +00:00
rm . internalPodInformer = podInformer
return rm
2016-04-14 18:00:52 +00:00
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient ( kubeClient clientset . Interface , resyncPeriod controller . ResyncPeriodFunc , burstReplicas int , lookupCacheSize int ) * ReplicationManager {
2016-08-04 07:02:13 +00:00
podInformer := informers . NewPodInformer ( kubeClient , resyncPeriod ( ) )
2016-06-10 23:28:42 +00:00
garbageCollectorEnabled := false
rm := NewReplicationManager ( podInformer , kubeClient , resyncPeriod , burstReplicas , lookupCacheSize , garbageCollectorEnabled )
2016-04-14 18:00:52 +00:00
rm . internalPodInformer = podInformer
return rm
2014-06-06 23:40:48 +00:00
}
2015-04-09 21:50:27 +00:00
// SetEventRecorder replaces the event recorder used by the replication manager
// with the given recorder. Only used for testing.
func ( rm * ReplicationManager ) SetEventRecorder ( recorder record . EventRecorder ) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
2015-08-08 01:52:23 +00:00
rm . podControl = controller . RealPodControl { KubeClient : rm . kubeClient , Recorder : recorder }
2015-04-09 21:50:27 +00:00
}
2014-07-10 11:47:10 +00:00
// Run begins watching and syncing.
2015-04-21 20:40:35 +00:00
func ( rm * ReplicationManager ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2016-01-22 00:21:30 +00:00
glog . Infof ( "Starting RC Manager" )
2015-04-21 20:40:35 +00:00
go rm . rcController . Run ( stopCh )
go rm . podController . Run ( stopCh )
for i := 0 ; i < workers ; i ++ {
2016-02-02 10:57:06 +00:00
go wait . Until ( rm . worker , time . Second , stopCh )
2015-04-21 20:40:35 +00:00
}
2016-04-14 18:00:52 +00:00
if rm . internalPodInformer != nil {
go rm . internalPodInformer . Run ( stopCh )
}
2015-04-21 20:40:35 +00:00
<- stopCh
2015-04-09 21:50:27 +00:00
glog . Infof ( "Shutting down RC Manager" )
2015-04-21 20:40:35 +00:00
rm . queue . ShutDown ( )
2014-06-17 23:42:29 +00:00
}
2015-07-29 20:16:58 +00:00
// getPodController returns the controller managing the given pod.
2015-04-21 20:40:35 +00:00
// TODO: Surface that we are ignoring multiple controllers for a single pod.
2016-06-10 23:28:42 +00:00
// TODO: use ownerReference.Controller to determine if the rc controls the pod.
2016-11-18 20:50:17 +00:00
func ( rm * ReplicationManager ) getPodController ( pod * v1 . Pod ) * v1 . ReplicationController {
2016-02-23 15:17:27 +00:00
// look up in the cache, if cached and the cache is valid, just return cached value
if obj , cached := rm . lookupCache . GetMatchingObject ( pod ) ; cached {
2016-11-18 20:50:17 +00:00
controller , ok := obj . ( * v1 . ReplicationController )
2016-02-23 15:17:27 +00:00
if ! ok {
// This should not happen
2016-06-10 23:28:42 +00:00
glog . Errorf ( "lookup cache does not return a ReplicationController object" )
2016-02-23 15:17:27 +00:00
return nil
}
if cached && rm . isCacheValid ( pod , controller ) {
return controller
}
}
// if not cached or cached value is invalid, search all the rc to find the matching one, and update cache
2015-07-27 22:41:00 +00:00
controllers , err := rm . rcStore . GetPodControllers ( pod )
2014-07-20 19:00:52 +00:00
if err != nil {
2015-04-21 20:40:35 +00:00
glog . V ( 4 ) . Infof ( "No controllers found for pod %v, replication manager will avoid syncing" , pod . Name )
return nil
}
2015-06-30 01:29:53 +00:00
// In theory, overlapping controllers is user error. This sorting will not prevent
2015-08-12 13:33:08 +00:00
// oscillation of replicas in all cases, eg:
// rc1 (older rc): [(k1=v1)], replicas=1 rc2: [(k2=v2)], replicas=2
// pod: [(k1:v1), (k2:v2)] will wake both rc1 and rc2, and we will sync rc1.
// pod: [(k2:v2)] will wake rc2 which creates a new replica.
2015-10-06 01:01:05 +00:00
if len ( controllers ) > 1 {
// More than two items in this list indicates user error. If two replication-controller
// overlap, sort by creation timestamp, subsort by name, then pick
// the first.
glog . Errorf ( "user error! more than one replication controller is selecting pods with labels: %+v" , pod . Labels )
2015-12-04 00:00:13 +00:00
sort . Sort ( OverlappingControllers ( controllers ) )
2015-10-06 01:01:05 +00:00
}
2016-02-23 15:17:27 +00:00
// update lookup cache
2016-09-16 17:19:58 +00:00
rm . lookupCache . Update ( pod , controllers [ 0 ] )
2016-02-23 15:17:27 +00:00
2016-09-16 17:19:58 +00:00
return controllers [ 0 ]
2015-04-21 20:40:35 +00:00
}
2016-02-23 15:17:27 +00:00
// isCacheValid check if the cache is valid
2016-11-18 20:50:17 +00:00
func ( rm * ReplicationManager ) isCacheValid ( pod * v1 . Pod , cachedRC * v1 . ReplicationController ) bool {
2016-09-16 17:19:58 +00:00
_ , err := rm . rcStore . ReplicationControllers ( cachedRC . Namespace ) . Get ( cachedRC . Name )
2016-02-23 15:17:27 +00:00
// rc has been deleted or updated, cache is invalid
2016-09-16 17:19:58 +00:00
if err != nil || ! isControllerMatch ( pod , cachedRC ) {
2016-02-23 15:17:27 +00:00
return false
}
return true
}
// isControllerMatch take a Pod and ReplicationController, return whether the Pod and ReplicationController are matching
// TODO(mqliang): This logic is a copy from GetPodControllers(), remove the duplication
2016-11-18 20:50:17 +00:00
func isControllerMatch ( pod * v1 . Pod , rc * v1 . ReplicationController ) bool {
2016-02-23 15:17:27 +00:00
if rc . Namespace != pod . Namespace {
return false
}
2016-08-19 07:36:55 +00:00
selector := labels . Set ( rc . Spec . Selector ) . AsSelectorPreValidated ( )
2016-02-23 15:17:27 +00:00
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
2016-08-19 07:36:55 +00:00
if selector . Empty ( ) || ! selector . Matches ( labels . Set ( pod . Labels ) ) {
2016-02-23 15:17:27 +00:00
return false
}
return true
}
2016-06-10 23:28:42 +00:00
// callback when RC is updated
func ( rm * ReplicationManager ) updateRC ( old , cur interface { } ) {
2016-11-18 20:50:17 +00:00
oldRC := old . ( * v1 . ReplicationController )
curRC := cur . ( * v1 . ReplicationController )
2016-06-10 23:28:42 +00:00
// We should invalidate the whole lookup cache if a RC's selector has been updated.
//
// Imagine that you have two RCs:
// * old RC1
// * new RC2
// You also have a pod that is attached to RC2 (because it doesn't match RC1 selector).
// Now imagine that you are changing RC1 selector so that it is now matching that pod,
// in such case, we must invalidate the whole cache so that pod could be adopted by RC1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if ! reflect . DeepEqual ( oldRC . Spec . Selector , curRC . Spec . Selector ) {
rm . lookupCache . InvalidateAll ( )
}
2016-09-05 12:28:12 +00:00
// TODO: Remove when #31981 is resolved!
2016-11-18 20:50:17 +00:00
glog . Infof ( "Observed updated replication controller %v. Desired pod count change: %d->%d" , curRC . Name , * ( oldRC . Spec . Replicas ) , * ( curRC . Spec . Replicas ) )
2016-06-10 23:28:42 +00:00
// You might imagine that we only really need to enqueue the
// controller when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
// resync can requeue any controllers that don't yet have pods
// but whose last attempts at creating a pod have failed (since
// we don't block on creation of pods) instead of those
// controllers stalling indefinitely. Enqueueing every time
// does result in some spurious syncs (like when Status.Replica
// is updated and the watch notification from it retriggers
// this function), but in general extra resyncs shouldn't be
// that bad as rcs that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if oldRC . Status . Replicas != curRC . Status . Replicas {
2016-09-06 06:39:06 +00:00
// TODO: Should we log status or spec?
2016-06-10 23:28:42 +00:00
glog . V ( 4 ) . Infof ( "Observed updated replica count for rc: %v, %d->%d" , curRC . Name , oldRC . Status . Replicas , curRC . Status . Replicas )
}
rm . enqueueController ( cur )
}
2015-04-21 20:40:35 +00:00
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func ( rm * ReplicationManager ) addPod ( obj interface { } ) {
2016-11-18 20:50:17 +00:00
pod := obj . ( * v1 . Pod )
2016-02-28 08:23:47 +00:00
rc := rm . getPodController ( pod )
if rc == nil {
return
}
rcKey , err := controller . KeyFunc ( rc )
if err != nil {
glog . Errorf ( "Couldn't get key for replication controller %#v: %v" , rc , err )
return
}
2015-08-20 01:52:34 +00:00
if pod . DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
2016-03-05 00:51:01 +00:00
rm . deletePod ( pod )
return
2014-07-20 19:00:52 +00:00
}
2016-03-05 00:51:01 +00:00
rm . expectations . CreationObserved ( rcKey )
2016-02-28 08:23:47 +00:00
rm . enqueueController ( rc )
2015-04-21 20:40:35 +00:00
}
2014-06-14 01:11:32 +00:00
2015-04-21 20:40:35 +00:00
// When a pod is updated, figure out what controller/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
2016-11-18 20:50:17 +00:00
// and new controller. old and cur must be *v1.Pod types.
2015-04-21 20:40:35 +00:00
func ( rm * ReplicationManager ) updatePod ( old , cur interface { } ) {
2016-11-18 20:50:17 +00:00
curPod := cur . ( * v1 . Pod )
oldPod := old . ( * v1 . Pod )
2016-08-09 13:57:21 +00:00
if curPod . ResourceVersion == oldPod . ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
2016-06-15 04:26:47 +00:00
glog . V ( 4 ) . Infof ( "Pod %s updated, objectMeta %+v -> %+v." , curPod . Name , oldPod . ObjectMeta , curPod . ObjectMeta )
labelChanged := ! reflect . DeepEqual ( curPod . Labels , oldPod . Labels )
2016-03-05 00:51:01 +00:00
if curPod . DeletionTimestamp != nil {
2015-08-20 01:52:34 +00:00
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rc never initiates a phase change, and so is never asleep waiting for the same.
2016-03-05 00:51:01 +00:00
rm . deletePod ( curPod )
2016-06-15 04:26:47 +00:00
if labelChanged {
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
rm . deletePod ( oldPod )
}
2016-03-05 00:51:01 +00:00
return
2015-04-21 20:40:35 +00:00
}
2016-06-15 04:26:47 +00:00
2015-04-21 20:40:35 +00:00
// Only need to get the old controller if the labels changed.
2016-06-10 23:28:42 +00:00
// Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod.
2016-06-15 04:26:47 +00:00
if labelChanged {
2015-04-21 20:40:35 +00:00
// If the old and new rc are the same, the first one that syncs
// will set expectations preventing any damage from the second.
2015-07-29 20:16:58 +00:00
if oldRC := rm . getPodController ( oldPod ) ; oldRC != nil {
2015-04-21 20:40:35 +00:00
rm . enqueueController ( oldRC )
2014-07-12 06:29:51 +00:00
}
2014-06-06 23:40:48 +00:00
}
2016-06-10 23:28:42 +00:00
2016-12-05 01:14:46 +00:00
changedToReady := ! v1 . IsPodReady ( oldPod ) && v1 . IsPodReady ( curPod )
2016-06-10 23:28:42 +00:00
if curRC := rm . getPodController ( curPod ) ; curRC != nil {
rm . enqueueController ( curRC )
2016-12-05 01:14:46 +00:00
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replication controller
// thus having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because a Pod
// transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replication controller manager).
if changedToReady && curRC . Spec . MinReadySeconds > 0 {
2016-12-12 11:59:34 +00:00
glog . V ( 2 ) . Infof ( "ReplicationController %q will be enqueued after %ds for availability check" , curRC . Name , curRC . Spec . MinReadySeconds )
2016-12-05 01:14:46 +00:00
rm . enqueueControllerAfter ( curRC , time . Duration ( curRC . Spec . MinReadySeconds ) * time . Second )
}
2016-06-10 23:28:42 +00:00
}
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:40:35 +00:00
// When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
2016-11-18 20:50:17 +00:00
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
2015-04-21 20:40:35 +00:00
func ( rm * ReplicationManager ) deletePod ( obj interface { } ) {
2016-11-18 20:50:17 +00:00
pod , ok := obj . ( * v1 . Pod )
2015-05-29 16:24:39 +00:00
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new rc will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2016-06-14 12:04:38 +00:00
glog . Errorf ( "Couldn't get object from tombstone %#v" , obj )
2015-05-29 16:24:39 +00:00
return
}
2016-11-18 20:50:17 +00:00
pod , ok = tombstone . Obj . ( * v1 . Pod )
2015-05-29 16:24:39 +00:00
if ! ok {
2016-06-14 12:04:38 +00:00
glog . Errorf ( "Tombstone contained object that is not a pod %#v" , obj )
2015-05-29 16:24:39 +00:00
return
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:40:35 +00:00
}
2016-03-05 00:51:01 +00:00
glog . V ( 4 ) . Infof ( "Pod %s/%s deleted through %v, timestamp %+v, labels %+v." , pod . Namespace , pod . Name , utilruntime . GetCaller ( ) , pod . DeletionTimestamp , pod . Labels )
2015-07-29 20:16:58 +00:00
if rc := rm . getPodController ( pod ) ; rc != nil {
2015-07-28 01:21:37 +00:00
rcKey , err := controller . KeyFunc ( rc )
if err != nil {
glog . Errorf ( "Couldn't get key for replication controller %#v: %v" , rc , err )
return
}
2016-03-05 00:51:01 +00:00
rm . expectations . DeletionObserved ( rcKey , controller . PodKey ( pod ) )
2015-05-29 16:24:39 +00:00
rm . enqueueController ( rc )
2014-06-06 23:40:48 +00:00
}
}
2016-11-18 20:50:17 +00:00
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
2015-04-21 20:40:35 +00:00
func ( rm * ReplicationManager ) enqueueController ( obj interface { } ) {
2015-07-28 01:21:37 +00:00
key , err := controller . KeyFunc ( obj )
2015-04-21 20:40:35 +00:00
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
}
2015-04-17 00:37:57 +00:00
2015-06-30 01:29:53 +00:00
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
2015-04-21 20:40:35 +00:00
rm . queue . Add ( key )
}
2015-04-17 00:37:57 +00:00
2016-12-05 01:14:46 +00:00
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
func ( rm * ReplicationManager ) enqueueControllerAfter ( obj interface { } , after time . Duration ) {
key , err := controller . KeyFunc ( obj )
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
}
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
rm . queue . AddAfter ( key , after )
}
2015-04-21 20:40:35 +00:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func ( rm * ReplicationManager ) worker ( ) {
2016-05-10 19:58:49 +00:00
workFunc := func ( ) bool {
key , quit := rm . queue . Get ( )
if quit {
return true
}
defer rm . queue . Done ( key )
2016-08-15 14:36:46 +00:00
2016-05-10 19:58:49 +00:00
err := rm . syncHandler ( key . ( string ) )
2016-08-15 14:36:46 +00:00
if err == nil {
rm . queue . Forget ( key )
return false
2016-05-10 19:58:49 +00:00
}
2016-08-15 14:36:46 +00:00
rm . queue . AddRateLimited ( key )
utilruntime . HandleError ( err )
2016-05-10 19:58:49 +00:00
return false
}
2015-04-21 20:40:35 +00:00
for {
2016-05-10 19:58:49 +00:00
if quit := workFunc ( ) ; quit {
glog . Infof ( "replication controller worker shutting down" )
return
}
2015-04-17 00:37:57 +00:00
}
}
2015-04-21 20:40:35 +00:00
// manageReplicas checks and updates replicas for the given replication controller.
2016-08-17 14:16:01 +00:00
// Does NOT modify <filteredPods>.
2016-11-18 20:50:17 +00:00
func ( rm * ReplicationManager ) manageReplicas ( filteredPods [ ] * v1 . Pod , rc * v1 . ReplicationController ) error {
diff := len ( filteredPods ) - int ( * ( rc . Spec . Replicas ) )
2015-07-28 01:21:37 +00:00
rcKey , err := controller . KeyFunc ( rc )
if err != nil {
2016-08-15 14:36:46 +00:00
return err
}
if diff == 0 {
return nil
2015-07-28 01:21:37 +00:00
}
2016-08-15 14:36:46 +00:00
2014-06-06 23:40:48 +00:00
if diff < 0 {
diff *= - 1
2015-05-06 21:39:14 +00:00
if diff > rm . burstReplicas {
diff = rm . burstReplicas
}
2016-03-05 00:51:01 +00:00
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
2016-08-15 14:36:46 +00:00
errCh := make ( chan error , diff )
2015-07-28 01:21:37 +00:00
rm . expectations . ExpectCreations ( rcKey , diff )
2016-06-17 19:16:15 +00:00
var wg sync . WaitGroup
wg . Add ( diff )
2016-11-18 20:50:17 +00:00
glog . V ( 2 ) . Infof ( "Too few %q/%q replicas, need %d, creating %d" , rc . Namespace , rc . Name , * ( rc . Spec . Replicas ) , diff )
2014-06-06 23:40:48 +00:00
for i := 0 ; i < diff ; i ++ {
2014-07-25 05:03:07 +00:00
go func ( ) {
2016-06-17 19:16:15 +00:00
defer wg . Done ( )
2016-06-10 23:28:42 +00:00
var err error
if rm . garbageCollectorEnabled {
var trueVar = true
2016-12-09 18:16:33 +00:00
controllerRef := & metav1 . OwnerReference {
2016-06-10 23:28:42 +00:00
APIVersion : getRCKind ( ) . GroupVersion ( ) . String ( ) ,
Kind : getRCKind ( ) . Kind ,
Name : rc . Name ,
UID : rc . UID ,
Controller : & trueVar ,
}
err = rm . podControl . CreatePodsWithControllerRef ( rc . Namespace , rc . Spec . Template , rc , controllerRef )
} else {
err = rm . podControl . CreatePods ( rc . Namespace , rc . Spec . Template , rc )
}
if err != nil {
2015-04-21 20:40:35 +00:00
// Decrement the expected number of creates because the informer won't observe this pod
2015-07-28 01:21:37 +00:00
glog . V ( 2 ) . Infof ( "Failed creation, decrementing expectations for controller %q/%q" , rc . Namespace , rc . Name )
rm . expectations . CreationObserved ( rcKey )
2016-08-15 14:36:46 +00:00
errCh <- err
2016-01-15 07:32:10 +00:00
utilruntime . HandleError ( err )
2015-04-21 20:40:35 +00:00
}
2014-07-25 05:03:07 +00:00
} ( )
2014-06-06 23:40:48 +00:00
}
2016-06-17 19:16:15 +00:00
wg . Wait ( )
2016-08-15 14:36:46 +00:00
select {
case err := <- errCh :
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default :
2016-03-05 00:51:01 +00:00
}
2016-08-15 14:36:46 +00:00
return nil
}
if diff > rm . burstReplicas {
diff = rm . burstReplicas
}
2016-11-18 20:50:17 +00:00
glog . V ( 2 ) . Infof ( "Too many %q/%q replicas, need %d, deleting %d" , rc . Namespace , rc . Name , * ( rc . Spec . Replicas ) , diff )
2016-08-15 14:36:46 +00:00
// No need to sort pods if we are about to delete all of them
2016-11-18 20:50:17 +00:00
if * ( rc . Spec . Replicas ) != 0 {
2016-08-15 14:36:46 +00:00
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort . Sort ( controller . ActivePods ( filteredPods ) )
}
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rc change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
deletedPodKeys := [ ] string { }
for i := 0 ; i < diff ; i ++ {
deletedPodKeys = append ( deletedPodKeys , controller . PodKey ( filteredPods [ i ] ) )
}
// We use pod namespace/name as a UID to wait for deletions, so if the
// labels on a pod/rc change in a way that the pod gets orphaned, the
// rc will only wake up after the expectation has expired.
errCh := make ( chan error , diff )
rm . expectations . ExpectDeletions ( rcKey , deletedPodKeys )
var wg sync . WaitGroup
wg . Add ( diff )
for i := 0 ; i < diff ; i ++ {
go func ( ix int ) {
defer wg . Done ( )
if err := rm . podControl . DeletePod ( rc . Namespace , filteredPods [ ix ] . Name , rc ) ; err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller . PodKey ( filteredPods [ ix ] )
glog . V ( 2 ) . Infof ( "Failed to delete %v due to %v, decrementing expectations for controller %q/%q" , podKey , err , rc . Namespace , rc . Name )
rm . expectations . DeletionObserved ( rcKey , podKey )
errCh <- err
utilruntime . HandleError ( err )
}
} ( i )
}
wg . Wait ( )
select {
case err := <- errCh :
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
2014-06-06 23:40:48 +00:00
}
2016-08-15 14:36:46 +00:00
default :
2014-06-06 23:40:48 +00:00
}
2016-08-15 14:36:46 +00:00
return nil
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:40:35 +00:00
// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func ( rm * ReplicationManager ) syncReplicationController ( key string ) error {
2016-05-13 08:30:45 +00:00
trace := util . NewTrace ( "syncReplicationController: " + key )
defer trace . LogIfLong ( 250 * time . Millisecond )
2015-04-21 20:40:35 +00:00
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing controller %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-02-13 03:47:33 +00:00
if ! rm . podStoreSynced ( ) {
// Sleep so we give the pod reflector goroutine a chance to run.
time . Sleep ( PodStoreSyncedPollPeriod )
glog . Infof ( "Waiting for pods controller to sync, requeuing rc %v" , key )
rm . queue . Add ( key )
return nil
}
2016-04-07 12:15:21 +00:00
obj , exists , err := rm . rcStore . Indexer . GetByKey ( key )
2015-04-21 20:40:35 +00:00
if ! exists {
glog . Infof ( "Replication Controller has been deleted %v" , key )
2015-05-08 21:16:58 +00:00
rm . expectations . DeleteExpectations ( key )
2015-04-21 20:40:35 +00:00
return nil
}
2014-06-17 23:42:29 +00:00
if err != nil {
2015-04-21 20:40:35 +00:00
return err
2014-06-17 23:42:29 +00:00
}
2016-11-18 20:50:17 +00:00
rc := * obj . ( * v1 . ReplicationController )
2015-04-21 20:40:35 +00:00
2016-05-13 08:30:45 +00:00
trace . Step ( "ReplicationController restored" )
2016-12-12 13:42:18 +00:00
rcNeedsSync := rm . expectations . SatisfiedExpectations ( key )
2016-05-13 08:30:45 +00:00
trace . Step ( "Expectations restored" )
2015-05-12 21:39:23 +00:00
2016-08-17 14:16:01 +00:00
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
2016-06-10 23:28:42 +00:00
// TODO: Do the List and Filter in a single pass, or use an index.
2016-11-18 20:50:17 +00:00
var filteredPods [ ] * v1 . Pod
2016-06-10 23:28:42 +00:00
if rm . garbageCollectorEnabled {
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
2016-08-17 14:16:01 +00:00
pods , err := rm . podStore . Pods ( rc . Namespace ) . List ( labels . Everything ( ) )
2016-06-10 23:28:42 +00:00
if err != nil {
glog . Errorf ( "Error getting pods for rc %q: %v" , key , err )
rm . queue . Add ( key )
return err
}
2016-08-19 07:36:55 +00:00
cm := controller . NewPodControllerRefManager ( rm . podControl , rc . ObjectMeta , labels . Set ( rc . Spec . Selector ) . AsSelectorPreValidated ( ) , getRCKind ( ) )
2016-08-17 14:16:01 +00:00
matchesAndControlled , matchesNeedsController , controlledDoesNotMatch := cm . Classify ( pods )
2016-12-12 13:42:18 +00:00
// Adopt pods only if this replication controller is not going to be deleted.
if rc . DeletionTimestamp == nil {
for _ , pod := range matchesNeedsController {
err := cm . AdoptPod ( pod )
// continue to next pod if adoption fails.
if err != nil {
// If the pod no longer exists, don't even log the error.
if ! errors . IsNotFound ( err ) {
utilruntime . HandleError ( err )
}
} else {
matchesAndControlled = append ( matchesAndControlled , pod )
2016-06-10 23:28:42 +00:00
}
}
}
filteredPods = matchesAndControlled
// remove the controllerRef for the pods that no longer have matching labels
var errlist [ ] error
for _ , pod := range controlledDoesNotMatch {
err := cm . ReleasePod ( pod )
if err != nil {
2016-07-19 09:46:11 +00:00
errlist = append ( errlist , err )
2016-06-10 23:28:42 +00:00
}
}
if len ( errlist ) != 0 {
aggregate := utilerrors . NewAggregate ( errlist )
// push the RC into work queue again. We need to try to free the
// pods again otherwise they will stuck with the stale
// controllerRef.
rm . queue . Add ( key )
return aggregate
}
} else {
2016-08-19 07:36:55 +00:00
pods , err := rm . podStore . Pods ( rc . Namespace ) . List ( labels . Set ( rc . Spec . Selector ) . AsSelectorPreValidated ( ) )
2016-06-10 23:28:42 +00:00
if err != nil {
glog . Errorf ( "Error getting pods for rc %q: %v" , key , err )
rm . queue . Add ( key )
return err
}
2016-08-17 14:16:01 +00:00
filteredPods = controller . FilterActivePods ( pods )
2016-06-10 23:28:42 +00:00
}
2016-08-15 14:36:46 +00:00
var manageReplicasErr error
2016-06-10 23:28:42 +00:00
if rcNeedsSync && rc . DeletionTimestamp == nil {
2016-08-15 14:36:46 +00:00
manageReplicasErr = rm . manageReplicas ( filteredPods , & rc )
2014-06-06 23:40:48 +00:00
}
2016-05-13 08:30:45 +00:00
trace . Step ( "manageReplicas done" )
2015-04-21 20:40:35 +00:00
2016-10-17 15:19:26 +00:00
newStatus := calculateStatus ( rc , filteredPods , manageReplicasErr )
2016-03-11 18:34:13 +00:00
2015-05-01 15:49:06 +00:00
// Always updates status as pods come up or die.
2016-10-17 15:19:26 +00:00
if err := updateReplicationControllerStatus ( rm . kubeClient . Core ( ) . ReplicationControllers ( rc . Namespace ) , rc , newStatus ) ; err != nil {
2016-08-15 14:36:46 +00:00
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
2015-04-21 20:40:35 +00:00
}
2016-08-15 14:36:46 +00:00
return manageReplicasErr
2014-06-06 23:40:48 +00:00
}