Merge pull request #63386 from roycaihw/gc-json-patch

Automatic merge from submit-queue (batch tested with PRs 63386, 64624, 62297, 64847). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Let the garbage collector use json merge patch when SMP is not supported

**What this PR does / why we need it**:
Let garbage collector fallback to use json merge patch when strategic merge patch returns 415. This enables orphan delete on custom resources. 

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #56348

**Special notes for your reviewer**:
This PR is developed based on https://github.com/kubernetes/kubernetes/pull/56595. Ref https://github.com/kubernetes/kubernetes/pull/56606 for more information. 

**Release note**:

```release-note
Orphan delete is now supported for custom resources
```

/sig api-machinery
pull/8/head
Kubernetes Submit Queue 2018-06-06 19:56:20 -07:00 committed by GitHub
commit ccb9590a3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 240 additions and 20 deletions

View File

@ -468,8 +468,11 @@ func (gc *GarbageCollector) attemptToDeleteItem(item *node) error {
// waitingForDependentsDeletion needs to be deleted from the
// ownerReferences, otherwise the referenced objects will be stuck with
// the FinalizerDeletingDependents and never get deleted.
patch := deleteOwnerRefPatch(item.identity.UID, append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)...)
_, err = gc.patchObject(item.identity, patch)
ownerUIDs := append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)
patch := deleteOwnerRefStrategicMergePatch(item.identity.UID, ownerUIDs...)
_, err = gc.patch(item, patch, func(n *node) ([]byte, error) {
return gc.deleteOwnerRefJSONMergePatch(n, ownerUIDs...)
})
return err
case len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0:
deps := item.getDependents()
@ -481,11 +484,11 @@ func (gc *GarbageCollector) attemptToDeleteItem(item *node) error {
// there are multiple workers run attemptToDeleteItem in
// parallel, the circle detection can fail in a race condition.
glog.V(2).Infof("processing object %s, some of its owners and its dependent [%s] have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the object is going to be deleted with Foreground", item.identity, dep.identity)
patch, err := item.patchToUnblockOwnerReferences()
patch, err := item.unblockOwnerReferencesStrategicMergePatch()
if err != nil {
return err
}
if _, err := gc.patchObject(item.identity, patch); err != nil {
if _, err := gc.patch(item, patch, gc.unblockOwnerReferencesJSONMergePatch); err != nil {
return err
}
break
@ -545,8 +548,10 @@ func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents [
go func(dependent *node) {
defer wg.Done()
// the dependent.identity.UID is used as precondition
patch := deleteOwnerRefPatch(dependent.identity.UID, owner.UID)
_, err := gc.patchObject(dependent.identity, patch)
patch := deleteOwnerRefStrategicMergePatch(dependent.identity.UID, owner.UID)
_, err := gc.patch(dependent, patch, func(n *node) ([]byte, error) {
return gc.deleteOwnerRefJSONMergePatch(n, owner.UID)
})
// note that if the target ownerReference doesn't exist in the
// dependent, strategic merge patch will NOT return an error.
if err != nil && !errors.IsNotFound(err) {

View File

@ -593,7 +593,7 @@ func TestDeleteOwnerRefPatch(t *testing.T) {
},
},
}
patch := deleteOwnerRefPatch("100", "2", "3")
patch := deleteOwnerRefStrategicMergePatch("100", "2", "3")
patched, err := strategicpatch.StrategicMergePatch(originalData, patch, v1.Pod{})
if err != nil {
t.Fatal(err)
@ -638,7 +638,7 @@ func TestUnblockOwnerReference(t *testing.T) {
n := node{
owners: accessor.GetOwnerReferences(),
}
patch, err := n.patchToUnblockOwnerReferences()
patch, err := n.unblockOwnerReferencesStrategicMergePatch()
if err != nil {
t.Fatal(err)
}

View File

@ -78,7 +78,7 @@ type GraphBuilder struct {
// each monitor list/watches a resource, the results are funneled to the
// dependencyGraphBuilder
monitors monitors
monitorLock sync.Mutex
monitorLock sync.RWMutex
// informersStarted is closed after after all of the controllers have been initialized and are running.
// After that it is safe to start them here, before that it is not.
informersStarted <-chan struct{}
@ -111,6 +111,7 @@ type GraphBuilder struct {
// monitor runs a Controller with a local stop channel.
type monitor struct {
controller cache.Controller
store cache.Store
// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
// not yet started.
@ -138,7 +139,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource)
}
}
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) {
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
@ -178,21 +179,21 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), nil
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
} else {
glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
}
// TODO: consider store in one storage.
glog.V(5).Infof("create storage for resource %s", resource)
_, monitor := cache.NewInformer(
store, monitor := cache.NewInformer(
listWatcher(gb.dynamicClient, resource),
nil,
ResourceResyncTime,
// don't need to clone because it's not from shared cache
handlers,
)
return monitor, nil
return monitor, store, nil
}
// syncMonitors rebuilds the monitor set according to the supplied resources,
@ -228,12 +229,12 @@ func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]s
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
continue
}
c, err := gb.controllerFor(resource, kind)
c, s, err := gb.controllerFor(resource, kind)
if err != nil {
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
continue
}
current[resource] = &monitor{controller: c}
current[resource] = &monitor{store: s, controller: c}
added++
}
gb.monitors = current

