2017-02-23 19:16:13 +00:00
/ *
Copyright 2016 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package garbagecollector
import (
"fmt"
"reflect"
2017-05-17 22:54:58 +00:00
"sync"
2017-02-23 19:16:13 +00:00
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
2017-05-17 22:54:58 +00:00
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2017-02-23 19:16:13 +00:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
2017-06-23 20:56:37 +00:00
"k8s.io/client-go/informers"
2017-02-23 19:16:13 +00:00
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
2017-12-21 07:56:09 +00:00
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
2017-02-23 19:16:13 +00:00
)
type eventType int
2017-03-30 07:08:32 +00:00
func ( e eventType ) String ( ) string {
switch e {
case addEvent :
return "add"
case updateEvent :
return "update"
case deleteEvent :
return "delete"
default :
return fmt . Sprintf ( "unknown(%d)" , int ( e ) )
}
}
2017-02-23 19:16:13 +00:00
const (
addEvent eventType = iota
updateEvent
deleteEvent
)
type event struct {
eventType eventType
obj interface { }
// the update event comes with an old object, but it's not used by the garbage collector.
oldObj interface { }
2017-05-04 17:55:24 +00:00
gvk schema . GroupVersionKind
2017-02-23 19:16:13 +00:00
}
// GraphBuilder: based on the events supplied by the informers, GraphBuilder updates
// uidToNode, a graph that caches the dependencies as we know, and enqueues
// items to the attemptToDelete and attemptToOrphan.
type GraphBuilder struct {
restMapper meta . RESTMapper
2017-05-17 22:54:58 +00:00
2017-02-23 19:16:13 +00:00
// each monitor list/watches a resource, the results are funneled to the
// dependencyGraphBuilder
2017-05-17 22:54:58 +00:00
monitors monitors
2017-11-29 01:49:24 +00:00
monitorLock sync . RWMutex
2017-08-24 16:39:55 +00:00
// informersStarted is closed after after all of the controllers have been initialized and are running.
// After that it is safe to start them here, before that it is not.
informersStarted <- chan struct { }
2018-01-15 20:39:12 +00:00
// stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
2017-05-17 22:54:58 +00:00
// This channel is also protected by monitorLock.
stopCh <- chan struct { }
2018-01-15 20:39:12 +00:00
// running tracks whether Run() has been called.
// it is protected by monitorLock.
running bool
2018-05-09 16:58:12 +00:00
dynamicClient dynamic . Interface
2017-02-23 19:16:13 +00:00
// monitors are the producer of the graphChanges queue, graphBuilder alters
// the in-memory graph according to the changes.
graphChanges workqueue . RateLimitingInterface
// uidToNode doesn't require a lock to protect, because only the
// single-threaded GraphBuilder.processGraphChanges() reads/writes it.
uidToNode * concurrentUIDToNode
// GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
attemptToDelete workqueue . RateLimitingInterface
attemptToOrphan workqueue . RateLimitingInterface
// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
// be non-existent are added to the cached.
absentOwnerCache * UIDCache
2017-05-04 17:55:24 +00:00
sharedInformers informers . SharedInformerFactory
2017-05-16 17:35:45 +00:00
ignoredResources map [ schema . GroupResource ] struct { }
2017-02-23 19:16:13 +00:00
}
2017-05-17 22:54:58 +00:00
// monitor runs a Controller with a local stop channel.
type monitor struct {
controller cache . Controller
2017-11-29 01:49:24 +00:00
store cache . Store
2017-05-17 22:54:58 +00:00
// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
// not yet started.
stopCh chan struct { }
}
// Run is intended to be called in a goroutine. Multiple calls of this is an
// error.
func ( m * monitor ) Run ( ) {
m . controller . Run ( m . stopCh )
}
type monitors map [ schema . GroupVersionResource ] * monitor
2018-05-09 16:58:12 +00:00
func listWatcher ( client dynamic . Interface , resource schema . GroupVersionResource ) * cache . ListWatch {
2017-02-23 19:16:13 +00:00
return & cache . ListWatch {
ListFunc : func ( options metav1 . ListOptions ) ( runtime . Object , error ) {
2018-05-04 18:40:39 +00:00
// We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok.
return client . Resource ( resource ) . List ( options )
2017-02-23 19:16:13 +00:00
} ,
WatchFunc : func ( options metav1 . ListOptions ) ( watch . Interface , error ) {
2018-05-04 18:40:39 +00:00
// We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok.
return client . Resource ( resource ) . Watch ( options )
2017-02-23 19:16:13 +00:00
} ,
}
}
2017-11-29 01:49:24 +00:00
func ( gb * GraphBuilder ) controllerFor ( resource schema . GroupVersionResource , kind schema . GroupVersionKind ) ( cache . Controller , cache . Store , error ) {
2017-05-04 17:55:24 +00:00
handlers := cache . ResourceEventHandlerFuncs {
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc : func ( obj interface { } ) {
event := & event {
eventType : addEvent ,
obj : obj ,
gvk : kind ,
}
gb . graphChanges . Add ( event )
} ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := & event {
eventType : updateEvent ,
obj : newObj ,
oldObj : oldObj ,
gvk : kind ,
}
gb . graphChanges . Add ( event )
} ,
DeleteFunc : func ( obj interface { } ) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown , ok := obj . ( cache . DeletedFinalStateUnknown ) ; ok {
obj = deletedFinalStateUnknown . Obj
}
event := & event {
eventType : deleteEvent ,
obj : obj ,
gvk : kind ,
}
gb . graphChanges . Add ( event )
} ,
}
shared , err := gb . sharedInformers . ForResource ( resource )
if err == nil {
glog . V ( 4 ) . Infof ( "using a shared informer for resource %q, kind %q" , resource . String ( ) , kind . String ( ) )
// need to clone because it's from a shared cache
shared . Informer ( ) . AddEventHandlerWithResyncPeriod ( handlers , ResourceResyncTime )
2017-11-29 01:49:24 +00:00
return shared . Informer ( ) . GetController ( ) , shared . Informer ( ) . GetStore ( ) , nil
2017-05-04 17:55:24 +00:00
} else {
glog . V ( 4 ) . Infof ( "unable to use a shared informer for resource %q, kind %q: %v" , resource . String ( ) , kind . String ( ) , err )
}
2017-02-23 19:16:13 +00:00
// TODO: consider store in one storage.
glog . V ( 5 ) . Infof ( "create storage for resource %s" , resource )
2017-11-29 01:49:24 +00:00
store , monitor := cache . NewInformer (
2018-05-04 18:40:39 +00:00
listWatcher ( gb . dynamicClient , resource ) ,
2017-02-23 19:16:13 +00:00
nil ,
ResourceResyncTime ,
2017-05-04 17:55:24 +00:00
// don't need to clone because it's not from shared cache
handlers ,
2017-02-23 19:16:13 +00:00
)
2017-11-29 01:49:24 +00:00
return monitor , store , nil
2017-02-23 19:16:13 +00:00
}
2017-05-17 22:54:58 +00:00
// syncMonitors rebuilds the monitor set according to the supplied resources,
// creating or deleting monitors as necessary. It will return any error
// encountered, but will make an attempt to create a monitor for each resource
// instead of immediately exiting on an error. It may be called before or after
// Run. Monitors are NOT started as part of the sync. To ensure all existing
// monitors are started, call startMonitors.
func ( gb * GraphBuilder ) syncMonitors ( resources map [ schema . GroupVersionResource ] struct { } ) error {
gb . monitorLock . Lock ( )
defer gb . monitorLock . Unlock ( )
toRemove := gb . monitors
if toRemove == nil {
toRemove = monitors { }
}
current := monitors { }
errs := [ ] error { }
kept := 0
added := 0
2017-02-23 19:16:13 +00:00
for resource := range resources {
2017-11-15 07:26:58 +00:00
if _ , ok := gb . ignoredResources [ resource . GroupResource ( ) ] ; ok {
2017-05-17 22:54:58 +00:00
continue
}
if m , ok := toRemove [ resource ] ; ok {
current [ resource ] = m
delete ( toRemove , resource )
kept ++
2017-02-23 19:16:13 +00:00
continue
}
kind , err := gb . restMapper . KindFor ( resource )
if err != nil {
2017-05-17 22:54:58 +00:00
errs = append ( errs , fmt . Errorf ( "couldn't look up resource %q: %v" , resource , err ) )
2017-03-16 18:41:45 +00:00
continue
2017-02-23 19:16:13 +00:00
}
2017-11-29 01:49:24 +00:00
c , s , err := gb . controllerFor ( resource , kind )
2017-02-23 19:16:13 +00:00
if err != nil {
2017-05-17 22:54:58 +00:00
errs = append ( errs , fmt . Errorf ( "couldn't start monitor for resource %q: %v" , resource , err ) )
continue
}
2017-11-29 01:49:24 +00:00
current [ resource ] = & monitor { store : s , controller : c }
2017-05-17 22:54:58 +00:00
added ++
}
gb . monitors = current
for _ , monitor := range toRemove {
if monitor . stopCh != nil {
close ( monitor . stopCh )
2017-02-23 19:16:13 +00:00
}
}
2017-05-17 22:54:58 +00:00
glog . V ( 4 ) . Infof ( "synced monitors; added %d, kept %d, removed %d" , added , kept , len ( toRemove ) )
// NewAggregate returns nil if errs is 0-length
return utilerrors . NewAggregate ( errs )
2017-02-23 19:16:13 +00:00
}
2017-05-17 22:54:58 +00:00
// startMonitors ensures the current set of monitors are running. Any newly
// started monitors will also cause shared informers to be started.
//
// If called before Run, startMonitors does nothing (as there is no stop channel
// to support monitor/informer execution).
func ( gb * GraphBuilder ) startMonitors ( ) {
gb . monitorLock . Lock ( )
defer gb . monitorLock . Unlock ( )
2018-01-15 20:39:12 +00:00
if ! gb . running {
2017-05-17 22:54:58 +00:00
return
}
2017-08-24 16:39:55 +00:00
// we're waiting until after the informer start that happens once all the controllers are initialized. This ensures
// that they don't get unexpected events on their work queues.
<- gb . informersStarted
2017-05-17 22:54:58 +00:00
monitors := gb . monitors
started := 0
for _ , monitor := range monitors {
if monitor . stopCh == nil {
monitor . stopCh = make ( chan struct { } )
gb . sharedInformers . Start ( gb . stopCh )
go monitor . Run ( )
started ++
}
}
glog . V ( 4 ) . Infof ( "started %d new monitors, %d currently running" , started , len ( monitors ) )
}
// IsSynced returns true if any monitors exist AND all those monitors'
// controllers HasSynced functions return true. This means IsSynced could return
// true at one time, and then later return false if all monitors were
// reconstructed.
func ( gb * GraphBuilder ) IsSynced ( ) bool {
gb . monitorLock . Lock ( )
defer gb . monitorLock . Unlock ( )
if len ( gb . monitors ) == 0 {
2018-06-01 22:47:00 +00:00
glog . V ( 4 ) . Info ( "garbage controller monitor not synced: no monitors" )
2017-05-17 22:54:58 +00:00
return false
}
2018-06-01 22:47:00 +00:00
for resource , monitor := range gb . monitors {
2017-05-17 22:54:58 +00:00
if ! monitor . controller . HasSynced ( ) {
2018-06-01 22:47:00 +00:00
glog . V ( 4 ) . Infof ( "garbage controller monitor not yet synced: %+v" , resource )
2017-02-23 19:16:13 +00:00
return false
}
}
return true
}
2017-05-17 22:54:58 +00:00
// Run sets the stop channel and starts monitor execution until stopCh is
// closed. Any running monitors will be stopped before Run returns.
2017-02-23 19:16:13 +00:00
func ( gb * GraphBuilder ) Run ( stopCh <- chan struct { } ) {
2017-05-17 22:54:58 +00:00
glog . Infof ( "GraphBuilder running" )
defer glog . Infof ( "GraphBuilder stopping" )
2017-05-04 17:55:24 +00:00
2017-05-17 22:54:58 +00:00
// Set up the stop channel.
gb . monitorLock . Lock ( )
2017-05-04 17:55:24 +00:00
gb . stopCh = stopCh
2018-01-15 20:39:12 +00:00
gb . running = true
2017-05-17 22:54:58 +00:00
gb . monitorLock . Unlock ( )
// Start monitors and begin change processing until the stop channel is
// closed.
gb . startMonitors ( )
wait . Until ( gb . runProcessGraphChanges , 1 * time . Second , stopCh )
// Stop any running monitors.
gb . monitorLock . Lock ( )
defer gb . monitorLock . Unlock ( )
monitors := gb . monitors
stopped := 0
for _ , monitor := range monitors {
if monitor . stopCh != nil {
stopped ++
close ( monitor . stopCh )
}
}
2017-11-14 03:03:49 +00:00
// reset monitors so that the graph builder can be safely re-run/synced.
gb . monitors = nil
2017-05-17 22:54:58 +00:00
glog . Infof ( "stopped %d of %d monitors" , stopped , len ( monitors ) )
2017-02-23 19:16:13 +00:00
}
2017-05-16 17:35:45 +00:00
var ignoredResources = map [ schema . GroupResource ] struct { } {
2018-07-05 12:15:16 +00:00
{ Group : "" , Resource : "events" } : { } ,
2017-05-16 17:35:45 +00:00
}
// DefaultIgnoredResources returns the default set of resources that the garbage collector controller
// should ignore. This is exposed so downstream integrators can have access to the defaults, and add
// to them as necessary when constructing the controller.
func DefaultIgnoredResources ( ) map [ schema . GroupResource ] struct { } {
return ignoredResources
2017-02-23 19:16:13 +00:00
}
2017-12-21 07:56:09 +00:00
// enqueueVirtualDeleteEvent is used to add a virtual delete event to be processed for virtual nodes
// once it is determined they do not have backing objects in storage
func ( gb * GraphBuilder ) enqueueVirtualDeleteEvent ( ref objectReference ) {
gb . graphChanges . Add ( & event {
eventType : deleteEvent ,
obj : & metaonly . MetadataOnlyObject {
TypeMeta : metav1 . TypeMeta { APIVersion : ref . APIVersion , Kind : ref . Kind } ,
ObjectMeta : metav1 . ObjectMeta { Namespace : ref . Namespace , UID : ref . UID , Name : ref . Name } ,
} ,
} )
2017-02-23 19:16:13 +00:00
}
// addDependentToOwners adds n to owners' dependents list. If the owner does not
// exist in the gb.uidToNode yet, a "virtual" node will be created to represent
// the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
2017-06-29 13:42:47 +00:00
// attemptToDeleteItem() will verify if the owner exists according to the API server.
2017-02-23 19:16:13 +00:00
func ( gb * GraphBuilder ) addDependentToOwners ( n * node , owners [ ] metav1 . OwnerReference ) {
for _ , owner := range owners {
ownerNode , ok := gb . uidToNode . Read ( owner . UID )
if ! ok {
// Create a "virtual" node in the graph for the owner if it doesn't
2017-12-21 06:07:07 +00:00
// exist in the graph yet.
2017-02-23 19:16:13 +00:00
ownerNode = & node {
identity : objectReference {
OwnerReference : owner ,
Namespace : n . identity . Namespace ,
} ,
dependents : make ( map [ * node ] struct { } ) ,
2017-12-21 07:56:09 +00:00
virtual : true ,
2017-02-23 19:16:13 +00:00
}
glog . V ( 5 ) . Infof ( "add virtual node.identity: %s\n\n" , ownerNode . identity )
gb . uidToNode . Write ( ownerNode )
}
ownerNode . addDependent ( n )
2017-12-21 06:07:07 +00:00
if ! ok {
// Enqueue the virtual node into attemptToDelete.
// The garbage processor will enqueue a virtual delete
// event to delete it from the graph if API server confirms this
// owner doesn't exist.
gb . attemptToDelete . Add ( ownerNode )
}
2017-02-23 19:16:13 +00:00
}
}
// insertNode insert the node to gb.uidToNode; then it finds all owners as listed
// in n.owners, and adds the node to their dependents list.
func ( gb * GraphBuilder ) insertNode ( n * node ) {
gb . uidToNode . Write ( n )
gb . addDependentToOwners ( n , n . owners )
}
// removeDependentFromOwners remove n from owners' dependents list.
func ( gb * GraphBuilder ) removeDependentFromOwners ( n * node , owners [ ] metav1 . OwnerReference ) {
for _ , owner := range owners {
ownerNode , ok := gb . uidToNode . Read ( owner . UID )
if ! ok {
continue
}
ownerNode . deleteDependent ( n )
}
}
// removeNode removes the node from gb.uidToNode, then finds all
// owners as listed in n.owners, and removes n from their dependents list.
func ( gb * GraphBuilder ) removeNode ( n * node ) {
gb . uidToNode . Delete ( n . identity . UID )
gb . removeDependentFromOwners ( n , n . owners )
}
type ownerRefPair struct {
oldRef metav1 . OwnerReference
newRef metav1 . OwnerReference
}
// TODO: profile this function to see if a naive N^2 algorithm performs better
// when the number of references is small.
func referencesDiffs ( old [ ] metav1 . OwnerReference , new [ ] metav1 . OwnerReference ) ( added [ ] metav1 . OwnerReference , removed [ ] metav1 . OwnerReference , changed [ ] ownerRefPair ) {
oldUIDToRef := make ( map [ string ] metav1 . OwnerReference )
2017-04-21 03:12:54 +00:00
for _ , value := range old {
oldUIDToRef [ string ( value . UID ) ] = value
2017-02-23 19:16:13 +00:00
}
oldUIDSet := sets . StringKeySet ( oldUIDToRef )
newUIDToRef := make ( map [ string ] metav1 . OwnerReference )
2017-04-21 03:12:54 +00:00
for _ , value := range new {
newUIDToRef [ string ( value . UID ) ] = value
2017-02-23 19:16:13 +00:00
}
newUIDSet := sets . StringKeySet ( newUIDToRef )
addedUID := newUIDSet . Difference ( oldUIDSet )
removedUID := oldUIDSet . Difference ( newUIDSet )
intersection := oldUIDSet . Intersection ( newUIDSet )
for uid := range addedUID {
added = append ( added , newUIDToRef [ uid ] )
}
for uid := range removedUID {
removed = append ( removed , oldUIDToRef [ uid ] )
}
for uid := range intersection {
if ! reflect . DeepEqual ( oldUIDToRef [ uid ] , newUIDToRef [ uid ] ) {
changed = append ( changed , ownerRefPair { oldRef : oldUIDToRef [ uid ] , newRef : newUIDToRef [ uid ] } )
}
}
return added , removed , changed
}
// returns if the object in the event just transitions to "being deleted".
func deletionStarts ( oldObj interface { } , newAccessor metav1 . Object ) bool {
// The delta_fifo may combine the creation and update of the object into one
// event, so if there is no oldObj, we just return if the newObj (via
// newAccessor) is being deleted.
if oldObj == nil {
if newAccessor . GetDeletionTimestamp ( ) == nil {
return false
}
return true
}
oldAccessor , err := meta . Accessor ( oldObj )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "cannot access oldObj: %v" , err ) )
return false
}
return beingDeleted ( newAccessor ) && ! beingDeleted ( oldAccessor )
}
func beingDeleted ( accessor metav1 . Object ) bool {
return accessor . GetDeletionTimestamp ( ) != nil
}
func hasDeleteDependentsFinalizer ( accessor metav1 . Object ) bool {
finalizers := accessor . GetFinalizers ( )
for _ , finalizer := range finalizers {
if finalizer == metav1 . FinalizerDeleteDependents {
return true
}
}
return false
}
func hasOrphanFinalizer ( accessor metav1 . Object ) bool {
finalizers := accessor . GetFinalizers ( )
for _ , finalizer := range finalizers {
if finalizer == metav1 . FinalizerOrphanDependents {
return true
}
}
return false
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsDeleted ( oldObj interface { } , newAccessor metav1 . Object ) bool {
return deletionStarts ( oldObj , newAccessor ) && hasDeleteDependentsFinalizer ( newAccessor )
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsOrphaned ( oldObj interface { } , newAccessor metav1 . Object ) bool {
return deletionStarts ( oldObj , newAccessor ) && hasOrphanFinalizer ( newAccessor )
}
// if an blocking ownerReference points to an object gets removed, or gets set to
// "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
func ( gb * GraphBuilder ) addUnblockedOwnersToDeleteQueue ( removed [ ] metav1 . OwnerReference , changed [ ] ownerRefPair ) {
for _ , ref := range removed {
if ref . BlockOwnerDeletion != nil && * ref . BlockOwnerDeletion {
node , found := gb . uidToNode . Read ( ref . UID )
if ! found {
glog . V ( 5 ) . Infof ( "cannot find %s in uidToNode" , ref . UID )
continue
}
gb . attemptToDelete . Add ( node )
}
}
for _ , c := range changed {
wasBlocked := c . oldRef . BlockOwnerDeletion != nil && * c . oldRef . BlockOwnerDeletion
isUnblocked := c . newRef . BlockOwnerDeletion == nil || ( c . newRef . BlockOwnerDeletion != nil && ! * c . newRef . BlockOwnerDeletion )
if wasBlocked && isUnblocked {
node , found := gb . uidToNode . Read ( c . newRef . UID )
if ! found {
glog . V ( 5 ) . Infof ( "cannot find %s in uidToNode" , c . newRef . UID )
continue
}
gb . attemptToDelete . Add ( node )
}
}
}
func ( gb * GraphBuilder ) processTransitions ( oldObj interface { } , newAccessor metav1 . Object , n * node ) {
if startsWaitingForDependentsOrphaned ( oldObj , newAccessor ) {
glog . V ( 5 ) . Infof ( "add %s to the attemptToOrphan" , n . identity )
gb . attemptToOrphan . Add ( n )
return
}
if startsWaitingForDependentsDeleted ( oldObj , newAccessor ) {
glog . V ( 2 ) . Infof ( "add %s to the attemptToDelete, because it's waiting for its dependents to be deleted" , n . identity )
// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
n . markDeletingDependents ( )
for dep := range n . dependents {
gb . attemptToDelete . Add ( dep )
}
gb . attemptToDelete . Add ( n )
}
}
func ( gb * GraphBuilder ) runProcessGraphChanges ( ) {
for gb . processGraphChanges ( ) {
}
}
// Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
func ( gb * GraphBuilder ) processGraphChanges ( ) bool {
item , quit := gb . graphChanges . Get ( )
if quit {
return false
}
defer gb . graphChanges . Done ( item )
event , ok := item . ( * event )
if ! ok {
utilruntime . HandleError ( fmt . Errorf ( "expect a *event, got %v" , item ) )
return true
}
obj := event . obj
accessor , err := meta . Accessor ( obj )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "cannot access obj: %v" , err ) )
return true
}
2017-05-04 17:55:24 +00:00
glog . V ( 5 ) . Infof ( "GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v" , event . gvk . GroupVersion ( ) . String ( ) , event . gvk . Kind , accessor . GetNamespace ( ) , accessor . GetName ( ) , string ( accessor . GetUID ( ) ) , event . eventType )
2017-02-23 19:16:13 +00:00
// Check if the node already exsits
existingNode , found := gb . uidToNode . Read ( accessor . GetUID ( ) )
2017-12-21 07:56:09 +00:00
if found {
// this marks the node as having been observed via an informer event
// 1. this depends on graphChanges only containing add/update events from the actual informer
// 2. this allows things tracking virtual nodes' existence to stop polling and rely on informer events
existingNode . markObserved ( )
}
2017-02-23 19:16:13 +00:00
switch {
case ( event . eventType == addEvent || event . eventType == updateEvent ) && ! found :
newNode := & node {
identity : objectReference {
OwnerReference : metav1 . OwnerReference {
2017-05-04 17:55:24 +00:00
APIVersion : event . gvk . GroupVersion ( ) . String ( ) ,
Kind : event . gvk . Kind ,
2017-02-23 19:16:13 +00:00
UID : accessor . GetUID ( ) ,
Name : accessor . GetName ( ) ,
} ,
Namespace : accessor . GetNamespace ( ) ,
} ,
dependents : make ( map [ * node ] struct { } ) ,
owners : accessor . GetOwnerReferences ( ) ,
deletingDependents : beingDeleted ( accessor ) && hasDeleteDependentsFinalizer ( accessor ) ,
beingDeleted : beingDeleted ( accessor ) ,
}
gb . insertNode ( newNode )
// the underlying delta_fifo may combine a creation and a deletion into
// one event, so we need to further process the event.
gb . processTransitions ( event . oldObj , accessor , newNode )
case ( event . eventType == addEvent || event . eventType == updateEvent ) && found :
// handle changes in ownerReferences
added , removed , changed := referencesDiffs ( existingNode . owners , accessor . GetOwnerReferences ( ) )
if len ( added ) != 0 || len ( removed ) != 0 || len ( changed ) != 0 {
// check if the changed dependency graph unblock owners that are
// waiting for the deletion of their dependents.
gb . addUnblockedOwnersToDeleteQueue ( removed , changed )
// update the node itself
existingNode . owners = accessor . GetOwnerReferences ( )
// Add the node to its new owners' dependent lists.
gb . addDependentToOwners ( existingNode , added )
// remove the node from the dependent list of node that are no longer in
// the node's owners list.
gb . removeDependentFromOwners ( existingNode , removed )
}
if beingDeleted ( accessor ) {
existingNode . markBeingDeleted ( )
}
gb . processTransitions ( event . oldObj , accessor , existingNode )
case event . eventType == deleteEvent :
if ! found {
glog . V ( 5 ) . Infof ( "%v doesn't exist in the graph, this shouldn't happen" , accessor . GetUID ( ) )
return true
}
// removeNode updates the graph
gb . removeNode ( existingNode )
existingNode . dependentsLock . RLock ( )
defer existingNode . dependentsLock . RUnlock ( )
if len ( existingNode . dependents ) > 0 {
gb . absentOwnerCache . Add ( accessor . GetUID ( ) )
}
for dep := range existingNode . dependents {
gb . attemptToDelete . Add ( dep )
}
for _ , owner := range existingNode . owners {
ownerNode , found := gb . uidToNode . Read ( owner . UID )
if ! found || ! ownerNode . isDeletingDependents ( ) {
continue
}
// this is to let attempToDeleteItem check if all the owner's
// dependents are deleted, if so, the owner will be deleted.
gb . attemptToDelete . Add ( ownerNode )
}
}
return true
}