mirror of https://github.com/k3s-io/k3s
controller: cleanup workload controllers a bit
* Switches glog.Errorf to utilruntime.HandleError in DS and RC controllers * Drops a couple of unused variables in the DS, SS, and Deployment controllers * Updates some commentspull/6/head
parent
7738f41b95
commit
10b4ec7b47
|
@ -52,9 +52,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Daemon sets will periodically check that their daemon pods are running as expected.
|
|
||||||
FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
|
|
||||||
|
|
||||||
// The value of 250 is chosen b/c values that are too high can cause registry DoS issues
|
// The value of 250 is chosen b/c values that are too high can cause registry DoS issues
|
||||||
BurstReplicas = 250
|
BurstReplicas = 250
|
||||||
|
|
||||||
|
@ -188,12 +185,12 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ds, ok = tombstone.Obj.(*extensions.DaemonSet)
|
ds, ok = tombstone.Obj.(*extensions.DaemonSet)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj)
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -249,7 +246,7 @@ func (dsc *DaemonSetsController) processNextWorkItem() bool {
|
||||||
func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
|
func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
|
||||||
key, err := controller.KeyFunc(ds)
|
key, err := controller.KeyFunc(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,7 +260,7 @@ func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.Daemon
|
||||||
ds, ok := obj.(*extensions.DaemonSet)
|
ds, ok := obj.(*extensions.DaemonSet)
|
||||||
if !ok {
|
if !ok {
|
||||||
// This should not happen
|
// This should not happen
|
||||||
glog.Errorf("lookup cache does not retuen a ReplicationController object")
|
utilruntime.HandleError(fmt.Errorf("lookup cache does not return a DaemonSet object"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if dsc.isCacheValid(pod, ds) {
|
if dsc.isCacheValid(pod, ds) {
|
||||||
|
@ -279,7 +276,7 @@ func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.Daemon
|
||||||
// More than two items in this list indicates user error. If two daemon
|
// More than two items in this list indicates user error. If two daemon
|
||||||
// sets overlap, sort by creation timestamp, subsort by name, then pick
|
// sets overlap, sort by creation timestamp, subsort by name, then pick
|
||||||
// the first.
|
// the first.
|
||||||
glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)
|
utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
|
||||||
sort.Sort(byCreationTimestamp(sets))
|
sort.Sort(byCreationTimestamp(sets))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,7 +321,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||||
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
||||||
dsKey, err := controller.KeyFunc(ds)
|
dsKey, err := controller.KeyFunc(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dsc.expectations.CreationObserved(dsKey)
|
dsc.expectations.CreationObserved(dsKey)
|
||||||
|
@ -369,12 +366,12 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Tombstone contained object that is not a pod %#v", obj)
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -382,7 +379,7 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
||||||
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
||||||
dsKey, err := controller.KeyFunc(ds)
|
dsKey, err := controller.KeyFunc(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dsc.expectations.DeletionObserved(dsKey)
|
dsc.expectations.DeletionObserved(dsKey)
|
||||||
|
|
|
@ -54,15 +54,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
|
// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
|
||||||
// of all deployments.
|
maxRetries = 5
|
||||||
// This recomputation happens based on contents in the local caches.
|
|
||||||
FullDeploymentResyncPeriod = 30 * time.Second
|
|
||||||
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
|
|
||||||
// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
|
|
||||||
StoreSyncedPollPeriod = 100 * time.Millisecond
|
|
||||||
// MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
|
|
||||||
MaxRetries = 5
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func getDeploymentKind() schema.GroupVersionKind {
|
func getDeploymentKind() schema.GroupVersionKind {
|
||||||
|
@ -72,6 +65,7 @@ func getDeploymentKind() schema.GroupVersionKind {
|
||||||
// DeploymentController is responsible for synchronizing Deployment objects stored
|
// DeploymentController is responsible for synchronizing Deployment objects stored
|
||||||
// in the system with actual running replica sets and pods.
|
// in the system with actual running replica sets and pods.
|
||||||
type DeploymentController struct {
|
type DeploymentController struct {
|
||||||
|
// rsControl is used for adopting/releasing replica sets.
|
||||||
rsControl controller.RSControlInterface
|
rsControl controller.RSControlInterface
|
||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
eventRecorder record.EventRecorder
|
eventRecorder record.EventRecorder
|
||||||
|
@ -310,12 +304,12 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rs, ok = tombstone.Obj.(*extensions.ReplicaSet)
|
rs, ok = tombstone.Obj.(*extensions.ReplicaSet)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -336,12 +330,12 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates pod", obj, FullDeploymentResyncPeriod))
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v, could take up to %v before a deployment recreates/updates pods", obj, FullDeploymentResyncPeriod))
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -438,7 +432,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if dc.queue.NumRequeues(key) < MaxRetries {
|
if dc.queue.NumRequeues(key) < maxRetries {
|
||||||
glog.V(2).Infof("Error syncing deployment %v: %v", key, err)
|
glog.V(2).Infof("Error syncing deployment %v: %v", key, err)
|
||||||
dc.queue.AddRateLimited(key)
|
dc.queue.AddRateLimited(key)
|
||||||
return
|
return
|
||||||
|
|
|
@ -182,7 +182,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
|
||||||
controller, ok := obj.(*v1.ReplicationController)
|
controller, ok := obj.(*v1.ReplicationController)
|
||||||
if !ok {
|
if !ok {
|
||||||
// This should not happen
|
// This should not happen
|
||||||
glog.Errorf("lookup cache does not return a ReplicationController object")
|
utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicationController object"))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if cached && rm.isCacheValid(pod, controller) {
|
if cached && rm.isCacheValid(pod, controller) {
|
||||||
|
@ -205,7 +205,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
|
||||||
// More than two items in this list indicates user error. If two replication-controller
|
// More than two items in this list indicates user error. If two replication-controller
|
||||||
// overlap, sort by creation timestamp, subsort by name, then pick
|
// overlap, sort by creation timestamp, subsort by name, then pick
|
||||||
// the first.
|
// the first.
|
||||||
glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)
|
utilruntime.HandleError(fmt.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels))
|
||||||
sort.Sort(OverlappingControllers(controllers))
|
sort.Sort(OverlappingControllers(controllers))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,7 +291,7 @@ func (rm *ReplicationManager) addPod(obj interface{}) {
|
||||||
}
|
}
|
||||||
rcKey, err := controller.KeyFunc(rc)
|
rcKey, err := controller.KeyFunc(rc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,12 +371,12 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("Tombstone contained object that is not a pod %#v", obj)
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -384,7 +384,7 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
|
||||||
if rc := rm.getPodController(pod); rc != nil {
|
if rc := rm.getPodController(pod); rc != nil {
|
||||||
rcKey, err := controller.KeyFunc(rc)
|
rcKey, err := controller.KeyFunc(rc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod))
|
rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod))
|
||||||
|
@ -396,7 +396,7 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
|
||||||
func (rm *ReplicationManager) enqueueController(obj interface{}) {
|
func (rm *ReplicationManager) enqueueController(obj interface{}) {
|
||||||
key, err := controller.KeyFunc(obj)
|
key, err := controller.KeyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,7 +413,7 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) {
|
||||||
func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) {
|
func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) {
|
||||||
key, err := controller.KeyFunc(obj)
|
key, err := controller.KeyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -616,7 +616,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
||||||
// anymore but has the stale controller ref.
|
// anymore but has the stale controller ref.
|
||||||
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
|
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error getting pods for rc %q: %v", key, err)
|
utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err))
|
||||||
rm.queue.Add(key)
|
rm.queue.Add(key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -657,7 +657,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
||||||
} else {
|
} else {
|
||||||
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
|
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error getting pods for rc %q: %v", key, err)
|
utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err))
|
||||||
rm.queue.Add(key)
|
rm.queue.Add(key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,8 +45,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Time to sleep before polling to see if the pod cache has synced.
|
|
||||||
PodStoreSyncedPollPeriod = 100 * time.Millisecond
|
|
||||||
// period to relist statefulsets and verify pets
|
// period to relist statefulsets and verify pets
|
||||||
statefulSetResyncPeriod = 30 * time.Second
|
statefulSetResyncPeriod = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
@ -55,18 +53,18 @@ const (
|
||||||
type StatefulSetController struct {
|
type StatefulSetController struct {
|
||||||
// client interface
|
// client interface
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
// newSyncer returns an interface capable of syncing a single pet.
|
// control returns an interface capable of syncing a stateful set.
|
||||||
// Abstracted out for testing.
|
// Abstracted out for testing.
|
||||||
control StatefulSetControlInterface
|
control StatefulSetControlInterface
|
||||||
// podStore is a cache of watched pods.
|
// podStore is a cache of watched pods.
|
||||||
podStore listers.StoreToPodLister
|
podStore listers.StoreToPodLister
|
||||||
// podStoreSynced returns true if the pod store has synced at least once.
|
// podStoreSynced returns true if the pod store has synced at least once.
|
||||||
podStoreSynced cache.InformerSynced
|
podStoreSynced cache.InformerSynced
|
||||||
// A store of StatefulSets, populated by the psController.
|
// A store of StatefulSets, populated by setController.
|
||||||
setStore listers.StoreToStatefulSetLister
|
setStore listers.StoreToStatefulSetLister
|
||||||
// Watches changes to all StatefulSets.
|
// Watches changes to all StatefulSets.
|
||||||
setController cache.Controller
|
setController cache.Controller
|
||||||
// Controllers that need to be synced.
|
// StatefulSets that need to be synced.
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.RateLimitingInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,6 +126,7 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer ssc.queue.ShutDown()
|
defer ssc.queue.ShutDown()
|
||||||
glog.Infof("Starting statefulset controller")
|
glog.Infof("Starting statefulset controller")
|
||||||
if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) {
|
if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go ssc.setController.Run(stopCh)
|
go ssc.setController.Run(stopCh)
|
||||||
|
@ -267,7 +266,6 @@ func (ssc *StatefulSetController) processNextWorkItem() bool {
|
||||||
// worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed
|
// worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed
|
||||||
func (ssc *StatefulSetController) worker() {
|
func (ssc *StatefulSetController) worker() {
|
||||||
for ssc.processNextWorkItem() {
|
for ssc.processNextWorkItem() {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue