k3s/pkg/controller/replicaset/replica_set.go

603 lines
23 KiB
Go
Raw Normal View History

2016-01-17 22:26:25 +00:00
/*
Copyright 2016 The Kubernetes Authors.
2016-01-17 22:26:25 +00:00
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// If you make changes to this file, you should also make the corresponding change in ReplicationController.
package replicaset
import (
"fmt"
2016-01-17 22:26:25 +00:00
"reflect"
"sort"
"sync"
"time"
"github.com/golang/glog"
2017-01-13 17:48:50 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-11 14:09:48 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
2017-01-30 18:39:54 +00:00
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
2017-01-24 14:11:51 +00:00
"k8s.io/client-go/tools/cache"
2017-01-30 18:39:54 +00:00
"k8s.io/client-go/tools/record"
2017-01-27 15:20:40 +00:00
"k8s.io/client-go/util/workqueue"
2017-01-30 18:39:54 +00:00
"k8s.io/kubernetes/pkg/api"
2016-11-18 20:50:17 +00:00
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
2016-11-18 20:50:17 +00:00
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
2017-02-08 21:18:21 +00:00
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
2016-01-17 22:26:25 +00:00
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
2016-01-17 22:26:25 +00:00
)
const (
// Realistic value of the burstReplica field for the replica set manager based off
2016-01-17 22:26:25 +00:00
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// The number of times we retry updating a ReplicaSet's status.
statusUpdateRetries = 1
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
2016-01-17 22:26:25 +00:00
// ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
// in the system with actual running pods.
type ReplicaSetController struct {
kubeClient clientset.Interface
2016-01-17 22:26:25 +00:00
podControl controller.PodControlInterface
// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncReplicaSet for testing.
syncHandler func(rsKey string) error
// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations
2016-01-17 22:26:25 +00:00
// A store of ReplicaSets, populated by the shared informer passed to NewReplicaSetController
rsLister extensionslisters.ReplicaSetLister
// rsListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced cache.InformerSynced
// A store of pods, populated by the shared informer passed to NewReplicaSetController
podLister corelisters.PodLister
// podListerSynced returns true if the pod store has been synced at least once.
2016-01-17 22:26:25 +00:00
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced
2016-01-17 22:26:25 +00:00
// Controllers that need to be synced
queue workqueue.RateLimitingInterface
2016-01-17 22:26:25 +00:00
}
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
2017-01-30 18:39:54 +00:00
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
2016-01-17 22:26:25 +00:00
rsc := &ReplicaSetController{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
2017-01-30 18:39:54 +00:00
Recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "replicaset-controller"}),
2016-01-17 22:26:25 +00:00
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
2016-01-17 22:26:25 +00:00
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
})
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.syncHandler = rsc.syncReplicaSet
2016-01-17 22:26:25 +00:00
return rsc
}
// SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder}
}
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
2016-01-15 07:32:10 +00:00
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
glog.Infof("Starting ReplicaSet controller")
if !cache.WaitForCacheSync(stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
2016-01-17 22:26:25 +00:00
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
2016-01-17 22:26:25 +00:00
}
2016-01-17 22:26:25 +00:00
<-stopCh
glog.Infof("Shutting down ReplicaSet Controller")
}
// getPodReplicaSets returns a list of ReplicaSets matching the given pod.
func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.ReplicaSet {
rss, err := rsc.rsLister.GetPodReplicaSets(pod)
2016-01-17 22:26:25 +00:00
if err != nil {
return nil
}
if len(rss) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels))
2016-01-17 22:26:25 +00:00
}
return rss
2016-01-17 22:26:25 +00:00
}
// callback when RS is updated
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*extensions.ReplicaSet)
curRS := cur.(*extensions.ReplicaSet)
// You might imagine that we only really need to enqueue the
// replica set when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
// resync can requeue any replica set that don't yet have pods
// but whose last attempts at creating a pod have failed (since
// we don't block on creation of pods) instead of those
// replica sets stalling indefinitely. Enqueueing every time
// does result in some spurious syncs (like when Status.Replica
// is updated and the watch notification from it retriggers
// this function), but in general extra resyncs shouldn't be
// that bad as ReplicaSets that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if oldRS.Status.Replicas != curRS.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas)
}
rsc.enqueueReplicaSet(cur)
}
// When a pod is created, enqueue the replica set that manages it and update its expectations.
2016-01-17 22:26:25 +00:00
func (rsc *ReplicaSetController) addPod(obj interface{}) {
2016-11-18 20:50:17 +00:00
pod := obj.(*v1.Pod)
2016-02-28 08:23:47 +00:00
2016-01-17 22:26:25 +00:00
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rsc.deletePod(pod)
return
2016-01-17 22:26:25 +00:00
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
rsc.expectations.CreationObserved(rsKey)
rsc.enqueueReplicaSet(rs)
return
}
// Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
rss := rsc.getPodReplicaSets(pod)
if len(rss) == 0 {
return
}
glog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
for _, rs := range rss {
rsc.enqueueReplicaSet(rs)
}
2016-01-17 22:26:25 +00:00
}
// When a pod is updated, figure out what replica set/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
2016-11-18 20:50:17 +00:00
// and new replica set. old and cur must be *v1.Pod types.
2016-01-17 22:26:25 +00:00
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
2016-11-18 20:50:17 +00:00
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
2016-02-28 08:23:47 +00:00
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
2016-02-28 08:23:47 +00:00
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rs never initiates a phase change, and so is never asleep waiting for the same.
rsc.deletePod(curPod)
if labelChanged {
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
rsc.deletePod(oldPod)
}
return
2016-02-28 08:23:47 +00:00
}
curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged &&
oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind {
// The ControllerRef was changed. Sync the old controller, if any.
rs, err := rsc.rsLister.ReplicaSets(oldPod.Namespace).Get(oldControllerRef.Name)
if err == nil {
rsc.enqueueReplicaSet(rs)
2016-01-17 22:26:25 +00:00
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
if curControllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
rs, err := rsc.rsLister.ReplicaSets(curPod.Namespace).Get(curControllerRef.Name)
if err != nil {
return
}
rsc.enqueueReplicaSet(rs)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because
// a Pod transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replica set controller).
if !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", rs.Name, rs.Spec.MinReadySeconds)
rsc.enqueueReplicaSetAfter(rs, time.Duration(rs.Spec.MinReadySeconds)*time.Second)
}
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
rss := rsc.getPodReplicaSets(curPod)
if len(rss) == 0 {
return
}
glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
for _, rs := range rss {
rsc.enqueueReplicaSet(rs)
}
}
2016-01-17 22:26:25 +00:00
}
// When a pod is deleted, enqueue the replica set that manages the pod and update its expectations.
2016-11-18 20:50:17 +00:00
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
2016-01-17 22:26:25 +00:00
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
2016-11-18 20:50:17 +00:00
pod, ok := obj.(*v1.Pod)
2016-01-17 22:26:25 +00:00
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new ReplicaSet will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
2016-01-17 22:26:25 +00:00
return
}
2016-11-18 20:50:17 +00:00
pod, ok = tombstone.Obj.(*v1.Pod)
2016-01-17 22:26:25 +00:00
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
2016-01-17 22:26:25 +00:00
return
}
}
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
2016-01-17 22:26:25 +00:00
}
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.enqueueReplicaSet(rs)
2016-01-17 22:26:25 +00:00
}
// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
2016-01-17 22:26:25 +00:00
return
}
rsc.queue.Add(key)
}
// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.AddAfter(key, after)
}
2016-01-17 22:26:25 +00:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
2016-01-17 22:26:25 +00:00
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
2016-01-17 22:26:25 +00:00
}
// manageReplicas checks and updates replicas for the given ReplicaSet.
2016-08-17 14:16:01 +00:00
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
2016-11-18 20:50:17 +00:00
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *extensions.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
2016-01-17 22:26:25 +00:00
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
return nil
2016-01-17 22:26:25 +00:00
}
var errCh chan error
2016-01-17 22:26:25 +00:00
if diff < 0 {
diff *= -1
errCh = make(chan error, diff)
2016-01-17 22:26:25 +00:00
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
2016-01-17 22:26:25 +00:00
rsc.expectations.ExpectCreations(rsKey, diff)
var wg sync.WaitGroup
wg.Add(diff)
2016-11-18 20:50:17 +00:00
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
2016-01-17 22:26:25 +00:00
for i := 0; i < diff; i++ {
go func() {
defer wg.Done()
var err error
2017-02-23 19:16:13 +00:00
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
2017-02-23 19:16:13 +00:00
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil {
2016-01-17 22:26:25 +00:00
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.CreationObserved(rsKey)
errCh <- err
2016-01-17 22:26:25 +00:00
}
}()
}
wg.Wait()
2016-01-17 22:26:25 +00:00
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
errCh = make(chan error, diff)
2016-11-18 20:50:17 +00:00
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
2016-01-17 22:26:25 +00:00
// No need to sort pods if we are about to delete all of them
2016-11-18 20:50:17 +00:00
if *(rs.Spec.Replicas) != 0 {
2016-01-17 22:26:25 +00:00
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
deletedPodKeys := []string{}
for i := 0; i < diff; i++ {
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
}
rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys)
var wg sync.WaitGroup
wg.Add(diff)
2016-01-17 22:26:25 +00:00
for i := 0; i < diff; i++ {
go func(ix int) {
defer wg.Done()
2016-01-17 22:26:25 +00:00
if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
2016-01-17 22:26:25 +00:00
}
}(i)
}
wg.Wait()
2016-01-17 22:26:25 +00:00
}
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
return nil
2016-01-17 22:26:25 +00:00
}
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
2017-01-02 16:35:12 +00:00
if err != nil {
return err
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("ReplicaSet has been deleted %v", key)
2016-01-17 22:26:25 +00:00
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
2016-01-17 22:26:25 +00:00
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
2016-12-03 18:57:26 +00:00
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
2016-01-17 22:26:25 +00:00
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
return nil
2016-01-17 22:26:25 +00:00
}
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
pods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// Ignore inactive pods.
var filteredPods []*v1.Pod
for _, pod := range pods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind)
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
filteredPods, err = cm.ClaimPods(filteredPods)
if err != nil {
return err
2016-01-17 22:26:25 +00:00
}
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
copy, err := api.Scheme.DeepCopy(rs)
if err != nil {
return err
2016-01-17 22:26:25 +00:00
}
rs = copy.(*extensions.ReplicaSet)
2016-01-17 22:26:25 +00:00
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
2016-01-17 22:26:25 +00:00
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
2016-01-17 22:26:25 +00:00
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
2016-01-17 22:26:25 +00:00
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
2016-01-17 22:26:25 +00:00
}