k3s/pkg/controller/replication/replication_controller.go

606 lines
23 KiB
Go
Raw Normal View History

2014-06-06 23:40:48 +00:00
/*
Copyright 2014 The Kubernetes Authors.
2014-06-06 23:40:48 +00:00
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
2016-01-17 22:26:25 +00:00
// If you make changes to this file, you should also make the corresponding change in ReplicaSet.
package replication
2014-06-06 23:40:48 +00:00
import (
"fmt"
2015-04-21 20:40:35 +00:00
"reflect"
"sort"
"sync"
2014-06-06 23:40:48 +00:00
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2017-01-13 17:48:50 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-11 14:09:48 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utiltrace "k8s.io/apiserver/pkg/util/trace"
2017-01-30 18:39:54 +00:00
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
2017-01-24 14:11:51 +00:00
"k8s.io/client-go/tools/cache"
2017-01-30 18:39:54 +00:00
"k8s.io/client-go/tools/record"
2017-01-27 15:20:40 +00:00
"k8s.io/client-go/util/workqueue"
2017-01-30 18:39:54 +00:00
"k8s.io/kubernetes/pkg/api"
2016-06-10 23:28:42 +00:00
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
2017-02-08 21:18:21 +00:00
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
2014-06-06 23:40:48 +00:00
)
2015-04-21 20:40:35 +00:00
const (
2015-05-06 21:39:14 +00:00
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
2015-07-28 01:21:37 +00:00
// The number of times we retry updating a replication controller's status.
statusUpdateRetries = 1
2015-04-21 20:40:35 +00:00
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1.SchemeGroupVersion.WithKind("ReplicationController")
2016-06-10 23:28:42 +00:00
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
2015-07-31 11:38:04 +00:00
// TODO: this really should be called ReplicationController. The only reason why it's a Manager
// is to distinguish this type from API object "ReplicationController". We should fix this.
2014-06-06 23:40:48 +00:00
type ReplicationManager struct {
kubeClient clientset.Interface
2015-07-28 01:21:37 +00:00
podControl controller.PodControlInterface
2015-05-06 21:39:14 +00:00
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncReplicationController for testing.
2015-04-21 20:40:35 +00:00
syncHandler func(rcKey string) error
// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations
2015-07-28 01:21:37 +00:00
rcLister corelisters.ReplicationControllerLister
rcListerSynced cache.InformerSynced
podLister corelisters.PodLister
2017-01-02 16:35:12 +00:00
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced
// Controllers that need to be synced
2016-08-15 14:36:46 +00:00
queue workqueue.RateLimitingInterface
2014-06-06 23:40:48 +00:00
}
2017-01-02 16:35:12 +00:00
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
2017-01-02 16:35:12 +00:00
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
2017-01-30 18:39:54 +00:00
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
2017-01-02 16:35:12 +00:00
rm := &ReplicationManager{
2014-06-06 23:40:48 +00:00
kubeClient: kubeClient,
2015-07-28 01:21:37 +00:00
podControl: controller.RealPodControl{
KubeClient: kubeClient,
2017-01-30 18:39:54 +00:00
Recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "replication-controller"}),
2014-06-06 23:40:48 +00:00
},
2015-05-06 21:39:14 +00:00
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
2014-06-06 23:40:48 +00:00
}
rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
2017-01-02 16:35:12 +00:00
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
})
rm.rcLister = rcInformer.Lister()
rm.rcListerSynced = rcInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podLister = podInformer.Lister()
rm.podListerSynced = podInformer.Informer().HasSynced
2015-04-21 20:40:35 +00:00
rm.syncHandler = rm.syncReplicationController
return rm
}
// SetEventRecorder replaces the event recorder used by the replication manager
// with the given recorder. Only used for testing.
func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
2015-08-08 01:52:23 +00:00
rm.podControl = controller.RealPodControl{KubeClient: rm.kubeClient, Recorder: recorder}
}
2014-07-10 11:47:10 +00:00
// Run begins watching and syncing.
2015-04-21 20:40:35 +00:00
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
2016-01-15 07:32:10 +00:00
defer utilruntime.HandleCrash()
2017-01-02 16:35:12 +00:00
defer rm.queue.ShutDown()
2017-01-02 16:35:12 +00:00
glog.Infof("Starting RC Manager")
if !cache.WaitForCacheSync(stopCh, rm.podListerSynced, rm.rcListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
2017-01-02 16:35:12 +00:00
return
}
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
2015-04-21 20:40:35 +00:00
<-stopCh
glog.Infof("Shutting down RC Manager")
2014-06-17 23:42:29 +00:00
}
// getPodControllers returns a list of ReplicationControllers matching the given pod.
func (rm *ReplicationManager) getPodControllers(pod *v1.Pod) []*v1.ReplicationController {
rcs, err := rm.rcLister.GetPodControllers(pod)
if err != nil {
glog.V(4).Infof("No ReplicationControllers found for pod %v, controller will avoid syncing", pod.Name)
2015-04-21 20:40:35 +00:00
return nil
}
if len(rcs) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicationController is selecting pods with labels: %+v", pod.Labels))
}
return rcs
}
2016-06-10 23:28:42 +00:00
// callback when RC is updated
func (rm *ReplicationManager) updateRC(old, cur interface{}) {
2016-11-18 20:50:17 +00:00
oldRC := old.(*v1.ReplicationController)
curRC := cur.(*v1.ReplicationController)
2016-06-10 23:28:42 +00:00
2016-09-05 12:28:12 +00:00
// TODO: Remove when #31981 is resolved!
2016-11-18 20:50:17 +00:00
glog.Infof("Observed updated replication controller %v. Desired pod count change: %d->%d", curRC.Name, *(oldRC.Spec.Replicas), *(curRC.Spec.Replicas))
2016-06-10 23:28:42 +00:00
// You might imagine that we only really need to enqueue the
// controller when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
// resync can requeue any controllers that don't yet have pods
// but whose last attempts at creating a pod have failed (since
// we don't block on creation of pods) instead of those
// controllers stalling indefinitely. Enqueueing every time
// does result in some spurious syncs (like when Status.Replica
// is updated and the watch notification from it retriggers
// this function), but in general extra resyncs shouldn't be
// that bad as rcs that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if oldRC.Status.Replicas != curRC.Status.Replicas {
2016-09-06 06:39:06 +00:00
// TODO: Should we log status or spec?
2016-06-10 23:28:42 +00:00
glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas)
}
rm.enqueueController(cur)
}
// When a pod is created, enqueue the ReplicationController that manages it and update its expectations.
2015-04-21 20:40:35 +00:00
func (rm *ReplicationManager) addPod(obj interface{}) {
2016-11-18 20:50:17 +00:00
pod := obj.(*v1.Pod)
glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
2016-02-28 08:23:47 +00:00
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rm.deletePod(pod)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
rsKey, err := controller.KeyFunc(rc)
if err != nil {
return
}
rm.expectations.CreationObserved(rsKey)
rm.enqueueController(rc)
return
}
// Otherwise, it's an orphan. Get a list of all matching ReplicationControllers and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, rc := range rm.getPodControllers(pod) {
rm.enqueueController(rc)
}
2015-04-21 20:40:35 +00:00
}
2014-06-14 01:11:32 +00:00
// When a pod is updated, figure out what ReplicationController/s manage it and wake them
2015-04-21 20:40:35 +00:00
// up. If the labels of the pod have changed we need to awaken both the old
// and new ReplicationController. old and cur must be *v1.Pod types.
2015-04-21 20:40:35 +00:00
func (rm *ReplicationManager) updatePod(old, cur interface{}) {
2016-11-18 20:50:17 +00:00
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rc never initiates a phase change, and so is never asleep waiting for the same.
rm.deletePod(curPod)
if labelChanged {
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
rm.deletePod(oldPod)
}
return
2015-04-21 20:40:35 +00:00
}
curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged &&
oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind {
// The ControllerRef was changed. Sync the old controller, if any.
rc, err := rm.rcLister.ReplicationControllers(oldPod.Namespace).Get(oldControllerRef.Name)
if err == nil {
rm.enqueueController(rc)
}
2014-06-06 23:40:48 +00:00
}
2016-06-10 23:28:42 +00:00
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
if curControllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
rc, err := rm.rcLister.ReplicationControllers(curPod.Namespace).Get(curControllerRef.Name)
if err != nil {
return
}
rm.enqueueController(rc)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning ReplicationController thus
// having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because
// a Pod transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the ReplicationController controller).
if !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) && rc.Spec.MinReadySeconds > 0 {
glog.V(2).Infof("ReplicationController %q will be enqueued after %ds for availability check", rc.Name, rc.Spec.MinReadySeconds)
rm.enqueueControllerAfter(rc, time.Duration(rc.Spec.MinReadySeconds)*time.Second)
}
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _, rc := range rm.getPodControllers(curPod) {
rm.enqueueController(rc)
}
2016-06-10 23:28:42 +00:00
}
2014-06-06 23:40:48 +00:00
}
// When a pod is deleted, enqueue the ReplicationController that manages the pod and update its expectations.
2016-11-18 20:50:17 +00:00
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
2015-04-21 20:40:35 +00:00
func (rm *ReplicationManager) deletePod(obj interface{}) {
2016-11-18 20:50:17 +00:00
pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new ReplicationController will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
2016-11-18 20:50:17 +00:00
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:40:35 +00:00
}
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
2014-06-06 23:40:48 +00:00
}
rsKey, err := controller.KeyFunc(rc)
if err != nil {
return
}
rm.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rm.enqueueController(rc)
2014-06-06 23:40:48 +00:00
}
2016-11-18 20:50:17 +00:00
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
2015-04-21 20:40:35 +00:00
func (rm *ReplicationManager) enqueueController(obj interface{}) {
2015-07-28 01:21:37 +00:00
key, err := controller.KeyFunc(obj)
2015-04-21 20:40:35 +00:00
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
2015-04-21 20:40:35 +00:00
return
}
rm.queue.Add(key)
}
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rm.queue.AddAfter(key, after)
}
2015-04-21 20:40:35 +00:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
for rm.processNextWorkItem() {
}
glog.Infof("replication controller worker shutting down")
}
2016-08-15 14:36:46 +00:00
func (rm *ReplicationManager) processNextWorkItem() bool {
key, quit := rm.queue.Get()
if quit {
return false
}
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err == nil {
rm.queue.Forget(key)
return true
}
rm.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true
}
2015-04-21 20:40:35 +00:00
// manageReplicas checks and updates replicas for the given replication controller.
2016-08-17 14:16:01 +00:00
// Does NOT modify <filteredPods>.
2016-11-18 20:50:17 +00:00
func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error {
diff := len(filteredPods) - int(*(rc.Spec.Replicas))
2015-07-28 01:21:37 +00:00
rcKey, err := controller.KeyFunc(rc)
if err != nil {
2016-08-15 14:36:46 +00:00
return err
}
if diff == 0 {
return nil
2015-07-28 01:21:37 +00:00
}
2016-08-15 14:36:46 +00:00
2014-06-06 23:40:48 +00:00
if diff < 0 {
diff *= -1
2015-05-06 21:39:14 +00:00
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
2016-08-15 14:36:46 +00:00
errCh := make(chan error, diff)
2015-07-28 01:21:37 +00:00
rm.expectations.ExpectCreations(rcKey, diff)
var wg sync.WaitGroup
wg.Add(diff)
2016-11-18 20:50:17 +00:00
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
2014-06-06 23:40:48 +00:00
for i := 0; i < diff; i++ {
go func() {
defer wg.Done()
2016-06-10 23:28:42 +00:00
var err error
2017-02-23 19:16:13 +00:00
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
2017-02-23 19:16:13 +00:00
Name: rc.Name,
UID: rc.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
2016-06-10 23:28:42 +00:00
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
2016-06-10 23:28:42 +00:00
if err != nil {
2015-04-21 20:40:35 +00:00
// Decrement the expected number of creates because the informer won't observe this pod
2015-07-28 01:21:37 +00:00
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey)
2016-08-15 14:36:46 +00:00
errCh <- err
2016-01-15 07:32:10 +00:00
utilruntime.HandleError(err)
2015-04-21 20:40:35 +00:00
}
}()
2014-06-06 23:40:48 +00:00
}
wg.Wait()
2016-08-15 14:36:46 +00:00
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
2016-08-15 14:36:46 +00:00
return nil
}
if diff > rm.burstReplicas {
diff = rm.burstReplicas
}
2016-11-18 20:50:17 +00:00
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
2016-08-15 14:36:46 +00:00
// No need to sort pods if we are about to delete all of them
2016-11-18 20:50:17 +00:00
if *(rc.Spec.Replicas) != 0 {
2016-08-15 14:36:46 +00:00
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rc change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
deletedPodKeys := []string{}
for i := 0; i < diff; i++ {
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
}
// We use pod namespace/name as a UID to wait for deletions, so if the
// labels on a pod/rc change in a way that the pod gets orphaned, the
// rc will only wake up after the expectation has expired.
errCh := make(chan error, diff)
rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
var wg sync.WaitGroup
wg.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wg.Done()
if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey, podKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
wg.Wait()
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
2014-06-06 23:40:48 +00:00
}
2016-08-15 14:36:46 +00:00
default:
2014-06-06 23:40:48 +00:00
}
2016-08-15 14:36:46 +00:00
return nil
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:40:35 +00:00
// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (rm *ReplicationManager) syncReplicationController(key string) error {
trace := utiltrace.New("syncReplicationController: " + key)
2016-05-13 08:30:45 +00:00
defer trace.LogIfLong(250 * time.Millisecond)
2015-04-21 20:40:35 +00:00
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
2017-01-02 16:35:12 +00:00
if err != nil {
return err
2016-02-13 03:47:33 +00:00
}
rc, err := rm.rcLister.ReplicationControllers(namespace).Get(name)
if errors.IsNotFound(err) {
2015-04-21 20:40:35 +00:00
glog.Infof("Replication Controller has been deleted %v", key)
rm.expectations.DeleteExpectations(key)
2015-04-21 20:40:35 +00:00
return nil
}
if err != nil {
return err
}
2015-04-21 20:40:35 +00:00
2016-05-13 08:30:45 +00:00
trace.Step("ReplicationController restored")
rcNeedsSync := rm.expectations.SatisfiedExpectations(key)
2016-05-13 08:30:45 +00:00
trace.Step("Expectations restored")
2016-08-17 14:16:01 +00:00
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
2016-06-10 23:28:42 +00:00
// TODO: Do the List and Filter in a single pass, or use an index.
2016-11-18 20:50:17 +00:00
var filteredPods []*v1.Pod
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
return err
}
cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind)
filteredPods, err = cm.ClaimPods(pods)
if err != nil {
return err
2016-06-10 23:28:42 +00:00
}
2016-08-15 14:36:46 +00:00
var manageReplicasErr error
2016-06-10 23:28:42 +00:00
if rcNeedsSync && rc.DeletionTimestamp == nil {
manageReplicasErr = rm.manageReplicas(filteredPods, rc)
2014-06-06 23:40:48 +00:00
}
2016-05-13 08:30:45 +00:00
trace.Step("manageReplicas done")
2015-04-21 20:40:35 +00:00
copy, err := api.Scheme.DeepCopy(rc)
if err != nil {
return err
}
rc = copy.(*v1.ReplicationController)
newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRC, err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus)
if err != nil {
2016-08-15 14:36:46 +00:00
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
2015-04-21 20:40:35 +00:00
}
// Resync the ReplicationController after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRC.Spec.MinReadySeconds > 0 &&
updatedRC.Status.ReadyReplicas == *(updatedRC.Spec.Replicas) &&
updatedRC.Status.AvailableReplicas != *(updatedRC.Spec.Replicas) {
rm.enqueueControllerAfter(updatedRC, time.Duration(updatedRC.Spec.MinReadySeconds)*time.Second)
}
2016-08-15 14:36:46 +00:00
return manageReplicasErr
2014-06-06 23:40:48 +00:00
}