2015-09-08 18:50:39 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2015 The Kubernetes Authors .
2015-09-08 18:50:39 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package daemon
import (
2016-06-06 07:07:28 +00:00
"fmt"
2015-09-08 18:50:39 +00:00
"reflect"
"sort"
2016-01-28 06:13:05 +00:00
"sync"
2015-09-08 18:50:39 +00:00
"time"
2018-02-14 18:35:38 +00:00
apps "k8s.io/api/apps/v1"
2017-06-22 18:24:23 +00:00
"k8s.io/api/core/v1"
2017-02-06 18:35:50 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-11 14:09:48 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2017-07-24 12:08:35 +00:00
"k8s.io/apimachinery/pkg/util/sets"
2017-01-11 14:09:48 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2017-02-24 01:14:46 +00:00
utilfeature "k8s.io/apiserver/pkg/util/feature"
2018-02-14 18:35:38 +00:00
appsinformers "k8s.io/client-go/informers/apps/v1"
2017-06-23 20:56:37 +00:00
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
2017-07-07 20:00:40 +00:00
"k8s.io/client-go/kubernetes/scheme"
2018-02-14 18:35:38 +00:00
unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
2017-01-30 18:39:54 +00:00
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
2018-02-14 18:35:38 +00:00
appslisters "k8s.io/client-go/listers/apps/v1"
2017-06-23 20:56:37 +00:00
corelisters "k8s.io/client-go/listers/core/v1"
2017-01-24 14:11:51 +00:00
"k8s.io/client-go/tools/cache"
2017-01-30 18:39:54 +00:00
"k8s.io/client-go/tools/record"
2017-07-11 04:04:35 +00:00
"k8s.io/client-go/util/integer"
2017-01-27 15:20:40 +00:00
"k8s.io/client-go/util/workqueue"
2017-04-17 17:56:40 +00:00
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
2017-11-08 22:34:54 +00:00
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
2015-09-08 18:50:39 +00:00
"k8s.io/kubernetes/pkg/controller"
2017-02-16 10:18:16 +00:00
"k8s.io/kubernetes/pkg/controller/daemon/util"
2017-02-24 01:14:46 +00:00
"k8s.io/kubernetes/pkg/features"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
2018-01-04 02:12:18 +00:00
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
2016-04-13 18:38:32 +00:00
"k8s.io/kubernetes/pkg/util/metrics"
2016-09-14 18:35:38 +00:00
"github.com/golang/glog"
2015-09-08 18:50:39 +00:00
)
const (
2018-02-05 09:11:09 +00:00
// BurstReplicas is a rate limiter for booting pods on a lot of pods.
// The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
2017-01-27 19:34:11 +00:00
BurstReplicas = 250
2015-10-20 10:47:46 +00:00
2018-02-05 09:11:09 +00:00
// StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
2015-09-08 18:50:39 +00:00
StatusUpdateRetries = 1
2018-02-05 09:11:09 +00:00
)
2017-02-01 19:32:03 +00:00
2018-02-05 09:11:09 +00:00
// Reasons for DaemonSet events
const (
2017-02-01 19:32:03 +00:00
// SelectingAllReason is added to an event when a DaemonSet selects all Pods.
SelectingAllReason = "SelectingAll"
// FailedPlacementReason is added to an event when a DaemonSet can't schedule a Pod to a specified node.
FailedPlacementReason = "FailedPlacement"
// FailedDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Failed'.
FailedDaemonPodReason = "FailedDaemonPod"
2015-09-08 18:50:39 +00:00
)
2017-02-25 18:56:58 +00:00
// controllerKind contains the schema.GroupVersionKind for this controller type.
2018-02-14 18:35:38 +00:00
var controllerKind = apps . SchemeGroupVersion . WithKind ( "DaemonSet" )
2017-02-25 18:56:58 +00:00
2015-09-08 18:50:39 +00:00
// DaemonSetsController is responsible for synchronizing DaemonSet objects stored
// in the system with actual running pods.
type DaemonSetsController struct {
2016-03-18 23:14:07 +00:00
kubeClient clientset . Interface
eventRecorder record . EventRecorder
podControl controller . PodControlInterface
2017-06-03 01:02:01 +00:00
crControl controller . ControllerRevisionControlInterface
2015-09-08 18:50:39 +00:00
2015-10-20 10:47:46 +00:00
// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
2015-09-08 18:50:39 +00:00
// To allow injection of syncDaemonSet for testing.
syncHandler func ( dsKey string ) error
2017-03-02 11:44:59 +00:00
// used for unit testing
2018-02-14 18:35:38 +00:00
enqueueDaemonSet func ( ds * apps . DaemonSet )
enqueueDaemonSetRateLimited func ( ds * apps . DaemonSet )
2015-09-08 18:50:39 +00:00
// A TTLCache of pod creates/deletes each ds expects to see
expectations controller . ControllerExpectationsInterface
2017-02-06 18:35:50 +00:00
// dsLister can list/get daemonsets from the shared informer's store
2018-02-14 18:35:38 +00:00
dsLister appslisters . DaemonSetLister
2016-12-19 21:39:23 +00:00
// dsStoreSynced returns true if the daemonset store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dsStoreSynced cache . InformerSynced
2017-05-17 23:53:46 +00:00
// historyLister get list/get history from the shared informers's store
historyLister appslisters . ControllerRevisionLister
// historyStoreSynced returns true if the history store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
historyStoreSynced cache . InformerSynced
2017-02-06 18:35:50 +00:00
// podLister get list/get pods from the shared informers's store
podLister corelisters . PodLister
2015-10-01 09:16:08 +00:00
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
2016-09-14 18:35:38 +00:00
podStoreSynced cache . InformerSynced
2017-02-06 18:35:50 +00:00
// nodeLister can list/get nodes from the shared informer's store
nodeLister corelisters . NodeLister
2016-09-07 11:39:49 +00:00
// nodeStoreSynced returns true if the node store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
2016-09-14 18:35:38 +00:00
nodeStoreSynced cache . InformerSynced
2015-10-01 09:16:08 +00:00
2016-09-07 11:39:49 +00:00
// DaemonSet keys that need to be synced.
queue workqueue . RateLimitingInterface
2017-07-24 12:08:35 +00:00
// The DaemonSet that has suspended pods on nodes; the key is node name, the value
// is DaemonSet set that want to run pods but can't schedule in latest syncup cycle.
suspendedDaemonPodsMutex sync . Mutex
suspendedDaemonPods map [ string ] sets . String
2015-09-08 18:50:39 +00:00
}
2018-02-05 09:11:09 +00:00
// NewDaemonSetsController creates a new DaemonSetsController
2018-02-14 18:35:38 +00:00
func NewDaemonSetsController ( daemonSetInformer appsinformers . DaemonSetInformer , historyInformer appsinformers . ControllerRevisionInformer , podInformer coreinformers . PodInformer , nodeInformer coreinformers . NodeInformer , kubeClient clientset . Interface ) ( * DaemonSetsController , error ) {
2015-09-08 18:50:39 +00:00
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( glog . Infof )
2016-01-15 05:00:58 +00:00
// TODO: remove the wrapper when every clients have moved to use the clientset.
2017-10-25 15:54:32 +00:00
eventBroadcaster . StartRecordingToSink ( & v1core . EventSinkImpl { Interface : v1core . New ( kubeClient . CoreV1 ( ) . RESTClient ( ) ) . Events ( "" ) } )
2015-09-08 18:50:39 +00:00
2017-10-25 15:54:32 +00:00
if kubeClient != nil && kubeClient . CoreV1 ( ) . RESTClient ( ) . GetRateLimiter ( ) != nil {
2017-10-31 14:19:55 +00:00
if err := metrics . RegisterMetricAndTrackRateLimiterUsage ( "daemon_controller" , kubeClient . CoreV1 ( ) . RESTClient ( ) . GetRateLimiter ( ) ) ; err != nil {
return nil , err
}
2016-04-13 18:38:32 +00:00
}
2015-09-08 18:50:39 +00:00
dsc := & DaemonSetsController {
2016-03-18 23:14:07 +00:00
kubeClient : kubeClient ,
2017-07-15 05:25:54 +00:00
eventRecorder : eventBroadcaster . NewRecorder ( scheme . Scheme , v1 . EventSource { Component : "daemonset-controller" } ) ,
2015-09-08 18:50:39 +00:00
podControl : controller . RealPodControl {
KubeClient : kubeClient ,
2017-09-01 00:25:18 +00:00
Recorder : eventBroadcaster . NewRecorder ( scheme . Scheme , v1 . EventSource { Component : "daemonset-controller" } ) ,
2015-09-08 18:50:39 +00:00
} ,
2017-06-03 01:02:01 +00:00
crControl : controller . RealControllerRevisionControl {
KubeClient : kubeClient ,
} ,
2017-07-24 12:08:35 +00:00
burstReplicas : BurstReplicas ,
expectations : controller . NewControllerExpectations ( ) ,
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "daemonset" ) ,
suspendedDaemonPods : map [ string ] sets . String { } ,
2015-09-08 18:50:39 +00:00
}
2016-09-15 20:27:47 +00:00
daemonSetInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : func ( obj interface { } ) {
2018-02-14 18:35:38 +00:00
ds := obj . ( * apps . DaemonSet )
2016-09-15 20:27:47 +00:00
glog . V ( 4 ) . Infof ( "Adding daemon set %s" , ds . Name )
dsc . enqueueDaemonSet ( ds )
2015-09-08 18:50:39 +00:00
} ,
2016-09-15 20:27:47 +00:00
UpdateFunc : func ( old , cur interface { } ) {
2018-02-14 18:35:38 +00:00
oldDS := old . ( * apps . DaemonSet )
curDS := cur . ( * apps . DaemonSet )
2016-09-15 20:27:47 +00:00
glog . V ( 4 ) . Infof ( "Updating daemon set %s" , oldDS . Name )
dsc . enqueueDaemonSet ( curDS )
2015-09-08 18:50:39 +00:00
} ,
2016-09-15 20:27:47 +00:00
DeleteFunc : dsc . deleteDaemonset ,
} )
2017-02-06 18:35:50 +00:00
dsc . dsLister = daemonSetInformer . Lister ( )
2016-12-19 21:39:23 +00:00
dsc . dsStoreSynced = daemonSetInformer . Informer ( ) . HasSynced
2016-04-19 12:45:00 +00:00
2017-05-17 23:53:46 +00:00
historyInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : dsc . addHistory ,
UpdateFunc : dsc . updateHistory ,
DeleteFunc : dsc . deleteHistory ,
} )
dsc . historyLister = historyInformer . Lister ( )
dsc . historyStoreSynced = historyInformer . Informer ( ) . HasSynced
2015-09-08 18:50:39 +00:00
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
2016-09-15 20:27:47 +00:00
podInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
2016-04-19 12:45:00 +00:00
AddFunc : dsc . addPod ,
UpdateFunc : dsc . updatePod ,
DeleteFunc : dsc . deletePod ,
} )
2017-02-06 18:35:50 +00:00
dsc . podLister = podInformer . Lister ( )
2016-09-15 20:27:47 +00:00
dsc . podStoreSynced = podInformer . Informer ( ) . HasSynced
nodeInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : dsc . addNode ,
UpdateFunc : dsc . updateNode ,
} ,
2015-09-08 18:50:39 +00:00
)
2016-09-15 20:27:47 +00:00
dsc . nodeStoreSynced = nodeInformer . Informer ( ) . HasSynced
2017-02-06 18:35:50 +00:00
dsc . nodeLister = nodeInformer . Lister ( )
2016-09-07 11:39:49 +00:00
2015-09-08 18:50:39 +00:00
dsc . syncHandler = dsc . syncDaemonSet
2017-03-02 11:44:59 +00:00
dsc . enqueueDaemonSet = dsc . enqueue
2017-07-24 12:08:35 +00:00
dsc . enqueueDaemonSetRateLimited = dsc . enqueueRateLimited
2017-10-31 14:19:55 +00:00
return dsc , nil
2015-09-08 18:50:39 +00:00
}
2016-05-19 21:16:34 +00:00
func ( dsc * DaemonSetsController ) deleteDaemonset ( obj interface { } ) {
2018-02-14 18:35:38 +00:00
ds , ok := obj . ( * apps . DaemonSet )
2016-05-19 21:16:34 +00:00
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2017-02-12 16:42:26 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get object from tombstone %#v" , obj ) )
2016-05-19 21:16:34 +00:00
return
}
2018-02-14 18:35:38 +00:00
ds , ok = tombstone . Obj . ( * apps . DaemonSet )
2016-05-19 21:16:34 +00:00
if ! ok {
2017-02-12 16:42:26 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Tombstone contained object that is not a DaemonSet %#v" , obj ) )
2016-05-19 21:16:34 +00:00
return
}
}
glog . V ( 4 ) . Infof ( "Deleting daemon set %s" , ds . Name )
dsc . enqueueDaemonSet ( ds )
}
2015-09-08 18:50:39 +00:00
// Run begins watching and syncing daemon sets.
func ( dsc * DaemonSetsController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2016-09-07 11:39:49 +00:00
defer dsc . queue . ShutDown ( )
2017-04-12 19:49:17 +00:00
glog . Infof ( "Starting daemon sets controller" )
defer glog . Infof ( "Shutting down daemon sets controller" )
2016-09-07 11:39:49 +00:00
2017-06-05 21:52:41 +00:00
if ! controller . WaitForCacheSync ( "daemon sets" , stopCh , dsc . podStoreSynced , dsc . nodeStoreSynced , dsc . historyStoreSynced , dsc . dsStoreSynced ) {
2016-09-07 11:39:49 +00:00
return
}
2015-09-08 18:50:39 +00:00
for i := 0 ; i < workers ; i ++ {
2016-06-06 06:29:57 +00:00
go wait . Until ( dsc . runWorker , time . Second , stopCh )
2015-09-08 18:50:39 +00:00
}
2016-04-19 12:45:00 +00:00
2015-09-08 18:50:39 +00:00
<- stopCh
}
2016-06-06 06:29:57 +00:00
func ( dsc * DaemonSetsController ) runWorker ( ) {
2016-09-07 11:39:49 +00:00
for dsc . processNextWorkItem ( ) {
2015-09-08 18:50:39 +00:00
}
}
2016-09-07 11:39:49 +00:00
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func ( dsc * DaemonSetsController ) processNextWorkItem ( ) bool {
dsKey , quit := dsc . queue . Get ( )
if quit {
return false
}
defer dsc . queue . Done ( dsKey )
err := dsc . syncHandler ( dsKey . ( string ) )
if err == nil {
dsc . queue . Forget ( dsKey )
return true
}
utilruntime . HandleError ( fmt . Errorf ( "%v failed with : %v" , dsKey , err ) )
dsc . queue . AddRateLimited ( dsKey )
return true
}
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) enqueue ( ds * apps . DaemonSet ) {
2016-03-18 23:14:07 +00:00
key , err := controller . KeyFunc ( ds )
2015-09-08 18:50:39 +00:00
if err != nil {
2017-02-12 16:42:26 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %#v: %v" , ds , err ) )
2015-09-08 18:50:39 +00:00
return
}
2015-10-03 21:03:27 +00:00
// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
2015-09-08 18:50:39 +00:00
dsc . queue . Add ( key )
}
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) enqueueRateLimited ( ds * apps . DaemonSet ) {
2017-07-24 12:08:35 +00:00
key , err := controller . KeyFunc ( ds )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %#v: %v" , ds , err ) )
return
}
dsc . queue . AddRateLimited ( key )
}
2017-02-16 10:18:16 +00:00
func ( dsc * DaemonSetsController ) enqueueDaemonSetAfter ( obj interface { } , after time . Duration ) {
key , err := controller . KeyFunc ( obj )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %+v: %v" , obj , err ) )
return
}
// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
dsc . queue . AddAfter ( key , after )
}
2017-09-08 18:44:45 +00:00
// getDaemonSetsForPod returns a list of DaemonSets that potentially match the pod.
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) getDaemonSetsForPod ( pod * v1 . Pod ) [ ] * apps . DaemonSet {
2017-02-06 18:35:50 +00:00
sets , err := dsc . dsLister . GetPodDaemonSets ( pod )
2015-09-08 18:50:39 +00:00
if err != nil {
return nil
}
2015-09-18 19:19:07 +00:00
if len ( sets ) > 1 {
2017-03-17 15:47:10 +00:00
// ControllerRef will ensure we don't do anything crazy, but more than one
2017-02-26 01:34:14 +00:00
// item in this list nevertheless constitutes user error.
2017-02-12 16:42:26 +00:00
utilruntime . HandleError ( fmt . Errorf ( "user error! more than one daemon is selecting pods with labels: %+v" , pod . Labels ) )
2016-02-26 03:39:43 +00:00
}
2017-02-26 01:34:14 +00:00
return sets
2016-02-26 03:39:43 +00:00
}
2017-05-17 23:53:46 +00:00
// getDaemonSetsForHistory returns a list of DaemonSets that potentially
// match a ControllerRevision.
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) getDaemonSetsForHistory ( history * apps . ControllerRevision ) [ ] * apps . DaemonSet {
2017-05-17 23:53:46 +00:00
daemonSets , err := dsc . dsLister . GetHistoryDaemonSets ( history )
if err != nil || len ( daemonSets ) == 0 {
return nil
}
if len ( daemonSets ) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
glog . V ( 4 ) . Infof ( "User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v" ,
history . Namespace , history . Name , history . Labels )
}
return daemonSets
}
// addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
// or when the controller manager is restarted.
func ( dsc * DaemonSetsController ) addHistory ( obj interface { } ) {
history := obj . ( * apps . ControllerRevision )
if history . DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dsc . deleteHistory ( history )
return
}
// If it has a ControllerRef, that's all that matters.
2017-08-02 09:41:33 +00:00
if controllerRef := metav1 . GetControllerOf ( history ) ; controllerRef != nil {
2017-05-17 23:53:46 +00:00
ds := dsc . resolveControllerRef ( history . Namespace , controllerRef )
if ds == nil {
return
}
glog . V ( 4 ) . Infof ( "ControllerRevision %s added." , history . Name )
return
}
// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
// them to see if anyone wants to adopt it.
daemonSets := dsc . getDaemonSetsForHistory ( history )
if len ( daemonSets ) == 0 {
return
}
glog . V ( 4 ) . Infof ( "Orphan ControllerRevision %s added." , history . Name )
for _ , ds := range daemonSets {
dsc . enqueueDaemonSet ( ds )
}
}
// updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
2017-09-08 18:44:45 +00:00
// is updated and wake them up. If anything of the ControllerRevision has changed, we need to awaken
// both the old and new DaemonSets.
2017-05-17 23:53:46 +00:00
func ( dsc * DaemonSetsController ) updateHistory ( old , cur interface { } ) {
curHistory := cur . ( * apps . ControllerRevision )
oldHistory := old . ( * apps . ControllerRevision )
if curHistory . ResourceVersion == oldHistory . ResourceVersion {
// Periodic resync will send update events for all known ControllerRevisions.
return
}
2017-08-02 09:41:33 +00:00
curControllerRef := metav1 . GetControllerOf ( curHistory )
oldControllerRef := metav1 . GetControllerOf ( oldHistory )
2017-05-17 23:53:46 +00:00
controllerRefChanged := ! reflect . DeepEqual ( curControllerRef , oldControllerRef )
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if ds := dsc . resolveControllerRef ( oldHistory . Namespace , oldControllerRef ) ; ds != nil {
dsc . enqueueDaemonSet ( ds )
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
ds := dsc . resolveControllerRef ( curHistory . Namespace , curControllerRef )
if ds == nil {
return
}
glog . V ( 4 ) . Infof ( "ControllerRevision %s updated." , curHistory . Name )
dsc . enqueueDaemonSet ( ds )
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := ! reflect . DeepEqual ( curHistory . Labels , oldHistory . Labels )
if labelChanged || controllerRefChanged {
daemonSets := dsc . getDaemonSetsForHistory ( curHistory )
if len ( daemonSets ) == 0 {
return
}
glog . V ( 4 ) . Infof ( "Orphan ControllerRevision %s updated." , curHistory . Name )
for _ , ds := range daemonSets {
dsc . enqueueDaemonSet ( ds )
}
}
}
// deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
// the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
// a DeletionFinalStateUnknown marker item.
func ( dsc * DaemonSetsController ) deleteHistory ( obj interface { } ) {
history , ok := obj . ( * apps . ControllerRevision )
// When a delete is dropped, the relist will notice a ControllerRevision in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the ControllerRevision
// changed labels the new DaemonSet will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get object from tombstone %#v" , obj ) )
return
}
history , ok = tombstone . Obj . ( * apps . ControllerRevision )
if ! ok {
utilruntime . HandleError ( fmt . Errorf ( "Tombstone contained object that is not a ControllerRevision %#v" , obj ) )
return
}
}
2017-08-02 09:41:33 +00:00
controllerRef := metav1 . GetControllerOf ( history )
2017-05-17 23:53:46 +00:00
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
ds := dsc . resolveControllerRef ( history . Namespace , controllerRef )
if ds == nil {
return
}
glog . V ( 4 ) . Infof ( "ControllerRevision %s deleted." , history . Name )
dsc . enqueueDaemonSet ( ds )
}
2015-09-08 18:50:39 +00:00
func ( dsc * DaemonSetsController ) addPod ( obj interface { } ) {
2016-11-18 20:50:17 +00:00
pod := obj . ( * v1 . Pod )
2017-02-26 01:34:14 +00:00
if pod . DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
dsc . deletePod ( pod )
return
}
// If it has a ControllerRef, that's all that matters.
2017-08-02 09:41:33 +00:00
if controllerRef := metav1 . GetControllerOf ( pod ) ; controllerRef != nil {
2017-03-06 18:48:10 +00:00
ds := dsc . resolveControllerRef ( pod . Namespace , controllerRef )
if ds == nil {
2017-02-26 01:34:14 +00:00
return
}
2015-09-08 18:50:39 +00:00
dsKey , err := controller . KeyFunc ( ds )
if err != nil {
return
}
2017-03-06 18:48:10 +00:00
glog . V ( 4 ) . Infof ( "Pod %s added." , pod . Name )
2015-09-08 18:50:39 +00:00
dsc . expectations . CreationObserved ( dsKey )
dsc . enqueueDaemonSet ( ds )
2017-02-26 01:34:14 +00:00
return
}
// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
2017-09-08 18:44:45 +00:00
dss := dsc . getDaemonSetsForPod ( pod )
2017-03-02 18:35:21 +00:00
if len ( dss ) == 0 {
return
}
glog . V ( 4 ) . Infof ( "Orphan Pod %s added." , pod . Name )
for _ , ds := range dss {
2017-02-26 01:34:14 +00:00
dsc . enqueueDaemonSet ( ds )
2015-09-08 18:50:39 +00:00
}
}
// When a pod is updated, figure out what sets manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
2016-11-18 20:50:17 +00:00
// and new set. old and cur must be *v1.Pod types.
2015-09-08 18:50:39 +00:00
func ( dsc * DaemonSetsController ) updatePod ( old , cur interface { } ) {
2016-11-18 20:50:17 +00:00
curPod := cur . ( * v1 . Pod )
oldPod := old . ( * v1 . Pod )
2016-08-09 13:57:21 +00:00
if curPod . ResourceVersion == oldPod . ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
2015-09-08 18:50:39 +00:00
return
}
2017-02-26 01:34:14 +00:00
2017-08-02 09:41:33 +00:00
curControllerRef := metav1 . GetControllerOf ( curPod )
oldControllerRef := metav1 . GetControllerOf ( oldPod )
2017-02-26 01:34:14 +00:00
controllerRefChanged := ! reflect . DeepEqual ( curControllerRef , oldControllerRef )
2017-03-06 18:48:10 +00:00
if controllerRefChanged && oldControllerRef != nil {
2017-02-26 01:34:14 +00:00
// The ControllerRef was changed. Sync the old controller, if any.
2017-03-06 18:48:10 +00:00
if ds := dsc . resolveControllerRef ( oldPod . Namespace , oldControllerRef ) ; ds != nil {
2017-02-26 01:34:14 +00:00
dsc . enqueueDaemonSet ( ds )
}
}
2017-02-16 10:18:16 +00:00
2017-02-26 01:34:14 +00:00
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
2017-03-06 18:48:10 +00:00
ds := dsc . resolveControllerRef ( curPod . Namespace , curControllerRef )
if ds == nil {
2017-02-26 01:34:14 +00:00
return
}
2017-03-02 18:35:21 +00:00
glog . V ( 4 ) . Infof ( "Pod %s updated." , curPod . Name )
2017-02-26 01:34:14 +00:00
dsc . enqueueDaemonSet ( ds )
2017-09-08 18:44:45 +00:00
changedToReady := ! podutil . IsPodReady ( oldPod ) && podutil . IsPodReady ( curPod )
2017-02-16 10:18:16 +00:00
// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
2017-02-26 01:34:14 +00:00
if changedToReady && ds . Spec . MinReadySeconds > 0 {
2017-03-19 03:44:43 +00:00
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
dsc . enqueueDaemonSetAfter ( ds , ( time . Duration ( ds . Spec . MinReadySeconds ) * time . Second ) + time . Second )
2017-02-16 10:18:16 +00:00
}
2017-02-26 01:34:14 +00:00
return
2015-09-08 18:50:39 +00:00
}
2017-02-26 01:34:14 +00:00
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
2017-09-08 18:44:45 +00:00
dss := dsc . getDaemonSetsForPod ( curPod )
2017-03-02 18:35:21 +00:00
if len ( dss ) == 0 {
return
}
glog . V ( 4 ) . Infof ( "Orphan Pod %s updated." , curPod . Name )
2017-09-08 18:44:45 +00:00
labelChanged := ! reflect . DeepEqual ( curPod . Labels , oldPod . Labels )
2017-02-26 01:34:14 +00:00
if labelChanged || controllerRefChanged {
2017-03-02 18:35:21 +00:00
for _ , ds := range dss {
2017-02-26 01:34:14 +00:00
dsc . enqueueDaemonSet ( ds )
2015-09-08 18:50:39 +00:00
}
}
}
2017-07-24 12:08:35 +00:00
// listSuspendedDaemonPods lists the Daemon pods that 'want to run, but should not schedule'
// for the node.
func ( dsc * DaemonSetsController ) listSuspendedDaemonPods ( node string ) ( dss [ ] string ) {
dsc . suspendedDaemonPodsMutex . Lock ( )
defer dsc . suspendedDaemonPodsMutex . Unlock ( )
if _ , found := dsc . suspendedDaemonPods [ node ] ; ! found {
return nil
}
for k := range dsc . suspendedDaemonPods [ node ] {
dss = append ( dss , k )
}
return
}
// requeueSuspendedDaemonPods enqueues all DaemonSets which has pods that 'want to run,
// but should not schedule' for the node; so DaemonSetController will sync up them again.
func ( dsc * DaemonSetsController ) requeueSuspendedDaemonPods ( node string ) {
dss := dsc . listSuspendedDaemonPods ( node )
for _ , dsKey := range dss {
if ns , name , err := cache . SplitMetaNamespaceKey ( dsKey ) ; err != nil {
glog . Errorf ( "Failed to get DaemonSet's namespace and name from %s: %v" , dsKey , err )
continue
} else if ds , err := dsc . dsLister . DaemonSets ( ns ) . Get ( name ) ; err != nil {
glog . Errorf ( "Failed to get DaemonSet %s/%s: %v" , ns , name , err )
continue
} else {
dsc . enqueueDaemonSetRateLimited ( ds )
}
}
}
// addSuspendedDaemonPods adds DaemonSet which has pods that 'want to run,
// but should not schedule' for the node to the suspended queue.
func ( dsc * DaemonSetsController ) addSuspendedDaemonPods ( node , ds string ) {
dsc . suspendedDaemonPodsMutex . Lock ( )
defer dsc . suspendedDaemonPodsMutex . Unlock ( )
if _ , found := dsc . suspendedDaemonPods [ node ] ; ! found {
dsc . suspendedDaemonPods [ node ] = sets . NewString ( )
}
dsc . suspendedDaemonPods [ node ] . Insert ( ds )
}
// removeSuspendedDaemonPods removes DaemonSet which has pods that 'want to run,
// but should not schedule' for the node from suspended queue.
func ( dsc * DaemonSetsController ) removeSuspendedDaemonPods ( node , ds string ) {
dsc . suspendedDaemonPodsMutex . Lock ( )
defer dsc . suspendedDaemonPodsMutex . Unlock ( )
if _ , found := dsc . suspendedDaemonPods [ node ] ; ! found {
return
}
dsc . suspendedDaemonPods [ node ] . Delete ( ds )
if len ( dsc . suspendedDaemonPods [ node ] ) == 0 {
delete ( dsc . suspendedDaemonPods , node )
}
}
2015-09-08 18:50:39 +00:00
func ( dsc * DaemonSetsController ) deletePod ( obj interface { } ) {
2016-11-18 20:50:17 +00:00
pod , ok := obj . ( * v1 . Pod )
2015-09-08 18:50:39 +00:00
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
2016-02-26 00:15:46 +00:00
// changed labels the new daemonset will not be woken up till the periodic
// resync.
2015-09-08 18:50:39 +00:00
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2017-02-26 01:34:14 +00:00
utilruntime . HandleError ( fmt . Errorf ( "couldn't get object from tombstone %#v" , obj ) )
2015-09-08 18:50:39 +00:00
return
}
2016-11-18 20:50:17 +00:00
pod , ok = tombstone . Obj . ( * v1 . Pod )
2015-09-08 18:50:39 +00:00
if ! ok {
2017-02-26 01:34:14 +00:00
utilruntime . HandleError ( fmt . Errorf ( "tombstone contained object that is not a pod %#v" , obj ) )
2015-09-08 18:50:39 +00:00
return
}
}
2017-02-26 01:34:14 +00:00
2017-08-02 09:41:33 +00:00
controllerRef := metav1 . GetControllerOf ( pod )
2017-02-26 01:34:14 +00:00
if controllerRef == nil {
// No controller should care about orphans being deleted.
2017-07-24 12:08:35 +00:00
if len ( pod . Spec . NodeName ) != 0 {
// If scheduled pods were deleted, requeue suspended daemon pods.
dsc . requeueSuspendedDaemonPods ( pod . Spec . NodeName )
}
2017-02-26 01:34:14 +00:00
return
}
2017-03-06 18:48:10 +00:00
ds := dsc . resolveControllerRef ( pod . Namespace , controllerRef )
if ds == nil {
2017-07-24 12:08:35 +00:00
if len ( pod . Spec . NodeName ) != 0 {
// If scheduled pods were deleted, requeue suspended daemon pods.
dsc . requeueSuspendedDaemonPods ( pod . Spec . NodeName )
}
2017-02-26 01:34:14 +00:00
return
}
dsKey , err := controller . KeyFunc ( ds )
if err != nil {
return
}
2017-03-06 18:48:10 +00:00
glog . V ( 4 ) . Infof ( "Pod %s deleted." , pod . Name )
2017-02-26 01:34:14 +00:00
dsc . expectations . DeletionObserved ( dsKey )
dsc . enqueueDaemonSet ( ds )
2015-09-08 18:50:39 +00:00
}
func ( dsc * DaemonSetsController ) addNode ( obj interface { } ) {
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
2017-02-06 18:35:50 +00:00
dsList , err := dsc . dsLister . List ( labels . Everything ( ) )
2015-11-18 01:39:55 +00:00
if err != nil {
glog . V ( 4 ) . Infof ( "Error enqueueing daemon sets: %v" , err )
return
}
2016-11-18 20:50:17 +00:00
node := obj . ( * v1 . Node )
2017-05-10 06:06:34 +00:00
for _ , ds := range dsList {
2016-12-14 22:25:35 +00:00
_ , shouldSchedule , _ , err := dsc . nodeShouldRunDaemonPod ( node , ds )
if err != nil {
continue
}
if shouldSchedule {
2015-11-18 01:39:55 +00:00
dsc . enqueueDaemonSet ( ds )
}
}
2015-09-08 18:50:39 +00:00
}
2017-05-11 08:49:18 +00:00
// nodeInSameCondition returns true if all effective types ("Status" is true) equals;
// otherwise, returns false.
func nodeInSameCondition ( old [ ] v1 . NodeCondition , cur [ ] v1 . NodeCondition ) bool {
if len ( old ) == 0 && len ( cur ) == 0 {
return true
}
c1map := map [ v1 . NodeConditionType ] v1 . ConditionStatus { }
for _ , c := range old {
if c . Status == v1 . ConditionTrue {
c1map [ c . Type ] = c . Status
}
}
for _ , c := range cur {
if c . Status != v1 . ConditionTrue {
continue
}
if _ , found := c1map [ c . Type ] ; ! found {
return false
}
delete ( c1map , c . Type )
}
return len ( c1map ) == 0
}
2015-09-08 18:50:39 +00:00
func ( dsc * DaemonSetsController ) updateNode ( old , cur interface { } ) {
2016-11-18 20:50:17 +00:00
oldNode := old . ( * v1 . Node )
curNode := cur . ( * v1 . Node )
2017-05-11 08:49:18 +00:00
if reflect . DeepEqual ( oldNode . Labels , curNode . Labels ) &&
reflect . DeepEqual ( oldNode . Spec . Taints , curNode . Spec . Taints ) &&
nodeInSameCondition ( oldNode . Status . Conditions , curNode . Status . Conditions ) {
// If node labels, taints and condition didn't change, we can ignore this update.
2015-09-08 18:50:39 +00:00
return
}
2017-05-11 08:49:18 +00:00
2017-02-06 18:35:50 +00:00
dsList , err := dsc . dsLister . List ( labels . Everything ( ) )
2015-11-18 01:39:55 +00:00
if err != nil {
2017-09-08 18:44:45 +00:00
glog . V ( 4 ) . Infof ( "Error listing daemon sets: %v" , err )
2015-11-18 01:39:55 +00:00
return
}
2016-12-14 22:25:35 +00:00
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
2017-05-10 06:06:34 +00:00
for _ , ds := range dsList {
2016-12-14 22:25:35 +00:00
_ , oldShouldSchedule , oldShouldContinueRunning , err := dsc . nodeShouldRunDaemonPod ( oldNode , ds )
if err != nil {
continue
}
_ , currentShouldSchedule , currentShouldContinueRunning , err := dsc . nodeShouldRunDaemonPod ( curNode , ds )
if err != nil {
continue
}
if ( oldShouldSchedule != currentShouldSchedule ) || ( oldShouldContinueRunning != currentShouldContinueRunning ) {
2015-11-18 01:39:55 +00:00
dsc . enqueueDaemonSet ( ds )
}
}
2015-09-08 18:50:39 +00:00
}
2017-05-17 23:53:46 +00:00
// getDaemonPods returns daemon pods owned by the given ds.
2017-02-26 00:22:54 +00:00
// This also reconciles ControllerRef by adopting/orphaning.
// Note that returned Pods are pointers to objects in the cache.
// If you want to modify one, you need to deep-copy it first.
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) getDaemonPods ( ds * apps . DaemonSet ) ( [ ] * v1 . Pod , error ) {
2016-12-03 18:57:26 +00:00
selector , err := metav1 . LabelSelectorAsSelector ( ds . Spec . Selector )
2015-10-26 06:11:09 +00:00
if err != nil {
return nil , err
}
2017-02-26 00:22:54 +00:00
// List all pods to include those that don't match the selector anymore but
// have a ControllerRef pointing to this controller.
pods , err := dsc . podLister . Pods ( ds . Namespace ) . List ( labels . Everything ( ) )
2015-09-08 18:50:39 +00:00
if err != nil {
2017-02-26 00:22:54 +00:00
return nil , err
2015-09-08 18:50:39 +00:00
}
2017-03-11 01:13:51 +00:00
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing Pods (see #42639).
2017-08-24 16:51:28 +00:00
dsNotDeleted := controller . RecheckDeletionTimestamp ( func ( ) ( metav1 . Object , error ) {
2018-02-14 18:35:38 +00:00
fresh , err := dsc . kubeClient . AppsV1 ( ) . DaemonSets ( ds . Namespace ) . Get ( ds . Name , metav1 . GetOptions { } )
2017-03-11 01:13:51 +00:00
if err != nil {
return nil , err
}
if fresh . UID != ds . UID {
return nil , fmt . Errorf ( "original DaemonSet %v/%v is gone: got uid %v, wanted %v" , ds . Namespace , ds . Name , fresh . UID , ds . UID )
}
return fresh , nil
} )
2017-08-24 16:51:28 +00:00
2017-02-26 00:22:54 +00:00
// Use ControllerRefManager to adopt/orphan as needed.
2017-08-24 16:51:28 +00:00
cm := controller . NewPodControllerRefManager ( dsc . podControl , ds , selector , controllerKind , dsNotDeleted )
2017-05-17 23:53:46 +00:00
return cm . ClaimPods ( pods )
}
// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
// This also reconciles ControllerRef by adopting/orphaning.
// Note that returned Pods are pointers to objects in the cache.
// If you want to modify one, you need to deep-copy it first.
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) getNodesToDaemonPods ( ds * apps . DaemonSet ) ( map [ string ] [ ] * v1 . Pod , error ) {
2017-05-17 23:53:46 +00:00
claimedPods , err := dsc . getDaemonPods ( ds )
2017-02-26 00:22:54 +00:00
if err != nil {
return nil , err
}
// Group Pods by Node name.
nodeToDaemonPods := make ( map [ string ] [ ] * v1 . Pod )
for _ , pod := range claimedPods {
nodeName := pod . Spec . NodeName
nodeToDaemonPods [ nodeName ] = append ( nodeToDaemonPods [ nodeName ] , pod )
2015-09-08 18:50:39 +00:00
}
return nodeToDaemonPods , nil
}
2017-03-06 18:48:10 +00:00
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
2017-07-21 06:09:18 +00:00
// of the correct Kind.
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) resolveControllerRef ( namespace string , controllerRef * metav1 . OwnerReference ) * apps . DaemonSet {
2017-03-06 18:48:10 +00:00
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef . Kind != controllerKind . Kind {
return nil
}
ds , err := dsc . dsLister . DaemonSets ( namespace ) . Get ( controllerRef . Name )
if err != nil {
return nil
}
if ds . UID != controllerRef . UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return ds
}
2017-09-08 18:44:45 +00:00
// manage manages the scheduling and running of Pods of ds on nodes.
// After figuring out which nodes should run a Pod of ds but not yet running one and
// which nodes should not run a Pod of ds but currently running one, it calls function
// syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) manage ( ds * apps . DaemonSet , hash string ) error {
2017-03-07 23:01:11 +00:00
// Find out which nodes are running the daemon pods controlled by ds.
nodeToDaemonPods , err := dsc . getNodesToDaemonPods ( ds )
if err != nil {
2017-06-09 18:03:38 +00:00
return fmt . Errorf ( "couldn't get node to daemon pod mapping for daemon set %q: %v" , ds . Name , err )
2017-03-07 23:01:11 +00:00
}
2015-09-08 18:50:39 +00:00
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
2017-02-06 18:35:50 +00:00
nodeList , err := dsc . nodeLister . List ( labels . Everything ( ) )
2015-09-08 18:50:39 +00:00
if err != nil {
2017-06-09 18:03:38 +00:00
return fmt . Errorf ( "couldn't get list of nodes when syncing daemon set %#v: %v" , ds , err )
2015-09-08 18:50:39 +00:00
}
var nodesNeedingDaemonPods , podsToDelete [ ] string
2017-01-24 23:10:54 +00:00
var failedPodsObserved int
2017-05-10 06:06:34 +00:00
for _ , node := range nodeList {
2017-07-24 12:08:35 +00:00
wantToRun , shouldSchedule , shouldContinueRunning , err := dsc . nodeShouldRunDaemonPod ( node , ds )
2016-12-14 22:25:35 +00:00
if err != nil {
continue
}
2015-11-18 01:39:55 +00:00
2017-01-23 23:17:26 +00:00
daemonPods , exists := nodeToDaemonPods [ node . Name ]
2017-07-24 12:08:35 +00:00
dsKey , _ := cache . MetaNamespaceKeyFunc ( ds )
dsc . removeSuspendedDaemonPods ( node . Name , dsKey )
2015-11-16 16:09:43 +00:00
2016-06-06 07:07:28 +00:00
switch {
2017-07-24 12:08:35 +00:00
case wantToRun && ! shouldSchedule :
// If daemon pod is supposed to run, but can not be scheduled, add to suspended list.
dsc . addSuspendedDaemonPods ( node . Name , dsKey )
2017-01-23 23:17:26 +00:00
case shouldSchedule && ! exists :
2015-09-08 18:50:39 +00:00
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
2015-11-16 16:09:43 +00:00
nodesNeedingDaemonPods = append ( nodesNeedingDaemonPods , node . Name )
2017-01-23 23:17:26 +00:00
case shouldContinueRunning :
// If a daemon pod failed, delete it
2017-07-24 12:08:35 +00:00
// If there's non-daemon pods left on this node, we will create it in the next sync loop
2017-01-23 23:17:26 +00:00
var daemonPodsRunning [ ] * v1 . Pod
2017-05-10 06:06:34 +00:00
for _ , pod := range daemonPods {
2017-08-24 16:51:28 +00:00
if pod . DeletionTimestamp != nil {
continue
}
2017-01-24 22:49:35 +00:00
if pod . Status . Phase == v1 . PodFailed {
2017-09-01 14:12:49 +00:00
msg := fmt . Sprintf ( "Found failed daemon pod %s/%s on node %s, will try to kill it" , pod . Namespace , pod . Name , node . Name )
2017-01-31 03:22:54 +00:00
glog . V ( 2 ) . Infof ( msg )
// Emit an event so that it's discoverable to users.
2017-02-01 19:32:03 +00:00
dsc . eventRecorder . Eventf ( ds , v1 . EventTypeWarning , FailedDaemonPodReason , msg )
2017-01-24 22:49:35 +00:00
podsToDelete = append ( podsToDelete , pod . Name )
2017-01-24 23:10:54 +00:00
failedPodsObserved ++
2017-01-23 23:17:26 +00:00
} else {
2017-01-24 22:49:35 +00:00
daemonPodsRunning = append ( daemonPodsRunning , pod )
2017-01-23 23:17:26 +00:00
}
}
2015-09-08 18:50:39 +00:00
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
2017-01-23 23:17:26 +00:00
// Sort the daemon pods by creation time, so the oldest is preserved.
if len ( daemonPodsRunning ) > 1 {
sort . Sort ( podByCreationTimestamp ( daemonPodsRunning ) )
for i := 1 ; i < len ( daemonPodsRunning ) ; i ++ {
2017-08-22 06:05:27 +00:00
podsToDelete = append ( podsToDelete , daemonPodsRunning [ i ] . Name )
2017-01-23 23:17:26 +00:00
}
2015-09-08 18:50:39 +00:00
}
2017-01-23 23:17:26 +00:00
case ! shouldContinueRunning && exists :
2015-09-08 18:50:39 +00:00
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
2017-05-10 06:06:34 +00:00
for _ , pod := range daemonPods {
podsToDelete = append ( podsToDelete , pod . Name )
2015-09-08 18:50:39 +00:00
}
}
}
2017-05-17 23:53:46 +00:00
2017-06-09 18:03:38 +00:00
// Label new pods using the hash label value of the current history when creating them
2017-05-17 23:53:46 +00:00
if err = dsc . syncNodes ( ds , podsToDelete , nodesNeedingDaemonPods , hash ) ; err != nil {
2017-06-09 18:03:38 +00:00
return err
2017-05-17 23:53:46 +00:00
}
2017-02-16 10:18:16 +00:00
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 {
2017-06-09 18:03:38 +00:00
return fmt . Errorf ( "deleted %d failed pods of DaemonSet %s/%s" , failedPodsObserved , ds . Namespace , ds . Name )
2017-02-16 10:18:16 +00:00
}
2015-09-08 18:50:39 +00:00
2017-06-09 18:03:38 +00:00
return nil
2017-02-16 10:18:16 +00:00
}
2017-03-07 12:58:18 +00:00
// syncNodes deletes given pods and creates new daemon set pods on the given nodes
2017-02-16 10:18:16 +00:00
// returns slice with erros if any
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) syncNodes ( ds * apps . DaemonSet , podsToDelete , nodesNeedingDaemonPods [ ] string , hash string ) error {
2015-09-08 18:50:39 +00:00
// We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey , err := controller . KeyFunc ( ds )
if err != nil {
2017-05-17 23:53:46 +00:00
return fmt . Errorf ( "couldn't get key for object %#v: %v" , ds , err )
2015-09-08 18:50:39 +00:00
}
2015-10-20 10:47:46 +00:00
createDiff := len ( nodesNeedingDaemonPods )
deleteDiff := len ( podsToDelete )
if createDiff > dsc . burstReplicas {
createDiff = dsc . burstReplicas
}
if deleteDiff > dsc . burstReplicas {
deleteDiff = dsc . burstReplicas
2015-09-08 18:50:39 +00:00
}
2015-10-20 10:47:46 +00:00
dsc . expectations . SetExpectations ( dsKey , createDiff , deleteDiff )
2016-09-07 11:39:49 +00:00
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make ( chan error , createDiff + deleteDiff )
2015-10-20 10:47:46 +00:00
glog . V ( 4 ) . Infof ( "Nodes needing daemon pods for daemon set %s: %+v, creating %d" , ds . Name , nodesNeedingDaemonPods , createDiff )
createWait := sync . WaitGroup { }
2018-02-14 18:35:38 +00:00
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
generation , err := util . GetTemplateGeneration ( ds )
if err != nil {
generation = nil
}
template := util . CreatePodTemplate ( ds . Spec . Template , generation , hash )
2017-07-11 04:04:35 +00:00
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
batchSize := integer . IntMin ( createDiff , controller . SlowStartInitialBatchSize )
for pos := 0 ; createDiff > pos ; batchSize , pos = integer . IntMin ( 2 * batchSize , createDiff - ( pos + batchSize ) ) , pos + batchSize {
errorCount := len ( errCh )
createWait . Add ( batchSize )
for i := pos ; i < pos + batchSize ; i ++ {
go func ( ix int ) {
defer createWait . Done ( )
2018-03-08 02:09:54 +00:00
var err error
podTemplate := & template
if utilfeature . DefaultFeatureGate . Enabled ( features . ScheduleDaemonSetPods ) {
podTemplate = template . DeepCopy ( )
podTemplate . Spec . Affinity = util . ReplaceDaemonSetPodHostnameNodeAffinity (
podTemplate . Spec . Affinity , nodesNeedingDaemonPods [ ix ] )
err = dsc . podControl . CreatePodsWithControllerRef ( ds . Namespace , podTemplate ,
ds , metav1 . NewControllerRef ( ds , controllerKind ) )
} else {
err = dsc . podControl . CreatePodsOnNode ( nodesNeedingDaemonPods [ ix ] , ds . Namespace , podTemplate ,
ds , metav1 . NewControllerRef ( ds , controllerKind ) )
}
2017-07-11 04:04:35 +00:00
if err != nil && errors . IsTimeout ( err ) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
glog . V ( 2 ) . Infof ( "Failed creation, decrementing expectations for set %q/%q" , ds . Namespace , ds . Name )
dsc . expectations . CreationObserved ( dsKey )
errCh <- err
utilruntime . HandleError ( err )
}
} ( i )
}
createWait . Wait ( )
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - batchSize
if errorCount < len ( errCh ) && skippedPods > 0 {
glog . V ( 2 ) . Infof ( "Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q" , skippedPods , ds . Namespace , ds . Name )
for i := 0 ; i < skippedPods ; i ++ {
2015-10-20 10:47:46 +00:00
dsc . expectations . CreationObserved ( dsKey )
}
2017-07-11 04:04:35 +00:00
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
2015-10-20 10:47:46 +00:00
}
glog . V ( 4 ) . Infof ( "Pods to delete for daemon set %s: %+v, deleting %d" , ds . Name , podsToDelete , deleteDiff )
deleteWait := sync . WaitGroup { }
deleteWait . Add ( deleteDiff )
for i := 0 ; i < deleteDiff ; i ++ {
go func ( ix int ) {
defer deleteWait . Done ( )
if err := dsc . podControl . DeletePod ( ds . Namespace , podsToDelete [ ix ] , ds ) ; err != nil {
glog . V ( 2 ) . Infof ( "Failed deletion, decrementing expectations for set %q/%q" , ds . Namespace , ds . Name )
dsc . expectations . DeletionObserved ( dsKey )
2016-09-07 11:39:49 +00:00
errCh <- err
2016-01-15 07:32:10 +00:00
utilruntime . HandleError ( err )
2015-10-20 10:47:46 +00:00
}
} ( i )
2015-09-08 18:50:39 +00:00
}
2015-10-20 10:47:46 +00:00
deleteWait . Wait ( )
2016-09-07 11:39:49 +00:00
// collect errors if any for proper reporting/retry logic in the controller
errors := [ ] error { }
close ( errCh )
for err := range errCh {
errors = append ( errors , err )
}
2017-05-17 23:53:46 +00:00
return utilerrors . NewAggregate ( errors )
2015-09-08 18:50:39 +00:00
}
2018-02-14 18:35:38 +00:00
func storeDaemonSetStatus ( dsClient unversionedapps . DaemonSetInterface , ds * apps . DaemonSet , desiredNumberScheduled , currentNumberScheduled , numberMisscheduled , numberReady , updatedNumberScheduled , numberAvailable , numberUnavailable int ) error {
2016-06-07 02:28:05 +00:00
if int ( ds . Status . DesiredNumberScheduled ) == desiredNumberScheduled &&
int ( ds . Status . CurrentNumberScheduled ) == currentNumberScheduled &&
2016-10-05 07:16:41 +00:00
int ( ds . Status . NumberMisscheduled ) == numberMisscheduled &&
2016-12-21 01:35:20 +00:00
int ( ds . Status . NumberReady ) == numberReady &&
2017-02-16 10:18:16 +00:00
int ( ds . Status . UpdatedNumberScheduled ) == updatedNumberScheduled &&
int ( ds . Status . NumberAvailable ) == numberAvailable &&
int ( ds . Status . NumberUnavailable ) == numberUnavailable &&
2016-12-21 01:35:20 +00:00
ds . Status . ObservedGeneration >= ds . Generation {
2015-09-08 18:50:39 +00:00
return nil
}
2016-02-11 03:46:16 +00:00
2017-08-15 12:14:21 +00:00
toUpdate := ds . DeepCopy ( )
2016-12-19 21:39:23 +00:00
2015-09-08 18:50:39 +00:00
var updateErr , getErr error
2016-06-07 02:28:05 +00:00
for i := 0 ; i < StatusUpdateRetries ; i ++ {
2016-12-21 01:35:20 +00:00
toUpdate . Status . ObservedGeneration = ds . Generation
2016-12-19 21:39:23 +00:00
toUpdate . Status . DesiredNumberScheduled = int32 ( desiredNumberScheduled )
toUpdate . Status . CurrentNumberScheduled = int32 ( currentNumberScheduled )
toUpdate . Status . NumberMisscheduled = int32 ( numberMisscheduled )
toUpdate . Status . NumberReady = int32 ( numberReady )
2017-02-16 10:18:16 +00:00
toUpdate . Status . UpdatedNumberScheduled = int32 ( updatedNumberScheduled )
toUpdate . Status . NumberAvailable = int32 ( numberAvailable )
toUpdate . Status . NumberUnavailable = int32 ( numberUnavailable )
2015-11-04 07:29:31 +00:00
2016-12-19 21:39:23 +00:00
if _ , updateErr = dsClient . UpdateStatus ( toUpdate ) ; updateErr == nil {
2015-09-08 18:50:39 +00:00
return nil
}
2016-06-07 02:28:05 +00:00
2015-09-08 18:50:39 +00:00
// Update the set with the latest resource version for the next poll
2016-12-19 21:39:23 +00:00
if toUpdate , getErr = dsClient . Get ( ds . Name , metav1 . GetOptions { } ) ; getErr != nil {
2015-09-08 18:50:39 +00:00
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return getErr
}
}
return updateErr
}
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) updateDaemonSetStatus ( ds * apps . DaemonSet , hash string ) error {
2015-09-21 22:40:57 +00:00
glog . V ( 4 ) . Infof ( "Updating daemon set status" )
2017-03-07 23:01:11 +00:00
nodeToDaemonPods , err := dsc . getNodesToDaemonPods ( ds )
if err != nil {
return fmt . Errorf ( "couldn't get node to daemon pod mapping for daemon set %q: %v" , ds . Name , err )
}
2015-09-08 18:50:39 +00:00
2017-02-06 18:35:50 +00:00
nodeList , err := dsc . nodeLister . List ( labels . Everything ( ) )
2015-09-08 18:50:39 +00:00
if err != nil {
2016-09-07 11:39:49 +00:00
return fmt . Errorf ( "couldn't get list of nodes when updating daemon set %#v: %v" , ds , err )
2015-09-08 18:50:39 +00:00
}
2017-02-16 10:18:16 +00:00
var desiredNumberScheduled , currentNumberScheduled , numberMisscheduled , numberReady , updatedNumberScheduled , numberAvailable int
2017-05-10 06:06:34 +00:00
for _ , node := range nodeList {
2017-02-06 18:35:50 +00:00
wantToRun , _ , _ , err := dsc . nodeShouldRunDaemonPod ( node , ds )
2016-12-14 22:25:35 +00:00
if err != nil {
return err
}
2015-10-07 00:01:54 +00:00
2016-06-07 02:28:05 +00:00
scheduled := len ( nodeToDaemonPods [ node . Name ] ) > 0
2015-09-08 18:50:39 +00:00
2016-12-14 22:25:35 +00:00
if wantToRun {
2015-09-08 18:50:39 +00:00
desiredNumberScheduled ++
2016-06-07 02:28:05 +00:00
if scheduled {
currentNumberScheduled ++
2017-02-16 10:18:16 +00:00
// Sort the daemon pods by creation time, so that the oldest is first.
2016-10-05 07:16:41 +00:00
daemonPods , _ := nodeToDaemonPods [ node . Name ]
sort . Sort ( podByCreationTimestamp ( daemonPods ) )
2017-02-16 10:18:16 +00:00
pod := daemonPods [ 0 ]
2017-04-17 17:56:40 +00:00
if podutil . IsPodReady ( pod ) {
2016-10-05 07:16:41 +00:00
numberReady ++
2017-04-17 17:56:40 +00:00
if podutil . IsPodAvailable ( pod , ds . Spec . MinReadySeconds , metav1 . Now ( ) ) {
2017-02-16 10:18:16 +00:00
numberAvailable ++
}
}
2018-02-14 18:35:38 +00:00
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
generation , err := util . GetTemplateGeneration ( ds )
if err != nil {
generation = nil
}
if util . IsPodUpdated ( pod , hash , generation ) {
2017-02-16 10:18:16 +00:00
updatedNumberScheduled ++
2016-10-05 07:16:41 +00:00
}
2016-06-07 02:28:05 +00:00
}
} else {
if scheduled {
numberMisscheduled ++
}
2015-09-08 18:50:39 +00:00
}
}
2017-02-16 10:18:16 +00:00
numberUnavailable := desiredNumberScheduled - numberAvailable
2015-09-08 18:50:39 +00:00
2018-02-14 18:35:38 +00:00
err = storeDaemonSetStatus ( dsc . kubeClient . AppsV1 ( ) . DaemonSets ( ds . Namespace ) , ds , desiredNumberScheduled , currentNumberScheduled , numberMisscheduled , numberReady , updatedNumberScheduled , numberAvailable , numberUnavailable )
2015-09-08 18:50:39 +00:00
if err != nil {
2016-09-07 11:39:49 +00:00
return fmt . Errorf ( "error storing status for daemon set %#v: %v" , ds , err )
2015-09-08 18:50:39 +00:00
}
2016-09-07 11:39:49 +00:00
return nil
2015-09-08 18:50:39 +00:00
}
func ( dsc * DaemonSetsController ) syncDaemonSet ( key string ) error {
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing daemon set %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-02-13 03:47:33 +00:00
2017-02-06 18:35:50 +00:00
namespace , name , err := cache . SplitMetaNamespaceKey ( key )
2015-09-08 18:50:39 +00:00
if err != nil {
2017-02-06 18:35:50 +00:00
return err
2015-09-08 18:50:39 +00:00
}
2017-02-06 18:35:50 +00:00
ds , err := dsc . dsLister . DaemonSets ( namespace ) . Get ( name )
if errors . IsNotFound ( err ) {
2015-09-08 18:50:39 +00:00
glog . V ( 3 ) . Infof ( "daemon set has been deleted %v" , key )
dsc . expectations . DeleteExpectations ( key )
return nil
}
2017-02-06 18:35:50 +00:00
if err != nil {
return fmt . Errorf ( "unable to retrieve ds %v from store: %v" , key , err )
}
2015-09-08 18:50:39 +00:00
2016-12-03 18:57:26 +00:00
everything := metav1 . LabelSelector { }
2016-03-18 23:14:07 +00:00
if reflect . DeepEqual ( ds . Spec . Selector , & everything ) {
2017-02-01 19:32:03 +00:00
dsc . eventRecorder . Eventf ( ds , v1 . EventTypeWarning , SelectingAllReason , "This daemon set is selecting all pods. A non-empty selector is required." )
2016-03-18 23:14:07 +00:00
return nil
}
2015-09-08 18:50:39 +00:00
// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
dsKey , err := controller . KeyFunc ( ds )
if err != nil {
2016-09-07 11:39:49 +00:00
return fmt . Errorf ( "couldn't get key for object %#v: %v" , ds , err )
2015-09-08 18:50:39 +00:00
}
2017-06-09 18:03:38 +00:00
// Construct histories of the DaemonSet, and get the hash of current history
cur , old , err := dsc . constructHistory ( ds )
if err != nil {
return fmt . Errorf ( "failed to construct revisions of DaemonSet: %v" , err )
}
2018-02-14 18:35:38 +00:00
hash := cur . Labels [ apps . DefaultDaemonSetUniqueLabelKey ]
2017-06-09 18:03:38 +00:00
2017-03-07 23:01:11 +00:00
if ds . DeletionTimestamp != nil || ! dsc . expectations . SatisfiedExpectations ( dsKey ) {
// Only update status.
2017-06-09 18:03:38 +00:00
return dsc . updateDaemonSetStatus ( ds , hash )
2017-03-07 23:01:11 +00:00
}
2017-06-09 18:03:38 +00:00
err = dsc . manage ( ds , hash )
2017-05-17 23:53:46 +00:00
if err != nil {
2017-03-07 23:01:11 +00:00
return err
2015-09-08 18:50:39 +00:00
}
2017-03-07 23:01:11 +00:00
// Process rolling updates if we're ready.
if dsc . expectations . SatisfiedExpectations ( dsKey ) {
2017-02-16 10:18:16 +00:00
switch ds . Spec . UpdateStrategy . Type {
2018-02-14 18:35:38 +00:00
case apps . OnDeleteDaemonSetStrategyType :
case apps . RollingUpdateDaemonSetStrategyType :
2017-05-17 23:53:46 +00:00
err = dsc . rollingUpdate ( ds , hash )
2017-02-16 10:18:16 +00:00
}
if err != nil {
return err
}
}
2017-06-09 18:03:38 +00:00
err = dsc . cleanupHistory ( ds , old )
2017-05-17 23:53:46 +00:00
if err != nil {
return fmt . Errorf ( "failed to clean up revisions of DaemonSet: %v" , err )
}
2017-06-09 18:03:38 +00:00
return dsc . updateDaemonSetStatus ( ds , hash )
2015-09-08 18:50:39 +00:00
}
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) simulate ( newPod * v1 . Pod , node * v1 . Node , ds * apps . DaemonSet ) ( [ ] algorithm . PredicateFailureReason , * schedulercache . NodeInfo , error ) {
2017-02-21 17:06:45 +00:00
// DaemonSet pods shouldn't be deleted by NodeController in case of node problems.
// Add infinite toleration for taint notReady:NoExecute here
// to survive taint-based eviction enforced by NodeController
// when node turns not ready.
2017-04-13 20:19:08 +00:00
v1helper . AddOrUpdateTolerationInPod ( newPod , & v1 . Toleration {
2017-05-30 14:46:00 +00:00
Key : algorithm . TaintNodeNotReady ,
2017-02-21 17:06:45 +00:00
Operator : v1 . TolerationOpExists ,
Effect : v1 . TaintEffectNoExecute ,
} )
// DaemonSet pods shouldn't be deleted by NodeController in case of node problems.
// Add infinite toleration for taint unreachable:NoExecute here
// to survive taint-based eviction enforced by NodeController
// when node turns unreachable.
2017-04-13 20:19:08 +00:00
v1helper . AddOrUpdateTolerationInPod ( newPod , & v1 . Toleration {
2017-05-30 14:46:00 +00:00
Key : algorithm . TaintNodeUnreachable ,
2017-02-21 17:06:45 +00:00
Operator : v1 . TolerationOpExists ,
Effect : v1 . TaintEffectNoExecute ,
} )
2017-08-05 06:49:10 +00:00
// According to TaintNodesByCondition, all DaemonSet pods should tolerate
// MemoryPressure and DisPressure taints, and the critical pods should tolerate
// OutOfDisk taint additional.
v1helper . AddOrUpdateTolerationInPod ( newPod , & v1 . Toleration {
Key : algorithm . TaintNodeDiskPressure ,
Operator : v1 . TolerationOpExists ,
Effect : v1 . TaintEffectNoSchedule ,
} )
v1helper . AddOrUpdateTolerationInPod ( newPod , & v1 . Toleration {
Key : algorithm . TaintNodeMemoryPressure ,
Operator : v1 . TolerationOpExists ,
Effect : v1 . TaintEffectNoSchedule ,
} )
2017-08-28 18:17:38 +00:00
// TODO(#48843) OutOfDisk taints will be removed in 1.10
2017-08-05 06:49:10 +00:00
if utilfeature . DefaultFeatureGate . Enabled ( features . ExperimentalCriticalPodAnnotation ) &&
kubelettypes . IsCriticalPod ( newPod ) {
v1helper . AddOrUpdateTolerationInPod ( newPod , & v1 . Toleration {
Key : algorithm . TaintNodeOutOfDisk ,
Operator : v1 . TolerationOpExists ,
Effect : v1 . TaintEffectNoSchedule ,
} )
}
2016-11-18 20:50:17 +00:00
pods := [ ] * v1 . Pod { }
2015-11-16 16:09:43 +00:00
2017-02-06 18:35:50 +00:00
podList , err := dsc . podLister . List ( labels . Everything ( ) )
if err != nil {
2017-07-01 14:03:06 +00:00
return nil , nil , err
2017-02-06 18:35:50 +00:00
}
2017-05-10 06:06:34 +00:00
for _ , pod := range podList {
2016-01-28 06:13:05 +00:00
if pod . Spec . NodeName != node . Name {
continue
}
2016-11-18 20:50:17 +00:00
if pod . Status . Phase == v1 . PodSucceeded || pod . Status . Phase == v1 . PodFailed {
2016-04-06 18:47:16 +00:00
continue
}
2016-08-09 12:01:46 +00:00
// ignore pods that belong to the daemonset when taking into account whether
2016-03-03 06:15:20 +00:00
// a daemonset should bind to a node.
2017-08-02 10:36:58 +00:00
if metav1 . IsControlledBy ( pod , ds ) {
2016-03-03 06:15:20 +00:00
continue
}
2016-01-28 06:13:05 +00:00
pods = append ( pods , pod )
}
2016-07-01 17:02:51 +00:00
nodeInfo := schedulercache . NewNodeInfo ( pods ... )
nodeInfo . SetNode ( node )
2017-07-01 14:03:06 +00:00
2017-03-02 00:04:54 +00:00
_ , reasons , err := Predicates ( newPod , nodeInfo )
2017-07-01 14:03:06 +00:00
return reasons , nodeInfo , err
}
// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
// summary. Returned booleans are:
// * wantToRun:
// Returns true when a user would expect a pod to run on this node and ignores conditions
2017-08-28 18:17:38 +00:00
// such as DiskPressure or insufficient resource that would cause a daemonset pod not to schedule.
2017-07-01 14:03:06 +00:00
// This is primarily used to populate daemonset status.
// * shouldSchedule:
// Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already
// running on that node.
// * shouldContinueRunning:
// Returns true when a daemonset should continue running on a node if a daemonset pod is already
// running on that node.
2018-02-14 18:35:38 +00:00
func ( dsc * DaemonSetsController ) nodeShouldRunDaemonPod ( node * v1 . Node , ds * apps . DaemonSet ) ( wantToRun , shouldSchedule , shouldContinueRunning bool , err error ) {
2017-07-01 14:03:06 +00:00
newPod := NewPod ( ds , node . Name )
// Because these bools require an && of all their required conditions, we start
// with all bools set to true and set a bool to false if a condition is not met.
// A bool should probably not be set to true after this line.
wantToRun , shouldSchedule , shouldContinueRunning = true , true , true
// If the daemon set specifies a node name, check that it matches with node.Name.
if ! ( ds . Spec . Template . Spec . NodeName == "" || ds . Spec . Template . Spec . NodeName == node . Name ) {
return false , false , false , nil
}
reasons , nodeInfo , err := dsc . simulate ( newPod , node , ds )
2016-07-01 17:02:51 +00:00
if err != nil {
2017-03-02 00:04:54 +00:00
glog . Warningf ( "DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v" , node . Name , ds . ObjectMeta . Namespace , ds . ObjectMeta . Name , err )
2016-12-14 22:25:35 +00:00
return false , false , false , err
2016-08-09 12:01:46 +00:00
}
2017-02-09 01:33:59 +00:00
2018-03-08 02:09:54 +00:00
// TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
// e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
// into one result, e.g. selectedNode.
2017-06-28 09:42:49 +00:00
var insufficientResourceErr error
2016-08-09 12:01:46 +00:00
for _ , r := range reasons {
2017-03-02 00:04:54 +00:00
glog . V ( 4 ) . Infof ( "DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v" , node . Name , ds . ObjectMeta . Namespace , ds . ObjectMeta . Name , r . GetReason ( ) )
2016-10-24 20:54:31 +00:00
switch reason := r . ( type ) {
case * predicates . InsufficientResourceError :
2017-06-28 09:42:49 +00:00
insufficientResourceErr = reason
2016-10-24 20:54:31 +00:00
case * predicates . PredicateFailureError :
2016-12-14 22:25:35 +00:00
var emitEvent bool
2017-06-28 09:29:54 +00:00
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
2016-12-14 22:25:35 +00:00
switch reason {
2017-06-28 09:29:54 +00:00
// intentional
case
predicates . ErrNodeSelectorNotMatch ,
predicates . ErrPodNotMatchHostName ,
predicates . ErrNodeLabelPresenceViolated ,
// this one is probably intentional since it's a workaround for not having
// pod hard anti affinity.
2017-06-28 09:42:49 +00:00
predicates . ErrPodNotFitsHostPorts :
2017-06-28 09:27:24 +00:00
return false , false , false , nil
2017-06-28 09:42:49 +00:00
case predicates . ErrTaintsTolerationsNotMatch :
// DaemonSet is expected to respect taints and tolerations
2017-06-28 09:27:24 +00:00
fitsNoExecute , _ , err := predicates . PodToleratesNodeNoExecuteTaints ( newPod , nil , nodeInfo )
if err != nil {
return false , false , false , err
}
if ! fitsNoExecute {
return false , false , false , nil
}
wantToRun , shouldSchedule = false , false
2017-06-28 09:29:54 +00:00
// unintentional
2016-12-14 22:25:35 +00:00
case
predicates . ErrDiskConflict ,
predicates . ErrVolumeZoneConflict ,
predicates . ErrMaxVolumeCountExceeded ,
predicates . ErrNodeUnderMemoryPressure ,
predicates . ErrNodeUnderDiskPressure :
// wantToRun and shouldContinueRunning are likely true here. They are
// absolutely true at the time of writing the comment. See first comment
// of this method.
shouldSchedule = false
emitEvent = true
// unexpected
case
predicates . ErrPodAffinityNotMatch ,
2017-02-09 01:33:59 +00:00
predicates . ErrServiceAffinityViolated :
glog . Warningf ( "unexpected predicate failure reason: %s" , reason . GetReason ( ) )
2017-03-02 00:04:54 +00:00
return false , false , false , fmt . Errorf ( "unexpected reason: DaemonSet Predicates should not return reason %s" , reason . GetReason ( ) )
2016-12-14 22:25:35 +00:00
default :
2017-01-24 22:49:35 +00:00
glog . V ( 4 ) . Infof ( "unknown predicate failure reason: %s" , reason . GetReason ( ) )
2016-12-14 22:25:35 +00:00
wantToRun , shouldSchedule , shouldContinueRunning = false , false , false
emitEvent = true
}
if emitEvent {
2017-04-12 04:03:18 +00:00
dsc . eventRecorder . Eventf ( ds , v1 . EventTypeWarning , FailedPlacementReason , "failed to place pod on %q: %s" , node . ObjectMeta . Name , reason . GetReason ( ) )
2016-10-24 20:54:31 +00:00
}
}
2016-01-28 06:13:05 +00:00
}
2017-06-28 09:42:49 +00:00
// only emit this event if insufficient resource is the only thing
// preventing the daemon pod from scheduling
if shouldSchedule && insufficientResourceErr != nil {
dsc . eventRecorder . Eventf ( ds , v1 . EventTypeWarning , FailedPlacementReason , "failed to place pod on %q: %s" , node . ObjectMeta . Name , insufficientResourceErr . Error ( ) )
shouldSchedule = false
}
2016-12-14 22:25:35 +00:00
return
2015-11-16 16:09:43 +00:00
}
2018-02-05 09:11:09 +00:00
// NewPod creates a new pod
2018-02-14 18:35:38 +00:00
func NewPod ( ds * apps . DaemonSet , nodeName string ) * v1 . Pod {
2017-03-02 00:04:54 +00:00
newPod := & v1 . Pod { Spec : ds . Spec . Template . Spec , ObjectMeta : ds . Spec . Template . ObjectMeta }
newPod . Namespace = ds . Namespace
newPod . Spec . NodeName = nodeName
return newPod
}
2018-03-08 02:09:54 +00:00
// nodeSelectionPredicates runs a set of predicates that select candidate nodes for the DaemonSet;
// the predicates include:
// - PodFitsHost: checks pod's NodeName against node
// - PodMatchNodeSelector: checks pod's NodeSelector and NodeAffinity against node
func nodeSelectionPredicates ( pod * v1 . Pod , meta algorithm . PredicateMetadata , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
var predicateFails [ ] algorithm . PredicateFailureReason
fit , reasons , err := predicates . PodFitsHost ( pod , meta , nodeInfo )
if err != nil {
return false , predicateFails , err
}
if ! fit {
predicateFails = append ( predicateFails , reasons ... )
}
fit , reasons , err = predicates . PodMatchNodeSelector ( pod , meta , nodeInfo )
if err != nil {
return false , predicateFails , err
}
if ! fit {
predicateFails = append ( predicateFails , reasons ... )
}
return len ( predicateFails ) == 0 , predicateFails , nil
}
2017-03-02 00:04:54 +00:00
// Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates
2017-02-09 01:33:59 +00:00
// and PodToleratesNodeTaints predicate
2017-03-02 00:04:54 +00:00
func Predicates ( pod * v1 . Pod , nodeInfo * schedulercache . NodeInfo ) ( bool , [ ] algorithm . PredicateFailureReason , error ) {
2017-02-09 01:33:59 +00:00
var predicateFails [ ] algorithm . PredicateFailureReason
2018-03-08 02:09:54 +00:00
// If ScheduleDaemonSetPods is enabled, only check nodeSelector and nodeAffinity.
if utilfeature . DefaultFeatureGate . Enabled ( features . ScheduleDaemonSetPods ) {
fit , reasons , err := nodeSelectionPredicates ( pod , nil , nodeInfo )
if err != nil {
return false , predicateFails , err
}
if ! fit {
predicateFails = append ( predicateFails , reasons ... )
}
return len ( predicateFails ) == 0 , predicateFails , nil
}
critical := utilfeature . DefaultFeatureGate . Enabled ( features . ExperimentalCriticalPodAnnotation ) &&
kubelettypes . IsCriticalPod ( pod )
2017-02-09 01:33:59 +00:00
2017-02-24 01:14:46 +00:00
fit , reasons , err := predicates . PodToleratesNodeTaints ( pod , nil , nodeInfo )
2017-02-09 01:33:59 +00:00
if err != nil {
return false , predicateFails , err
}
if ! fit {
predicateFails = append ( predicateFails , reasons ... )
}
2017-02-24 01:14:46 +00:00
if critical {
// If the pod is marked as critical and support for critical pod annotations is enabled,
// check predicates for critical pods only.
fit , reasons , err = predicates . EssentialPredicates ( pod , nil , nodeInfo )
} else {
fit , reasons , err = predicates . GeneralPredicates ( pod , nil , nodeInfo )
}
2017-02-09 01:33:59 +00:00
if err != nil {
return false , predicateFails , err
}
if ! fit {
predicateFails = append ( predicateFails , reasons ... )
}
return len ( predicateFails ) == 0 , predicateFails , nil
}
2015-09-08 18:50:39 +00:00
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
2018-02-14 18:35:38 +00:00
type byCreationTimestamp [ ] * apps . DaemonSet
2015-09-08 18:50:39 +00:00
func ( o byCreationTimestamp ) Len ( ) int { return len ( o ) }
func ( o byCreationTimestamp ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o byCreationTimestamp ) Less ( i , j int ) bool {
2017-08-04 15:04:14 +00:00
if o [ i ] . CreationTimestamp . Equal ( & o [ j ] . CreationTimestamp ) {
2015-09-08 18:50:39 +00:00
return o [ i ] . Name < o [ j ] . Name
}
2017-08-04 15:04:14 +00:00
return o [ i ] . CreationTimestamp . Before ( & o [ j ] . CreationTimestamp )
2015-09-08 18:50:39 +00:00
}
2015-09-29 09:07:43 +00:00
2016-11-18 20:50:17 +00:00
type podByCreationTimestamp [ ] * v1 . Pod
2015-09-29 09:07:43 +00:00
func ( o podByCreationTimestamp ) Len ( ) int { return len ( o ) }
func ( o podByCreationTimestamp ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o podByCreationTimestamp ) Less ( i , j int ) bool {
2017-08-04 15:04:14 +00:00
if o [ i ] . CreationTimestamp . Equal ( & o [ j ] . CreationTimestamp ) {
2015-09-29 09:07:43 +00:00
return o [ i ] . Name < o [ j ] . Name
}
2017-08-04 15:04:14 +00:00
return o [ i ] . CreationTimestamp . Before ( & o [ j ] . CreationTimestamp )
2015-09-29 09:07:43 +00:00
}