Adding cascading deletion support to federated namespaces

pull/6/head
nikhiljindal 2016-10-11 12:48:38 -07:00
parent 0fdca3be19
commit f955d556f8
12 changed files with 473 additions and 79 deletions

View File

@ -18,6 +18,7 @@ go_library(
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
@ -28,7 +29,6 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/flowcontrol:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],
@ -42,6 +42,8 @@ go_test(
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5/fake:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/api/v1:go_default_library",

View File

@ -23,6 +23,7 @@ import (
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
@ -31,9 +32,8 @@ import (
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
@ -71,6 +71,8 @@ type NamespaceController struct {
// For events
eventRecorder record.EventRecorder
deletionHelper *deletionhelper.DeletionHelper
namespaceReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
@ -100,7 +102,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont
// Start informer in federated API servers on namespaces that should be federated.
nc.namespaceInformerStore, nc.namespaceInformerController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
versionedOptions := util.VersionizeV1ListOptions(options)
return client.Core().Namespaces().List(versionedOptions)
},
@ -111,7 +113,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont
},
&api_v1.Namespace{},
controller.NoResyncPeriodFunc(),
util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) }))
util.NewTriggerOnAllChanges(func(obj runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) }))
// Federated informer on namespaces in members of federation.
nc.namespaceFederatedInformer = util.NewFederatedInformer(
@ -119,7 +121,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont
func(cluster *federation_api.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
versionedOptions := util.VersionizeV1ListOptions(options)
return targetClient.Core().Namespaces().List(versionedOptions)
},
@ -133,10 +135,9 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some namespace opration succeeded.
util.NewTriggerOnMetaAndSpecChanges(
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) },
func(obj runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) },
))
},
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
// When new cluster becomes available process all the namespaces again.
@ -147,24 +148,118 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont
// Federated updeater along with Create/Update/Delete operations.
nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer,
func(client kubeclientset.Interface, obj pkg_runtime.Object) error {
func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*api_v1.Namespace)
_, err := client.Core().Namespaces().Create(namespace)
return err
},
func(client kubeclientset.Interface, obj pkg_runtime.Object) error {
func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*api_v1.Namespace)
_, err := client.Core().Namespaces().Update(namespace)
return err
},
func(client kubeclientset.Interface, obj pkg_runtime.Object) error {
func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*api_v1.Namespace)
err := client.Core().Namespaces().Delete(namespace.Name, &api_v1.DeleteOptions{})
// IsNotFound error is fine since that means the object is deleted already.
if errors.IsNotFound(err) {
return nil
}
return err
})
nc.deletionHelper = deletionhelper.NewDeletionHelper(
nc.hasFinalizerFunc,
nc.removeFinalizerFunc,
nc.addFinalizerFunc,
// objNameFunc
func(obj runtime.Object) string {
namespace := obj.(*api_v1.Namespace)
return namespace.Name
},
nc.updateTimeout,
nc.eventRecorder,
nc.namespaceFederatedInformer,
nc.federatedUpdater,
)
return nc
}
// Returns true if the given object has the given finalizer in its ObjectMeta.
func (nc *NamespaceController) hasFinalizerFunc(obj runtime.Object, finalizer string) bool {
namespace := obj.(*api_v1.Namespace)
for i := range namespace.ObjectMeta.Finalizers {
if string(namespace.ObjectMeta.Finalizers[i]) == finalizer {
return true
}
}
return false
}
// Removes the finalizer from the given objects ObjectMeta.
// Assumes that the given object is a namespace.
func (nc *NamespaceController) removeFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
namespace := obj.(*api_v1.Namespace)
newFinalizers := []string{}
hasFinalizer := false
for i := range namespace.ObjectMeta.Finalizers {
if string(namespace.ObjectMeta.Finalizers[i]) != finalizer {
newFinalizers = append(newFinalizers, namespace.ObjectMeta.Finalizers[i])
} else {
hasFinalizer = true
}
}
if !hasFinalizer {
// Nothing to do.
return obj, nil
}
namespace.ObjectMeta.Finalizers = newFinalizers
namespace, err := nc.federatedApiClient.Core().Namespaces().Update(namespace)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from namespace %s: %v", finalizer, namespace.Name, err)
}
return namespace, nil
}
// Adds the given finalizer to the given objects ObjectMeta.
// Assumes that the given object is a namespace.
func (nc *NamespaceController) addFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
namespace := obj.(*api_v1.Namespace)
namespace.ObjectMeta.Finalizers = append(namespace.ObjectMeta.Finalizers, finalizer)
namespace, err := nc.federatedApiClient.Core().Namespaces().Finalize(namespace)
if err != nil {
return nil, fmt.Errorf("failed to add finalizer %s to namespace %s: %v", finalizer, namespace.Name, err)
}
return namespace, nil
}
// Returns true if the given object has the given finalizer in its NamespaceSpec.
func (nc *NamespaceController) hasFinalizerFuncInSpec(obj runtime.Object, finalizer api_v1.FinalizerName) bool {
namespace := obj.(*api_v1.Namespace)
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] == finalizer {
return true
}
}
return false
}
// Removes the finalizer from the given objects NamespaceSpec.
func (nc *NamespaceController) removeFinalizerFromSpec(namespace *api_v1.Namespace, finalizer api_v1.FinalizerName) (*api_v1.Namespace, error) {
updatedFinalizers := []api_v1.FinalizerName{}
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != finalizer {
updatedFinalizers = append(updatedFinalizers, namespace.Spec.Finalizers[i])
}
}
namespace.Spec.Finalizers = updatedFinalizers
updatedNamespace, err := nc.federatedApiClient.Core().Namespaces().Finalize(namespace)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from namespace %s: %v", string(finalizer), namespace.Name, err)
}
return updatedNamespace, nil
}
func (nc *NamespaceController) Run(stopChan <-chan struct{}) {
go nc.namespaceInformerController.Run(stopChan)
nc.namespaceFederatedInformer.Start()
@ -255,6 +350,23 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
return
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for namespace: %s",
baseNamespace.Name)
// Add the DeleteFromUnderlyingClusters finalizer before creating a namespace in
// underlying clusters.
// This ensures that the dependent namespaces are deleted in underlying
// clusters when the federated namespace is deleted.
updatedNamespaceObj, err := nc.deletionHelper.EnsureDeleteFromUnderlyingClustersFinalizer(baseNamespace)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in namespace %s: %v",
baseNamespace.Name, err)
nc.deliverNamespace(namespace, 0, false)
return
}
baseNamespace = updatedNamespaceObj.(*api_v1.Namespace)
glog.V(3).Infof("Syncing namespace %s in underlying clusters", baseNamespace.Name)
// Sync the namespace in all underlying clusters.
clusters, err := nc.namespaceFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
@ -274,6 +386,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
ObjectMeta: util.CopyObjectMeta(baseNamespace.ObjectMeta),
Spec: baseNamespace.Spec,
}
glog.V(5).Infof("Desired namespace in underlying clusters: %+v", desiredNamespace)
if !found {
nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "CreateInCluster",
@ -290,7 +403,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
// Update existing namespace, if needed.
if !util.ObjectMetaAndSpecEquivalent(desiredNamespace, clusterNamespace) {
nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "UpdateInCluster",
"Updating namespace in cluster %s", cluster.Name)
"Updating namespace in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredNamespace, clusterNamespace)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
@ -305,6 +418,8 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
// Everything is in order
return
}
glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations))
err = nc.federatedUpdater.UpdateWithOnError(operations, nc.updateTimeout, func(op util.FederatedOperation, operror error) {
nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "UpdateInClusterFailed",
"Namespace update in cluster %s failed: %v", op.ClusterName, operror)
@ -329,66 +444,30 @@ func (nc *NamespaceController) delete(namespace *api_v1.Namespace) error {
Phase: api_v1.NamespaceTerminating,
},
}
var err error
if namespace.Status.Phase != api_v1.NamespaceTerminating {
nc.eventRecorder.Event(namespace, api.EventTypeNormal, "DeleteNamespace", fmt.Sprintf("Marking for deletion"))
_, err := nc.federatedApiClient.Core().Namespaces().Update(updatedNamespace)
_, err = nc.federatedApiClient.Core().Namespaces().Update(updatedNamespace)
if err != nil {
return fmt.Errorf("failed to update namespace: %v", err)
}
}
// Right now there is just 5 types of objects: ReplicaSet, Secret, Ingress, Events and Service.
// Temporarly these items are simply deleted one by one to squeeze this code into 1.4.
// TODO: Make it generic (like in the regular namespace controller) and parallel.
err := nc.federatedApiClient.Core().Services(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if nc.hasFinalizerFuncInSpec(updatedNamespace, api_v1.FinalizerKubernetes) {
// Delete resources in this namespace.
updatedNamespace, err = nc.removeKubernetesFinalizer(updatedNamespace)
if err != nil {
return fmt.Errorf("failed to delete service list: %v", err)
}
err = nc.federatedApiClient.Extensions().ReplicaSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to delete replicaset list from namespace: %v", err)
}
err = nc.federatedApiClient.Core().Secrets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to delete secret list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().Ingresses(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to delete ingresses list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().DaemonSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to delete daemonsets list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().Deployments(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to delete deployments list from namespace: %v", err)
}
err = nc.federatedApiClient.Core().Events(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to delete events list from namespace: %v", err)
}
// Remove kube_api.FinalzerKubernetes
if len(updatedNamespace.Spec.Finalizers) != 0 {
finalizerSet := sets.NewString()
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api_v1.FinalizerKubernetes {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
}
}
updatedNamespace.Spec.Finalizers = make([]api_v1.FinalizerName, 0, len(finalizerSet))
for _, value := range finalizerSet.List() {
updatedNamespace.Spec.Finalizers = append(updatedNamespace.Spec.Finalizers, api_v1.FinalizerName(value))
}
_, err := nc.federatedApiClient.Core().Namespaces().Finalize(updatedNamespace)
if err != nil {
return fmt.Errorf("failed to finalize namespace: %v", err)
return fmt.Errorf("error in deleting resources in namespace %s: %v", namespace.Name, err)
}
}
// TODO: What about namespaces in subclusters ???
err = nc.federatedApiClient.Core().Namespaces().Delete(updatedNamespace.Name, &api_v1.DeleteOptions{})
// Delete the namespace from all underlying clusters.
_, err = nc.deletionHelper.HandleObjectInUnderlyingClusters(updatedNamespace)
if err != nil {
return err
}
err = nc.federatedApiClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of namespace finalizer deletion.
@ -399,3 +478,44 @@ func (nc *NamespaceController) delete(namespace *api_v1.Namespace) error {
}
return nil
}
// Ensures that all resources in this namespace are deleted and then removes the kubernetes finalizer.
func (nc *NamespaceController) removeKubernetesFinalizer(namespace *api_v1.Namespace) (*api_v1.Namespace, error) {
// Right now there are just 7 types of objects: Deployments, DaemonSets, ReplicaSet, Secret, Ingress, Events and Service.
// Temporarly these items are simply deleted one by one to squeeze this code into 1.4.
// TODO: Make it generic (like in the regular namespace controller) and parallel.
err := nc.federatedApiClient.Core().Services(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete service list: %v", err)
}
err = nc.federatedApiClient.Extensions().ReplicaSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete replicaset list from namespace: %v", err)
}
err = nc.federatedApiClient.Core().Secrets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete secret list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().Ingresses(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete ingresses list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().DaemonSets(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete daemonsets list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().Deployments(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete deployments list from namespace: %v", err)
}
err = nc.federatedApiClient.Core().Events(namespace.Name).DeleteCollection(&api_v1.DeleteOptions{}, api_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete events list from namespace: %v", err)
}
// Remove kube_api.FinalizerKubernetes
if len(namespace.Spec.Finalizers) != 0 {
return nc.removeFinalizerFromSpec(namespace, api_v1.FinalizerKubernetes)
}
return namespace, nil
}

View File

@ -23,6 +23,8 @@ import (
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/pkg/api/unversioned"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
@ -44,12 +46,16 @@ func TestNamespaceController(t *testing.T) {
Name: "test-namespace",
SelfLink: "/api/v1/namespaces/test-namespace",
},
Spec: api_v1.NamespaceSpec{
Finalizers: []api_v1.FinalizerName{api_v1.FinalizerKubernetes},
},
}
fakeClient := &fake_fedclientset.Clientset{}
RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
RegisterFakeList("namespaces", &fakeClient.Fake, &api_v1.NamespaceList{Items: []api_v1.Namespace{}})
namespaceWatch := RegisterFakeWatch("namespaces", &fakeClient.Fake)
namespaceCreateChan := RegisterFakeCopyOnCreate("namespaces", &fakeClient.Fake, namespaceWatch)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)
cluster1Client := &fake_kubeclientset.Clientset{}
@ -87,8 +93,7 @@ func TestNamespaceController(t *testing.T) {
secretDeleteChan := RegisterDeleteCollection(&fakeClient.Fake, "secrets")
namespaceController := NewNamespaceController(fakeClient)
informer := ToFederatedInformerForTestOnly(namespaceController.namespaceFederatedInformer)
informer.SetClientFactory(func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
informerClientFactory := func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
@ -97,7 +102,8 @@ func TestNamespaceController(t *testing.T) {
default:
return nil, fmt.Errorf("Unknown cluster")
}
})
}
setClientFactory(namespaceController.namespaceFederatedInformer, informerClientFactory)
namespaceController.clusterAvailableDelay = time.Second
namespaceController.namespaceReviewDelay = 50 * time.Millisecond
namespaceController.smallDelay = 20 * time.Millisecond
@ -108,11 +114,19 @@ func TestNamespaceController(t *testing.T) {
// Test add federated namespace.
namespaceWatch.Add(&ns1)
// Verify that the DeleteFromUnderlyingClusters finalizer is added to the namespace.
// Note: finalize invokes the create action in Fake client.
// TODO: Seems like a bug. Should invoke update. Fix it.
updatedNamespace := GetNamespaceFromChan(namespaceCreateChan)
assert.True(t, namespaceController.hasFinalizerFunc(updatedNamespace, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
ns1 = *updatedNamespace
// Verify that the namespace is created in underlying cluster1.
createdNamespace := GetNamespaceFromChan(cluster1CreateChan)
assert.NotNil(t, createdNamespace)
assert.Equal(t, ns1.Name, createdNamespace.Name)
// Wait for the secret to appear in the informer store
// Wait for the namespace to appear in the informer store
err := WaitForStoreUpdate(
namespaceController.namespaceFederatedInformer.GetTargetStore(),
cluster1.Name, ns1.Name, wait.ForeverTestTimeout)
@ -123,7 +137,7 @@ func TestNamespaceController(t *testing.T) {
"A": "B",
}
namespaceWatch.Modify(&ns1)
updatedNamespace := GetNamespaceFromChan(cluster1UpdateChan)
updatedNamespace = GetNamespaceFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedNamespace)
assert.Equal(t, ns1.Name, updatedNamespace.Name)
// assert.Contains(t, updatedNamespace.Annotations, "A")
@ -135,6 +149,10 @@ func TestNamespaceController(t *testing.T) {
assert.Equal(t, ns1.Name, createdNamespace2.Name)
// assert.Contains(t, createdNamespace2.Annotations, "A")
// Delete the namespace with orphan finalizer (let namespaces
// in underlying clusters be as is).
// TODO: Add a test without orphan finalizer.
ns1.ObjectMeta.Finalizers = append(ns1.ObjectMeta.Finalizers, api_v1.FinalizerOrphan)
ns1.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
namespaceWatch.Modify(&ns1)
assert.Equal(t, ns1.Name, GetStringFromChan(nsDeleteChan))
@ -145,6 +163,11 @@ func TestNamespaceController(t *testing.T) {
close(stop)
}
func setClientFactory(informer util.FederatedInformer, informerClientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error)) {
testInformer := ToFederatedInformerForTestOnly(informer)
testInformer.SetClientFactory(informerClientFactory)
}
func RegisterDeleteCollection(client *core.Fake, resource string) chan string {
deleteChan := make(chan string, 100)
client.AddReactor("delete-collection", resource, func(action core.Action) (bool, runtime.Object, error) {
@ -169,7 +192,7 @@ func GetStringFromChan(c chan string) string {
case str := <-c:
return str
case <-time.After(5 * time.Second):
return ""
return "timedout"
}
}

View File

@ -0,0 +1,25 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["deletion_helper.go"],
tags = ["automanaged"],
deps = [
"//federation/pkg/federation-controller/util:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/runtime:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View File

@ -0,0 +1,171 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package to help federation controllers to delete federated resources from
// underlying clusters when the resource is deleted from federation control
// plane.
package deletionhelper
import (
"fmt"
"strings"
"time"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/runtime"
"github.com/golang/glog"
)
const (
// Add this finalizer to a federation resource if the resource should be
// deleted from all underlying clusters before being deleted from
// federation control plane.
// This is ignored if FinalizerOrphan is also present on the resource.
// In that case, both finalizers are removed from the resource and the
// resource is deleted from federation control plane without affecting
// the underlying clusters.
FinalizerDeleteFromUnderlyingClusters string = "federation.kubernetes.io/delete-from-underlying-clusters"
)
type HasFinalizerFunc func(runtime.Object, string) bool
type RemoveFinalizerFunc func(runtime.Object, string) (runtime.Object, error)
type AddFinalizerFunc func(runtime.Object, string) (runtime.Object, error)
type ObjNameFunc func(runtime.Object) string
type DeletionHelper struct {
hasFinalizerFunc HasFinalizerFunc
removeFinalizerFunc RemoveFinalizerFunc
addFinalizerFunc AddFinalizerFunc
objNameFunc ObjNameFunc
updateTimeout time.Duration
eventRecorder record.EventRecorder
informer util.FederatedInformer
updater util.FederatedUpdater
}
func NewDeletionHelper(
hasFinalizerFunc HasFinalizerFunc, removeFinalizerFunc RemoveFinalizerFunc,
addFinalizerFunc AddFinalizerFunc, objNameFunc ObjNameFunc,
updateTimeout time.Duration, eventRecorder record.EventRecorder,
informer util.FederatedInformer,
updater util.FederatedUpdater) *DeletionHelper {
return &DeletionHelper{
hasFinalizerFunc: hasFinalizerFunc,
removeFinalizerFunc: removeFinalizerFunc,
addFinalizerFunc: addFinalizerFunc,
objNameFunc: objNameFunc,
updateTimeout: updateTimeout,
eventRecorder: eventRecorder,
informer: informer,
updater: updater,
}
}
// Ensures that the given object has the required finalizer to ensure that
// objects are deleted in underlying clusters when this object is deleted
// from federation control plane.
// This method should be called before creating objects in underlying clusters.
func (dh *DeletionHelper) EnsureDeleteFromUnderlyingClustersFinalizer(obj runtime.Object) (
runtime.Object, error) {
if dh.hasFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) {
return obj, nil
}
return dh.addFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
}
// Deletes the resources corresponding to the given federated resource from
// all underlying clusters, unless it has the FinalizerOrphan finalizer.
// Removes FinalizerOrphan and FinalizerDeleteFromUnderlyingClusters finalizers
// when done.
// Callers are expected to keep calling this (with appropriate backoff) until
// it succeeds.
func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
runtime.Object, error) {
objName := dh.objNameFunc(obj)
glog.V(2).Infof("Handling deletion of federated dependents for object: %s", objName)
if !dh.hasFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) {
glog.V(2).Infof("obj does not have %s finalizer. Nothing to do", FinalizerDeleteFromUnderlyingClusters)
return obj, nil
}
hasOrphanFinalizer := dh.hasFinalizerFunc(obj, api_v1.FinalizerOrphan)
if hasOrphanFinalizer {
glog.V(3).Infof("Found finalizer orphan. Nothing to do, just remove the finalizer")
// If the obj has FinalizerOrphan finalizer, then we need to orphan the
// corresponding objects in underlying clusters.
// Just remove both the finalizers in that case.
obj, err := dh.removeFinalizerFunc(obj, api_v1.FinalizerOrphan)
if err != nil {
return obj, err
}
return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
}
// Else, we need to delete the obj from all underlying clusters.
unreadyClusters, err := dh.informer.GetUnreadyClusters()
if err != nil {
return nil, fmt.Errorf("failed to get a list of unready clusters: %v", err)
}
// TODO: Handle the case when cluster resource is watched after this is executed.
// This can happen if a namespace is deleted before its creation had been
// observed in all underlying clusters.
clusterNsObjs, err := dh.informer.GetTargetStore().GetFromAllClusters(objName)
if err != nil {
return nil, fmt.Errorf("failed to get object %s from underlying clusters: %v", objName, err)
}
operations := make([]util.FederatedOperation, 0)
for _, clusterNsObj := range clusterNsObjs {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeDelete,
ClusterName: clusterNsObj.ClusterName,
Obj: clusterNsObj.Object.(runtime.Object),
})
}
err = dh.updater.UpdateWithOnError(operations, dh.updateTimeout, func(op util.FederatedOperation, operror error) {
objName := dh.objNameFunc(op.Obj)
dh.eventRecorder.Eventf(obj, api.EventTypeNormal, "DeleteInClusterFailed",
"Failed to delete obj %s in cluster %s: %v", objName, op.ClusterName, operror)
})
if err != nil {
return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err)
}
if len(operations) > 0 {
// We have deleted a bunch of resources.
// Wait for the store to observe all the deletions.
var clusterNames []string
for _, op := range operations {
clusterNames = append(clusterNames, op.ClusterName)
}
return nil, fmt.Errorf("waiting for object %s to be deleted from clusters: %s", objName, strings.Join(clusterNames, ", "))
}
// We have now deleted the object from all *ready* clusters.
// But still need to wait for clusters that are not ready to ensure that
// the object has been deleted from *all* clusters.
if len(unreadyClusters) != 0 {
var clusterNames []string
for _, cluster := range unreadyClusters {
clusterNames = append(clusterNames, cluster.Name)
}
return nil, fmt.Errorf("waiting for clusters %s to become ready to verify that obj %s has been deleted", strings.Join(clusterNames, ", "), objName)
}
// All done. Just remove the finalizer.
return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
}

