From f955d556f8ed22d392a1ab2ff3891db2156b0591 Mon Sep 17 00:00:00 2001 From: nikhiljindal Date: Tue, 11 Oct 2016 12:48:38 -0700 Subject: [PATCH] Adding cascading deletion support to federated namespaces --- .../pkg/federation-controller/namespace/BUILD | 4 +- .../namespace/namespace_controller.go | 240 +++++++++++++----- .../namespace/namespace_controller_test.go | 35 ++- .../util/deletionhelper/BUILD | 25 ++ .../util/deletionhelper/deletion_helper.go | 171 +++++++++++++ .../util/federated_informer.go | 24 ++ .../util/federated_updater_test.go | 9 +- .../util/test/test_helper.go | 19 +- pkg/api/v1/types.go | 1 + test/e2e/federated-namespace.go | 17 +- test/e2e/framework/framework.go | 6 +- test/e2e/framework/util.go | 1 + 12 files changed, 473 insertions(+), 79 deletions(-) create mode 100644 federation/pkg/federation-controller/util/deletionhelper/BUILD create mode 100644 federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go diff --git a/federation/pkg/federation-controller/namespace/BUILD b/federation/pkg/federation-controller/namespace/BUILD index 49cf6b6189..b47975c58a 100644 --- a/federation/pkg/federation-controller/namespace/BUILD +++ b/federation/pkg/federation-controller/namespace/BUILD @@ -18,6 +18,7 @@ go_library( "//federation/apis/federation/v1beta1:go_default_library", "//federation/client/clientset_generated/federation_release_1_5:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/eventsink:go_default_library", "//pkg/api:go_default_library", "//pkg/api/errors:go_default_library", @@ -28,7 +29,6 @@ go_library( "//pkg/controller:go_default_library", "//pkg/runtime:go_default_library", "//pkg/util/flowcontrol:go_default_library", - "//pkg/util/sets:go_default_library", "//pkg/watch:go_default_library", "//vendor:github.com/golang/glog", ], @@ -42,6 +42,8 @@ go_test( deps = [ "//federation/apis/federation/v1beta1:go_default_library", "//federation/client/clientset_generated/federation_release_1_5/fake:go_default_library", + "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library", "//pkg/api/unversioned:go_default_library", "//pkg/api/v1:go_default_library", diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index 7562f4f500..b0dedaef65 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -23,6 +23,7 @@ import ( federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5" "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -31,9 +32,8 @@ import ( kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" - pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/flowcontrol" - "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -71,6 +71,8 @@ type NamespaceController struct { // For events eventRecorder record.EventRecorder + deletionHelper *deletionhelper.DeletionHelper + namespaceReviewDelay time.Duration clusterAvailableDelay time.Duration smallDelay time.Duration @@ -100,7 +102,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont // Start informer in federated API servers on namespaces that should be federated. nc.namespaceInformerStore, nc.namespaceInformerController = cache.NewInformer( &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + ListFunc: func(options api.ListOptions) (runtime.Object, error) { versionedOptions := util.VersionizeV1ListOptions(options) return client.Core().Namespaces().List(versionedOptions) }, @@ -111,7 +113,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont }, &api_v1.Namespace{}, controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) })) + util.NewTriggerOnAllChanges(func(obj runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) })) // Federated informer on namespaces in members of federation. nc.namespaceFederatedInformer = util.NewFederatedInformer( @@ -119,7 +121,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont func(cluster *federation_api.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { return cache.NewInformer( &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + ListFunc: func(options api.ListOptions) (runtime.Object, error) { versionedOptions := util.VersionizeV1ListOptions(options) return targetClient.Core().Namespaces().List(versionedOptions) }, @@ -133,10 +135,9 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont // Trigger reconciliation whenever something in federated cluster is changed. In most cases it // would be just confirmation that some namespace opration succeeded. util.NewTriggerOnMetaAndSpecChanges( - func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) }, + func(obj runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) }, )) }, - &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federation_api.Cluster) { // When new cluster becomes available process all the namespaces again. @@ -147,24 +148,118 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont // Federated updeater along with Create/Update/Delete operations. nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, - func(client kubeclientset.Interface, obj pkg_runtime.Object) error { + func(client kubeclientset.Interface, obj runtime.Object) error { namespace := obj.(*api_v1.Namespace) _, err := client.Core().Namespaces().Create(namespace) return err }, - func(client kubeclientset.Interface, obj pkg_runtime.Object) error { + func(client kubeclientset.Interface, obj runtime.Object) error { namespace := obj.(*api_v1.Namespace) _, err := client.Core().Namespaces().Update(namespace) return err }, - func(client kubeclientset.Interface, obj pkg_runtime.Object) error { + func(client kubeclientset.Interface, obj runtime.Object) error { namespace := obj.(*api_v1.Namespace) err := client.Core().Namespaces().Delete(namespace.Name, &api_v1.DeleteOptions{}) + // IsNotFound error is fine since that means the object is deleted already. + if errors.IsNotFound(err) { + return nil + } return err }) + + nc.deletionHelper = deletionhelper.NewDeletionHelper( + nc.hasFinalizerFunc, + nc.removeFinalizerFunc, + nc.addFinalizerFunc, + // objNameFunc + func(obj runtime.Object) string { + namespace := obj.(*api_v1.Namespace) + return namespace.Name + }, + nc.updateTimeout, + nc.eventRecorder, + nc.namespaceFederatedInformer, + nc.federatedUpdater, + ) return nc } +// Returns true if the given object has the given finalizer in its ObjectMeta. +func (nc *NamespaceController) hasFinalizerFunc(obj runtime.Object, finalizer string) bool { + namespace := obj.(*api_v1.Namespace) + for i := range namespace.ObjectMeta.Finalizers { + if string(namespace.ObjectMeta.Finalizers[i]) == finalizer { + return true + } + } + return false +} + +// Removes the finalizer from the given objects ObjectMeta. +// Assumes that the given object is a namespace. +func (nc *NamespaceController) removeFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) { + namespace := obj.(*api_v1.Namespace) + newFinalizers := []string{} + hasFinalizer := false + for i := range namespace.ObjectMeta.Finalizers { + if string(namespace.ObjectMeta.Finalizers[i]) != finalizer { + newFinalizers = append(newFinalizers, namespace.ObjectMeta.Finalizers[i]) + } else { + hasFinalizer = true + } + } + if !hasFinalizer { + // Nothing to do. + return obj, nil + } + namespace.ObjectMeta.Finalizers = newFinalizers + namespace, err := nc.federatedApiClient.Core().Namespaces().Update(namespace) + if err != nil { + return nil, fmt.Errorf("failed to remove finalizer %s from namespace %s: %v", finalizer, namespace.Name, err) + } + return namespace, nil +} + +// Adds the given finalizer to the given objects ObjectMeta. +// Assumes that the given object is a namespace. +func (nc *NamespaceController) addFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) { + namespace := obj.(*api_v1.Namespace) + namespace.ObjectMeta.Finalizers = append(namespace.ObjectMeta.Finalizers, finalizer) + namespace, err := nc.federatedApiClient.Core().Namespaces().Finalize(namespace) + if err != nil { + return nil, fmt.Errorf("failed to add finalizer %s to namespace %s: %v", finalizer, namespace.Name, err) + } + return namespace, nil +} + +// Returns true if the given object has the given finalizer in its NamespaceSpec. +func (nc *NamespaceController) hasFinalizerFuncInSpec(obj runtime.Object, finalizer api_v1.FinalizerName) bool { + namespace := obj.(*api_v1.Namespace) + for i := range namespace.Spec.Finalizers { + if namespace.Spec.Finalizers[i] == finalizer { + return true + } + } + return false +} + +// Removes the finalizer from the given objects NamespaceSpec. +func (nc *NamespaceController) removeFinalizerFromSpec(namespace *api_v1.Namespace, finalizer api_v1.FinalizerName) (*api_v1.Namespace, error) { + updatedFinalizers := []api_v1.FinalizerName{} + for i := range namespace.Spec.Finalizers { + if namespace.Spec.Finalizers[i] != finalizer { + updatedFinalizers = append(updatedFinalizers, namespace.Spec.Finalizers[i]) + } + } + namespace.Spec.Finalizers = updatedFinalizers + updatedNamespace, err := nc.federatedApiClient.Core().Namespaces().Finalize(namespace) + if err != nil { + return nil, fmt.Errorf("failed to remove finalizer %s from namespace %s: %v", string(finalizer), namespace.Name, err) + } + return updatedNamespace, nil +} + func (nc *NamespaceController) Run(stopChan <-chan struct{}) { go nc.namespaceInformerController.Run(stopChan) nc.namespaceFederatedInformer.Start() @@ -255,6 +350,23 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) { return } + glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for namespace: %s", + baseNamespace.Name) + // Add the DeleteFromUnderlyingClusters finalizer before creating a namespace in + // underlying clusters. + // This ensures that the dependent namespaces are deleted in underlying + // clusters when the federated namespace is deleted. + updatedNamespaceObj, err := nc.deletionHelper.EnsureDeleteFromUnderlyingClustersFinalizer(baseNamespace) + if err != nil { + glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in namespace %s: %v", + baseNamespace.Name, err) + nc.deliverNamespace(namespace, 0, false) + return + } + baseNamespace = updatedNamespaceObj.(*api_v1.Namespace) + + glog.V(3).Infof("Syncing namespace %s in underlying clusters", baseNamespace.Name) + // Sync the namespace in all underlying clusters. clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) @@ -274,6 +386,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) { ObjectMeta: util.CopyObjectMeta(baseNamespace.ObjectMeta), Spec: baseNamespace.Spec, } + glog.V(5).Infof("Desired namespace in underlying clusters: %+v", desiredNamespace) if !found { nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "CreateInCluster", @@ -290,7 +403,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) { // Update existing namespace, if needed. if !util.ObjectMetaAndSpecEquivalent(desiredNamespace, clusterNamespace) { nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "UpdateInCluster", - "Updating namespace in cluster %s", cluster.Name) + "Updating namespace in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredNamespace, clusterNamespace) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeUpdate, @@ -305,6 +418,8 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) { // Everything is in order return } + glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations)) + err = nc.federatedUpdater.UpdateWithOnError(operations, nc.updateTimeout, func(op util.FederatedOperation, operror error) { nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "UpdateInClusterFailed", "Namespace update in cluster %s failed: %v", op.ClusterName, operror) @@ -329,66 +444,30 @@ func (nc *NamespaceController) delete(namespace *api_v1.Namespace) error { Phase: api_v1.NamespaceTerminating, }, } + var err error if namespace.Status.Phase != api_v1.NamespaceTerminating { nc.eventRecorder.Event(namespace, api.EventTypeNormal, "DeleteNamespace", fmt.Sprintf("Marking for deletion")) - _, err := nc.federatedApiClient.Core().Namespaces().Update(updatedNamespace) + _, err = nc.federatedApiClient.Core().Namespaces().Update(updatedNamespace) if err != nil { return fmt.Errorf("failed to update namespace: %v", err) } } - // Right now there is just 5 types of objects: ReplicaSet, Secret, Ingress, Events and Service. - // Temporarly these items are simply deleted one by one to squeeze this code into 1.4. - // TODO: Make it generic (like in the regular namespace controller) and parallel. - err := nc.federatedApiClient.Core().Services(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to delete service list: %v", err) - } - err = nc.federatedApiClient.Extensions().ReplicaSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to delete replicaset list from namespace: %v", err) - } - err = nc.federatedApiClient.Core().Secrets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to delete secret list from namespace: %v", err) - } - err = nc.federatedApiClient.Extensions().Ingresses(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to delete ingresses list from namespace: %v", err) - } - err = nc.federatedApiClient.Extensions().DaemonSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to delete daemonsets list from namespace: %v", err) - } - err = nc.federatedApiClient.Extensions().Deployments(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to delete deployments list from namespace: %v", err) - } - err = nc.federatedApiClient.Core().Events(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to delete events list from namespace: %v", err) - } - - // Remove kube_api.FinalzerKubernetes - if len(updatedNamespace.Spec.Finalizers) != 0 { - finalizerSet := sets.NewString() - for i := range namespace.Spec.Finalizers { - if namespace.Spec.Finalizers[i] != api_v1.FinalizerKubernetes { - finalizerSet.Insert(string(namespace.Spec.Finalizers[i])) - } - } - updatedNamespace.Spec.Finalizers = make([]api_v1.FinalizerName, 0, len(finalizerSet)) - for _, value := range finalizerSet.List() { - updatedNamespace.Spec.Finalizers = append(updatedNamespace.Spec.Finalizers, api_v1.FinalizerName(value)) - } - _, err := nc.federatedApiClient.Core().Namespaces().Finalize(updatedNamespace) + if nc.hasFinalizerFuncInSpec(updatedNamespace, api_v1.FinalizerKubernetes) { + // Delete resources in this namespace. + updatedNamespace, err = nc.removeKubernetesFinalizer(updatedNamespace) if err != nil { - return fmt.Errorf("failed to finalize namespace: %v", err) + return fmt.Errorf("error in deleting resources in namespace %s: %v", namespace.Name, err) } } - // TODO: What about namespaces in subclusters ??? - err = nc.federatedApiClient.Core().Namespaces().Delete(updatedNamespace.Name, &api_v1.DeleteOptions{}) + // Delete the namespace from all underlying clusters. + _, err = nc.deletionHelper.HandleObjectInUnderlyingClusters(updatedNamespace) + if err != nil { + return err + } + + err = nc.federatedApiClient.Core().Namespaces().Delete(namespace.Name, nil) if err != nil { // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. // This is expected when we are processing an update as a result of namespace finalizer deletion. @@ -399,3 +478,44 @@ func (nc *NamespaceController) delete(namespace *api_v1.Namespace) error { } return nil } + +// Ensures that all resources in this namespace are deleted and then removes the kubernetes finalizer. +func (nc *NamespaceController) removeKubernetesFinalizer(namespace *api_v1.Namespace) (*api_v1.Namespace, error) { + // Right now there are just 7 types of objects: Deployments, DaemonSets, ReplicaSet, Secret, Ingress, Events and Service. + // Temporarly these items are simply deleted one by one to squeeze this code into 1.4. + // TODO: Make it generic (like in the regular namespace controller) and parallel. + err := nc.federatedApiClient.Core().Services(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to delete service list: %v", err) + } + err = nc.federatedApiClient.Extensions().ReplicaSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to delete replicaset list from namespace: %v", err) + } + err = nc.federatedApiClient.Core().Secrets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to delete secret list from namespace: %v", err) + } + err = nc.federatedApiClient.Extensions().Ingresses(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to delete ingresses list from namespace: %v", err) + } + err = nc.federatedApiClient.Extensions().DaemonSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to delete daemonsets list from namespace: %v", err) + } + err = nc.federatedApiClient.Extensions().Deployments(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to delete deployments list from namespace: %v", err) + } + err = nc.federatedApiClient.Core().Events(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to delete events list from namespace: %v", err) + } + + // Remove kube_api.FinalizerKubernetes + if len(namespace.Spec.Finalizers) != 0 { + return nc.removeFinalizerFromSpec(namespace, api_v1.FinalizerKubernetes) + } + return namespace, nil +} diff --git a/federation/pkg/federation-controller/namespace/namespace_controller_test.go b/federation/pkg/federation-controller/namespace/namespace_controller_test.go index 29c1aa92aa..7857bf36f0 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller_test.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller_test.go @@ -23,6 +23,8 @@ import ( federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/unversioned" api_v1 "k8s.io/kubernetes/pkg/api/v1" @@ -44,12 +46,16 @@ func TestNamespaceController(t *testing.T) { Name: "test-namespace", SelfLink: "/api/v1/namespaces/test-namespace", }, + Spec: api_v1.NamespaceSpec{ + Finalizers: []api_v1.FinalizerName{api_v1.FinalizerKubernetes}, + }, } fakeClient := &fake_fedclientset.Clientset{} RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}}) RegisterFakeList("namespaces", &fakeClient.Fake, &api_v1.NamespaceList{Items: []api_v1.Namespace{}}) namespaceWatch := RegisterFakeWatch("namespaces", &fakeClient.Fake) + namespaceCreateChan := RegisterFakeCopyOnCreate("namespaces", &fakeClient.Fake, namespaceWatch) clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake) cluster1Client := &fake_kubeclientset.Clientset{} @@ -87,8 +93,7 @@ func TestNamespaceController(t *testing.T) { secretDeleteChan := RegisterDeleteCollection(&fakeClient.Fake, "secrets") namespaceController := NewNamespaceController(fakeClient) - informer := ToFederatedInformerForTestOnly(namespaceController.namespaceFederatedInformer) - informer.SetClientFactory(func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) { + informerClientFactory := func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) { switch cluster.Name { case cluster1.Name: return cluster1Client, nil @@ -97,7 +102,8 @@ func TestNamespaceController(t *testing.T) { default: return nil, fmt.Errorf("Unknown cluster") } - }) + } + setClientFactory(namespaceController.namespaceFederatedInformer, informerClientFactory) namespaceController.clusterAvailableDelay = time.Second namespaceController.namespaceReviewDelay = 50 * time.Millisecond namespaceController.smallDelay = 20 * time.Millisecond @@ -108,11 +114,19 @@ func TestNamespaceController(t *testing.T) { // Test add federated namespace. namespaceWatch.Add(&ns1) + // Verify that the DeleteFromUnderlyingClusters finalizer is added to the namespace. + // Note: finalize invokes the create action in Fake client. + // TODO: Seems like a bug. Should invoke update. Fix it. + updatedNamespace := GetNamespaceFromChan(namespaceCreateChan) + assert.True(t, namespaceController.hasFinalizerFunc(updatedNamespace, deletionhelper.FinalizerDeleteFromUnderlyingClusters)) + ns1 = *updatedNamespace + + // Verify that the namespace is created in underlying cluster1. createdNamespace := GetNamespaceFromChan(cluster1CreateChan) assert.NotNil(t, createdNamespace) assert.Equal(t, ns1.Name, createdNamespace.Name) - // Wait for the secret to appear in the informer store + // Wait for the namespace to appear in the informer store err := WaitForStoreUpdate( namespaceController.namespaceFederatedInformer.GetTargetStore(), cluster1.Name, ns1.Name, wait.ForeverTestTimeout) @@ -123,7 +137,7 @@ func TestNamespaceController(t *testing.T) { "A": "B", } namespaceWatch.Modify(&ns1) - updatedNamespace := GetNamespaceFromChan(cluster1UpdateChan) + updatedNamespace = GetNamespaceFromChan(cluster1UpdateChan) assert.NotNil(t, updatedNamespace) assert.Equal(t, ns1.Name, updatedNamespace.Name) // assert.Contains(t, updatedNamespace.Annotations, "A") @@ -135,6 +149,10 @@ func TestNamespaceController(t *testing.T) { assert.Equal(t, ns1.Name, createdNamespace2.Name) // assert.Contains(t, createdNamespace2.Annotations, "A") + // Delete the namespace with orphan finalizer (let namespaces + // in underlying clusters be as is). + // TODO: Add a test without orphan finalizer. + ns1.ObjectMeta.Finalizers = append(ns1.ObjectMeta.Finalizers, api_v1.FinalizerOrphan) ns1.DeletionTimestamp = &unversioned.Time{Time: time.Now()} namespaceWatch.Modify(&ns1) assert.Equal(t, ns1.Name, GetStringFromChan(nsDeleteChan)) @@ -145,6 +163,11 @@ func TestNamespaceController(t *testing.T) { close(stop) } +func setClientFactory(informer util.FederatedInformer, informerClientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error)) { + testInformer := ToFederatedInformerForTestOnly(informer) + testInformer.SetClientFactory(informerClientFactory) +} + func RegisterDeleteCollection(client *core.Fake, resource string) chan string { deleteChan := make(chan string, 100) client.AddReactor("delete-collection", resource, func(action core.Action) (bool, runtime.Object, error) { @@ -169,7 +192,7 @@ func GetStringFromChan(c chan string) string { case str := <-c: return str case <-time.After(5 * time.Second): - return "" + return "timedout" } } diff --git a/federation/pkg/federation-controller/util/deletionhelper/BUILD b/federation/pkg/federation-controller/util/deletionhelper/BUILD new file mode 100644 index 0000000000..ecae25747b --- /dev/null +++ b/federation/pkg/federation-controller/util/deletionhelper/BUILD @@ -0,0 +1,25 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["deletion_helper.go"], + tags = ["automanaged"], + deps = [ + "//federation/pkg/federation-controller/util:go_default_library", + "//pkg/api:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/client/record:go_default_library", + "//pkg/runtime:go_default_library", + "//vendor:github.com/golang/glog", + ], +) diff --git a/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go b/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go new file mode 100644 index 0000000000..9728bf1d1f --- /dev/null +++ b/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go @@ -0,0 +1,171 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package to help federation controllers to delete federated resources from +// underlying clusters when the resource is deleted from federation control +// plane. +package deletionhelper + +import ( + "fmt" + "strings" + "time" + + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/runtime" + + "github.com/golang/glog" +) + +const ( + // Add this finalizer to a federation resource if the resource should be + // deleted from all underlying clusters before being deleted from + // federation control plane. + // This is ignored if FinalizerOrphan is also present on the resource. + // In that case, both finalizers are removed from the resource and the + // resource is deleted from federation control plane without affecting + // the underlying clusters. + FinalizerDeleteFromUnderlyingClusters string = "federation.kubernetes.io/delete-from-underlying-clusters" +) + +type HasFinalizerFunc func(runtime.Object, string) bool +type RemoveFinalizerFunc func(runtime.Object, string) (runtime.Object, error) +type AddFinalizerFunc func(runtime.Object, string) (runtime.Object, error) +type ObjNameFunc func(runtime.Object) string + +type DeletionHelper struct { + hasFinalizerFunc HasFinalizerFunc + removeFinalizerFunc RemoveFinalizerFunc + addFinalizerFunc AddFinalizerFunc + objNameFunc ObjNameFunc + updateTimeout time.Duration + eventRecorder record.EventRecorder + informer util.FederatedInformer + updater util.FederatedUpdater +} + +func NewDeletionHelper( + hasFinalizerFunc HasFinalizerFunc, removeFinalizerFunc RemoveFinalizerFunc, + addFinalizerFunc AddFinalizerFunc, objNameFunc ObjNameFunc, + updateTimeout time.Duration, eventRecorder record.EventRecorder, + informer util.FederatedInformer, + updater util.FederatedUpdater) *DeletionHelper { + return &DeletionHelper{ + hasFinalizerFunc: hasFinalizerFunc, + removeFinalizerFunc: removeFinalizerFunc, + addFinalizerFunc: addFinalizerFunc, + objNameFunc: objNameFunc, + updateTimeout: updateTimeout, + eventRecorder: eventRecorder, + informer: informer, + updater: updater, + } +} + +// Ensures that the given object has the required finalizer to ensure that +// objects are deleted in underlying clusters when this object is deleted +// from federation control plane. +// This method should be called before creating objects in underlying clusters. +func (dh *DeletionHelper) EnsureDeleteFromUnderlyingClustersFinalizer(obj runtime.Object) ( + runtime.Object, error) { + if dh.hasFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) { + return obj, nil + } + return dh.addFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) +} + +// Deletes the resources corresponding to the given federated resource from +// all underlying clusters, unless it has the FinalizerOrphan finalizer. +// Removes FinalizerOrphan and FinalizerDeleteFromUnderlyingClusters finalizers +// when done. +// Callers are expected to keep calling this (with appropriate backoff) until +// it succeeds. +func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) ( + runtime.Object, error) { + objName := dh.objNameFunc(obj) + glog.V(2).Infof("Handling deletion of federated dependents for object: %s", objName) + if !dh.hasFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) { + glog.V(2).Infof("obj does not have %s finalizer. Nothing to do", FinalizerDeleteFromUnderlyingClusters) + return obj, nil + } + hasOrphanFinalizer := dh.hasFinalizerFunc(obj, api_v1.FinalizerOrphan) + if hasOrphanFinalizer { + glog.V(3).Infof("Found finalizer orphan. Nothing to do, just remove the finalizer") + // If the obj has FinalizerOrphan finalizer, then we need to orphan the + // corresponding objects in underlying clusters. + // Just remove both the finalizers in that case. + obj, err := dh.removeFinalizerFunc(obj, api_v1.FinalizerOrphan) + if err != nil { + return obj, err + } + return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) + } + + // Else, we need to delete the obj from all underlying clusters. + unreadyClusters, err := dh.informer.GetUnreadyClusters() + if err != nil { + return nil, fmt.Errorf("failed to get a list of unready clusters: %v", err) + } + // TODO: Handle the case when cluster resource is watched after this is executed. + // This can happen if a namespace is deleted before its creation had been + // observed in all underlying clusters. + clusterNsObjs, err := dh.informer.GetTargetStore().GetFromAllClusters(objName) + if err != nil { + return nil, fmt.Errorf("failed to get object %s from underlying clusters: %v", objName, err) + } + operations := make([]util.FederatedOperation, 0) + for _, clusterNsObj := range clusterNsObjs { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeDelete, + ClusterName: clusterNsObj.ClusterName, + Obj: clusterNsObj.Object.(runtime.Object), + }) + } + err = dh.updater.UpdateWithOnError(operations, dh.updateTimeout, func(op util.FederatedOperation, operror error) { + objName := dh.objNameFunc(op.Obj) + dh.eventRecorder.Eventf(obj, api.EventTypeNormal, "DeleteInClusterFailed", + "Failed to delete obj %s in cluster %s: %v", objName, op.ClusterName, operror) + }) + if err != nil { + return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err) + } + if len(operations) > 0 { + // We have deleted a bunch of resources. + // Wait for the store to observe all the deletions. + var clusterNames []string + for _, op := range operations { + clusterNames = append(clusterNames, op.ClusterName) + } + return nil, fmt.Errorf("waiting for object %s to be deleted from clusters: %s", objName, strings.Join(clusterNames, ", ")) + } + + // We have now deleted the object from all *ready* clusters. + // But still need to wait for clusters that are not ready to ensure that + // the object has been deleted from *all* clusters. + if len(unreadyClusters) != 0 { + var clusterNames []string + for _, cluster := range unreadyClusters { + clusterNames = append(clusterNames, cluster.Name) + } + return nil, fmt.Errorf("waiting for clusters %s to become ready to verify that obj %s has been deleted", strings.Join(clusterNames, ", "), objName) + } + + // All done. Just remove the finalizer. + return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) +} diff --git a/federation/pkg/federation-controller/util/federated_informer.go b/federation/pkg/federation-controller/util/federated_informer.go index 55c091861a..0082337ea7 100644 --- a/federation/pkg/federation-controller/util/federated_informer.go +++ b/federation/pkg/federation-controller/util/federated_informer.go @@ -74,6 +74,9 @@ type FederationView interface { // GetClientsetForCluster returns a clientset for the cluster, if present. GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error) + // GetUnreadyClusters returns a list of all clusters that are not ready yet. + GetUnreadyClusters() ([]*federation_api.Cluster, error) + // GetReadyClusers returns all clusters for which the sub-informers are run. GetReadyClusters() ([]*federation_api.Cluster, error) @@ -260,6 +263,9 @@ type federatedInformerImpl struct { clientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error) } +// *federatedInformerImpl implements FederatedInformer interface. +var _ FederatedInformer = &federatedInformerImpl{} + type federatedStoreImpl struct { federatedInformer *federatedInformerImpl } @@ -313,6 +319,24 @@ func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName strin return nil, fmt.Errorf("cluster %q not found", clusterName) } +func (f *federatedInformerImpl) GetUnreadyClusters() ([]*federation_api.Cluster, error) { + f.Lock() + defer f.Unlock() + + items := f.clusterInformer.store.List() + result := make([]*federation_api.Cluster, 0, len(items)) + for _, item := range items { + if cluster, ok := item.(*federation_api.Cluster); ok { + if !isClusterReady(cluster) { + result = append(result, cluster) + } + } else { + return nil, fmt.Errorf("wrong data in FederatedInformerImpl cluster store: %v", item) + } + } + return result, nil +} + // GetReadyClusers returns all clusters for which the sub-informers are run. func (f *federatedInformerImpl) GetReadyClusters() ([]*federation_api.Cluster, error) { f.Lock() diff --git a/federation/pkg/federation-controller/util/federated_updater_test.go b/federation/pkg/federation-controller/util/federated_updater_test.go index aeb7f20825..1e4d0616d7 100644 --- a/federation/pkg/federation-controller/util/federated_updater_test.go +++ b/federation/pkg/federation-controller/util/federated_updater_test.go @@ -34,7 +34,10 @@ import ( type fakeFederationView struct { } -func (f fakeFederationView) GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error) { +// Verify that fakeFederationView implements FederationView interface +var _ FederationView = &fakeFederationView{} + +func (f *fakeFederationView) GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error) { return &fake_kubeclientset.Clientset{}, nil } @@ -42,6 +45,10 @@ func (f *fakeFederationView) GetReadyClusters() ([]*federation_api.Cluster, erro return []*federation_api.Cluster{}, nil } +func (f *fakeFederationView) GetUnreadyClusters() ([]*federation_api.Cluster, error) { + return []*federation_api.Cluster{}, nil +} + func (f *fakeFederationView) GetReadyCluster(name string) (*federation_api.Cluster, bool, error) { return nil, false, nil } diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index 011cd97d26..dcc36114de 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -163,30 +163,35 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch objChan := make(chan runtime.Object, 100) client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { createAction := action.(core.CreateAction) - obj := createAction.GetObject() + originalObj := createAction.GetObject() + // Create a copy of the object here to prevent data races while reading the object in go routine. + obj := copy(originalObj) go func() { + glog.V(4).Infof("Object created. Writing to channel: %v", obj) watcher.Add(obj) - objChan <- copy(obj) + objChan <- obj }() - return true, obj, nil + return true, originalObj, nil }) return objChan } -// RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes +// RegisterFakeCopyOnUpdate registers a reactor in the given fake client that passes // all updated objects to the given watcher and also copies them to a channel for // in-test inspection. func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object { objChan := make(chan runtime.Object, 100) client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { updateAction := action.(core.UpdateAction) - obj := updateAction.GetObject() + originalObj := updateAction.GetObject() + // Create a copy of the object here to prevent data races while reading the object in go routine. + obj := copy(originalObj) go func() { glog.V(4).Infof("Object updated. Writing to channel: %v", obj) watcher.Modify(obj) - objChan <- copy(obj) + objChan <- obj }() - return true, obj, nil + return true, originalObj, nil }) return objChan } diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 81483742b8..7958bafdf5 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -3079,6 +3079,7 @@ type FinalizerName string // These are internal finalizer values to Kubernetes, must be qualified name unless defined here const ( FinalizerKubernetes FinalizerName = "kubernetes" + FinalizerOrphan string = "orphan" ) // NamespaceSpec describes the attributes on a Namespace. diff --git a/test/e2e/federated-namespace.go b/test/e2e/federated-namespace.go index 1117a0a77e..f92d1ae189 100644 --- a/test/e2e/federated-namespace.go +++ b/test/e2e/federated-namespace.go @@ -42,6 +42,7 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func Describe("Namespace objects", func() { var federationName string var clusters map[string]*cluster // All clusters, keyed by cluster name + var nsName string BeforeEach(func() { framework.SkipUnlessFederated(f.ClientSet) @@ -76,11 +77,13 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func Name: api.SimpleNameGenerator.GenerateName(namespacePrefix), }, } + nsName = ns.Name By(fmt.Sprintf("Creating namespace %s", ns.Name)) _, err := f.FederationClientset_1_5.Core().Namespaces().Create(&ns) framework.ExpectNoError(err, "Failed to create namespace %s", ns.Name) // Check subclusters if the namespace was created there. + By(fmt.Sprintf("Waiting for namespace %s to be created in all underlying clusters", ns.Name)) err = wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) { for _, cluster := range clusters { _, err := cluster.Core().Namespaces().Get(ns.Name) @@ -95,9 +98,19 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func }) framework.ExpectNoError(err, "Not all namespaces created") + By(fmt.Sprintf("Deleting namespace %s", ns.Name)) deleteAllTestNamespaces( f.FederationClientset_1_5.Core().Namespaces().List, f.FederationClientset_1_5.Core().Namespaces().Delete) + By(fmt.Sprintf("Verifying that namespace %s was deleted from all underlying clusters", ns.Name)) + // Verify that the namespace was deleted from all underlying clusters as well. + for clusterName, clusterClientset := range clusters { + _, err := clusterClientset.Core().Namespaces().Get(ns.Name) + if err == nil || !errors.IsNotFound(err) { + framework.Failf("expected NotFound error for namespace %s in cluster %s, got error: %v", ns.Name, clusterName, err) + } + } + By(fmt.Sprintf("Verified that deletion succeeded")) }) }) }) @@ -110,7 +123,9 @@ func deleteAllTestNamespaces(lister func(api_v1.ListOptions) (*api_v1.NamespaceL } for _, namespace := range list.Items { if strings.HasPrefix(namespace.Name, namespacePrefix) { - err := deleter(namespace.Name, &api_v1.DeleteOptions{}) + // Do not orphan dependents (corresponding namespaces in underlying clusters). + orphanDependents := false + err := deleter(namespace.Name, &api_v1.DeleteOptions{OrphanDependents: &orphanDependents}) if err != nil { framework.Failf("Failed to set %s for deletion: %v", namespace.Name, err) } diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index f9384af1fb..d557d2d961 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -276,7 +276,9 @@ func (f *Framework) deleteFederationNs() { clientset := f.FederationClientset_1_5 // First delete the namespace from federation apiserver. - if err := clientset.Core().Namespaces().Delete(ns.Name, &v1.DeleteOptions{}); err != nil { + // Also delete the corresponding namespaces from underlying clusters. + orphanDependents := false + if err := clientset.Core().Namespaces().Delete(ns.Name, &v1.DeleteOptions{OrphanDependents: &orphanDependents}); err != nil { Failf("Error while deleting federation namespace %s: %s", ns.Name, err) } // Verify that it got deleted. @@ -297,8 +299,6 @@ func (f *Framework) deleteFederationNs() { Logf("Namespace %v was already deleted", ns.Name) } } - - // TODO: Delete the namespace from underlying clusters. } // AfterEach deletes the namespace, after reading its events. diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index e12a5a606f..28d5e7a47f 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2218,6 +2218,7 @@ func DumpEventsInNamespace(eventsLister EventsLister, namespace string) { events, err := eventsLister(v1.ListOptions{}, namespace) Expect(err).NotTo(HaveOccurred()) + By(fmt.Sprintf("Found %d events.", len(events.Items))) // Sort events by their first timestamp sortedEvents := events.Items if len(sortedEvents) > 1 {