mirror of https://github.com/k3s-io/k3s
Enable garbage collection of custom resources
Enhance the garbage collector to periodically refresh the resources it monitors (via discovery) to enable custom resource definition GC. This implementation caches Unstructured structs for any kinds not covered by a shared informer. The existing meta-only codec only supports compiled types; an improved codec which supports arbitrary types could be introduced to optimize caching to store only metadata for all non-informer types.pull/6/head
parent
3d3d3922c2
commit
d08dfb92c7
|
@ -60,7 +60,6 @@ go_library(
|
|||
"//pkg/controller/disruption:go_default_library",
|
||||
"//pkg/controller/endpoint:go_default_library",
|
||||
"//pkg/controller/garbagecollector:go_default_library",
|
||||
"//pkg/controller/garbagecollector/metaonly:go_default_library",
|
||||
"//pkg/controller/job:go_default_library",
|
||||
"//pkg/controller/namespace:go_default_library",
|
||||
"//pkg/controller/node:go_default_library",
|
||||
|
@ -109,14 +108,15 @@ go_library(
|
|||
"//vendor/github.com/spf13/cobra:go_default_library",
|
||||
"//vendor/github.com/spf13/pflag:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery/cached:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
|
|
|
@ -29,18 +29,17 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/discovery"
|
||||
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||
"k8s.io/client-go/dynamic"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
|
||||
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
|
||||
"k8s.io/kubernetes/pkg/controller/podgc"
|
||||
|
@ -297,47 +296,50 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
// TODO: should use a dynamic RESTMapper built from the discovery results.
|
||||
restMapper := api.Registry.RESTMapper()
|
||||
|
||||
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
||||
preferredResources, err := gcClientset.Discovery().ServerPreferredResources()
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err))
|
||||
}
|
||||
if len(preferredResources) == 0 {
|
||||
return true, fmt.Errorf("unable to get any supported resources from server: %v", err)
|
||||
}
|
||||
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"get", "list", "watch", "patch", "update", "delete"}}, preferredResources)
|
||||
deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Failed to parse resources from server: %v", err)
|
||||
}
|
||||
|
||||
// Use a discovery client capable of being refreshed.
|
||||
discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())
|
||||
restMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient, meta.InterfacesForUnstructured)
|
||||
restMapper.Reset()
|
||||
|
||||
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
|
||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig = dynamic.ContentConfig()
|
||||
// TODO: Make NewMetadataCodecFactory support arbitrary (non-compiled)
|
||||
// resource types. Otherwise we'll be storing full Unstructured data in our
|
||||
// caches for custom resources. Consider porting it to work with
|
||||
// metav1alpha1.PartialObjectMetadata.
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||
clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||
|
||||
// Get an initial set of deletable resources to prime the garbage collector.
|
||||
deletableResources, err := garbagecollector.GetDeletableResources(discoveryClient)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
ignoredResources := make(map[schema.GroupResource]struct{})
|
||||
for _, r := range ctx.Options.GCIgnoredResources {
|
||||
ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
|
||||
}
|
||||
|
||||
garbageCollector, err := garbagecollector.NewGarbageCollector(
|
||||
metaOnlyClientPool,
|
||||
clientPool,
|
||||
restMapper,
|
||||
deletableGroupVersionResources,
|
||||
deletableResources,
|
||||
ignoredResources,
|
||||
ctx.InformerFactory,
|
||||
)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
|
||||
}
|
||||
|
||||
// Start the garbage collector.
|
||||
workers := int(ctx.Options.ConcurrentGCSyncs)
|
||||
go garbageCollector.Run(workers, ctx.Stop)
|
||||
|
||||
// Periodically refresh the RESTMapper with new discovery information and sync
|
||||
// the garbage collector.
|
||||
go garbageCollector.Sync(restMapper, discoveryClient, 30*time.Second, ctx.Stop)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -1647,7 +1647,7 @@ run_non_native_resource_tests() {
|
|||
kubectl "${kube_flags[@]}" delete bars test --cascade=false
|
||||
|
||||
# Make sure it's gone
|
||||
kube::test::get_object_assert bars "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
kube::test::wait_object_assert bars "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
|
||||
# Test that we can create single item via apply
|
||||
kubectl "${kube_flags[@]}" apply -f hack/testdata/TPR/foo.yaml
|
||||
|
|
|
@ -71,7 +71,7 @@ runTests() {
|
|||
make -C "${KUBE_ROOT}" test \
|
||||
WHAT="${WHAT:-$(kube::test::find_integration_test_dirs | paste -sd' ' -)}" \
|
||||
GOFLAGS="${GOFLAGS:-}" \
|
||||
KUBE_TEST_ARGS="${KUBE_TEST_ARGS:-} ${SHORT:--short=true} --vmodule=garbage*collector*=6 --alsologtostderr=true" \
|
||||
KUBE_TEST_ARGS="${KUBE_TEST_ARGS:-} ${SHORT:--short=true} --alsologtostderr=true" \
|
||||
KUBE_RACE="" \
|
||||
KUBE_TIMEOUT="${KUBE_TIMEOUT}" \
|
||||
KUBE_TEST_API_VERSIONS="$1"
|
||||
|
|
|
@ -42,6 +42,7 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
|
|
|
@ -20,8 +20,6 @@ import (
|
|||
"fmt"
|
||||
)
|
||||
|
||||
const nonCoreMessage = `If %s is a non-core resource (e.g. thirdparty resource, custom resource from aggregated apiserver), please note that the garbage collector doesn't support non-core resources yet. Once they are supported, object with ownerReferences referring non-existing non-core objects will be deleted by the garbage collector.`
|
||||
|
||||
type restMappingError struct {
|
||||
kind string
|
||||
version string
|
||||
|
@ -36,7 +34,6 @@ func (r *restMappingError) Error() string {
|
|||
func (r *restMappingError) Message() string {
|
||||
versionKind := fmt.Sprintf("%s/%s", r.version, r.kind)
|
||||
errMsg := fmt.Sprintf("unable to get REST mapping for %s. ", versionKind)
|
||||
errMsg += fmt.Sprintf(nonCoreMessage, versionKind)
|
||||
errMsg += fmt.Sprintf(" If %s is an invalid resource, then you should manually remove ownerReferences that refer %s objects.", versionKind, versionKind)
|
||||
return errMsg
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
@ -73,6 +74,8 @@ type GarbageCollector struct {
|
|||
// GC caches the owners that do not exist according to the API server.
|
||||
absentOwnerCache *UIDCache
|
||||
sharedInformers informers.SharedInformerFactory
|
||||
|
||||
workerLock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewGarbageCollector(
|
||||
|
@ -108,14 +111,24 @@ func NewGarbageCollector(
|
|||
sharedInformers: sharedInformers,
|
||||
ignoredResources: ignoredResources,
|
||||
}
|
||||
if err := gb.monitorsForResources(deletableResources); err != nil {
|
||||
return nil, err
|
||||
if err := gb.syncMonitors(deletableResources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err))
|
||||
}
|
||||
gc.dependencyGraphBuilder = gb
|
||||
|
||||
return gc, nil
|
||||
}
|
||||
|
||||
// resyncMonitors starts or stops resource monitors as needed to ensure that all
|
||||
// (and only) those resources present in the map are monitored.
|
||||
func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
|
||||
if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
|
||||
return err
|
||||
}
|
||||
gc.dependencyGraphBuilder.startMonitors()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer gc.attemptToDelete.ShutDown()
|
||||
|
@ -125,9 +138,9 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
|
|||
glog.Infof("Starting garbage collector controller")
|
||||
defer glog.Infof("Shutting down garbage collector controller")
|
||||
|
||||
gc.dependencyGraphBuilder.Run(stopCh)
|
||||
go gc.dependencyGraphBuilder.Run(stopCh)
|
||||
|
||||
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.HasSynced) {
|
||||
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -142,8 +155,43 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
|
|||
<-stopCh
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) HasSynced() bool {
|
||||
return gc.dependencyGraphBuilder.HasSynced()
|
||||
// resettableRESTMapper is a RESTMapper which is capable of resetting itself
|
||||
// from discovery.
|
||||
type resettableRESTMapper interface {
|
||||
meta.RESTMapper
|
||||
Reset()
|
||||
}
|
||||
|
||||
// Sync periodically resyncs the garbage collector monitors with resources
|
||||
// returned found via the discoveryClient. Sync blocks, continuing to sync until
|
||||
// a message is received on stopCh.
|
||||
//
|
||||
// The discoveryClient should be the same client which underlies restMapper.
|
||||
func (gc *GarbageCollector) Sync(restMapper resettableRESTMapper, discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) {
|
||||
wait.Until(func() {
|
||||
// Ensure workers are paused to avoid processing events before informers
|
||||
// have resynced.
|
||||
gc.workerLock.Lock()
|
||||
defer gc.workerLock.Unlock()
|
||||
|
||||
restMapper.Reset()
|
||||
deletableResources, err := GetDeletableResources(discoveryClient)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
if err := gc.resyncMonitors(deletableResources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
||||
return
|
||||
}
|
||||
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
|
||||
}
|
||||
}, period, stopCh)
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) IsSynced() bool {
|
||||
return gc.dependencyGraphBuilder.IsSynced()
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) runAttemptToDeleteWorker() {
|
||||
|
@ -153,6 +201,8 @@ func (gc *GarbageCollector) runAttemptToDeleteWorker() {
|
|||
|
||||
func (gc *GarbageCollector) attemptToDeleteWorker() bool {
|
||||
item, quit := gc.attemptToDelete.Get()
|
||||
gc.workerLock.RLock()
|
||||
defer gc.workerLock.RUnlock()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
|
@ -164,13 +214,18 @@ func (gc *GarbageCollector) attemptToDeleteWorker() bool {
|
|||
}
|
||||
err := gc.attemptToDeleteItem(n)
|
||||
if err != nil {
|
||||
// TODO: remove this block when gc starts using dynamic RESTMapper.
|
||||
if restMappingError, ok := err.(*restMappingError); ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Ignore syncing item %#v: %s", n, restMappingError.Message()))
|
||||
// The RESTMapper is static, so no need to retry, otherwise we'll get the same error.
|
||||
return true
|
||||
if _, ok := err.(*restMappingError); ok {
|
||||
// There are at least two ways this can happen:
|
||||
// 1. The reference is to an object of a custom type that has not yet been
|
||||
// recognized by gc.restMapper (this is a transient error).
|
||||
// 2. The reference is to an invalid group/version. We don't currently
|
||||
// have a way to distinguish this from a valid type we will recognize
|
||||
// after the next discovery sync.
|
||||
// For now, record the error and retry.
|
||||
glog.V(5).Infof("error syncing item %s: %v", n, err)
|
||||
} else {
|
||||
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
|
||||
}
|
||||
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", n, err))
|
||||
// retry if garbage collection of an object failed.
|
||||
gc.attemptToDelete.AddRateLimited(item)
|
||||
}
|
||||
|
@ -454,6 +509,8 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker() {
|
|||
// these steps fail.
|
||||
func (gc *GarbageCollector) attemptToOrphanWorker() bool {
|
||||
item, quit := gc.attemptToOrphan.Get()
|
||||
gc.workerLock.RLock()
|
||||
defer gc.workerLock.RUnlock()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
|
@ -498,3 +555,19 @@ func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetDeletableResources returns all resources from discoveryClient that the
|
||||
// garbage collector should recognize and work with. More specifically, all
|
||||
// preferred resources which support the 'delete' verb.
|
||||
func GetDeletableResources(discoveryClient discovery.DiscoveryInterface) (map[schema.GroupVersionResource]struct{}, error) {
|
||||
preferredResources, err := discoveryClient.ServerPreferredResources()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get supported resources from server: %v", err)
|
||||
}
|
||||
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources)
|
||||
deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to parse resources from server: %v", err)
|
||||
}
|
||||
return deletableGroupVersionResources, nil
|
||||
}
|
||||
|
|
|
@ -46,25 +46,62 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
)
|
||||
|
||||
func TestNewGarbageCollector(t *testing.T) {
|
||||
func TestGarbageCollectorConstruction(t *testing.T) {
|
||||
config := &restclient.Config{}
|
||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
tweakableRM := meta.NewDefaultRESTMapper(nil, nil)
|
||||
rm := meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig.NegotiatedSerializer = nil
|
||||
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
|
||||
podResource := map[schema.GroupVersionResource]struct{}{
|
||||
{Version: "v1", Resource: "pods"}: {},
|
||||
// no monitor will be constructed for non-core resource, the GC construction will not fail.
|
||||
}
|
||||
twoResources := map[schema.GroupVersionResource]struct{}{
|
||||
{Version: "v1", Resource: "pods"}: {},
|
||||
{Group: "tpr.io", Version: "v1", Resource: "unknown"}: {},
|
||||
}
|
||||
|
||||
client := fake.NewSimpleClientset()
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, ignoredResources, sharedInformers)
|
||||
|
||||
// No monitor will be constructed for the non-core resource, but the GC
|
||||
// construction will not fail.
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors))
|
||||
|
||||
// Make sure resource monitor syncing creates and stops resource monitors.
|
||||
tweakableRM.Add(schema.GroupVersionKind{Group: "tpr.io", Version: "v1", Kind: "unknown"}, nil)
|
||||
err = gc.resyncMonitors(twoResources)
|
||||
if err != nil {
|
||||
t.Errorf("Failed adding a monitor: %v", err)
|
||||
}
|
||||
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
|
||||
|
||||
err = gc.resyncMonitors(podResource)
|
||||
if err != nil {
|
||||
t.Errorf("Failed removing a monitor: %v", err)
|
||||
}
|
||||
assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors))
|
||||
|
||||
// Make sure the syncing mechanism also works after Run() has been called
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go gc.Run(1, stopCh)
|
||||
|
||||
err = gc.resyncMonitors(twoResources)
|
||||
if err != nil {
|
||||
t.Errorf("Failed adding a monitor: %v", err)
|
||||
}
|
||||
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
|
||||
|
||||
err = gc.resyncMonitors(podResource)
|
||||
if err != nil {
|
||||
t.Errorf("Failed removing a monitor: %v", err)
|
||||
}
|
||||
assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors))
|
||||
}
|
||||
|
||||
// fakeAction records information about requests to aid in testing.
|
||||
|
|
|
@ -134,6 +134,14 @@ func (n *node) blockingDependents() []*node {
|
|||
return ret
|
||||
}
|
||||
|
||||
// String renders node as a string using fmt. Acquires a read lock to ensure the
|
||||
// reflective dump of dependents doesn't race with any concurrent writes.
|
||||
func (n *node) String() string {
|
||||
n.dependentsLock.RLock()
|
||||
defer n.dependentsLock.RUnlock()
|
||||
return fmt.Sprintf("%#v", n)
|
||||
}
|
||||
|
||||
type concurrentUIDToNode struct {
|
||||
uidToNodeLock sync.RWMutex
|
||||
uidToNode map[types.UID]*node
|
||||
|
|
|
@ -19,6 +19,7 @@ package garbagecollector
|
|||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -27,6 +28,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
@ -71,9 +73,18 @@ type event struct {
|
|||
// items to the attemptToDelete and attemptToOrphan.
|
||||
type GraphBuilder struct {
|
||||
restMapper meta.RESTMapper
|
||||
|
||||
// each monitor list/watches a resource, the results are funneled to the
|
||||
// dependencyGraphBuilder
|
||||
monitors []cache.Controller
|
||||
monitors monitors
|
||||
monitorLock sync.Mutex
|
||||
// stopCh drives shutdown. If it is nil, it indicates that Run() has not been
|
||||
// called yet. If it is non-nil, then when closed it indicates everything
|
||||
// should shut down.
|
||||
//
|
||||
// This channel is also protected by monitorLock.
|
||||
stopCh <-chan struct{}
|
||||
|
||||
// metaOnlyClientPool uses a special codec, which removes fields except for
|
||||
// apiVersion, kind, and metadata during decoding.
|
||||
metaOnlyClientPool dynamic.ClientPool
|
||||
|
@ -93,10 +104,26 @@ type GraphBuilder struct {
|
|||
// be non-existent are added to the cached.
|
||||
absentOwnerCache *UIDCache
|
||||
sharedInformers informers.SharedInformerFactory
|
||||
stopCh <-chan struct{}
|
||||
ignoredResources map[schema.GroupResource]struct{}
|
||||
}
|
||||
|
||||
// monitor runs a Controller with a local stop channel.
|
||||
type monitor struct {
|
||||
controller cache.Controller
|
||||
|
||||
// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
|
||||
// not yet started.
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// Run is intended to be called in a goroutine. Multiple calls of this is an
|
||||
// error.
|
||||
func (m *monitor) Run() {
|
||||
m.controller.Run(m.stopCh)
|
||||
}
|
||||
|
||||
type monitors map[schema.GroupVersionResource]*monitor
|
||||
|
||||
func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) *cache.ListWatch {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
|
@ -157,19 +184,11 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
|
|||
gb.graphChanges.Add(event)
|
||||
},
|
||||
}
|
||||
|
||||
shared, err := gb.sharedInformers.ForResource(resource)
|
||||
if err == nil {
|
||||
glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
|
||||
// need to clone because it's from a shared cache
|
||||
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
|
||||
if gb.stopCh != nil {
|
||||
// if gb.stopCh is set, it means we've already gotten past the initial gb.Run() call, so this
|
||||
// means we've re-loaded and re-read discovery and we are adding a new monitor for a
|
||||
// previously unseen resource, so we need to call Start on the shared informers again (this
|
||||
// will only start those shared informers that have not yet been started).
|
||||
go gb.sharedInformers.Start(gb.stopCh)
|
||||
}
|
||||
return shared.Informer().GetController(), nil
|
||||
} else {
|
||||
glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
|
||||
|
@ -181,6 +200,7 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: since the gv is never unregistered, isn't this a memory leak?
|
||||
gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
|
||||
_, monitor := cache.NewInformer(
|
||||
listWatcher(client, resource),
|
||||
|
@ -192,44 +212,134 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
|
|||
return monitor, nil
|
||||
}
|
||||
|
||||
func (gb *GraphBuilder) monitorsForResources(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
// syncMonitors rebuilds the monitor set according to the supplied resources,
|
||||
// creating or deleting monitors as necessary. It will return any error
|
||||
// encountered, but will make an attempt to create a monitor for each resource
|
||||
// instead of immediately exiting on an error. It may be called before or after
|
||||
// Run. Monitors are NOT started as part of the sync. To ensure all existing
|
||||
// monitors are started, call startMonitors.
|
||||
func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
gb.monitorLock.Lock()
|
||||
defer gb.monitorLock.Unlock()
|
||||
|
||||
toRemove := gb.monitors
|
||||
if toRemove == nil {
|
||||
toRemove = monitors{}
|
||||
}
|
||||
current := monitors{}
|
||||
errs := []error{}
|
||||
kept := 0
|
||||
added := 0
|
||||
for resource := range resources {
|
||||
if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
|
||||
glog.V(5).Infof("ignore resource %#v", resource)
|
||||
if _, ok := ignoredResources[resource.GroupResource()]; ok {
|
||||
continue
|
||||
}
|
||||
if m, ok := toRemove[resource]; ok {
|
||||
current[resource] = m
|
||||
delete(toRemove, resource)
|
||||
kept++
|
||||
continue
|
||||
}
|
||||
kind, err := gb.restMapper.KindFor(resource)
|
||||
if err != nil {
|
||||
nonCoreMsg := fmt.Sprintf(nonCoreMessage, resource)
|
||||
utilruntime.HandleError(fmt.Errorf("%v. %s", err, nonCoreMsg))
|
||||
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
|
||||
continue
|
||||
}
|
||||
monitor, err := gb.controllerFor(resource, kind)
|
||||
c, err := gb.controllerFor(resource, kind)
|
||||
if err != nil {
|
||||
return err
|
||||
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
|
||||
continue
|
||||
}
|
||||
gb.monitors = append(gb.monitors, monitor)
|
||||
current[resource] = &monitor{controller: c}
|
||||
added++
|
||||
}
|
||||
return nil
|
||||
gb.monitors = current
|
||||
|
||||
for _, monitor := range toRemove {
|
||||
if monitor.stopCh != nil {
|
||||
close(monitor.stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
|
||||
// NewAggregate returns nil if errs is 0-length
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
func (gb *GraphBuilder) HasSynced() bool {
|
||||
// startMonitors ensures the current set of monitors are running. Any newly
|
||||
// started monitors will also cause shared informers to be started.
|
||||
//
|
||||
// If called before Run, startMonitors does nothing (as there is no stop channel
|
||||
// to support monitor/informer execution).
|
||||
func (gb *GraphBuilder) startMonitors() {
|
||||
gb.monitorLock.Lock()
|
||||
defer gb.monitorLock.Unlock()
|
||||
|
||||
if gb.stopCh == nil {
|
||||
return
|
||||
}
|
||||
|
||||
monitors := gb.monitors
|
||||
started := 0
|
||||
for _, monitor := range monitors {
|
||||
if monitor.stopCh == nil {
|
||||
monitor.stopCh = make(chan struct{})
|
||||
gb.sharedInformers.Start(gb.stopCh)
|
||||
go monitor.Run()
|
||||
started++
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors))
|
||||
}
|
||||
|
||||
// IsSynced returns true if any monitors exist AND all those monitors'
|
||||
// controllers HasSynced functions return true. This means IsSynced could return
|
||||
// true at one time, and then later return false if all monitors were
|
||||
// reconstructed.
|
||||
func (gb *GraphBuilder) IsSynced() bool {
|
||||
gb.monitorLock.Lock()
|
||||
defer gb.monitorLock.Unlock()
|
||||
|
||||
if len(gb.monitors) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, monitor := range gb.monitors {
|
||||
if !monitor.HasSynced() {
|
||||
if !monitor.controller.HasSynced() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Run sets the stop channel and starts monitor execution until stopCh is
|
||||
// closed. Any running monitors will be stopped before Run returns.
|
||||
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
|
||||
for _, monitor := range gb.monitors {
|
||||
go monitor.Run(stopCh)
|
||||
}
|
||||
go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
|
||||
glog.Infof("GraphBuilder running")
|
||||
defer glog.Infof("GraphBuilder stopping")
|
||||
|
||||
// set this so that we can use it if we need to start new shared informers
|
||||
// Set up the stop channel.
|
||||
gb.monitorLock.Lock()
|
||||
gb.stopCh = stopCh
|
||||
gb.monitorLock.Unlock()
|
||||
|
||||
// Start monitors and begin change processing until the stop channel is
|
||||
// closed.
|
||||
gb.startMonitors()
|
||||
wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
|
||||
|
||||
// Stop any running monitors.
|
||||
gb.monitorLock.Lock()
|
||||
defer gb.monitorLock.Unlock()
|
||||
monitors := gb.monitors
|
||||
stopped := 0
|
||||
for _, monitor := range monitors {
|
||||
if monitor.stopCh != nil {
|
||||
stopped++
|
||||
close(monitor.stopCh)
|
||||
}
|
||||
}
|
||||
glog.Infof("stopped %d of %d monitors", stopped, len(monitors))
|
||||
}
|
||||
|
||||
var ignoredResources = map[schema.GroupResource]struct{}{
|
||||
|
|
|
@ -105,13 +105,11 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
|
|||
|
||||
func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) genericregistry.RESTOptionsGetter {
|
||||
ret := apiserver.CRDRESTOptionsGetter{
|
||||
StorageConfig: etcdOptions.StorageConfig,
|
||||
StoragePrefix: etcdOptions.StorageConfig.Prefix,
|
||||
EnableWatchCache: etcdOptions.EnableWatchCache,
|
||||
DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize,
|
||||
// garbage collection for custom resources is forced off until GC works with CRs.
|
||||
// When GC is enabled, this turns back into etcdOptions.EnableGarbageCollection
|
||||
EnableGarbageCollection: false,
|
||||
StorageConfig: etcdOptions.StorageConfig,
|
||||
StoragePrefix: etcdOptions.StorageConfig.Prefix,
|
||||
EnableWatchCache: etcdOptions.EnableWatchCache,
|
||||
DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize,
|
||||
EnableGarbageCollection: etcdOptions.EnableGarbageCollection,
|
||||
DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers,
|
||||
}
|
||||
ret.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
|
||||
|
|
|
@ -20,6 +20,7 @@ go_library(
|
|||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/client/retry:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
|
@ -39,11 +40,13 @@ go_library(
|
|||
"//vendor/k8s.io/apiextensions-apiserver/test/integration/testserver:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1alpha1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -22,11 +22,17 @@ import (
|
|||
|
||||
"k8s.io/api/core/v1"
|
||||
v1beta1 "k8s.io/api/extensions/v1beta1"
|
||||
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
apiextensionstestserver "k8s.io/apiextensions-apiserver/test/integration/testserver"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage/names"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/metrics"
|
||||
|
@ -535,7 +541,11 @@ var _ = SIGDescribe("Garbage collector", func() {
|
|||
}
|
||||
By("wait for the rc to be deleted")
|
||||
// default client QPS is 20, deleting each pod requires 2 requests, so 30s should be enough
|
||||
if err := wait.Poll(1*time.Second, 30*time.Second, func() (bool, error) {
|
||||
// TODO: 30s is enough assuming immediate processing of dependents following
|
||||
// owner deletion, but in practice there can be a long delay between owner
|
||||
// deletion and dependent deletion processing. For now, increase the timeout
|
||||
// and investigate the processing delay.
|
||||
if err := wait.Poll(1*time.Second, 60*time.Second, func() (bool, error) {
|
||||
_, err := rcClient.Get(rc.Name, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
pods, _ := podClient.List(metav1.ListOptions{})
|
||||
|
@ -737,4 +747,106 @@ var _ = SIGDescribe("Garbage collector", func() {
|
|||
framework.Failf("failed to wait for all pods to be deleted: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
It("should support cascading deletion of custom resources", func() {
|
||||
config, err := framework.LoadConfig()
|
||||
if err != nil {
|
||||
framework.Failf("failed to load config: %v", err)
|
||||
}
|
||||
|
||||
apiExtensionClient, err := apiextensionsclientset.NewForConfig(config)
|
||||
if err != nil {
|
||||
framework.Failf("failed to initialize apiExtensionClient: %v", err)
|
||||
}
|
||||
|
||||
// Create a random custom resource definition and ensure it's available for
|
||||
// use.
|
||||
definition := apiextensionstestserver.NewRandomNameCustomResourceDefinition(apiextensionsv1beta1.ClusterScoped)
|
||||
defer func() {
|
||||
err = apiextensionstestserver.DeleteCustomResourceDefinition(definition, apiExtensionClient)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
framework.Failf("failed to delete CustomResourceDefinition: %v", err)
|
||||
}
|
||||
}()
|
||||
client, err := apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, f.ClientPool)
|
||||
if err != nil {
|
||||
framework.Failf("failed to create CustomResourceDefinition: %v", err)
|
||||
}
|
||||
|
||||
// Get a client for the custom resource.
|
||||
resourceClient := client.Resource(&metav1.APIResource{
|
||||
Name: definition.Spec.Names.Plural,
|
||||
Namespaced: false,
|
||||
}, api.NamespaceNone)
|
||||
apiVersion := definition.Spec.Group + "/" + definition.Spec.Version
|
||||
|
||||
// Create a custom owner resource.
|
||||
ownerName := names.SimpleNameGenerator.GenerateName("owner")
|
||||
owner := &unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": apiVersion,
|
||||
"kind": definition.Spec.Names.Kind,
|
||||
"metadata": map[string]interface{}{
|
||||
"name": ownerName,
|
||||
},
|
||||
},
|
||||
}
|
||||
persistedOwner, err := resourceClient.Create(owner)
|
||||
if err != nil {
|
||||
framework.Failf("failed to create owner resource %q: %v", ownerName, err)
|
||||
}
|
||||
framework.Logf("created owner resource %q", ownerName)
|
||||
|
||||
// Create a custom dependent resource.
|
||||
dependentName := names.SimpleNameGenerator.GenerateName("dependent")
|
||||
dependent := &unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": apiVersion,
|
||||
"kind": definition.Spec.Names.Kind,
|
||||
"metadata": map[string]interface{}{
|
||||
"name": dependentName,
|
||||
"ownerReferences": []map[string]string{
|
||||
{
|
||||
"uid": string(persistedOwner.GetUID()),
|
||||
"apiVersion": apiVersion,
|
||||
"kind": definition.Spec.Names.Kind,
|
||||
"name": ownerName,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
persistedDependent, err := resourceClient.Create(dependent)
|
||||
if err != nil {
|
||||
framework.Failf("failed to create dependent resource %q: %v", dependentName, err)
|
||||
}
|
||||
framework.Logf("created dependent resource %q", dependentName)
|
||||
|
||||
// Delete the owner.
|
||||
background := metav1.DeletePropagationBackground
|
||||
err = resourceClient.Delete(ownerName, &metav1.DeleteOptions{PropagationPolicy: &background})
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete owner resource %q: %v", ownerName, err)
|
||||
}
|
||||
|
||||
// Ensure the dependent is deleted.
|
||||
if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
||||
_, err := resourceClient.Get(dependentName, metav1.GetOptions{})
|
||||
return errors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
framework.Logf("owner: %#v", persistedOwner)
|
||||
framework.Logf("dependent: %#v", persistedDependent)
|
||||
framework.Failf("failed waiting for dependent resource %q to be deleted", dependentName)
|
||||
}
|
||||
|
||||
// Ensure the owner is deleted.
|
||||
_, err = resourceClient.Get(ownerName, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
framework.Failf("expected owner resource %q to be deleted", ownerName)
|
||||
} else {
|
||||
if !errors.IsNotFound(err) {
|
||||
framework.Failf("unexpected error getting owner resource %q: %v", ownerName, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
|
|
@ -10,32 +10,32 @@ load(
|
|||
go_test(
|
||||
name = "go_default_test",
|
||||
size = "large",
|
||||
srcs = [
|
||||
"garbage_collector_test.go",
|
||||
"main_test.go",
|
||||
],
|
||||
srcs = ["garbage_collector_test.go"],
|
||||
tags = [
|
||||
"automanaged",
|
||||
"integration",
|
||||
],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//cmd/kube-apiserver/app/testing:go_default_library",
|
||||
"//pkg/controller/garbagecollector:go_default_library",
|
||||
"//pkg/controller/garbagecollector/metaonly:go_default_library",
|
||||
"//test/integration:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library",
|
||||
"//vendor/k8s.io/apiextensions-apiserver/test/integration/testserver:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery/cached:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -18,32 +18,34 @@ package garbagecollector
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
apiextensionstestserver "k8s.io/apiextensions-apiserver/test/integration/testserver"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage/names"
|
||||
"k8s.io/client-go/discovery"
|
||||
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
apitesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
"k8s.io/kubernetes/test/integration"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
|
||||
func getForegroundOptions() *metav1.DeleteOptions {
|
||||
|
@ -124,63 +126,197 @@ func newOwnerRC(name, namespace string) *v1.ReplicationController {
|
|||
}
|
||||
}
|
||||
|
||||
func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) {
|
||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
||||
masterConfig.EnableCoreControllers = false
|
||||
_, s, closeFn := framework.RunAMaster(masterConfig)
|
||||
func newCRDInstance(definition *apiextensionsv1beta1.CustomResourceDefinition, namespace, name string) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"kind": definition.Spec.Names.Kind,
|
||||
"apiVersion": definition.Spec.Group + "/" + definition.Spec.Version,
|
||||
"metadata": map[string]interface{}{
|
||||
"name": name,
|
||||
"namespace": namespace,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL})
|
||||
if err != nil {
|
||||
t.Fatalf("Error in create clientset: %v", err)
|
||||
func newConfigMap(namespace, name string) *v1.ConfigMap {
|
||||
return &v1.ConfigMap{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "ConfigMap",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
},
|
||||
}
|
||||
preferredResources, err := clientSet.Discovery().ServerPreferredResources()
|
||||
}
|
||||
|
||||
func link(t *testing.T, owner, dependent metav1.Object) {
|
||||
ownerType, err := meta.TypeAccessor(owner)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get supported resources from server: %v", err)
|
||||
t.Fatalf("failed to get type info for %#v: %v", owner, err)
|
||||
}
|
||||
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources)
|
||||
deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources)
|
||||
ref := metav1.OwnerReference{
|
||||
Kind: ownerType.GetKind(),
|
||||
APIVersion: ownerType.GetAPIVersion(),
|
||||
Name: owner.GetName(),
|
||||
UID: owner.GetUID(),
|
||||
}
|
||||
dependent.SetOwnerReferences(append(dependent.GetOwnerReferences(), ref))
|
||||
}
|
||||
|
||||
func createRandomCustomResourceDefinition(
|
||||
t *testing.T, apiExtensionClient apiextensionsclientset.Interface,
|
||||
clientPool dynamic.ClientPool,
|
||||
namespace string,
|
||||
) (*apiextensionsv1beta1.CustomResourceDefinition, dynamic.ResourceInterface) {
|
||||
// Create a random custom resource definition and ensure it's available for
|
||||
// use.
|
||||
definition := apiextensionstestserver.NewRandomNameCustomResourceDefinition(apiextensionsv1beta1.NamespaceScoped)
|
||||
|
||||
client, err := apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, clientPool)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse supported resources from server: %v", err)
|
||||
t.Fatalf("failed to create CustomResourceDefinition: %v", err)
|
||||
}
|
||||
config := &restclient.Config{Host: s.URL}
|
||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig.NegotiatedSerializer = nil
|
||||
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
|
||||
// Get a client for the custom resource.
|
||||
resourceClient := client.Resource(&metav1.APIResource{
|
||||
Name: definition.Spec.Names.Plural,
|
||||
Namespaced: true,
|
||||
}, namespace)
|
||||
return definition, resourceClient
|
||||
}
|
||||
|
||||
type testContext struct {
|
||||
tearDown func()
|
||||
gc *garbagecollector.GarbageCollector
|
||||
clientSet clientset.Interface
|
||||
apiExtensionClient apiextensionsclientset.Interface
|
||||
clientPool dynamic.ClientPool
|
||||
startGC func(workers int)
|
||||
// syncPeriod is how often the GC started with startGC will be resynced.
|
||||
syncPeriod time.Duration
|
||||
}
|
||||
|
||||
// if workerCount > 0, will start the GC, otherwise it's up to the caller to Run() the GC.
|
||||
func setup(t *testing.T, workerCount int) *testContext {
|
||||
masterConfig, tearDownMaster := apitesting.StartTestServerOrDie(t)
|
||||
|
||||
// TODO: Disable logging here until we resolve teardown issues which result in
|
||||
// massive log spam. Another path forward would be to refactor
|
||||
// StartTestServerOrDie to work with the etcd instance already started by the
|
||||
// integration test scripts.
|
||||
// See https://github.com/kubernetes/kubernetes/issues/49489.
|
||||
repo, err := capnslog.GetRepoLogger("github.com/coreos/etcd")
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't configure logging: %v", err)
|
||||
}
|
||||
repo.SetLogLevel(map[string]capnslog.LogLevel{
|
||||
"etcdserver/api/v3rpc": capnslog.CRITICAL,
|
||||
})
|
||||
|
||||
clientSet, err := clientset.NewForConfig(masterConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating clientset: %v", err)
|
||||
}
|
||||
|
||||
// Helpful stuff for testing CRD.
|
||||
apiExtensionClient, err := apiextensionsclientset.NewForConfig(masterConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating extension clientset: %v", err)
|
||||
}
|
||||
// CreateNewCustomResourceDefinition wants to use this namespace for verifying
|
||||
// namespace-scoped CRD creation.
|
||||
createNamespaceOrDie("aval", clientSet, t)
|
||||
|
||||
discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
|
||||
restMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient, meta.InterfacesForUnstructured)
|
||||
restMapper.Reset()
|
||||
deletableResources, err := garbagecollector.GetDeletableResources(discoveryClient)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to get deletable resources: %v", err)
|
||||
}
|
||||
config := *masterConfig
|
||||
config.ContentConfig = dynamic.ContentConfig()
|
||||
metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||
clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
|
||||
gc, err := garbagecollector.NewGarbageCollector(
|
||||
metaOnlyClientPool,
|
||||
clientPool,
|
||||
api.Registry.RESTMapper(),
|
||||
deletableGroupVersionResources,
|
||||
restMapper,
|
||||
deletableResources,
|
||||
garbagecollector.DefaultIgnoredResources(),
|
||||
sharedInformers,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create garbage collector")
|
||||
t.Fatalf("failed to create garbage collector: %v", err)
|
||||
}
|
||||
|
||||
go sharedInformers.Start(stop)
|
||||
stopCh := make(chan struct{})
|
||||
tearDown := func() {
|
||||
close(stopCh)
|
||||
tearDownMaster()
|
||||
repo.SetLogLevel(map[string]capnslog.LogLevel{
|
||||
"etcdserver/api/v3rpc": capnslog.ERROR,
|
||||
})
|
||||
}
|
||||
syncPeriod := 5 * time.Second
|
||||
startGC := func(workers int) {
|
||||
go gc.Run(workers, stopCh)
|
||||
go gc.Sync(restMapper, discoveryClient, syncPeriod, stopCh)
|
||||
}
|
||||
|
||||
return s, closeFn, gc, clientSet
|
||||
if workerCount > 0 {
|
||||
startGC(workerCount)
|
||||
}
|
||||
|
||||
return &testContext{
|
||||
tearDown: tearDown,
|
||||
gc: gc,
|
||||
clientSet: clientSet,
|
||||
apiExtensionClient: apiExtensionClient,
|
||||
clientPool: clientPool,
|
||||
startGC: startGC,
|
||||
syncPeriod: syncPeriod,
|
||||
}
|
||||
}
|
||||
|
||||
func createNamespaceOrDie(name string, c clientset.Interface, t *testing.T) *v1.Namespace {
|
||||
ns := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
||||
if _, err := c.Core().Namespaces().Create(ns); err != nil {
|
||||
t.Fatalf("failed to create namespace: %v", err)
|
||||
}
|
||||
falseVar := false
|
||||
_, err := c.Core().ServiceAccounts(ns.Name).Create(&v1.ServiceAccount{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "default"},
|
||||
AutomountServiceAccountToken: &falseVar,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create service account: %v", err)
|
||||
}
|
||||
return ns
|
||||
}
|
||||
|
||||
func deleteNamespaceOrDie(name string, c clientset.Interface, t *testing.T) {
|
||||
zero := int64(0)
|
||||
background := metav1.DeletePropagationBackground
|
||||
err := c.Core().Namespaces().Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &zero, PropagationPolicy: &background})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete namespace %q: %v", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// This test simulates the cascading deletion.
|
||||
func TestCascadingDeletion(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
glog.V(6).Infof("TestCascadingDeletion starts")
|
||||
defer glog.V(6).Infof("TestCascadingDeletion ends")
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
gc, clientSet := ctx.gc, ctx.clientSet
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-cascading-deletion", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
ns := createNamespaceOrDie("gc-cascading-deletion", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
|
||||
podClient := clientSet.Core().Pods(ns.Name)
|
||||
|
@ -234,7 +370,6 @@ func TestCascadingDeletion(t *testing.T) {
|
|||
if len(pods.Items) != 3 {
|
||||
t.Fatalf("Expect only 3 pods")
|
||||
}
|
||||
go gc.Run(5, stopCh)
|
||||
// delete one of the replication controller
|
||||
if err := rcClient.Delete(toBeDeletedRCName, getNonOrphanOptions()); err != nil {
|
||||
t.Fatalf("failed to delete replication controller: %v", err)
|
||||
|
@ -262,17 +397,13 @@ func TestCascadingDeletion(t *testing.T) {
|
|||
// This test simulates the case where an object is created with an owner that
|
||||
// doesn't exist. It verifies the GC will delete such an object.
|
||||
func TestCreateWithNonExistentOwner(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-non-existing-owner", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
clientSet := ctx.clientSet
|
||||
|
||||
ns := createNamespaceOrDie("gc-non-existing-owner", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
podClient := clientSet.Core().Pods(ns.Name)
|
||||
|
||||
|
@ -290,7 +421,6 @@ func TestCreateWithNonExistentOwner(t *testing.T) {
|
|||
if len(pods.Items) != 1 {
|
||||
t.Fatalf("Expect only 1 pod")
|
||||
}
|
||||
go gc.Run(5, stopCh)
|
||||
// wait for the garbage collector to delete the pod
|
||||
if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 5*time.Second, 30*time.Second); err != nil {
|
||||
t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err)
|
||||
|
@ -361,21 +491,13 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
|
|||
// time of our pre-submit tests to increase submit-queue throughput. We'll add
|
||||
// e2e tests that put more stress.
|
||||
func TestStressingCascadingDeletion(t *testing.T) {
|
||||
t.Logf("starts garbage collector stress test")
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
gc, clientSet := ctx.gc, ctx.clientSet
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-stressing-cascading-deletion", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
|
||||
go gc.Run(5, stopCh)
|
||||
ns := createNamespaceOrDie("gc-stressing-cascading-deletion", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
const collections = 10
|
||||
var wg sync.WaitGroup
|
||||
|
@ -428,18 +550,13 @@ func TestStressingCascadingDeletion(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOrphaning(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
gc, clientSet := ctx.gc, ctx.clientSet
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-orphaning", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
ns := createNamespaceOrDie("gc-orphaning", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
podClient := clientSet.Core().Pods(ns.Name)
|
||||
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
|
||||
|
@ -462,7 +579,6 @@ func TestOrphaning(t *testing.T) {
|
|||
}
|
||||
podUIDs = append(podUIDs, pod.ObjectMeta.UID)
|
||||
}
|
||||
go gc.Run(5, stopCh)
|
||||
|
||||
// we need wait for the gc to observe the creation of the pods, otherwise if
|
||||
// the deletion of RC is observed before the creation of the pods, the pods
|
||||
|
@ -504,18 +620,13 @@ func TestOrphaning(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
clientSet := ctx.clientSet
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-foreground1", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
ns := createNamespaceOrDie("gc-foreground1", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
podClient := clientSet.Core().Pods(ns.Name)
|
||||
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
|
||||
|
@ -538,8 +649,6 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
|
|||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
go gc.Run(5, stopCh)
|
||||
|
||||
err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete the rc: %v", err)
|
||||
|
@ -571,18 +680,13 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
clientSet := ctx.clientSet
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-foreground2", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
ns := createNamespaceOrDie("gc-foreground2", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
podClient := clientSet.Core().Pods(ns.Name)
|
||||
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
|
||||
|
@ -613,8 +717,6 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
|
|||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
go gc.Run(5, stopCh)
|
||||
|
||||
err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete the rc: %v", err)
|
||||
|
@ -644,18 +746,12 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
ctx := setup(t, 0)
|
||||
defer ctx.tearDown()
|
||||
gc, clientSet := ctx.gc, ctx.clientSet
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-foreground3", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
ns := createNamespaceOrDie("foo", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
podClient := clientSet.Core().Pods(ns.Name)
|
||||
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
|
||||
|
@ -675,10 +771,9 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
|||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
go gc.Run(5, stopCh)
|
||||
|
||||
// this makes sure the garbage collector will have added the pod to its
|
||||
// dependency graph before handling the foreground deletion of the rc.
|
||||
ctx.startGC(5)
|
||||
timeout := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
|
@ -686,7 +781,7 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
|||
close(timeout)
|
||||
}
|
||||
}()
|
||||
if !cache.WaitForCacheSync(timeout, gc.HasSynced) {
|
||||
if !cache.WaitForCacheSync(timeout, gc.IsSynced) {
|
||||
t.Fatalf("failed to wait for garbage collector to be synced")
|
||||
}
|
||||
|
||||
|
@ -694,7 +789,7 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Failed to delete the rc: %v", err)
|
||||
}
|
||||
time.Sleep(30 * time.Second)
|
||||
time.Sleep(15 * time.Second)
|
||||
// verify the toBeDeleteRC is NOT deleted
|
||||
_, err = rcClient.Get(toBeDeletedRC.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
|
@ -710,3 +805,215 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
|||
t.Errorf("expect there to be 1 pods, got %#v", pods.Items)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCustomResourceCascadingDeletion ensures the basic cascading delete
|
||||
// behavior supports custom resources.
|
||||
func TestCustomResourceCascadingDeletion(t *testing.T) {
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
clientSet, apiExtensionClient, clientPool := ctx.clientSet, ctx.apiExtensionClient, ctx.clientPool
|
||||
|
||||
ns := createNamespaceOrDie("crd-cascading", clientSet, t)
|
||||
|
||||
definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, clientPool, ns.Name)
|
||||
|
||||
// Create a custom owner resource.
|
||||
owner := newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner"))
|
||||
owner, err := resourceClient.Create(owner)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create owner resource %q: %v", owner.GetName(), err)
|
||||
}
|
||||
t.Logf("created owner resource %q", owner.GetName())
|
||||
|
||||
// Create a custom dependent resource.
|
||||
dependent := newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("dependent"))
|
||||
link(t, owner, dependent)
|
||||
|
||||
dependent, err = resourceClient.Create(dependent)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create dependent resource %q: %v", dependent.GetName(), err)
|
||||
}
|
||||
t.Logf("created dependent resource %q", dependent.GetName())
|
||||
|
||||
// Delete the owner.
|
||||
foreground := metav1.DeletePropagationForeground
|
||||
err = resourceClient.Delete(owner.GetName(), &metav1.DeleteOptions{PropagationPolicy: &foreground})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete owner resource %q: %v", owner.GetName(), err)
|
||||
}
|
||||
|
||||
// Ensure the owner is deleted.
|
||||
if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
||||
_, err := resourceClient.Get(owner.GetName(), metav1.GetOptions{})
|
||||
return errors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("failed waiting for owner resource %q to be deleted", owner.GetName())
|
||||
}
|
||||
|
||||
// Ensure the dependent is deleted.
|
||||
_, err = resourceClient.Get(dependent.GetName(), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Fatalf("expected dependent %q to be deleted", dependent.GetName())
|
||||
} else {
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Fatalf("unexpected error getting dependent %q: %v", dependent.GetName(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMixedRelationships ensures that owner/dependent relationships work
|
||||
// between core and custom resources.
|
||||
//
|
||||
// TODO: Consider how this could be represented with table-style tests (e.g. a
|
||||
// before/after expected object graph given a delete operation targetting a
|
||||
// specific node in the before graph with certain delete options).
|
||||
func TestMixedRelationships(t *testing.T) {
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
clientSet, apiExtensionClient, clientPool := ctx.clientSet, ctx.apiExtensionClient, ctx.clientPool
|
||||
|
||||
ns := createNamespaceOrDie("crd-mixed", clientSet, t)
|
||||
|
||||
configMapClient := clientSet.Core().ConfigMaps(ns.Name)
|
||||
|
||||
definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, clientPool, ns.Name)
|
||||
|
||||
// Create a custom owner resource.
|
||||
customOwner, err := resourceClient.Create(newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner")))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create owner: %v", err)
|
||||
}
|
||||
t.Logf("created custom owner %q", customOwner.GetName())
|
||||
|
||||
// Create a core dependent resource.
|
||||
coreDependent := newConfigMap(ns.Name, names.SimpleNameGenerator.GenerateName("dependent"))
|
||||
link(t, customOwner, coreDependent)
|
||||
coreDependent, err = configMapClient.Create(coreDependent)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create dependent: %v", err)
|
||||
}
|
||||
t.Logf("created core dependent %q", coreDependent.GetName())
|
||||
|
||||
// Create a core owner resource.
|
||||
coreOwner, err := configMapClient.Create(newConfigMap(ns.Name, names.SimpleNameGenerator.GenerateName("owner")))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create owner: %v", err)
|
||||
}
|
||||
t.Logf("created core owner %q: %#v", coreOwner.GetName(), coreOwner)
|
||||
|
||||
// Create a custom dependent resource.
|
||||
customDependent := newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("dependent"))
|
||||
coreOwner.TypeMeta.Kind = "ConfigMap"
|
||||
coreOwner.TypeMeta.APIVersion = "v1"
|
||||
link(t, coreOwner, customDependent)
|
||||
customDependent, err = resourceClient.Create(customDependent)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create dependent: %v", err)
|
||||
}
|
||||
t.Logf("created custom dependent %q", customDependent.GetName())
|
||||
|
||||
// Delete the custom owner.
|
||||
foreground := metav1.DeletePropagationForeground
|
||||
err = resourceClient.Delete(customOwner.GetName(), &metav1.DeleteOptions{PropagationPolicy: &foreground})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete owner resource %q: %v", customOwner.GetName(), err)
|
||||
}
|
||||
|
||||
// Ensure the owner is deleted.
|
||||
if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
||||
_, err := resourceClient.Get(customOwner.GetName(), metav1.GetOptions{})
|
||||
return errors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("failed waiting for owner resource %q to be deleted", customOwner.GetName())
|
||||
}
|
||||
|
||||
// Ensure the dependent is deleted.
|
||||
_, err = resourceClient.Get(coreDependent.GetName(), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Fatalf("expected dependent %q to be deleted", coreDependent.GetName())
|
||||
} else {
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Fatalf("unexpected error getting dependent %q: %v", coreDependent.GetName(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the core owner.
|
||||
err = configMapClient.Delete(coreOwner.GetName(), &metav1.DeleteOptions{PropagationPolicy: &foreground})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete owner resource %q: %v", coreOwner.GetName(), err)
|
||||
}
|
||||
|
||||
// Ensure the owner is deleted.
|
||||
if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
||||
_, err := configMapClient.Get(coreOwner.GetName(), metav1.GetOptions{})
|
||||
return errors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("failed waiting for owner resource %q to be deleted", coreOwner.GetName())
|
||||
}
|
||||
|
||||
// Ensure the dependent is deleted.
|
||||
_, err = resourceClient.Get(customDependent.GetName(), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Fatalf("expected dependent %q to be deleted", customDependent.GetName())
|
||||
} else {
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Fatalf("unexpected error getting dependent %q: %v", customDependent.GetName(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestCRDDeletionCascading ensures propagating deletion of a custom resource
|
||||
// definition with an instance that owns a core resource.
|
||||
func TestCRDDeletionCascading(t *testing.T) {
|
||||
ctx := setup(t, 5)
|
||||
defer ctx.tearDown()
|
||||
|
||||
clientSet, apiExtensionClient, clientPool := ctx.clientSet, ctx.apiExtensionClient, ctx.clientPool
|
||||
|
||||
ns := createNamespaceOrDie("crd-mixed", clientSet, t)
|
||||
|
||||
configMapClient := clientSet.Core().ConfigMaps(ns.Name)
|
||||
|
||||
definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, clientPool, ns.Name)
|
||||
|
||||
// Create a custom owner resource.
|
||||
owner, err := resourceClient.Create(newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner")))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create owner: %v", err)
|
||||
}
|
||||
t.Logf("created owner %q", owner.GetName())
|
||||
|
||||
// Create a core dependent resource.
|
||||
dependent := newConfigMap(ns.Name, names.SimpleNameGenerator.GenerateName("dependent"))
|
||||
link(t, owner, dependent)
|
||||
dependent, err = configMapClient.Create(dependent)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create dependent: %v", err)
|
||||
}
|
||||
t.Logf("created dependent %q", dependent.GetName())
|
||||
|
||||
time.Sleep(ctx.syncPeriod + 5*time.Second)
|
||||
|
||||
// Delete the definition, which should cascade to the owner and ultimately its dependents.
|
||||
if err := apiextensionstestserver.DeleteCustomResourceDefinition(definition, apiExtensionClient); err != nil {
|
||||
t.Fatalf("failed to delete %q: %v", definition.Name, err)
|
||||
}
|
||||
|
||||
// Ensure the owner is deleted.
|
||||
if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
||||
_, err := resourceClient.Get(owner.GetName(), metav1.GetOptions{})
|
||||
return errors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("failed waiting for owner %q to be deleted", owner.GetName())
|
||||
}
|
||||
|
||||
// Ensure the dependent is deleted.
|
||||
if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
||||
_, err := configMapClient.Get(dependent.GetName(), metav1.GetOptions{})
|
||||
return errors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("failed waiting for dependent %q (owned by %q) to be deleted", dependent.GetName(), owner.GetName())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package garbagecollector
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
framework.EtcdMain(m.Run)
|
||||
}
|
Loading…
Reference in New Issue