2014-06-06 23:40:48 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2014 The Kubernetes Authors All rights reserved .
2014-06-06 23:40:48 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2014-06-17 21:49:44 +00:00
2015-07-31 11:38:04 +00:00
package replicationcontroller
2014-06-06 23:40:48 +00:00
import (
2015-04-21 20:40:35 +00:00
"reflect"
2015-04-17 00:37:57 +00:00
"sort"
2014-07-25 04:55:56 +00:00
"sync"
2014-06-06 23:40:48 +00:00
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
2015-09-03 21:40:58 +00:00
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
2015-09-03 21:43:19 +00:00
client "k8s.io/kubernetes/pkg/client/unversioned"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
2014-06-06 23:40:48 +00:00
)
2015-04-21 20:40:35 +00:00
const (
// We'll attempt to recompute the required replicas of all replication controllers
2015-08-31 08:42:34 +00:00
// that have fulfilled their expectations at least this often. This recomputation
2015-04-30 17:58:18 +00:00
// happens based on contents in local pod storage.
2015-04-21 20:40:35 +00:00
FullControllerResyncPeriod = 30 * time . Second
2015-04-30 17:58:18 +00:00
// If a watch misdelivers info about a pod, it'll take at least this long
// to rectify the number of replicas. Note that dropped deletes are only
// rectified after the expectation times out because we don't know the
// final resting state of the pod.
2015-04-21 20:40:35 +00:00
PodRelistPeriod = 5 * time . Minute
2015-05-06 21:39:14 +00:00
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
2015-06-19 20:35:19 +00:00
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time . Millisecond
2015-07-28 01:21:37 +00:00
// The number of times we retry updating a replication controller's status.
statusUpdateRetries = 1
2015-04-21 20:40:35 +00:00
)
2014-08-04 03:27:38 +00:00
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
2015-07-31 11:38:04 +00:00
// TODO: this really should be called ReplicationController. The only reason why it's a Manager
// is to distinguish this type from API object "ReplicationController". We should fix this.
2014-06-06 23:40:48 +00:00
type ReplicationManager struct {
2014-07-10 23:51:34 +00:00
kubeClient client . Interface
2015-07-28 01:21:37 +00:00
podControl controller . PodControlInterface
2014-06-18 20:10:19 +00:00
2015-05-06 21:39:14 +00:00
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
2014-06-18 20:10:19 +00:00
// To allow injection of syncReplicationController for testing.
2015-04-21 20:40:35 +00:00
syncHandler func ( rcKey string ) error
2015-06-19 20:35:19 +00:00
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func ( ) bool
2015-04-21 20:40:35 +00:00
// A TTLCache of pod creates/deletes each rc expects to see
2015-07-28 01:21:37 +00:00
expectations controller . ControllerExpectationsInterface
2015-07-27 22:41:00 +00:00
// A store of replication controllers, populated by the rcController
rcStore cache . StoreToReplicationControllerLister
2015-07-28 01:21:37 +00:00
2015-04-21 20:40:35 +00:00
// A store of pods, populated by the podController
podStore cache . StoreToPodLister
// Watches changes to all replication controllers
rcController * framework . Controller
// Watches changes to all pods
podController * framework . Controller
// Controllers that need to be updated
queue * workqueue . Type
2014-06-06 23:40:48 +00:00
}
2014-08-21 04:27:19 +00:00
// NewReplicationManager creates a new ReplicationManager.
2015-05-06 21:39:14 +00:00
func NewReplicationManager ( kubeClient client . Interface , burstReplicas int ) * ReplicationManager {
2015-04-14 19:42:49 +00:00
eventBroadcaster := record . NewBroadcaster ( )
2015-06-03 06:51:32 +00:00
eventBroadcaster . StartLogging ( glog . Infof )
2015-04-14 19:42:49 +00:00
eventBroadcaster . StartRecordingToSink ( kubeClient . Events ( "" ) )
2014-06-18 20:10:19 +00:00
rm := & ReplicationManager {
2014-06-06 23:40:48 +00:00
kubeClient : kubeClient ,
2015-07-28 01:21:37 +00:00
podControl : controller . RealPodControl {
KubeClient : kubeClient ,
Recorder : eventBroadcaster . NewRecorder ( api . EventSource { Component : "replication-controller" } ) ,
2014-06-06 23:40:48 +00:00
} ,
2015-05-06 21:39:14 +00:00
burstReplicas : burstReplicas ,
2015-07-28 01:21:37 +00:00
expectations : controller . NewControllerExpectations ( ) ,
2015-05-06 21:39:14 +00:00
queue : workqueue . New ( ) ,
2014-06-06 23:40:48 +00:00
}
2015-04-14 19:42:49 +00:00
2015-07-27 22:41:00 +00:00
rm . rcStore . Store , rm . rcController = framework . NewInformer (
2015-04-21 20:40:35 +00:00
& cache . ListWatch {
ListFunc : func ( ) ( runtime . Object , error ) {
return rm . kubeClient . ReplicationControllers ( api . NamespaceAll ) . List ( labels . Everything ( ) )
} ,
WatchFunc : func ( rv string ) ( watch . Interface , error ) {
return rm . kubeClient . ReplicationControllers ( api . NamespaceAll ) . Watch ( labels . Everything ( ) , fields . Everything ( ) , rv )
} ,
} ,
& api . ReplicationController { } ,
FullControllerResyncPeriod ,
framework . ResourceEventHandlerFuncs {
AddFunc : rm . enqueueController ,
UpdateFunc : func ( old , cur interface { } ) {
// We only really need to do this when spec changes, but for correctness it is safer to
// periodically double check. It is overkill for 2 reasons:
// 1. Status.Replica updates will cause a sync
// 2. Every 30s we will get a full resync (this will happen anyway every 5 minutes when pods relist)
// However, it shouldn't be that bad as rcs that haven't met expectations won't sync, and all
// the listing is done using local stores.
oldRC := old . ( * api . ReplicationController )
curRC := cur . ( * api . ReplicationController )
if oldRC . Status . Replicas != curRC . Status . Replicas {
glog . V ( 4 ) . Infof ( "Observed updated replica count for rc: %v, %d->%d" , curRC . Name , oldRC . Status . Replicas , curRC . Status . Replicas )
}
rm . enqueueController ( cur )
} ,
2015-08-08 21:29:57 +00:00
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
2015-05-21 21:10:25 +00:00
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
2015-04-21 20:40:35 +00:00
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc : rm . enqueueController ,
} ,
)
rm . podStore . Store , rm . podController = framework . NewInformer (
& cache . ListWatch {
ListFunc : func ( ) ( runtime . Object , error ) {
return rm . kubeClient . Pods ( api . NamespaceAll ) . List ( labels . Everything ( ) , fields . Everything ( ) )
} ,
WatchFunc : func ( rv string ) ( watch . Interface , error ) {
return rm . kubeClient . Pods ( api . NamespaceAll ) . Watch ( labels . Everything ( ) , fields . Everything ( ) , rv )
} ,
} ,
& api . Pod { } ,
PodRelistPeriod ,
framework . ResourceEventHandlerFuncs {
AddFunc : rm . addPod ,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc : rm . updatePod ,
DeleteFunc : rm . deletePod ,
} ,
)
2014-07-20 19:00:52 +00:00
rm . syncHandler = rm . syncReplicationController
2015-06-19 20:35:19 +00:00
rm . podStoreSynced = rm . podController . HasSynced
2014-06-18 20:10:19 +00:00
return rm
2014-06-06 23:40:48 +00:00
}
2015-04-09 21:50:27 +00:00
// SetEventRecorder replaces the event recorder used by the replication manager
// with the given recorder. Only used for testing.
func ( rm * ReplicationManager ) SetEventRecorder ( recorder record . EventRecorder ) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
2015-08-08 01:52:23 +00:00
rm . podControl = controller . RealPodControl { KubeClient : rm . kubeClient , Recorder : recorder }
2015-04-09 21:50:27 +00:00
}
2014-07-10 11:47:10 +00:00
// Run begins watching and syncing.
2015-04-21 20:40:35 +00:00
func ( rm * ReplicationManager ) Run ( workers int , stopCh <- chan struct { } ) {
defer util . HandleCrash ( )
go rm . rcController . Run ( stopCh )
go rm . podController . Run ( stopCh )
for i := 0 ; i < workers ; i ++ {
go util . Until ( rm . worker , time . Second , stopCh )
}
<- stopCh
2015-04-09 21:50:27 +00:00
glog . Infof ( "Shutting down RC Manager" )
2015-04-21 20:40:35 +00:00
rm . queue . ShutDown ( )
2014-06-17 23:42:29 +00:00
}
2015-07-29 20:16:58 +00:00
// getPodController returns the controller managing the given pod.
2015-04-21 20:40:35 +00:00
// TODO: Surface that we are ignoring multiple controllers for a single pod.
2015-07-29 20:16:58 +00:00
func ( rm * ReplicationManager ) getPodController ( pod * api . Pod ) * api . ReplicationController {
2015-07-27 22:41:00 +00:00
controllers , err := rm . rcStore . GetPodControllers ( pod )
2014-07-20 19:00:52 +00:00
if err != nil {
2015-04-21 20:40:35 +00:00
glog . V ( 4 ) . Infof ( "No controllers found for pod %v, replication manager will avoid syncing" , pod . Name )
return nil
}
2015-06-30 01:29:53 +00:00
// In theory, overlapping controllers is user error. This sorting will not prevent
2015-08-12 13:33:08 +00:00
// oscillation of replicas in all cases, eg:
// rc1 (older rc): [(k1=v1)], replicas=1 rc2: [(k2=v2)], replicas=2
// pod: [(k1:v1), (k2:v2)] will wake both rc1 and rc2, and we will sync rc1.
// pod: [(k2:v2)] will wake rc2 which creates a new replica.
2015-06-27 00:55:14 +00:00
sort . Sort ( overlappingControllers ( controllers ) )
2015-04-21 20:40:35 +00:00
return & controllers [ 0 ]
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func ( rm * ReplicationManager ) addPod ( obj interface { } ) {
pod := obj . ( * api . Pod )
2015-08-20 01:52:34 +00:00
if pod . DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rm . deletePod ( pod )
return
}
2015-07-29 20:16:58 +00:00
if rc := rm . getPodController ( pod ) ; rc != nil {
2015-07-28 01:21:37 +00:00
rcKey , err := controller . KeyFunc ( rc )
if err != nil {
glog . Errorf ( "Couldn't get key for replication controller %#v: %v" , rc , err )
return
}
rm . expectations . CreationObserved ( rcKey )
2015-04-21 20:40:35 +00:00
rm . enqueueController ( rc )
2014-07-20 19:00:52 +00:00
}
2015-04-21 20:40:35 +00:00
}
2014-06-14 01:11:32 +00:00
2015-04-21 20:40:35 +00:00
// When a pod is updated, figure out what controller/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
// and new controller. old and cur must be *api.Pod types.
func ( rm * ReplicationManager ) updatePod ( old , cur interface { } ) {
if api . Semantic . DeepEqual ( old , cur ) {
// A periodic relist will send update events for all known pods.
return
}
// TODO: Write a unittest for this case
curPod := cur . ( * api . Pod )
2015-08-20 01:52:34 +00:00
if curPod . DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rc never initiates a phase change, and so is never asleep waiting for the same.
rm . deletePod ( curPod )
return
}
2015-07-29 20:16:58 +00:00
if rc := rm . getPodController ( curPod ) ; rc != nil {
2015-04-21 20:40:35 +00:00
rm . enqueueController ( rc )
}
oldPod := old . ( * api . Pod )
// Only need to get the old controller if the labels changed.
if ! reflect . DeepEqual ( curPod . Labels , oldPod . Labels ) {
// If the old and new rc are the same, the first one that syncs
// will set expectations preventing any damage from the second.
2015-07-29 20:16:58 +00:00
if oldRC := rm . getPodController ( oldPod ) ; oldRC != nil {
2015-04-21 20:40:35 +00:00
rm . enqueueController ( oldRC )
2014-07-12 06:29:51 +00:00
}
2014-06-06 23:40:48 +00:00
}
}
2015-04-21 20:40:35 +00:00
// When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func ( rm * ReplicationManager ) deletePod ( obj interface { } ) {
2015-05-29 16:24:39 +00:00
pod , ok := obj . ( * api . Pod )
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new rc will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2015-07-28 01:21:37 +00:00
glog . Errorf ( "Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica" , obj , controller . ExpectationsTimeout )
2015-05-29 16:24:39 +00:00
return
}
pod , ok = tombstone . Obj . ( * api . Pod )
if ! ok {
2015-07-28 01:21:37 +00:00
glog . Errorf ( "Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica" , obj , controller . ExpectationsTimeout )
2015-05-29 16:24:39 +00:00
return
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:40:35 +00:00
}
2015-07-29 20:16:58 +00:00
if rc := rm . getPodController ( pod ) ; rc != nil {
2015-07-28 01:21:37 +00:00
rcKey , err := controller . KeyFunc ( rc )
if err != nil {
glog . Errorf ( "Couldn't get key for replication controller %#v: %v" , rc , err )
return
}
rm . expectations . DeletionObserved ( rcKey )
2015-05-29 16:24:39 +00:00
rm . enqueueController ( rc )
2014-06-06 23:40:48 +00:00
}
}
2015-04-21 20:40:35 +00:00
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item.
func ( rm * ReplicationManager ) enqueueController ( obj interface { } ) {
2015-07-28 01:21:37 +00:00
key , err := controller . KeyFunc ( obj )
2015-04-21 20:40:35 +00:00
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
}
2015-04-17 00:37:57 +00:00
2015-06-30 01:29:53 +00:00
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
2015-04-21 20:40:35 +00:00
rm . queue . Add ( key )
}
2015-04-17 00:37:57 +00:00
2015-04-21 20:40:35 +00:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func ( rm * ReplicationManager ) worker ( ) {
for {
func ( ) {
key , quit := rm . queue . Get ( )
if quit {
return
}
defer rm . queue . Done ( key )
err := rm . syncHandler ( key . ( string ) )
if err != nil {
glog . Errorf ( "Error syncing replication controller: %v" , err )
}
} ( )
2015-04-17 00:37:57 +00:00
}
}
2015-04-21 20:40:35 +00:00
// manageReplicas checks and updates replicas for the given replication controller.
2015-07-28 01:21:37 +00:00
func ( rm * ReplicationManager ) manageReplicas ( filteredPods [ ] * api . Pod , rc * api . ReplicationController ) {
diff := len ( filteredPods ) - rc . Spec . Replicas
rcKey , err := controller . KeyFunc ( rc )
if err != nil {
glog . Errorf ( "Couldn't get key for replication controller %#v: %v" , rc , err )
return
}
2014-06-06 23:40:48 +00:00
if diff < 0 {
diff *= - 1
2015-05-06 21:39:14 +00:00
if diff > rm . burstReplicas {
diff = rm . burstReplicas
}
2015-07-28 01:21:37 +00:00
rm . expectations . ExpectCreations ( rcKey , diff )
2014-07-25 05:03:07 +00:00
wait := sync . WaitGroup { }
wait . Add ( diff )
2015-07-28 01:21:37 +00:00
glog . V ( 2 ) . Infof ( "Too few %q/%q replicas, need %d, creating %d" , rc . Namespace , rc . Name , rc . Spec . Replicas , diff )
2014-06-06 23:40:48 +00:00
for i := 0 ; i < diff ; i ++ {
2014-07-25 05:03:07 +00:00
go func ( ) {
defer wait . Done ( )
2015-08-27 12:19:35 +00:00
if err := rm . podControl . CreatePods ( rc . Namespace , rc . Spec . Template , rc ) ; err != nil {
2015-04-21 20:40:35 +00:00
// Decrement the expected number of creates because the informer won't observe this pod
2015-07-28 01:21:37 +00:00
glog . V ( 2 ) . Infof ( "Failed creation, decrementing expectations for controller %q/%q" , rc . Namespace , rc . Name )
rm . expectations . CreationObserved ( rcKey )
2015-04-21 20:40:35 +00:00
util . HandleError ( err )
}
2014-07-25 05:03:07 +00:00
} ( )
2014-06-06 23:40:48 +00:00
}
2014-07-25 05:03:07 +00:00
wait . Wait ( )
2014-06-06 23:40:48 +00:00
} else if diff > 0 {
2015-05-06 21:39:14 +00:00
if diff > rm . burstReplicas {
diff = rm . burstReplicas
}
2015-07-28 01:21:37 +00:00
rm . expectations . ExpectDeletions ( rcKey , diff )
glog . V ( 2 ) . Infof ( "Too many %q/%q replicas, need %d, deleting %d" , rc . Namespace , rc . Name , rc . Spec . Replicas , diff )
2015-05-20 13:20:25 +00:00
// No need to sort pods if we are about to delete all of them
2015-07-28 01:21:37 +00:00
if rc . Spec . Replicas != 0 {
2015-05-20 13:20:25 +00:00
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
2015-07-28 01:21:37 +00:00
sort . Sort ( controller . ActivePods ( filteredPods ) )
2015-05-20 13:20:25 +00:00
}
2015-04-17 00:37:57 +00:00
2014-07-25 05:03:07 +00:00
wait := sync . WaitGroup { }
wait . Add ( diff )
2014-06-06 23:40:48 +00:00
for i := 0 ; i < diff ; i ++ {
2014-07-25 05:03:07 +00:00
go func ( ix int ) {
defer wait . Done ( )
2015-07-28 01:21:37 +00:00
if err := rm . podControl . DeletePod ( rc . Namespace , filteredPods [ ix ] . Name ) ; err != nil {
2015-04-21 20:40:35 +00:00
// Decrement the expected number of deletes because the informer won't observe this deletion
2015-07-28 01:21:37 +00:00
glog . V ( 2 ) . Infof ( "Failed deletion, decrementing expectations for controller %q/%q" , rc . Namespace , rc . Name )
rm . expectations . DeletionObserved ( rcKey )
2015-04-21 20:40:35 +00:00
}
2014-07-25 05:03:07 +00:00
} ( i )
2014-06-06 23:40:48 +00:00
}
2014-07-25 05:03:07 +00:00
wait . Wait ( )
2014-06-06 23:40:48 +00:00
}
}
2015-04-21 20:40:35 +00:00
// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func ( rm * ReplicationManager ) syncReplicationController ( key string ) error {
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing controller %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2015-07-27 22:41:00 +00:00
obj , exists , err := rm . rcStore . Store . GetByKey ( key )
2015-04-21 20:40:35 +00:00
if ! exists {
glog . Infof ( "Replication Controller has been deleted %v" , key )
2015-05-08 21:16:58 +00:00
rm . expectations . DeleteExpectations ( key )
2015-04-21 20:40:35 +00:00
return nil
}
2014-06-17 23:42:29 +00:00
if err != nil {
2015-04-21 20:40:35 +00:00
glog . Infof ( "Unable to retrieve rc %v from store: %v" , key , err )
rm . queue . Add ( key )
return err
2014-06-17 23:42:29 +00:00
}
2015-07-28 01:21:37 +00:00
rc := * obj . ( * api . ReplicationController )
2015-06-19 20:35:19 +00:00
if ! rm . podStoreSynced ( ) {
// Sleep so we give the pod reflector goroutine a chance to run.
time . Sleep ( PodStoreSyncedPollPeriod )
2015-07-28 01:21:37 +00:00
glog . Infof ( "Waiting for pods controller to sync, requeuing rc %v" , rc . Name )
rm . enqueueController ( & rc )
2015-06-19 20:35:19 +00:00
return nil
}
2015-04-21 20:40:35 +00:00
2015-05-12 21:39:23 +00:00
// Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the rc sync is just deferred till the next relist.
2015-07-28 01:21:37 +00:00
rcKey , err := controller . KeyFunc ( & rc )
if err != nil {
glog . Errorf ( "Couldn't get key for replication controller %#v: %v" , rc , err )
return err
}
rcNeedsSync := rm . expectations . SatisfiedExpectations ( rcKey )
podList , err := rm . podStore . Pods ( rc . Namespace ) . List ( labels . Set ( rc . Spec . Selector ) . AsSelector ( ) )
2015-04-21 20:40:35 +00:00
if err != nil {
glog . Errorf ( "Error getting pods for rc %q: %v" , key , err )
rm . queue . Add ( key )
return err
}
2015-05-12 21:39:23 +00:00
2015-04-21 20:40:35 +00:00
// TODO: Do this in a single pass, or use an index.
2015-07-28 01:21:37 +00:00
filteredPods := controller . FilterActivePods ( podList . Items )
2015-05-12 21:39:23 +00:00
if rcNeedsSync {
2015-07-28 01:21:37 +00:00
rm . manageReplicas ( filteredPods , & rc )
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:40:35 +00:00
2015-05-01 15:49:06 +00:00
// Always updates status as pods come up or die.
2015-07-28 01:21:37 +00:00
if err := updateReplicaCount ( rm . kubeClient . ReplicationControllers ( rc . Namespace ) , rc , len ( filteredPods ) ) ; err != nil {
2015-05-01 15:49:06 +00:00
// Multiple things could lead to this update failing. Requeuing the controller ensures
// we retry with some fairness.
2015-07-28 01:21:37 +00:00
glog . V ( 2 ) . Infof ( "Failed to update replica count for controller %v, requeuing" , rc . Name )
rm . enqueueController ( & rc )
2015-04-21 20:40:35 +00:00
}
return nil
2014-06-06 23:40:48 +00:00
}