diff --git a/pkg/client/typed/dynamic/client.go b/pkg/client/typed/dynamic/client.go index dba2af0262..72c460b255 100644 --- a/pkg/client/typed/dynamic/client.go +++ b/pkg/client/typed/dynamic/client.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/conversion/queryparams" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/serializer" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/watch" ) @@ -78,6 +79,11 @@ func NewClient(conf *restclient.Config) (*Client, error) { return &Client{cl: cl}, nil } +// GetRateLimiter returns rate limier. +func (c *Client) GetRateLimiter() flowcontrol.RateLimiter { + return c.cl.GetRateLimiter() +} + // Resource returns an API interface to the specified resource for this client's // group and version. If resource is not a namespaced resource, then namespace // is ignored. diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 4cc4a854d4..3c922e04f9 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/clock" utilerrors "k8s.io/kubernetes/pkg/util/errors" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" @@ -127,7 +128,7 @@ func (m *concurrentUIDToNode) Delete(uid types.UID) { } type Propagator struct { - eventQueue *workqueue.Type + eventQueue *workqueue.TimedWorkQueue // uidToNode doesn't require a lock to protect, because only the // single-threaded Propagator.processEvent() reads/writes it. uidToNode *concurrentUIDToNode @@ -311,7 +312,7 @@ func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error { // the "Orphan" finalizer. The node is add back into the orphanQueue if any of // these steps fail. func (gc *GarbageCollector) orphanFinalizer() { - key, quit := gc.orphanQueue.Get() + key, start, quit := gc.orphanQueue.Get() if quit { return } @@ -331,20 +332,21 @@ func (gc *GarbageCollector) orphanFinalizer() { err := gc.orhpanDependents(owner.identity, dependents) if err != nil { glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err) - gc.orphanQueue.Add(owner) + gc.orphanQueue.AddWithTimestamp(owner, start) return } // update the owner, remove "orphaningFinalizer" from its finalizers list err = gc.removeOrphanFinalizer(owner) if err != nil { glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err) - gc.orphanQueue.Add(owner) + gc.orphanQueue.AddWithTimestamp(owner, start) } + OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start)) } // Dequeueing an event from eventQueue, updating graph, populating dirty_queue. func (p *Propagator) processEvent() { - key, quit := p.eventQueue.Get() + key, start, quit := p.eventQueue.Get() if quit { return } @@ -422,6 +424,7 @@ func (p *Propagator) processEvent() { p.gc.dirtyQueue.Add(dep) } } + EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, start)) } // GarbageCollector is responsible for carrying out cascading deletion, and @@ -434,11 +437,14 @@ type GarbageCollector struct { metaOnlyClientPool dynamic.ClientPool // clientPool uses the regular dynamicCodec. We need it to update // finalizers. It can be removed if we support patching finalizers. - clientPool dynamic.ClientPool - dirtyQueue *workqueue.Type - orphanQueue *workqueue.Type - monitors []monitor - propagator *Propagator + clientPool dynamic.ClientPool + dirtyQueue *workqueue.TimedWorkQueue + orphanQueue *workqueue.TimedWorkQueue + monitors []monitor + propagator *Propagator + clock clock.Clock + registeredRateLimiter *RegisteredRateLimiter + registeredRateLimiterForMonitors *RegisteredRateLimiter } func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch { @@ -466,14 +472,15 @@ func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionReso } } -func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) { +func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) { // TODO: consider store in one storage. glog.V(6).Infof("create storage for resource %s", resource) var monitor monitor - client, err := p.gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion()) + client, err := gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion()) if err != nil { return monitor, err } + gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") setObjectTypeMeta := func(obj interface{}) { runtimeObject, ok := obj.(runtime.Object) if !ok { @@ -493,13 +500,13 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion eventType: addEvent, obj: obj, } - p.eventQueue.Add(event) + gc.propagator.eventQueue.Add(event) }, UpdateFunc: func(oldObj, newObj interface{}) { setObjectTypeMeta(newObj) setObjectTypeMeta(oldObj) event := event{updateEvent, newObj, oldObj} - p.eventQueue.Add(event) + gc.propagator.eventQueue.Add(event) }, DeleteFunc: func(obj interface{}) { // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it @@ -511,7 +518,7 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion eventType: deleteEvent, obj: obj, } - p.eventQueue.Add(event) + gc.propagator.eventQueue.Add(event) }, }, ) @@ -528,16 +535,20 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{ } func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) { + clock := clock.RealClock{} gc := &GarbageCollector{ metaOnlyClientPool: metaOnlyClientPool, clientPool: clientPool, - dirtyQueue: workqueue.New(), - orphanQueue: workqueue.New(), // TODO: should use a dynamic RESTMapper built from the discovery results. - restMapper: registered.RESTMapper(), + restMapper: registered.RESTMapper(), + clock: clock, + dirtyQueue: workqueue.NewTimedWorkQueue(clock), + orphanQueue: workqueue.NewTimedWorkQueue(clock), + registeredRateLimiter: NewRegisteredRateLimiter(), + registeredRateLimiterForMonitors: NewRegisteredRateLimiter(), } gc.propagator = &Propagator{ - eventQueue: workqueue.New(), + eventQueue: workqueue.NewTimedWorkQueue(gc.clock), uidToNode: &concurrentUIDToNode{ RWMutex: &sync.RWMutex{}, uidToNode: make(map[types.UID]*node), @@ -553,7 +564,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam if err != nil { return nil, err } - monitor, err := monitorFor(gc.propagator, gc.clientPool, resource, kind) + monitor, err := gc.monitorFor(resource, kind) if err != nil { return nil, err } @@ -563,7 +574,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam } func (gc *GarbageCollector) worker() { - key, quit := gc.dirtyQueue.Get() + key, start, quit := gc.dirtyQueue.Get() if quit { return } @@ -572,6 +583,7 @@ func (gc *GarbageCollector) worker() { if err != nil { utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err)) } + DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start)) } // apiResource consults the REST mapper to translate an