add absent owner cache

pull/6/head
Chao Xu 2016-08-22 14:36:23 -07:00
parent a6d37f7ead
commit f2d0f1e3f0
3 changed files with 210 additions and 36 deletions

View File

@ -443,6 +443,8 @@ type GarbageCollector struct {
clock clock.Clock clock clock.Clock
registeredRateLimiter *RegisteredRateLimiter registeredRateLimiter *RegisteredRateLimiter
registeredRateLimiterForMonitors *RegisteredRateLimiter registeredRateLimiterForMonitors *RegisteredRateLimiter
// GC caches the owners that do not exist according to the API server.
absentOwnerCache *UIDCache
} }
func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch { func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch {
@ -543,6 +545,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
orphanQueue: workqueue.NewTimedWorkQueue(), orphanQueue: workqueue.NewTimedWorkQueue(),
registeredRateLimiter: NewRegisteredRateLimiter(resources), registeredRateLimiter: NewRegisteredRateLimiter(resources),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources), registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources),
absentOwnerCache: NewUIDCache(100),
} }
gc.propagator = &Propagator{ gc.propagator = &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(), eventQueue: workqueue.NewTimedWorkQueue(),
@ -708,6 +711,10 @@ func (gc *GarbageCollector) processItem(item *node) error {
// TODO: we need to remove dangling references if the object is not to be // TODO: we need to remove dangling references if the object is not to be
// deleted. // deleted.
for _, reference := range ownerReferences { for _, reference := range ownerReferences {
if gc.absentOwnerCache.Has(reference.UID) {
glog.V(6).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
continue
}
// TODO: we need to verify the reference resource is supported by the // TODO: we need to verify the reference resource is supported by the
// system. If it's not a valid resource, the garbage collector should i) // system. If it's not a valid resource, the garbage collector should i)
// ignore the reference when decide if the object should be deleted, and // ignore the reference when decide if the object should be deleted, and
@ -727,11 +734,13 @@ func (gc *GarbageCollector) processItem(item *node) error {
if err == nil { if err == nil {
if owner.GetUID() != reference.UID { if owner.GetUID() != reference.UID {
glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
gc.absentOwnerCache.Add(reference.UID)
continue continue
} }
glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID) glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID)
return nil return nil
} else if errors.IsNotFound(err) { } else if errors.IsNotFound(err) {
gc.absentOwnerCache.Add(reference.UID)
glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
} else { } else {
return err return err

View File

@ -105,34 +105,51 @@ func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request))
return srv, config return srv, config
} }
func newDanglingPod() *v1.Pod { func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector {
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
if err != nil {
t.Fatal(err)
}
return gc
}
func getPod(podName string, ownerReferences []v1.OwnerReference) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
TypeMeta: unversioned.TypeMeta{ TypeMeta: unversioned.TypeMeta{
Kind: "Pod", Kind: "Pod",
APIVersion: "v1", APIVersion: "v1",
}, },
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: "ToBeDeletedPod", Name: podName,
Namespace: "ns1", Namespace: "ns1",
OwnerReferences: []v1.OwnerReference{ OwnerReferences: ownerReferences,
{
Kind: "ReplicationController",
Name: "owner1",
UID: "123",
APIVersion: "v1",
},
},
}, },
} }
} }
// test the processItem function making the expected actions. func serilizeOrDie(t *testing.T, object interface{}) []byte {
func TestProcessItem(t *testing.T) { data, err := json.Marshal(object)
pod := newDanglingPod()
podBytes, err := json.Marshal(pod)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
return data
}
// test the processItem function making the expected actions.
func TestProcessItem(t *testing.T) {
pod := getPod("ToBeDeletedPod", []v1.OwnerReference{
{
Kind: "ReplicationController",
Name: "owner1",
UID: "123",
APIVersion: "v1",
},
})
testHandler := &fakeActionHandler{ testHandler := &fakeActionHandler{
response: map[string]FakeResponse{ response: map[string]FakeResponse{
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/owner1": { "GET" + "/api/v1/namespaces/ns1/replicationcontrollers/owner1": {
@ -141,21 +158,13 @@ func TestProcessItem(t *testing.T) {
}, },
"GET" + "/api/v1/namespaces/ns1/pods/ToBeDeletedPod": { "GET" + "/api/v1/namespaces/ns1/pods/ToBeDeletedPod": {
200, 200,
podBytes, serilizeOrDie(t, pod),
}, },
}, },
} }
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close() defer srv.Close()
clientConfig.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} gc := setupGC(t, clientConfig)
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
clientConfig.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
if err != nil {
t.Fatal(err)
}
item := &node{ item := &node{
identity: objectReference{ identity: objectReference{
OwnerReference: metatypes.OwnerReference{ OwnerReference: metatypes.OwnerReference{
@ -169,7 +178,7 @@ func TestProcessItem(t *testing.T) {
// owners are intentionally left empty. The processItem routine should get the latest item from the server. // owners are intentionally left empty. The processItem routine should get the latest item from the server.
owners: nil, owners: nil,
} }
err = gc.processItem(item) err := gc.processItem(item)
if err != nil { if err != nil {
t.Errorf("Unexpected Error: %v", err) t.Errorf("Unexpected Error: %v", err)
} }
@ -304,16 +313,7 @@ func TestProcessEvent(t *testing.T) {
// TestDependentsRace relies on golang's data race detector to check if there is // TestDependentsRace relies on golang's data race detector to check if there is
// data race among in the dependents field. // data race among in the dependents field.
func TestDependentsRace(t *testing.T) { func TestDependentsRace(t *testing.T) {
config := &restclient.Config{} gc := setupGC(t, &restclient.Config{})
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
if err != nil {
t.Fatal(err)
}
const updates = 100 const updates = 100
owner := &node{dependents: make(map[*node]struct{})} owner := &node{dependents: make(map[*node]struct{})}
@ -358,3 +358,116 @@ func TestGCListWatcher(t *testing.T) {
t.Errorf("expect %s, got %s", e, a) t.Errorf("expect %s, got %s", e, a)
} }
} }
func podToGCNode(pod *v1.Pod) *node {
return &node{
identity: objectReference{
OwnerReference: metatypes.OwnerReference{
Kind: pod.Kind,
APIVersion: pod.APIVersion,
Name: pod.Name,
UID: pod.UID,
},
Namespace: pod.Namespace,
},
// owners are intentionally left empty. The processItem routine should get the latest item from the server.
owners: nil,
}
}
func TestAbsentUIDCache(t *testing.T) {
rc1Pod1 := getPod("rc1Pod1", []v1.OwnerReference{
{
Kind: "ReplicationController",
Name: "rc1",
UID: "1",
APIVersion: "v1",
},
})
rc1Pod2 := getPod("rc1Pod2", []v1.OwnerReference{
{
Kind: "ReplicationController",
Name: "rc1",
UID: "1",
APIVersion: "v1",
},
})
rc2Pod1 := getPod("rc2Pod1", []v1.OwnerReference{
{
Kind: "ReplicationController",
Name: "rc2",
UID: "2",
APIVersion: "v1",
},
})
rc3Pod1 := getPod("rc3Pod1", []v1.OwnerReference{
{
Kind: "ReplicationController",
Name: "rc3",
UID: "3",
APIVersion: "v1",
},
})
testHandler := &fakeActionHandler{
response: map[string]FakeResponse{
"GET" + "/api/v1/namespaces/ns1/pods/rc1Pod1": {
200,
serilizeOrDie(t, rc1Pod1),
},
"GET" + "/api/v1/namespaces/ns1/pods/rc1Pod2": {
200,
serilizeOrDie(t, rc1Pod2),
},
"GET" + "/api/v1/namespaces/ns1/pods/rc2Pod1": {
200,
serilizeOrDie(t, rc2Pod1),
},
"GET" + "/api/v1/namespaces/ns1/pods/rc3Pod1": {
200,
serilizeOrDie(t, rc3Pod1),
},
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc1": {
404,
[]byte{},
},
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc2": {
404,
[]byte{},
},
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc3": {
404,
[]byte{},
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
gc := setupGC(t, clientConfig)
gc.absentOwnerCache = NewUIDCache(2)
gc.processItem(podToGCNode(rc1Pod1))
gc.processItem(podToGCNode(rc2Pod1))
// rc1 should already be in the cache, no request should be sent. rc1 should be promoted in the UIDCache
gc.processItem(podToGCNode(rc1Pod2))
// after this call, rc2 should be evicted from the UIDCache
gc.processItem(podToGCNode(rc3Pod1))
// check cache
if !gc.absentOwnerCache.Has(types.UID("1")) {
t.Errorf("expected rc1 to be in the cache")
}
if gc.absentOwnerCache.Has(types.UID("2")) {
t.Errorf("expected rc2 to not exist in the cache")
}
if !gc.absentOwnerCache.Has(types.UID("3")) {
t.Errorf("expected rc3 to be in the cache")
}
// check the request sent to the server
count := 0
for _, action := range testHandler.actions {
if action.String() == "GET=/api/v1/namespaces/ns1/replicationcontrollers/rc1" {
count++
}
}
if count != 1 {
t.Errorf("expected only 1 GET rc1 request, got %d", count)
}
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"sync"
"github.com/golang/groupcache/lru"
"k8s.io/kubernetes/pkg/types"
)
// UIDCache is an LRU cache for uid.
type UIDCache struct {
mutex sync.Mutex
cache *lru.Cache
}
// NewUIDCache returns a UIDCache.
func NewUIDCache(maxCacheEntries int) *UIDCache {
return &UIDCache{
cache: lru.New(maxCacheEntries),
}
}
// Add adds a uid to the cache.
func (c *UIDCache) Add(uid types.UID) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cache.Add(uid, nil)
}
// Has returns if a uid is in the cache.
func (c *UIDCache) Has(uid types.UID) bool {
c.mutex.Lock()
defer c.mutex.Unlock()
_, found := c.cache.Get(uid)
return found
}