2014-07-15 14:52:39 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2014 The Kubernetes Authors .
2014-07-15 14:52:39 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package config
import (
"fmt"
"reflect"
"sync"
2017-06-22 17:25:57 +00:00
"k8s.io/api/core/v1"
2017-03-20 22:52:29 +00:00
"k8s.io/apimachinery/pkg/types"
2017-01-11 14:09:48 +00:00
"k8s.io/apimachinery/pkg/util/sets"
2017-01-30 18:39:54 +00:00
"k8s.io/client-go/tools/record"
2018-11-09 18:49:10 +00:00
"k8s.io/klog"
2017-10-29 23:51:06 +00:00
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
2017-11-29 18:19:39 +00:00
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2015-08-05 22:03:47 +00:00
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
2016-07-13 00:32:24 +00:00
"k8s.io/kubernetes/pkg/kubelet/events"
2015-10-09 17:24:31 +00:00
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
2015-11-20 17:54:37 +00:00
"k8s.io/kubernetes/pkg/kubelet/util/format"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/util/config"
2014-07-15 14:52:39 +00:00
)
2014-09-02 10:00:28 +00:00
// PodConfigNotificationMode describes how changes are sent to the update channel.
2014-07-15 14:52:39 +00:00
type PodConfigNotificationMode int
const (
2015-09-14 11:57:47 +00:00
// PodConfigNotificationUnknown is the default value for
// PodConfigNotificationMode when uninitialized.
PodConfigNotificationUnknown = iota
2014-07-15 14:52:39 +00:00
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
2014-09-02 10:00:28 +00:00
// any change occurs.
2015-09-14 11:57:47 +00:00
PodConfigNotificationSnapshot
2016-06-14 09:29:18 +00:00
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
2014-07-15 14:52:39 +00:00
// changed, and a SET message if there are any additions or removals.
PodConfigNotificationSnapshotAndUpdates
2016-06-14 09:29:18 +00:00
// PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
2014-07-15 14:52:39 +00:00
PodConfigNotificationIncremental
)
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
pods * podStorage
mux * config . Mux
// the channel of denormalized changes passed to listeners
2015-10-09 17:24:31 +00:00
updates chan kubetypes . PodUpdate
2015-03-05 18:49:36 +00:00
// contains the list of all configured sources
2017-10-29 23:51:06 +00:00
sourcesLock sync . Mutex
sources sets . String
2017-11-29 18:19:39 +00:00
checkpointManager checkpointmanager . CheckpointManager
2014-07-15 14:52:39 +00:00
}
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
2015-03-03 06:06:20 +00:00
func NewPodConfig ( mode PodConfigNotificationMode , recorder record . EventRecorder ) * PodConfig {
2015-10-09 17:24:31 +00:00
updates := make ( chan kubetypes . PodUpdate , 50 )
2015-03-03 06:06:20 +00:00
storage := newPodStorage ( updates , mode , recorder )
2014-07-15 14:52:39 +00:00
podConfig := & PodConfig {
pods : storage ,
mux : config . NewMux ( storage ) ,
updates : updates ,
2015-09-09 17:45:01 +00:00
sources : sets . String { } ,
2014-07-15 14:52:39 +00:00
}
return podConfig
}
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func ( c * PodConfig ) Channel ( source string ) chan <- interface { } {
2015-03-05 18:49:36 +00:00
c . sourcesLock . Lock ( )
defer c . sourcesLock . Unlock ( )
c . sources . Insert ( source )
2014-07-15 14:52:39 +00:00
return c . mux . Channel ( source )
}
2015-10-06 01:20:57 +00:00
// SeenAllSources returns true if seenSources contains all sources in the
// config, and also this config has received a SET message from each source.
func ( c * PodConfig ) SeenAllSources ( seenSources sets . String ) bool {
2014-12-17 05:11:27 +00:00
if c . pods == nil {
return false
}
2018-11-09 18:49:10 +00:00
klog . V ( 5 ) . Infof ( "Looking for %v, have seen %v" , c . sources . List ( ) , seenSources )
2015-10-06 01:20:57 +00:00
return seenSources . HasAll ( c . sources . List ( ) ... ) && c . pods . seenSources ( c . sources . List ( ) ... )
2014-12-17 05:11:27 +00:00
}
2014-07-15 14:52:39 +00:00
// Updates returns a channel of updates to the configuration, properly denormalized.
2015-10-09 17:24:31 +00:00
func ( c * PodConfig ) Updates ( ) <- chan kubetypes . PodUpdate {
2014-07-15 14:52:39 +00:00
return c . updates
}
// Sync requests the full configuration be delivered to the update channel.
func ( c * PodConfig ) Sync ( ) {
c . pods . Sync ( )
}
2017-10-29 23:51:06 +00:00
// Restore restores pods from the checkpoint path, *once*
func ( c * PodConfig ) Restore ( path string , updates chan <- interface { } ) error {
2018-05-08 19:12:20 +00:00
if c . checkpointManager != nil {
return nil
}
2017-10-29 23:51:06 +00:00
var err error
2018-05-08 19:12:20 +00:00
c . checkpointManager , err = checkpointmanager . NewCheckpointManager ( path )
if err != nil {
return err
}
pods , err := checkpoint . LoadPods ( c . checkpointManager )
if err != nil {
return err
2017-10-29 23:51:06 +00:00
}
2018-05-08 19:12:20 +00:00
updates <- kubetypes . PodUpdate { Pods : pods , Op : kubetypes . RESTORE , Source : kubetypes . ApiserverSource }
return nil
2017-10-29 23:51:06 +00:00
}
2014-07-15 14:52:39 +00:00
// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
// available, then this object should be considered authoritative.
type podStorage struct {
podLock sync . RWMutex
2017-03-20 22:52:29 +00:00
// map of source name to pod uid to pod reference
pods map [ string ] map [ types . UID ] * v1 . Pod
2014-07-15 14:52:39 +00:00
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync . Mutex
2015-10-09 17:24:31 +00:00
updates chan <- kubetypes . PodUpdate
2014-12-17 05:11:27 +00:00
// contains the set of all sources that have sent at least one SET
2016-08-31 01:10:14 +00:00
sourcesSeenLock sync . RWMutex
2015-09-09 17:45:01 +00:00
sourcesSeen sets . String
2015-03-03 06:06:20 +00:00
// the EventRecorder to use
recorder record . EventRecorder
2014-07-15 14:52:39 +00:00
}
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
2015-10-09 17:24:31 +00:00
func newPodStorage ( updates chan <- kubetypes . PodUpdate , mode PodConfigNotificationMode , recorder record . EventRecorder ) * podStorage {
2014-07-15 14:52:39 +00:00
return & podStorage {
2017-03-20 22:52:29 +00:00
pods : make ( map [ string ] map [ types . UID ] * v1 . Pod ) ,
2014-12-17 05:11:27 +00:00
mode : mode ,
updates : updates ,
2015-09-09 17:45:01 +00:00
sourcesSeen : sets . String { } ,
2015-03-03 06:06:20 +00:00
recorder : recorder ,
2014-07-15 14:52:39 +00:00
}
}
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
func ( s * podStorage ) Merge ( source string , change interface { } ) error {
s . updateLock . Lock ( )
defer s . updateLock . Unlock ( )
2015-10-14 12:22:13 +00:00
seenBefore := s . sourcesSeen . Has ( source )
2017-10-29 23:51:06 +00:00
adds , updates , deletes , removes , reconciles , restores := s . merge ( source , change )
2015-10-14 12:22:13 +00:00
firstSet := ! seenBefore && s . sourcesSeen . Has ( source )
2014-07-15 14:52:39 +00:00
// deliver update notifications
switch s . mode {
case PodConfigNotificationIncremental :
2016-06-14 09:29:18 +00:00
if len ( removes . Pods ) > 0 {
s . updates <- * removes
2014-07-15 14:52:39 +00:00
}
2016-01-21 18:59:11 +00:00
if len ( adds . Pods ) > 0 {
2014-07-15 14:52:39 +00:00
s . updates <- * adds
}
if len ( updates . Pods ) > 0 {
s . updates <- * updates
}
2016-06-14 09:29:18 +00:00
if len ( deletes . Pods ) > 0 {
s . updates <- * deletes
}
2017-10-29 23:51:06 +00:00
if len ( restores . Pods ) > 0 {
s . updates <- * restores
}
2016-06-14 09:29:18 +00:00
if firstSet && len ( adds . Pods ) == 0 && len ( updates . Pods ) == 0 && len ( deletes . Pods ) == 0 {
2016-01-21 18:59:11 +00:00
// Send an empty update when first seeing the source and there are
2016-06-14 09:29:18 +00:00
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
2016-01-21 18:59:11 +00:00
// the source is ready.
s . updates <- * adds
}
2015-12-09 03:13:09 +00:00
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len ( reconciles . Pods ) > 0 {
s . updates <- * reconciles
}
2014-07-15 14:52:39 +00:00
case PodConfigNotificationSnapshotAndUpdates :
2016-06-14 09:29:18 +00:00
if len ( removes . Pods ) > 0 || len ( adds . Pods ) > 0 || firstSet {
2016-11-18 20:50:58 +00:00
s . updates <- kubetypes . PodUpdate { Pods : s . MergedState ( ) . ( [ ] * v1 . Pod ) , Op : kubetypes . SET , Source : source }
2015-10-14 12:22:13 +00:00
}
2014-07-15 14:52:39 +00:00
if len ( updates . Pods ) > 0 {
s . updates <- * updates
}
2016-06-14 09:29:18 +00:00
if len ( deletes . Pods ) > 0 {
s . updates <- * deletes
}
2014-07-15 14:52:39 +00:00
case PodConfigNotificationSnapshot :
2016-06-14 09:29:18 +00:00
if len ( updates . Pods ) > 0 || len ( deletes . Pods ) > 0 || len ( adds . Pods ) > 0 || len ( removes . Pods ) > 0 || firstSet {
2016-11-18 20:50:58 +00:00
s . updates <- kubetypes . PodUpdate { Pods : s . MergedState ( ) . ( [ ] * v1 . Pod ) , Op : kubetypes . SET , Source : source }
2014-07-15 14:52:39 +00:00
}
2015-09-14 11:57:47 +00:00
case PodConfigNotificationUnknown :
fallthrough
2014-07-15 14:52:39 +00:00
default :
panic ( fmt . Sprintf ( "unsupported PodConfigNotificationMode: %#v" , s . mode ) )
}
return nil
}
2017-10-29 23:51:06 +00:00
func ( s * podStorage ) merge ( source string , change interface { } ) ( adds , updates , deletes , removes , reconciles , restores * kubetypes . PodUpdate ) {
2014-07-15 14:52:39 +00:00
s . podLock . Lock ( )
defer s . podLock . Unlock ( )
2016-11-18 20:50:58 +00:00
addPods := [ ] * v1 . Pod { }
updatePods := [ ] * v1 . Pod { }
deletePods := [ ] * v1 . Pod { }
removePods := [ ] * v1 . Pod { }
reconcilePods := [ ] * v1 . Pod { }
2017-10-29 23:51:06 +00:00
restorePods := [ ] * v1 . Pod { }
2014-07-15 14:52:39 +00:00
pods := s . pods [ source ]
if pods == nil {
2017-03-20 22:52:29 +00:00
pods = make ( map [ types . UID ] * v1 . Pod )
2014-07-15 14:52:39 +00:00
}
2016-01-28 09:04:35 +00:00
// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
// After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache.
2017-03-20 22:52:29 +00:00
updatePodsFunc := func ( newPods [ ] * v1 . Pod , oldPods , pods map [ types . UID ] * v1 . Pod ) {
2016-01-28 09:04:35 +00:00
filtered := filterInvalidPods ( newPods , source , s . recorder )
2014-07-15 14:52:39 +00:00
for _ , ref := range filtered {
2015-09-11 19:52:22 +00:00
// Annotate the pod with the source before any comparison.
if ref . Annotations == nil {
ref . Annotations = make ( map [ string ] string )
}
2015-10-09 17:24:31 +00:00
ref . Annotations [ kubetypes . ConfigSourceAnnotationKey ] = source
2017-03-20 22:52:29 +00:00
if existing , found := oldPods [ ref . UID ] ; found {
pods [ ref . UID ] = existing
2016-06-14 09:29:18 +00:00
needUpdate , needReconcile , needGracefulDelete := checkAndUpdatePod ( existing , ref )
2015-12-09 03:13:09 +00:00
if needUpdate {
2015-12-10 18:35:02 +00:00
updatePods = append ( updatePods , existing )
2015-12-09 03:13:09 +00:00
} else if needReconcile {
reconcilePods = append ( reconcilePods , existing )
2016-06-14 09:29:18 +00:00
} else if needGracefulDelete {
deletePods = append ( deletePods , existing )
2014-07-15 14:52:39 +00:00
}
continue
}
2015-06-05 19:42:23 +00:00
recordFirstSeenTime ( ref )
2017-03-20 22:52:29 +00:00
pods [ ref . UID ] = ref
2016-03-17 18:36:18 +00:00
addPods = append ( addPods , ref )
2014-07-15 14:52:39 +00:00
}
2016-01-28 09:04:35 +00:00
}
update := change . ( kubetypes . PodUpdate )
switch update . Op {
2016-06-14 09:29:18 +00:00
case kubetypes . ADD , kubetypes . UPDATE , kubetypes . DELETE :
2016-01-28 09:04:35 +00:00
if update . Op == kubetypes . ADD {
2018-11-09 18:49:10 +00:00
klog . V ( 4 ) . Infof ( "Adding new pods from source %s : %v" , source , update . Pods )
2016-06-14 09:29:18 +00:00
} else if update . Op == kubetypes . DELETE {
2018-11-09 18:49:10 +00:00
klog . V ( 4 ) . Infof ( "Graceful deleting pods from source %s : %v" , source , update . Pods )
2016-01-28 09:04:35 +00:00
} else {
2018-11-09 18:49:10 +00:00
klog . V ( 4 ) . Infof ( "Updating pods from source %s : %v" , source , update . Pods )
2016-01-28 09:04:35 +00:00
}
updatePodsFunc ( update . Pods , pods , pods )
2014-07-15 14:52:39 +00:00
2015-10-09 17:24:31 +00:00
case kubetypes . REMOVE :
2018-11-09 18:49:10 +00:00
klog . V ( 4 ) . Infof ( "Removing pods from source %s : %v" , source , update . Pods )
2014-07-15 14:52:39 +00:00
for _ , value := range update . Pods {
2017-03-20 22:52:29 +00:00
if existing , found := pods [ value . UID ] ; found {
2014-07-15 14:52:39 +00:00
// this is a delete
2017-03-20 22:52:29 +00:00
delete ( pods , value . UID )
2016-06-14 09:29:18 +00:00
removePods = append ( removePods , existing )
2014-07-15 14:52:39 +00:00
continue
}
// this is a no-op
}
2015-10-09 17:24:31 +00:00
case kubetypes . SET :
2018-11-09 18:49:10 +00:00
klog . V ( 4 ) . Infof ( "Setting pods for source %s" , source )
2014-12-17 05:11:27 +00:00
s . markSourceSet ( source )
2014-07-15 14:52:39 +00:00
// Clear the old map entries by just creating a new map
oldPods := pods
2017-03-20 22:52:29 +00:00
pods = make ( map [ types . UID ] * v1 . Pod )
2016-01-28 09:04:35 +00:00
updatePodsFunc ( update . Pods , oldPods , pods )
2017-03-20 22:52:29 +00:00
for uid , existing := range oldPods {
if _ , found := pods [ uid ] ; ! found {
2014-07-15 14:52:39 +00:00
// this is a delete
2016-06-14 09:29:18 +00:00
removePods = append ( removePods , existing )
2014-07-15 14:52:39 +00:00
}
}
2017-10-29 23:51:06 +00:00
case kubetypes . RESTORE :
2018-11-09 18:49:10 +00:00
klog . V ( 4 ) . Infof ( "Restoring pods for source %s" , source )
2018-05-08 19:12:20 +00:00
for _ , value := range update . Pods {
restorePods = append ( restorePods , value )
}
2014-07-15 14:52:39 +00:00
default :
2018-11-09 18:49:10 +00:00
klog . Warningf ( "Received invalid update type: %v" , update )
2014-07-15 14:52:39 +00:00
}
s . pods [ source ] = pods
2015-12-10 18:35:02 +00:00
adds = & kubetypes . PodUpdate { Op : kubetypes . ADD , Pods : copyPods ( addPods ) , Source : source }
updates = & kubetypes . PodUpdate { Op : kubetypes . UPDATE , Pods : copyPods ( updatePods ) , Source : source }
2016-06-14 09:29:18 +00:00
deletes = & kubetypes . PodUpdate { Op : kubetypes . DELETE , Pods : copyPods ( deletePods ) , Source : source }
removes = & kubetypes . PodUpdate { Op : kubetypes . REMOVE , Pods : copyPods ( removePods ) , Source : source }
2015-12-09 03:13:09 +00:00
reconciles = & kubetypes . PodUpdate { Op : kubetypes . RECONCILE , Pods : copyPods ( reconcilePods ) , Source : source }
2017-10-29 23:51:06 +00:00
restores = & kubetypes . PodUpdate { Op : kubetypes . RESTORE , Pods : copyPods ( restorePods ) , Source : source }
2015-12-10 18:35:02 +00:00
2017-10-29 23:51:06 +00:00
return adds , updates , deletes , removes , reconciles , restores
2014-07-15 14:52:39 +00:00
}
2014-12-17 05:11:27 +00:00
func ( s * podStorage ) markSourceSet ( source string ) {
s . sourcesSeenLock . Lock ( )
defer s . sourcesSeenLock . Unlock ( )
s . sourcesSeen . Insert ( source )
}
func ( s * podStorage ) seenSources ( sources ... string ) bool {
2016-08-31 01:10:14 +00:00
s . sourcesSeenLock . RLock ( )
defer s . sourcesSeenLock . RUnlock ( )
2014-12-17 05:11:27 +00:00
return s . sourcesSeen . HasAll ( sources ... )
}
2016-11-18 20:50:58 +00:00
func filterInvalidPods ( pods [ ] * v1 . Pod , source string , recorder record . EventRecorder ) ( filtered [ ] * v1 . Pod ) {
2015-09-09 17:45:01 +00:00
names := sets . String { }
2015-04-03 22:51:50 +00:00
for i , pod := range pods {
2017-09-28 09:14:29 +00:00
// Pods from each source are assumed to have passed validation individually.
// This function only checks if there is any naming conflict.
name := kubecontainer . GetPodFullName ( pod )
if names . Has ( name ) {
2018-11-09 18:49:10 +00:00
klog . Warningf ( "Pod[%d] (%s) from %s failed validation due to duplicate pod name %q, ignoring" , i + 1 , format . Pod ( pod ) , source , pod . Name )
2017-09-28 09:14:29 +00:00
recorder . Eventf ( pod , v1 . EventTypeWarning , events . FailedValidation , "Error validating pod %s from %s due to duplicate pod name %q, ignoring" , format . Pod ( pod ) , source , pod . Name )
2016-11-18 20:50:58 +00:00
continue
2014-07-15 14:52:39 +00:00
} else {
2017-09-28 09:14:29 +00:00
names . Insert ( name )
2014-07-15 14:52:39 +00:00
}
2017-09-28 09:14:29 +00:00
2015-01-05 01:30:30 +00:00
filtered = append ( filtered , pod )
2014-07-15 14:52:39 +00:00
}
return
}
2015-08-28 22:34:22 +00:00
// Annotations that the kubelet adds to the pod.
var localAnnotations = [ ] string {
2015-10-09 17:24:31 +00:00
kubetypes . ConfigSourceAnnotationKey ,
kubetypes . ConfigMirrorAnnotationKey ,
kubetypes . ConfigFirstSeenAnnotationKey ,
2015-08-28 22:34:22 +00:00
}
func isLocalAnnotationKey ( key string ) bool {
for _ , localKey := range localAnnotations {
if key == localKey {
return true
}
}
return false
}
// isAnnotationMapEqual returns true if the existing annotation Map is equal to candidate except
// for local annotations.
func isAnnotationMapEqual ( existingMap , candidateMap map [ string ] string ) bool {
if candidateMap == nil {
2015-09-12 00:49:33 +00:00
candidateMap = make ( map [ string ] string )
2015-08-28 22:34:22 +00:00
}
for k , v := range candidateMap {
2015-09-12 00:49:33 +00:00
if isLocalAnnotationKey ( k ) {
continue
}
2015-08-28 22:34:22 +00:00
if existingValue , ok := existingMap [ k ] ; ok && existingValue == v {
continue
}
return false
}
for k := range existingMap {
if isLocalAnnotationKey ( k ) {
continue
}
// stale entry in existing map.
if _ , exists := candidateMap [ k ] ; ! exists {
return false
}
}
return true
}
2015-09-12 00:49:33 +00:00
// recordFirstSeenTime records the first seen time of this pod.
2016-11-18 20:50:58 +00:00
func recordFirstSeenTime ( pod * v1 . Pod ) {
2018-11-09 18:49:10 +00:00
klog . V ( 4 ) . Infof ( "Receiving a new pod %q" , format . Pod ( pod ) )
2015-10-09 17:24:31 +00:00
pod . Annotations [ kubetypes . ConfigFirstSeenAnnotationKey ] = kubetypes . NewTimestamp ( ) . GetString ( )
2015-09-12 00:49:33 +00:00
}
2015-08-28 22:34:22 +00:00
// updateAnnotations returns an Annotation map containing the api annotation map plus
// locally managed annotations
2016-11-18 20:50:58 +00:00
func updateAnnotations ( existing , ref * v1 . Pod ) {
2015-08-28 22:34:22 +00:00
annotations := make ( map [ string ] string , len ( ref . Annotations ) + len ( localAnnotations ) )
for k , v := range ref . Annotations {
annotations [ k ] = v
}
for _ , k := range localAnnotations {
if v , ok := existing . Annotations [ k ] ; ok {
annotations [ k ] = v
}
}
existing . Annotations = annotations
}
2016-11-18 20:50:58 +00:00
func podsDifferSemantically ( existing , ref * v1 . Pod ) bool {
2015-09-12 00:49:33 +00:00
if reflect . DeepEqual ( existing . Spec , ref . Spec ) &&
2015-09-25 13:29:00 +00:00
reflect . DeepEqual ( existing . Labels , ref . Labels ) &&
2015-09-12 00:49:33 +00:00
reflect . DeepEqual ( existing . DeletionTimestamp , ref . DeletionTimestamp ) &&
reflect . DeepEqual ( existing . DeletionGracePeriodSeconds , ref . DeletionGracePeriodSeconds ) &&
isAnnotationMapEqual ( existing . Annotations , ref . Annotations ) {
return false
}
return true
}
2015-12-09 03:13:09 +00:00
// checkAndUpdatePod updates existing, and:
// * if ref makes a meaningful change, returns needUpdate=true
2016-06-14 09:29:18 +00:00
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
2015-12-09 03:13:09 +00:00
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
2016-06-14 09:29:18 +00:00
// * else return all false
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
2016-11-18 20:50:58 +00:00
func checkAndUpdatePod ( existing , ref * v1 . Pod ) ( needUpdate , needReconcile , needGracefulDelete bool ) {
2016-06-14 09:29:18 +00:00
// 1. this is a reconcile
2015-08-20 01:57:58 +00:00
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
2015-09-12 00:49:33 +00:00
if ! podsDifferSemantically ( existing , ref ) {
2015-12-09 03:13:09 +00:00
// this is not an update
// Only check reconcile when it is not an update, because if the pod is going to
// be updated, an extra reconcile is unnecessary
if ! reflect . DeepEqual ( existing . Status , ref . Status ) {
// Pod with changed pod status needs reconcile, because kubelet should
// be the source of truth of pod status.
existing . Status = ref . Status
needReconcile = true
}
return
2015-08-20 01:57:58 +00:00
}
2015-09-12 00:49:33 +00:00
// Overwrite the first-seen time with the existing one. This is our own
// internal annotation, there is no need to update.
2015-10-09 17:24:31 +00:00
ref . Annotations [ kubetypes . ConfigFirstSeenAnnotationKey ] = existing . Annotations [ kubetypes . ConfigFirstSeenAnnotationKey ]
2015-09-12 00:49:33 +00:00
2015-08-20 01:57:58 +00:00
existing . Spec = ref . Spec
2015-09-25 13:29:00 +00:00
existing . Labels = ref . Labels
2015-08-20 01:57:58 +00:00
existing . DeletionTimestamp = ref . DeletionTimestamp
existing . DeletionGracePeriodSeconds = ref . DeletionGracePeriodSeconds
2015-12-09 03:13:09 +00:00
existing . Status = ref . Status
2015-08-28 22:34:22 +00:00
updateAnnotations ( existing , ref )
2016-06-14 09:29:18 +00:00
// 2. this is an graceful delete
if ref . DeletionTimestamp != nil {
needGracefulDelete = true
} else {
// 3. this is an update
needUpdate = true
}
2015-12-09 03:13:09 +00:00
return
2015-08-20 01:57:58 +00:00
}
2014-09-02 10:00:28 +00:00
// Sync sends a copy of the current state through the update channel.
2014-07-15 14:52:39 +00:00
func ( s * podStorage ) Sync ( ) {
s . updateLock . Lock ( )
defer s . updateLock . Unlock ( )
2016-11-18 20:50:58 +00:00
s . updates <- kubetypes . PodUpdate { Pods : s . MergedState ( ) . ( [ ] * v1 . Pod ) , Op : kubetypes . SET , Source : kubetypes . AllSource }
2014-07-15 14:52:39 +00:00
}
// Object implements config.Accessor
func ( s * podStorage ) MergedState ( ) interface { } {
s . podLock . RLock ( )
defer s . podLock . RUnlock ( )
2016-11-18 20:50:58 +00:00
pods := make ( [ ] * v1 . Pod , 0 )
2014-10-08 19:56:02 +00:00
for _ , sourcePods := range s . pods {
2014-07-15 14:52:39 +00:00
for _ , podRef := range sourcePods {
2017-10-06 11:02:35 +00:00
pods = append ( pods , podRef . DeepCopy ( ) )
2014-07-15 14:52:39 +00:00
}
}
return pods
}
2014-10-08 19:56:02 +00:00
2016-11-18 20:50:58 +00:00
func copyPods ( sourcePods [ ] * v1 . Pod ) [ ] * v1 . Pod {
pods := [ ] * v1 . Pod { }
2015-12-10 18:35:02 +00:00
for _ , source := range sourcePods {
// Use a deep copy here just in case
2017-10-06 11:02:35 +00:00
pods = append ( pods , source . DeepCopy ( ) )
2015-12-10 18:35:02 +00:00
}
return pods
}