View File

@ -76,12 +76,12 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured
return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Update(obj)
}
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) {
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte, pt types.PatchType) (*unstructured.Unstructured, error) {
resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return nil, err
}
return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch)
return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(item.Name, pt, patch)
}
// TODO: Using Patch when strategicmerge supports deleting an entry from a

View File

@ -21,12 +21,16 @@ import (
"fmt"
"strings"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
func deleteOwnerRefPatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte {
func deleteOwnerRefStrategicMergePatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte {
var pieces []string
for _, ownerUID := range ownerUIDs {
pieces = append(pieces, fmt.Sprintf(`{"$patch":"delete","uid":"%s"}`, ownerUID))
@ -35,9 +39,97 @@ func deleteOwnerRefPatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte
return []byte(patch)
}
// generate a patch that unsets the BlockOwnerDeletion field of all
// getMetadata tries getting object metadata from local cache, and sends GET request to apiserver when
// local cache is not available or not latest.
func (gc *GarbageCollector) getMetadata(apiVersion, kind, namespace, name string) (metav1.Object, error) {
apiResource, _, err := gc.apiResource(apiVersion, kind)
if err != nil {
return nil, err
}
gc.dependencyGraphBuilder.monitorLock.RLock()
defer gc.dependencyGraphBuilder.monitorLock.RUnlock()
m, ok := gc.dependencyGraphBuilder.monitors[apiResource]
if !ok || m == nil {
// If local cache doesn't exist for mapping.Resource, send a GET request to API server
return gc.dynamicClient.Resource(apiResource).Namespace(namespace).Get(name, metav1.GetOptions{})
}
key := name
if len(namespace) != 0 {
key = namespace + "/" + name
}
raw, exist, err := m.store.GetByKey(key)
if err != nil {
return nil, err
}
if !exist {
// If local cache doesn't contain the object, send a GET request to API server
return gc.dynamicClient.Resource(apiResource).Namespace(namespace).Get(name, metav1.GetOptions{})
}
obj, ok := raw.(runtime.Object)
if !ok {
return nil, fmt.Errorf("expect a runtime.Object, got %v", raw)
}
return meta.Accessor(obj)
}
type objectForPatch struct {
ObjectMetaForPatch `json:"metadata"`
}
type ObjectMetaForPatch struct {
ResourceVersion string `json:"resourceVersion"`
OwnerReferences []metav1.OwnerReference `json:"ownerReferences"`
}
// jsonMergePatchFunc defines the interface for functions that construct json merge patches that manipulate
// owner reference array.
type jsonMergePatchFunc func(*node) ([]byte, error)
// patch tries strategic merge patch on item first, and if SMP is not supported, it fallbacks to JSON merge
// patch.
func (gc *GarbageCollector) patch(item *node, smp []byte, jmp jsonMergePatchFunc) (*unstructured.Unstructured, error) {
smpResult, err := gc.patchObject(item.identity, smp, types.StrategicMergePatchType)
if err == nil {
return smpResult, nil
}
if !errors.IsUnsupportedMediaType(err) {
return nil, err
}
// StrategicMergePatch is not supported, use JSON merge patch instead
patch, err := jmp(item)
if err != nil {
return nil, err
}
return gc.patchObject(item.identity, patch, types.MergePatchType)
}
// Returns JSON merge patch that removes the ownerReferences matching ownerUIDs.
func (gc *GarbageCollector) deleteOwnerRefJSONMergePatch(item *node, ownerUIDs ...types.UID) ([]byte, error) {
accessor, err := gc.getMetadata(item.identity.APIVersion, item.identity.Kind, item.identity.Namespace, item.identity.Name)
if err != nil {
return nil, err
}
expectedObjectMeta := ObjectMetaForPatch{}
expectedObjectMeta.ResourceVersion = accessor.GetResourceVersion()
refs := accessor.GetOwnerReferences()
for _, ref := range refs {
var skip bool
for _, ownerUID := range ownerUIDs {
if ref.UID == ownerUID {
skip = true
break
}
}
if !skip {
expectedObjectMeta.OwnerReferences = append(expectedObjectMeta.OwnerReferences, ref)
}
}
return json.Marshal(objectForPatch{expectedObjectMeta})
}
// Generate a patch that unsets the BlockOwnerDeletion field of all
// ownerReferences of node.
func (n *node) patchToUnblockOwnerReferences() ([]byte, error) {
func (n *node) unblockOwnerReferencesStrategicMergePatch() ([]byte, error) {
var dummy metaonly.MetadataOnlyObject
var blockingRefs []metav1.OwnerReference
falseVar := false
@ -52,3 +144,22 @@ func (n *node) patchToUnblockOwnerReferences() ([]byte, error) {
dummy.ObjectMeta.UID = n.identity.UID
return json.Marshal(dummy)
}
// Generate a JSON merge patch that unsets the BlockOwnerDeletion field of all
// ownerReferences of node.
func (gc *GarbageCollector) unblockOwnerReferencesJSONMergePatch(n *node) ([]byte, error) {
accessor, err := gc.getMetadata(n.identity.APIVersion, n.identity.Kind, n.identity.Namespace, n.identity.Name)
if err != nil {
return nil, err
}
expectedObjectMeta := ObjectMetaForPatch{}
expectedObjectMeta.ResourceVersion = accessor.GetResourceVersion()
var expectedOwners []metav1.OwnerReference
falseVar := false
for _, owner := range n.owners {
owner.BlockOwnerDeletion = &falseVar
expectedOwners = append(expectedOwners, owner)
}
expectedObjectMeta.OwnerReferences = expectedOwners
return json.Marshal(objectForPatch{expectedObjectMeta})
}

View File

@ -998,6 +998,109 @@ var _ = SIGDescribe("Garbage collector", func() {
}
})
It("should support orphan deletion of custom resources", func() {
config, err := framework.LoadConfig()
if err != nil {
framework.Failf("failed to load config: %v", err)
}
apiExtensionClient, err := apiextensionsclientset.NewForConfig(config)
if err != nil {
framework.Failf("failed to initialize apiExtensionClient: %v", err)
}
// Create a random custom resource definition and ensure it's available for
// use.
definition := apiextensionstestserver.NewRandomNameCustomResourceDefinition(apiextensionsv1beta1.ClusterScoped)
defer func() {
err = apiextensionstestserver.DeleteCustomResourceDefinition(definition, apiExtensionClient)
if err != nil && !errors.IsNotFound(err) {
framework.Failf("failed to delete CustomResourceDefinition: %v", err)
}
}()
definition, err = apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, f.DynamicClient)
if err != nil {
framework.Failf("failed to create CustomResourceDefinition: %v", err)
}
// Get a client for the custom resource.
gvr := schema.GroupVersionResource{Group: definition.Spec.Group, Version: definition.Spec.Version, Resource: definition.Spec.Names.Plural}
resourceClient := f.DynamicClient.Resource(gvr)
apiVersion := definition.Spec.Group + "/" + definition.Spec.Version
// Create a custom owner resource.
ownerName := names.SimpleNameGenerator.GenerateName("owner")
owner := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiVersion,
"kind": definition.Spec.Names.Kind,
"metadata": map[string]interface{}{
"name": ownerName,
},
},
}
persistedOwner, err := resourceClient.Create(owner)
if err != nil {
framework.Failf("failed to create owner resource %q: %v", ownerName, err)
}
framework.Logf("created owner resource %q", ownerName)
// Create a custom dependent resource.
dependentName := names.SimpleNameGenerator.GenerateName("dependent")
dependent := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiVersion,
"kind": definition.Spec.Names.Kind,
"metadata": map[string]interface{}{
"name": dependentName,
"ownerReferences": []map[string]string{
{
"uid": string(persistedOwner.GetUID()),
"apiVersion": apiVersion,
"kind": definition.Spec.Names.Kind,
"name": ownerName,
},
},
},
},
}
_, err = resourceClient.Create(dependent)
if err != nil {
framework.Failf("failed to create dependent resource %q: %v", dependentName, err)
}
framework.Logf("created dependent resource %q", dependentName)
// Delete the owner and orphan the dependent.
err = resourceClient.Delete(ownerName, getOrphanOptions())
if err != nil {
framework.Failf("failed to delete owner resource %q: %v", ownerName, err)
}
By("wait for the owner to be deleted")
if err := wait.Poll(5*time.Second, 120*time.Second, func() (bool, error) {
_, err = resourceClient.Get(ownerName, metav1.GetOptions{})
if err == nil {
return false, nil
}
if err != nil && !errors.IsNotFound(err) {
return false, fmt.Errorf("Failed to get owner: %v", err)
}
return true, nil
}); err != nil {
framework.Failf("timeout in waiting for the owner to be deleted: %v", err)
}
// Wait 30s and ensure the dependent is not deleted.
By("wait for 30 seconds to see if the garbage collector mistakenly deletes the dependent crd")
if err := wait.Poll(5*time.Second, 30*time.Second, func() (bool, error) {
_, err := resourceClient.Get(dependentName, metav1.GetOptions{})
return false, err
}); err != nil && err != wait.ErrWaitTimeout {
framework.Failf("failed to ensure the dependent is not deleted: %v", err)
}
})
It("should delete jobs and pods created by cronjob", func() {
framework.SkipIfMissingResource(f.DynamicClient, CronJobGroupVersionResource, f.Namespace.Name)