diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index cc4173a457..c4fb622893 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -443,6 +443,8 @@ type GarbageCollector struct { clock clock.Clock registeredRateLimiter *RegisteredRateLimiter registeredRateLimiterForMonitors *RegisteredRateLimiter + // GC caches the owners that do not exist according to the API server. + absentOwnerCache *UIDCache } func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch { @@ -543,6 +545,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam orphanQueue: workqueue.NewTimedWorkQueue(), registeredRateLimiter: NewRegisteredRateLimiter(resources), registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources), + absentOwnerCache: NewUIDCache(100), } gc.propagator = &Propagator{ eventQueue: workqueue.NewTimedWorkQueue(), @@ -708,6 +711,10 @@ func (gc *GarbageCollector) processItem(item *node) error { // TODO: we need to remove dangling references if the object is not to be // deleted. for _, reference := range ownerReferences { + if gc.absentOwnerCache.Has(reference.UID) { + glog.V(6).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + continue + } // TODO: we need to verify the reference resource is supported by the // system. If it's not a valid resource, the garbage collector should i) // ignore the reference when decide if the object should be deleted, and @@ -727,11 +734,13 @@ func (gc *GarbageCollector) processItem(item *node) error { if err == nil { if owner.GetUID() != reference.UID { glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + gc.absentOwnerCache.Add(reference.UID) continue } glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID) return nil } else if errors.IsNotFound(err) { + gc.absentOwnerCache.Add(reference.UID) glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) } else { return err diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 9ed1da9b02..4909eac8c0 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -105,34 +105,51 @@ func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) return srv, config } -func newDanglingPod() *v1.Pod { +func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector { + config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} + metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc) + config.ContentConfig.NegotiatedSerializer = nil + clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc) + podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}} + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource) + if err != nil { + t.Fatal(err) + } + return gc +} + +func getPod(podName string, ownerReferences []v1.OwnerReference) *v1.Pod { return &v1.Pod{ TypeMeta: unversioned.TypeMeta{ Kind: "Pod", APIVersion: "v1", }, ObjectMeta: v1.ObjectMeta{ - Name: "ToBeDeletedPod", - Namespace: "ns1", - OwnerReferences: []v1.OwnerReference{ - { - Kind: "ReplicationController", - Name: "owner1", - UID: "123", - APIVersion: "v1", - }, - }, + Name: podName, + Namespace: "ns1", + OwnerReferences: ownerReferences, }, } } -// test the processItem function making the expected actions. -func TestProcessItem(t *testing.T) { - pod := newDanglingPod() - podBytes, err := json.Marshal(pod) +func serilizeOrDie(t *testing.T, object interface{}) []byte { + data, err := json.Marshal(object) if err != nil { t.Fatal(err) } + return data +} + +// test the processItem function making the expected actions. +func TestProcessItem(t *testing.T) { + pod := getPod("ToBeDeletedPod", []v1.OwnerReference{ + { + Kind: "ReplicationController", + Name: "owner1", + UID: "123", + APIVersion: "v1", + }, + }) testHandler := &fakeActionHandler{ response: map[string]FakeResponse{ "GET" + "/api/v1/namespaces/ns1/replicationcontrollers/owner1": { @@ -141,21 +158,13 @@ func TestProcessItem(t *testing.T) { }, "GET" + "/api/v1/namespaces/ns1/pods/ToBeDeletedPod": { 200, - podBytes, + serilizeOrDie(t, pod), }, }, } - podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}} srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) defer srv.Close() - clientConfig.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} - metaOnlyClientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc) - clientConfig.ContentConfig.NegotiatedSerializer = nil - clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource) - if err != nil { - t.Fatal(err) - } + gc := setupGC(t, clientConfig) item := &node{ identity: objectReference{ OwnerReference: metatypes.OwnerReference{ @@ -169,7 +178,7 @@ func TestProcessItem(t *testing.T) { // owners are intentionally left empty. The processItem routine should get the latest item from the server. owners: nil, } - err = gc.processItem(item) + err := gc.processItem(item) if err != nil { t.Errorf("Unexpected Error: %v", err) } @@ -304,16 +313,7 @@ func TestProcessEvent(t *testing.T) { // TestDependentsRace relies on golang's data race detector to check if there is // data race among in the dependents field. func TestDependentsRace(t *testing.T) { - config := &restclient.Config{} - config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} - metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc) - config.ContentConfig.NegotiatedSerializer = nil - clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc) - podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}} - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource) - if err != nil { - t.Fatal(err) - } + gc := setupGC(t, &restclient.Config{}) const updates = 100 owner := &node{dependents: make(map[*node]struct{})} @@ -358,3 +358,116 @@ func TestGCListWatcher(t *testing.T) { t.Errorf("expect %s, got %s", e, a) } } + +func podToGCNode(pod *v1.Pod) *node { + return &node{ + identity: objectReference{ + OwnerReference: metatypes.OwnerReference{ + Kind: pod.Kind, + APIVersion: pod.APIVersion, + Name: pod.Name, + UID: pod.UID, + }, + Namespace: pod.Namespace, + }, + // owners are intentionally left empty. The processItem routine should get the latest item from the server. + owners: nil, + } +} + +func TestAbsentUIDCache(t *testing.T) { + rc1Pod1 := getPod("rc1Pod1", []v1.OwnerReference{ + { + Kind: "ReplicationController", + Name: "rc1", + UID: "1", + APIVersion: "v1", + }, + }) + rc1Pod2 := getPod("rc1Pod2", []v1.OwnerReference{ + { + Kind: "ReplicationController", + Name: "rc1", + UID: "1", + APIVersion: "v1", + }, + }) + rc2Pod1 := getPod("rc2Pod1", []v1.OwnerReference{ + { + Kind: "ReplicationController", + Name: "rc2", + UID: "2", + APIVersion: "v1", + }, + }) + rc3Pod1 := getPod("rc3Pod1", []v1.OwnerReference{ + { + Kind: "ReplicationController", + Name: "rc3", + UID: "3", + APIVersion: "v1", + }, + }) + testHandler := &fakeActionHandler{ + response: map[string]FakeResponse{ + "GET" + "/api/v1/namespaces/ns1/pods/rc1Pod1": { + 200, + serilizeOrDie(t, rc1Pod1), + }, + "GET" + "/api/v1/namespaces/ns1/pods/rc1Pod2": { + 200, + serilizeOrDie(t, rc1Pod2), + }, + "GET" + "/api/v1/namespaces/ns1/pods/rc2Pod1": { + 200, + serilizeOrDie(t, rc2Pod1), + }, + "GET" + "/api/v1/namespaces/ns1/pods/rc3Pod1": { + 200, + serilizeOrDie(t, rc3Pod1), + }, + "GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc1": { + 404, + []byte{}, + }, + "GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc2": { + 404, + []byte{}, + }, + "GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc3": { + 404, + []byte{}, + }, + }, + } + srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + defer srv.Close() + gc := setupGC(t, clientConfig) + gc.absentOwnerCache = NewUIDCache(2) + gc.processItem(podToGCNode(rc1Pod1)) + gc.processItem(podToGCNode(rc2Pod1)) + // rc1 should already be in the cache, no request should be sent. rc1 should be promoted in the UIDCache + gc.processItem(podToGCNode(rc1Pod2)) + // after this call, rc2 should be evicted from the UIDCache + gc.processItem(podToGCNode(rc3Pod1)) + // check cache + if !gc.absentOwnerCache.Has(types.UID("1")) { + t.Errorf("expected rc1 to be in the cache") + } + if gc.absentOwnerCache.Has(types.UID("2")) { + t.Errorf("expected rc2 to not exist in the cache") + } + if !gc.absentOwnerCache.Has(types.UID("3")) { + t.Errorf("expected rc3 to be in the cache") + } + // check the request sent to the server + count := 0 + for _, action := range testHandler.actions { + if action.String() == "GET=/api/v1/namespaces/ns1/replicationcontrollers/rc1" { + count++ + } + } + if count != 1 { + t.Errorf("expected only 1 GET rc1 request, got %d", count) + } +} diff --git a/pkg/controller/garbagecollector/uid_cache.go b/pkg/controller/garbagecollector/uid_cache.go new file mode 100644 index 0000000000..70c066c866 --- /dev/null +++ b/pkg/controller/garbagecollector/uid_cache.go @@ -0,0 +1,52 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "sync" + + "github.com/golang/groupcache/lru" + "k8s.io/kubernetes/pkg/types" +) + +// UIDCache is an LRU cache for uid. +type UIDCache struct { + mutex sync.Mutex + cache *lru.Cache +} + +// NewUIDCache returns a UIDCache. +func NewUIDCache(maxCacheEntries int) *UIDCache { + return &UIDCache{ + cache: lru.New(maxCacheEntries), + } +} + +// Add adds a uid to the cache. +func (c *UIDCache) Add(uid types.UID) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(uid, nil) +} + +// Has returns if a uid is in the cache. +func (c *UIDCache) Has(uid types.UID) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + _, found := c.cache.Get(uid) + return found +}