View File

@ -74,6 +74,9 @@ type FederationView interface {
// GetClientsetForCluster returns a clientset for the cluster, if present.
GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error)
// GetUnreadyClusters returns a list of all clusters that are not ready yet.
GetUnreadyClusters() ([]*federation_api.Cluster, error)
// GetReadyClusers returns all clusters for which the sub-informers are run.
GetReadyClusters() ([]*federation_api.Cluster, error)
@ -260,6 +263,9 @@ type federatedInformerImpl struct {
clientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error)
}
// *federatedInformerImpl implements FederatedInformer interface.
var _ FederatedInformer = &federatedInformerImpl{}
type federatedStoreImpl struct {
federatedInformer *federatedInformerImpl
}
@ -313,6 +319,24 @@ func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName strin
return nil, fmt.Errorf("cluster %q not found", clusterName)
}
func (f *federatedInformerImpl) GetUnreadyClusters() ([]*federation_api.Cluster, error) {
f.Lock()
defer f.Unlock()
items := f.clusterInformer.store.List()
result := make([]*federation_api.Cluster, 0, len(items))
for _, item := range items {
if cluster, ok := item.(*federation_api.Cluster); ok {
if !isClusterReady(cluster) {
result = append(result, cluster)
}
} else {
return nil, fmt.Errorf("wrong data in FederatedInformerImpl cluster store: %v", item)
}
}
return result, nil
}
// GetReadyClusers returns all clusters for which the sub-informers are run.
func (f *federatedInformerImpl) GetReadyClusters() ([]*federation_api.Cluster, error) {
f.Lock()

View File

@ -34,7 +34,10 @@ import (
type fakeFederationView struct {
}
func (f fakeFederationView) GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error) {
// Verify that fakeFederationView implements FederationView interface
var _ FederationView = &fakeFederationView{}
func (f *fakeFederationView) GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error) {
return &fake_kubeclientset.Clientset{}, nil
}
@ -42,6 +45,10 @@ func (f *fakeFederationView) GetReadyClusters() ([]*federation_api.Cluster, erro
return []*federation_api.Cluster{}, nil
}
func (f *fakeFederationView) GetUnreadyClusters() ([]*federation_api.Cluster, error) {
return []*federation_api.Cluster{}, nil
}
func (f *fakeFederationView) GetReadyCluster(name string) (*federation_api.Cluster, bool, error) {
return nil, false, nil
}

View File

@ -163,30 +163,35 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch
objChan := make(chan runtime.Object, 100)
client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
obj := createAction.GetObject()
originalObj := createAction.GetObject()
// Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj)
go func() {
glog.V(4).Infof("Object created. Writing to channel: %v", obj)
watcher.Add(obj)
objChan <- copy(obj)
objChan <- obj
}()
return true, obj, nil
return true, originalObj, nil
})
return objChan
}
// RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes
// RegisterFakeCopyOnUpdate registers a reactor in the given fake client that passes
// all updated objects to the given watcher and also copies them to a channel for
// in-test inspection.
func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object {
objChan := make(chan runtime.Object, 100)
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
updateAction := action.(core.UpdateAction)
obj := updateAction.GetObject()
originalObj := updateAction.GetObject()
// Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj)
go func() {
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
watcher.Modify(obj)
objChan <- copy(obj)
objChan <- obj
}()
return true, obj, nil
return true, originalObj, nil
})
return objChan
}

View File

@ -3079,6 +3079,7 @@ type FinalizerName string
// These are internal finalizer values to Kubernetes, must be qualified name unless defined here
const (
FinalizerKubernetes FinalizerName = "kubernetes"
FinalizerOrphan string = "orphan"
)
// NamespaceSpec describes the attributes on a Namespace.

View File

@ -42,6 +42,7 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func
Describe("Namespace objects", func() {
var federationName string
var clusters map[string]*cluster // All clusters, keyed by cluster name
var nsName string
BeforeEach(func() {
framework.SkipUnlessFederated(f.ClientSet)
@ -76,11 +77,13 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func
Name: api.SimpleNameGenerator.GenerateName(namespacePrefix),
},
}
nsName = ns.Name
By(fmt.Sprintf("Creating namespace %s", ns.Name))
_, err := f.FederationClientset_1_5.Core().Namespaces().Create(&ns)
framework.ExpectNoError(err, "Failed to create namespace %s", ns.Name)
// Check subclusters if the namespace was created there.
By(fmt.Sprintf("Waiting for namespace %s to be created in all underlying clusters", ns.Name))
err = wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) {
for _, cluster := range clusters {
_, err := cluster.Core().Namespaces().Get(ns.Name)
@ -95,9 +98,19 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func
})
framework.ExpectNoError(err, "Not all namespaces created")
By(fmt.Sprintf("Deleting namespace %s", ns.Name))
deleteAllTestNamespaces(
f.FederationClientset_1_5.Core().Namespaces().List,
f.FederationClientset_1_5.Core().Namespaces().Delete)
By(fmt.Sprintf("Verifying that namespace %s was deleted from all underlying clusters", ns.Name))
// Verify that the namespace was deleted from all underlying clusters as well.
for clusterName, clusterClientset := range clusters {
_, err := clusterClientset.Core().Namespaces().Get(ns.Name)
if err == nil || !errors.IsNotFound(err) {
framework.Failf("expected NotFound error for namespace %s in cluster %s, got error: %v", ns.Name, clusterName, err)
}
}
By(fmt.Sprintf("Verified that deletion succeeded"))
})
})
})
@ -110,7 +123,9 @@ func deleteAllTestNamespaces(lister func(api_v1.ListOptions) (*api_v1.NamespaceL
}
for _, namespace := range list.Items {
if strings.HasPrefix(namespace.Name, namespacePrefix) {
err := deleter(namespace.Name, &api_v1.DeleteOptions{})
// Do not orphan dependents (corresponding namespaces in underlying clusters).
orphanDependents := false
err := deleter(namespace.Name, &api_v1.DeleteOptions{OrphanDependents: &orphanDependents})
if err != nil {
framework.Failf("Failed to set %s for deletion: %v", namespace.Name, err)
}

View File

@ -276,7 +276,9 @@ func (f *Framework) deleteFederationNs() {
clientset := f.FederationClientset_1_5
// First delete the namespace from federation apiserver.
if err := clientset.Core().Namespaces().Delete(ns.Name, &v1.DeleteOptions{}); err != nil {
// Also delete the corresponding namespaces from underlying clusters.
orphanDependents := false
if err := clientset.Core().Namespaces().Delete(ns.Name, &v1.DeleteOptions{OrphanDependents: &orphanDependents}); err != nil {
Failf("Error while deleting federation namespace %s: %s", ns.Name, err)
}
// Verify that it got deleted.
@ -297,8 +299,6 @@ func (f *Framework) deleteFederationNs() {
Logf("Namespace %v was already deleted", ns.Name)
}
}
// TODO: Delete the namespace from underlying clusters.
}
// AfterEach deletes the namespace, after reading its events.

View File

@ -2218,6 +2218,7 @@ func DumpEventsInNamespace(eventsLister EventsLister, namespace string) {
events, err := eventsLister(v1.ListOptions{}, namespace)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Found %d events.", len(events.Items)))
// Sort events by their first timestamp
sortedEvents := events.Items
if len(sortedEvents) > 1 {