diff --git a/federation/cmd/federation-controller-manager/app/BUILD b/federation/cmd/federation-controller-manager/app/BUILD index 1069570bbd..a4fffac2a2 100644 --- a/federation/cmd/federation-controller-manager/app/BUILD +++ b/federation/cmd/federation-controller-manager/app/BUILD @@ -24,7 +24,6 @@ go_library( "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/cluster:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library", - "//federation/pkg/federation-controller/namespace:go_default_library", "//federation/pkg/federation-controller/service:go_default_library", "//federation/pkg/federation-controller/service/dns:go_default_library", "//federation/pkg/federation-controller/sync:go_default_library", @@ -40,7 +39,6 @@ go_library( "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library", - "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", ], diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 1f8b41ec9b..9144eca97f 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/healthz" utilflag "k8s.io/apiserver/pkg/util/flag" - "k8s.io/client-go/dynamic" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" @@ -38,7 +37,6 @@ import ( "k8s.io/kubernetes/federation/pkg/federatedtypes" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" - namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns" synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync" @@ -151,14 +149,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop) } - if controllerEnabled(s.Controllers, serverResources, namespacecontroller.ControllerName, namespacecontroller.RequiredResources, true) { - glog.V(3).Infof("Loading client config for namespace controller %q", namespacecontroller.UserAgentName) - nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, namespacecontroller.UserAgentName)) - namespaceController := namespacecontroller.NewNamespaceController(nsClientset, dynamic.NewDynamicClientPool(restclient.AddUserAgent(restClientCfg, namespacecontroller.UserAgentName))) - glog.V(3).Infof("Running namespace controller") - namespaceController.Run(wait.NeverStop) - } - for kind, federatedType := range federatedtypes.FederatedTypes() { if controllerEnabled(s.Controllers, serverResources, federatedType.ControllerName, federatedType.RequiredResources, true) { synccontroller.StartFederationSyncController(kind, federatedType.AdapterFactory, restClientCfg, stopChan, minimizeLatency) diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index 8e1f8e2fbc..7bb01c29d6 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -15,6 +15,8 @@ go_library( "configmap.go", "daemonset.go", "deployment.go", + "namespace.go", + "qualifiedname.go", "registry.go", "replicaset.go", "scheduling.go", @@ -29,7 +31,9 @@ go_library( "//federation/pkg/federation-controller/util/planner:go_default_library", "//federation/pkg/federation-controller/util/podanalyzer:go_default_library", "//federation/pkg/federation-controller/util/replicapreferences:go_default_library", + "//pkg/api:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/controller/namespace/deletion:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", @@ -37,8 +41,10 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/dynamic:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/federation/pkg/federatedtypes/adapter.go b/federation/pkg/federatedtypes/adapter.go index 1da54a7554..2e59bb9060 100644 --- a/federation/pkg/federatedtypes/adapter.go +++ b/federation/pkg/federatedtypes/adapter.go @@ -19,8 +19,8 @@ package federatedtypes import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) @@ -34,21 +34,21 @@ type FederatedTypeAdapter interface { IsExpectedType(obj interface{}) bool Copy(obj pkgruntime.Object) pkgruntime.Object Equivalent(obj1, obj2 pkgruntime.Object) bool - NamespacedName(obj pkgruntime.Object) types.NamespacedName + QualifiedName(obj pkgruntime.Object) QualifiedName ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta // Fed* operations target the federation control plane FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) - FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error - FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) + FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error + FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) // The following operations are intended to target a cluster that is a member of a federation ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) - ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error - ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) + ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error + ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) @@ -62,7 +62,7 @@ type FederatedTypeAdapter interface { // that create instances of FederatedTypeAdapter. Such methods should // be registered with RegisterAdapterFactory to ensure the type // adapter is discoverable. -type AdapterFactory func(client federationclientset.Interface) FederatedTypeAdapter +type AdapterFactory func(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter // SetAnnotation sets the given key and value in the given object's ObjectMeta.Annotations map func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, value string) { @@ -75,5 +75,5 @@ func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, val // ObjectKey returns a cluster-unique key for the given object func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string { - return adapter.NamespacedName(obj).String() + return adapter.QualifiedName(obj).String() } diff --git a/federation/pkg/federatedtypes/configmap.go b/federation/pkg/federatedtypes/configmap.go index a0de1888b7..bc2c2ac3ef 100644 --- a/federation/pkg/federatedtypes/configmap.go +++ b/federation/pkg/federatedtypes/configmap.go @@ -21,8 +21,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/federation-controller/util" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -41,7 +41,7 @@ type ConfigMapAdapter struct { client federationclientset.Interface } -func NewConfigMapAdapter(client federationclientset.Interface) FederatedTypeAdapter { +func NewConfigMapAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { return &ConfigMapAdapter{client: client} } @@ -72,9 +72,9 @@ func (a *ConfigMapAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { return util.ConfigMapEquivalent(configmap1, configmap2) } -func (a *ConfigMapAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { +func (a *ConfigMapAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName { configmap := obj.(*apiv1.ConfigMap) - return types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name} + return QualifiedName{Namespace: configmap.Namespace, Name: configmap.Name} } func (a *ConfigMapAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { @@ -86,12 +86,12 @@ func (a *ConfigMapAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, return a.client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) } -func (a *ConfigMapAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { - return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Delete(namespacedName.Name, options) +func (a *ConfigMapAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return a.client.CoreV1().ConfigMaps(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *ConfigMapAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *ConfigMapAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) { + return a.client.CoreV1().ConfigMaps(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *ConfigMapAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { @@ -112,12 +112,12 @@ func (a *ConfigMapAdapter) ClusterCreate(client kubeclientset.Interface, obj pkg return client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) } -func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { - return client.CoreV1().ConfigMaps(nsName.Namespace).Delete(nsName.Name, options) +func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return client.CoreV1().ConfigMaps(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) { + return client.CoreV1().ConfigMaps(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *ConfigMapAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federatedtypes/crudtester/crudtester.go b/federation/pkg/federatedtypes/crudtester/crudtester.go index 45a7c5ea6f..f11768054c 100644 --- a/federation/pkg/federatedtypes/crudtester/crudtester.go +++ b/federation/pkg/federatedtypes/crudtester/crudtester.go @@ -17,6 +17,7 @@ limitations under the License. package crudtester import ( + "fmt" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -76,15 +77,20 @@ func (c *FederatedTypeCRUDTester) CheckLifecycle(desiredObject pkgruntime.Object func (c *FederatedTypeCRUDTester) Create(desiredObject pkgruntime.Object) pkgruntime.Object { namespace := c.adapter.ObjectMeta(desiredObject).Namespace - c.tl.Logf("Creating new federated %s in namespace %q", c.kind, namespace) + resourceMsg := fmt.Sprintf("federated %s", c.kind) + if len(namespace) > 0 { + resourceMsg = fmt.Sprintf("%s in namespace %q", resourceMsg, namespace) + } + + c.tl.Logf("Creating new %s", resourceMsg) obj, err := c.adapter.FedCreate(desiredObject) if err != nil { - c.tl.Fatalf("Error creating federated %s in namespace %q : %v", c.kind, namespace, err) + c.tl.Fatalf("Error creating %s: %v", resourceMsg, err) } - namespacedName := c.adapter.NamespacedName(obj) - c.tl.Logf("Created new federated %s %q", c.kind, namespacedName) + qualifiedName := c.adapter.QualifiedName(obj) + c.tl.Logf("Created new federated %s %q", c.kind, qualifiedName) return obj } @@ -98,7 +104,7 @@ func (c *FederatedTypeCRUDTester) CheckCreate(desiredObject pkgruntime.Object) p } func (c *FederatedTypeCRUDTester) CheckUpdate(obj pkgruntime.Object) { - namespacedName := c.adapter.NamespacedName(obj) + qualifiedName := c.adapter.QualifiedName(obj) var initialAnnotation string meta := c.adapter.ObjectMeta(obj) @@ -106,29 +112,29 @@ func (c *FederatedTypeCRUDTester) CheckUpdate(obj pkgruntime.Object) { initialAnnotation = meta.Annotations[AnnotationTestFederationCRUDUpdate] } - c.tl.Logf("Updating federated %s %q", c.kind, namespacedName) + c.tl.Logf("Updating federated %s %q", c.kind, qualifiedName) updatedObj, err := c.updateFedObject(obj) if err != nil { - c.tl.Fatalf("Error updating federated %s %q: %v", c.kind, namespacedName, err) + c.tl.Fatalf("Error updating federated %s %q: %v", c.kind, qualifiedName, err) } // updateFedObject is expected to have changed the value of the annotation meta = c.adapter.ObjectMeta(updatedObj) updatedAnnotation := meta.Annotations[AnnotationTestFederationCRUDUpdate] if updatedAnnotation == initialAnnotation { - c.tl.Fatalf("Federated %s %q not mutated", c.kind, namespacedName) + c.tl.Fatalf("Federated %s %q not mutated", c.kind, qualifiedName) } c.CheckPropagation(updatedObj) } func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDependents *bool) { - namespacedName := c.adapter.NamespacedName(obj) + qualifiedName := c.adapter.QualifiedName(obj) - c.tl.Logf("Deleting federated %s %q", c.kind, namespacedName) - err := c.adapter.FedDelete(namespacedName, &metav1.DeleteOptions{OrphanDependents: orphanDependents}) + c.tl.Logf("Deleting federated %s %q", c.kind, qualifiedName) + err := c.adapter.FedDelete(qualifiedName, &metav1.DeleteOptions{OrphanDependents: orphanDependents}) if err != nil { - c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, namespacedName, err) + c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, qualifiedName, err) } deletingInCluster := (orphanDependents != nil && *orphanDependents == false) @@ -142,14 +148,14 @@ func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDepen // Wait for deletion. The federation resource will only be removed once orphan deletion has been // completed or deemed unnecessary. err = wait.PollImmediate(c.waitInterval, waitTimeout, func() (bool, error) { - _, err := c.adapter.FedGet(namespacedName) + _, err := c.adapter.FedGet(qualifiedName) if errors.IsNotFound(err) { return true, nil } return false, err }) if err != nil { - c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, namespacedName, err) + c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, qualifiedName, err) } var stateMsg string = "present" @@ -157,14 +163,14 @@ func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDepen stateMsg = "not present" } for _, client := range c.clusterClients { - _, err := c.adapter.ClusterGet(client, namespacedName) + _, err := c.adapter.ClusterGet(client, qualifiedName) switch { case !deletingInCluster && errors.IsNotFound(err): - c.tl.Fatalf("Federated %s %q was unexpectedly deleted from a member cluster", c.kind, namespacedName) + c.tl.Fatalf("Federated %s %q was unexpectedly deleted from a member cluster", c.kind, qualifiedName) case deletingInCluster && err == nil: - c.tl.Fatalf("Federated %s %q was unexpectedly orphaned in a member cluster", c.kind, namespacedName) + c.tl.Fatalf("Federated %s %q was unexpectedly orphaned in a member cluster", c.kind, qualifiedName) case err != nil && !errors.IsNotFound(err): - c.tl.Fatalf("Error while checking whether %s %q is %s in member clusters: %v", c.kind, namespacedName, stateMsg, err) + c.tl.Fatalf("Error while checking whether %s %q is %s in member clusters: %v", c.kind, qualifiedName, stateMsg, err) } } } @@ -176,26 +182,26 @@ func (c *FederatedTypeCRUDTester) CheckPropagation(obj pkgruntime.Object) { // CheckPropagationForClients checks propagation for the provided clients func (c *FederatedTypeCRUDTester) CheckPropagationForClients(obj pkgruntime.Object, clusterClients []clientset.Interface, objExpected bool) { - namespacedName := c.adapter.NamespacedName(obj) + qualifiedName := c.adapter.QualifiedName(obj) - c.tl.Logf("Waiting for %s %q in %d clusters", c.kind, namespacedName, len(clusterClients)) + c.tl.Logf("Waiting for %s %q in %d clusters", c.kind, qualifiedName, len(clusterClients)) for _, client := range clusterClients { err := c.waitForResource(client, obj) switch { case err == wait.ErrWaitTimeout: if objExpected { - c.tl.Fatalf("Timeout verifying %s %q in a member cluster: %v", c.kind, namespacedName, err) + c.tl.Fatalf("Timeout verifying %s %q in a member cluster: %v", c.kind, qualifiedName, err) } case err != nil: - c.tl.Fatalf("Failed to verify %s %q in a member cluster: %v", c.kind, namespacedName, err) + c.tl.Fatalf("Failed to verify %s %q in a member cluster: %v", c.kind, qualifiedName, err) case err == nil && !objExpected: - c.tl.Fatalf("Found unexpected object %s %q in a member cluster: %v", c.kind, namespacedName, err) + c.tl.Fatalf("Found unexpected object %s %q in a member cluster: %v", c.kind, qualifiedName, err) } } } func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, obj pkgruntime.Object) error { - namespacedName := c.adapter.NamespacedName(obj) + qualifiedName := c.adapter.QualifiedName(obj) err := wait.PollImmediate(c.waitInterval, c.clusterWaitTimeout, func() (bool, error) { equivalenceFunc := c.adapter.Equivalent if c.adapter.IsSchedulingAdapter() { @@ -206,7 +212,7 @@ func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, ob equivalenceFunc = schedulingAdapter.EquivalentIgnoringSchedule } - clusterObj, err := c.adapter.ClusterGet(client, namespacedName) + clusterObj, err := c.adapter.ClusterGet(client, qualifiedName) if err == nil && equivalenceFunc(clusterObj, obj) { return true, nil } @@ -227,8 +233,8 @@ func (c *FederatedTypeCRUDTester) updateFedObject(obj pkgruntime.Object) (pkgrun if errors.IsConflict(err) { // The resource was updated by the federation controller. // Get the latest version and retry. - namespacedName := c.adapter.NamespacedName(obj) - obj, err = c.adapter.FedGet(namespacedName) + qualifiedName := c.adapter.QualifiedName(obj) + obj, err = c.adapter.FedGet(qualifiedName) return false, err } // Be tolerant of a slow server diff --git a/federation/pkg/federatedtypes/daemonset.go b/federation/pkg/federatedtypes/daemonset.go index 3094afd299..6216141f62 100644 --- a/federation/pkg/federatedtypes/daemonset.go +++ b/federation/pkg/federatedtypes/daemonset.go @@ -24,8 +24,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/federation-controller/util" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -44,7 +44,7 @@ type DaemonSetAdapter struct { client federationclientset.Interface } -func NewDaemonSetAdapter(client federationclientset.Interface) FederatedTypeAdapter { +func NewDaemonSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { return &DaemonSetAdapter{client: client} } @@ -75,9 +75,9 @@ func (a *DaemonSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { return util.ObjectMetaEquivalent(daemonset1.ObjectMeta, daemonset2.ObjectMeta) && reflect.DeepEqual(daemonset1.Spec, daemonset2.Spec) } -func (a *DaemonSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { +func (a *DaemonSetAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName { daemonset := obj.(*extensionsv1.DaemonSet) - return types.NamespacedName{Namespace: daemonset.Namespace, Name: daemonset.Name} + return QualifiedName{Namespace: daemonset.Namespace, Name: daemonset.Name} } func (a *DaemonSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { @@ -89,12 +89,12 @@ func (a *DaemonSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, return a.client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset) } -func (a *DaemonSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { - return a.client.Extensions().DaemonSets(namespacedName.Namespace).Delete(namespacedName.Name, options) +func (a *DaemonSetAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return a.client.Extensions().DaemonSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *DaemonSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return a.client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *DaemonSetAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) { + return a.client.Extensions().DaemonSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *DaemonSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { @@ -115,12 +115,12 @@ func (a *DaemonSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pkg return client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset) } -func (a *DaemonSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { - return client.Extensions().DaemonSets(nsName.Namespace).Delete(nsName.Name, options) +func (a *DaemonSetAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return client.Extensions().DaemonSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *DaemonSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *DaemonSetAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) { + return client.Extensions().DaemonSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *DaemonSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federatedtypes/deployment.go b/federation/pkg/federatedtypes/deployment.go index 72a007126a..cea1923921 100644 --- a/federation/pkg/federatedtypes/deployment.go +++ b/federation/pkg/federatedtypes/deployment.go @@ -22,8 +22,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -44,7 +44,7 @@ type DeploymentAdapter struct { client federationclientset.Interface } -func NewDeploymentAdapter(client federationclientset.Interface) FederatedTypeAdapter { +func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { schedulingAdapter := schedulingAdapter{ preferencesAnnotationName: FedDeploymentPreferencesAnnotation, updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { @@ -91,9 +91,9 @@ func (a *DeploymentAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { return fedutil.DeploymentEquivalent(deployment1, deployment2) } -func (a *DeploymentAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { +func (a *DeploymentAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName { deployment := obj.(*extensionsv1.Deployment) - return types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name} + return QualifiedName{Namespace: deployment.Namespace, Name: deployment.Name} } func (a *DeploymentAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { @@ -105,12 +105,12 @@ func (a *DeploymentAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, return a.client.Extensions().Deployments(deployment.Namespace).Create(deployment) } -func (a *DeploymentAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { - return a.client.Extensions().Deployments(namespacedName.Namespace).Delete(namespacedName.Name, options) +func (a *DeploymentAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return a.client.Extensions().Deployments(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *DeploymentAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return a.client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *DeploymentAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) { + return a.client.Extensions().Deployments(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *DeploymentAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { @@ -131,12 +131,12 @@ func (a *DeploymentAdapter) ClusterCreate(client kubeclientset.Interface, obj pk return client.Extensions().Deployments(deployment.Namespace).Create(deployment) } -func (a *DeploymentAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { - return client.Extensions().Deployments(nsName.Namespace).Delete(nsName.Name, options) +func (a *DeploymentAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return client.Extensions().Deployments(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *DeploymentAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *DeploymentAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) { + return client.Extensions().Deployments(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *DeploymentAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federatedtypes/namespace.go b/federation/pkg/federatedtypes/namespace.go new file mode 100644 index 0000000000..43eadfac14 --- /dev/null +++ b/federation/pkg/federatedtypes/namespace.go @@ -0,0 +1,215 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/controller/namespace/deletion" + + "github.com/golang/glog" +) + +const ( + NamespaceKind = "namespace" + NamespaceControllerName = "namespaces" +) + +func init() { + RegisterFederatedType(NamespaceKind, NamespaceControllerName, []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource(NamespaceControllerName)}, NewNamespaceAdapter) +} + +type NamespaceAdapter struct { + client federationclientset.Interface + deleter deletion.NamespacedResourcesDeleterInterface +} + +func NewNamespaceAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { + dynamicClientPool := dynamic.NewDynamicClientPool(config) + discoverResourcesFunc := client.Discovery().ServerPreferredNamespacedResources + deleter := deletion.NewNamespacedResourcesDeleter( + client.Core().Namespaces(), + dynamicClientPool, + nil, + discoverResourcesFunc, + apiv1.FinalizerKubernetes, + false) + return &NamespaceAdapter{client: client, deleter: deleter} +} + +func (a *NamespaceAdapter) Kind() string { + return NamespaceKind +} + +func (a *NamespaceAdapter) ObjectType() pkgruntime.Object { + return &apiv1.Namespace{} +} + +func (a *NamespaceAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*apiv1.Namespace) + return ok +} + +func (a *NamespaceAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + namespace := obj.(*apiv1.Namespace) + return &apiv1.Namespace{ + ObjectMeta: util.DeepCopyRelevantObjectMeta(namespace.ObjectMeta), + Spec: *(util.DeepCopyApiTypeOrPanic(&namespace.Spec).(*apiv1.NamespaceSpec)), + } +} + +func (a *NamespaceAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { + return util.ObjectMetaAndSpecEquivalent(obj1, obj2) +} + +func (a *NamespaceAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName { + namespace := obj.(*apiv1.Namespace) + return QualifiedName{Name: namespace.Name} +} + +func (a *NamespaceAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*apiv1.Namespace).ObjectMeta +} + +func (a *NamespaceAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { + namespace := obj.(*apiv1.Namespace) + return a.client.CoreV1().Namespaces().Create(namespace) +} + +func (a *NamespaceAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return a.client.CoreV1().Namespaces().Delete(qualifiedName.Name, options) +} + +func (a *NamespaceAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) { + return a.client.CoreV1().Namespaces().Get(qualifiedName.Name, metav1.GetOptions{}) +} + +func (a *NamespaceAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.CoreV1().Namespaces().List(options) +} + +func (a *NamespaceAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { + namespace := obj.(*apiv1.Namespace) + return a.client.CoreV1().Namespaces().Update(namespace) +} + +func (a *NamespaceAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.CoreV1().Namespaces().Watch(options) +} + +func (a *NamespaceAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + namespace := obj.(*apiv1.Namespace) + return client.CoreV1().Namespaces().Create(namespace) +} + +func (a *NamespaceAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return client.CoreV1().Namespaces().Delete(qualifiedName.Name, options) +} + +func (a *NamespaceAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) { + return client.CoreV1().Namespaces().Get(qualifiedName.Name, metav1.GetOptions{}) +} + +func (a *NamespaceAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.CoreV1().Namespaces().List(options) +} + +func (a *NamespaceAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + namespace := obj.(*apiv1.Namespace) + return client.CoreV1().Namespaces().Update(namespace) +} + +func (a *NamespaceAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().Namespaces().Watch(options) +} + +func (a *NamespaceAdapter) IsSchedulingAdapter() bool { + return false +} + +func (a *NamespaceAdapter) NewTestObject(namespace string) pkgruntime.Object { + return &apiv1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-namespace-", + }, + Spec: apiv1.NamespaceSpec{ + Finalizers: []apiv1.FinalizerName{apiv1.FinalizerKubernetes}, + }, + } +} + +// CleanUpNamespace deletes all resources in a given namespace. +func (a *NamespaceAdapter) CleanUpNamespace(obj pkgruntime.Object, eventRecorder record.EventRecorder) (pkgruntime.Object, error) { + namespace := obj.(*apiv1.Namespace) + name := namespace.Name + + // Set Terminating status. + updatedNamespace := &apiv1.Namespace{ + ObjectMeta: namespace.ObjectMeta, + Spec: namespace.Spec, + Status: apiv1.NamespaceStatus{ + Phase: apiv1.NamespaceTerminating, + }, + } + var err error + if namespace.Status.Phase != apiv1.NamespaceTerminating { + glog.V(2).Infof("Marking ns %s as terminating", name) + eventRecorder.Event(namespace, api.EventTypeNormal, "DeleteNamespace", fmt.Sprintf("Marking for deletion")) + _, err = a.FedUpdate(updatedNamespace) + if err != nil { + return nil, fmt.Errorf("failed to update namespace: %v", err) + } + } + + if hasFinalizerInSpec(updatedNamespace, apiv1.FinalizerKubernetes) { + // Delete resources in this namespace. + err = a.deleter.Delete(name) + if err != nil { + return nil, fmt.Errorf("error in deleting resources in namespace %s: %v", name, err) + } + glog.V(2).Infof("Removed kubernetes finalizer from ns %s", name) + // Fetch the updated Namespace. + obj, err = a.FedGet(QualifiedName{Name: name}) + updatedNamespace = obj.(*apiv1.Namespace) + if err != nil { + return nil, fmt.Errorf("error in fetching updated namespace %s: %s", name, err) + } + } + + return updatedNamespace, nil +} + +func hasFinalizerInSpec(namespace *apiv1.Namespace, finalizer apiv1.FinalizerName) bool { + for i := range namespace.Spec.Finalizers { + if namespace.Spec.Finalizers[i] == finalizer { + return true + } + } + return false +} diff --git a/federation/pkg/federatedtypes/qualifiedname.go b/federation/pkg/federatedtypes/qualifiedname.go new file mode 100644 index 0000000000..95f0df1104 --- /dev/null +++ b/federation/pkg/federatedtypes/qualifiedname.go @@ -0,0 +1,41 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + "fmt" +) + +// QualifiedName comprises a resource name with an optional namespace. +// If namespace is provided, a QualifiedName will be rendered as +// "/". If not, it will be rendered as "name". This +// is intended to allow the FederatedTypeAdapter interface and its +// consumers to operate on both namespaces and namespace-qualified +// resources. + +type QualifiedName struct { + Namespace string + Name string +} + +// String returns the general purpose string representation +func (n QualifiedName) String() string { + if len(n.Namespace) == 0 { + return n.Name + } + return fmt.Sprintf("%s/%s", n.Namespace, n.Name) +} diff --git a/federation/pkg/federatedtypes/replicaset.go b/federation/pkg/federatedtypes/replicaset.go index 35c53306ab..6f0b3a909a 100644 --- a/federation/pkg/federatedtypes/replicaset.go +++ b/federation/pkg/federatedtypes/replicaset.go @@ -22,8 +22,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -44,7 +44,7 @@ type ReplicaSetAdapter struct { client federationclientset.Interface } -func NewReplicaSetAdapter(client federationclientset.Interface) FederatedTypeAdapter { +func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { schedulingAdapter := schedulingAdapter{ preferencesAnnotationName: FedReplicaSetPreferencesAnnotation, updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { @@ -91,9 +91,9 @@ func (a *ReplicaSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2) } -func (a *ReplicaSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { +func (a *ReplicaSetAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName { replicaset := obj.(*extensionsv1.ReplicaSet) - return types.NamespacedName{Namespace: replicaset.Namespace, Name: replicaset.Name} + return QualifiedName{Namespace: replicaset.Namespace, Name: replicaset.Name} } func (a *ReplicaSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { @@ -105,12 +105,12 @@ func (a *ReplicaSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, return a.client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset) } -func (a *ReplicaSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { - return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Delete(namespacedName.Name, options) +func (a *ReplicaSetAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return a.client.Extensions().ReplicaSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *ReplicaSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *ReplicaSetAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) { + return a.client.Extensions().ReplicaSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *ReplicaSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { @@ -131,12 +131,12 @@ func (a *ReplicaSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pk return client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset) } -func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { - return client.Extensions().ReplicaSets(nsName.Namespace).Delete(nsName.Name, options) +func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return client.Extensions().ReplicaSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) { + return client.Extensions().ReplicaSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *ReplicaSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federatedtypes/secret.go b/federation/pkg/federatedtypes/secret.go index 937b8c65fc..9208c96e29 100644 --- a/federation/pkg/federatedtypes/secret.go +++ b/federation/pkg/federatedtypes/secret.go @@ -21,8 +21,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + restclient "k8s.io/client-go/rest" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/pkg/federation-controller/util" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -41,7 +41,7 @@ type SecretAdapter struct { client federationclientset.Interface } -func NewSecretAdapter(client federationclientset.Interface) FederatedTypeAdapter { +func NewSecretAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { return &SecretAdapter{client: client} } @@ -73,9 +73,9 @@ func (a *SecretAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { return util.SecretEquivalent(*secret1, *secret2) } -func (a *SecretAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { +func (a *SecretAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName { secret := obj.(*apiv1.Secret) - return types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name} + return QualifiedName{Namespace: secret.Namespace, Name: secret.Name} } func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { @@ -87,12 +87,12 @@ func (a *SecretAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, err return a.client.CoreV1().Secrets(secret.Namespace).Create(secret) } -func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { - return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options) +func (a *SecretAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return a.client.CoreV1().Secrets(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *SecretAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return a.client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *SecretAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) { + return a.client.CoreV1().Secrets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *SecretAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { @@ -113,12 +113,12 @@ func (a *SecretAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgrun return client.CoreV1().Secrets(secret.Namespace).Create(secret) } -func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { - return client.CoreV1().Secrets(nsName.Namespace).Delete(nsName.Name, options) +func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error { + return client.CoreV1().Secrets(qualifiedName.Namespace).Delete(qualifiedName.Name, options) } -func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { - return client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) { + return client.CoreV1().Secrets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{}) } func (a *SecretAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federation-controller/BUILD b/federation/pkg/federation-controller/BUILD index 1c5fd2d39b..28057fc52e 100644 --- a/federation/pkg/federation-controller/BUILD +++ b/federation/pkg/federation-controller/BUILD @@ -26,7 +26,6 @@ filegroup( ":package-srcs", "//federation/pkg/federation-controller/cluster:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs", - "//federation/pkg/federation-controller/namespace:all-srcs", "//federation/pkg/federation-controller/service:all-srcs", "//federation/pkg/federation-controller/sync:all-srcs", "//federation/pkg/federation-controller/util:all-srcs", diff --git a/federation/pkg/federation-controller/namespace/BUILD b/federation/pkg/federation-controller/namespace/BUILD deleted file mode 100644 index 1216d70720..0000000000 --- a/federation/pkg/federation-controller/namespace/BUILD +++ /dev/null @@ -1,75 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = ["namespace_controller.go"], - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/eventsink:go_default_library", - "//pkg/api:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/namespace/deletion:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/dynamic:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["namespace_controller_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/test:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/client-go/dynamic:go_default_library", - "//vendor/k8s.io/client-go/rest:go_default_library", - "//vendor/k8s.io/client-go/testing:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go deleted file mode 100644 index 9979ac8c1d..0000000000 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ /dev/null @@ -1,460 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package namespace - -import ( - "fmt" - "time" - - apiv1 "k8s.io/api/core/v1" - clientv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" - federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" - "k8s.io/kubernetes/pkg/api" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/namespace/deletion" - - "github.com/golang/glog" -) - -const ( - allClustersKey = "ALL_CLUSTERS" - ControllerName = "namespaces" - UserAgentName = "federation-namespace-controller" -) - -var ( - RequiredResources = []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource("namespaces")} -) - -type NamespaceController struct { - // For triggering single namespace reconciliation. This is used when there is an - // add/update/delete operation on a namespace in either federated API server or - // in some member of the federation. - namespaceDeliverer *util.DelayingDeliverer - - // For triggering all namespaces reconciliation. This is used when - // a new cluster becomes available. - clusterDeliverer *util.DelayingDeliverer - - // Contains namespaces present in members of federation. - namespaceFederatedInformer util.FederatedInformer - // For updating members of federation. - federatedUpdater util.FederatedUpdater - // Definitions of namespaces that should be federated. - namespaceInformerStore cache.Store - // Informer controller for namespaces that should be federated. - namespaceInformerController cache.Controller - - // Client to federated api server. - federatedApiClient federationclientset.Interface - - // Backoff manager for namespaces - namespaceBackoff *flowcontrol.Backoff - - // For events - eventRecorder record.EventRecorder - - deletionHelper *deletionhelper.DeletionHelper - - // Helper to delete all resources in a namespace. - namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface - - namespaceReviewDelay time.Duration - clusterAvailableDelay time.Duration - smallDelay time.Duration - updateTimeout time.Duration -} - -// NewNamespaceController returns a new namespace controller -func NewNamespaceController(client federationclientset.Interface, dynamicClientPool dynamic.ClientPool) *NamespaceController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName}) - - nc := &NamespaceController{ - federatedApiClient: client, - namespaceReviewDelay: time.Second * 10, - clusterAvailableDelay: time.Second * 20, - smallDelay: time.Second * 3, - updateTimeout: time.Second * 30, - namespaceBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - eventRecorder: recorder, - } - - // Build deliverers for triggering reconciliations. - nc.namespaceDeliverer = util.NewDelayingDeliverer() - nc.clusterDeliverer = util.NewDelayingDeliverer() - - // Start informer in federated API servers on namespaces that should be federated. - nc.namespaceInformerStore, nc.namespaceInformerController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return client.Core().Namespaces().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Core().Namespaces().Watch(options) - }, - }, - &apiv1.Namespace{}, - controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) })) - - // Federated informer on namespaces in members of federation. - nc.namespaceFederatedInformer = util.NewFederatedInformer( - client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return targetClient.Core().Namespaces().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return targetClient.Core().Namespaces().Watch(options) - }, - }, - &apiv1.Namespace{}, - controller.NoResyncPeriodFunc(), - // Trigger reconciliation whenever something in federated cluster is changed. In most cases it - // would be just confirmation that some namespace operation succeeded. - util.NewTriggerOnMetaAndSpecChanges( - func(obj runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) }, - )) - }, - &util.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *federationapi.Cluster) { - // When new cluster becomes available process all the namespaces again. - nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay) - }, - }, - ) - - // Federated updater along with Create/Update/Delete operations. - nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.updateTimeout, nc.eventRecorder, - func(client kubeclientset.Interface, obj runtime.Object) error { - namespace := obj.(*apiv1.Namespace) - _, err := client.Core().Namespaces().Create(namespace) - return err - }, - func(client kubeclientset.Interface, obj runtime.Object) error { - namespace := obj.(*apiv1.Namespace) - _, err := client.Core().Namespaces().Update(namespace) - return err - }, - func(client kubeclientset.Interface, obj runtime.Object) error { - namespace := obj.(*apiv1.Namespace) - orphanDependents := false - err := client.Core().Namespaces().Delete(namespace.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - // IsNotFound error is fine since that means the object is deleted already. - if errors.IsNotFound(err) { - return nil - } - return err - }) - - nc.deletionHelper = deletionhelper.NewDeletionHelper( - nc.updateNamespace, - // objNameFunc - func(obj runtime.Object) string { - namespace := obj.(*apiv1.Namespace) - return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name) - }, - nc.namespaceFederatedInformer, - nc.federatedUpdater, - ) - - discoverResourcesFn := nc.federatedApiClient.Discovery().ServerPreferredNamespacedResources - nc.namespacedResourcesDeleter = deletion.NewNamespacedResourcesDeleter( - client.Core().Namespaces(), dynamicClientPool, nil, - discoverResourcesFn, apiv1.FinalizerKubernetes, false) - return nc -} - -// Sends the given update object to apiserver. -// Assumes that the given object is a namespace. -func (nc *NamespaceController) updateNamespace(obj runtime.Object) (runtime.Object, error) { - namespace := obj.(*apiv1.Namespace) - return nc.federatedApiClient.Core().Namespaces().Update(namespace) -} - -// Returns true if the given object has the given finalizer in its NamespaceSpec. -func (nc *NamespaceController) hasFinalizerFuncInSpec(obj runtime.Object, finalizer apiv1.FinalizerName) bool { - namespace := obj.(*apiv1.Namespace) - for i := range namespace.Spec.Finalizers { - if namespace.Spec.Finalizers[i] == finalizer { - return true - } - } - return false -} - -// Removes the finalizer from the given objects NamespaceSpec. -func (nc *NamespaceController) removeFinalizerFromSpec(namespace *apiv1.Namespace, finalizer apiv1.FinalizerName) (*apiv1.Namespace, error) { - updatedFinalizers := []apiv1.FinalizerName{} - for i := range namespace.Spec.Finalizers { - if namespace.Spec.Finalizers[i] != finalizer { - updatedFinalizers = append(updatedFinalizers, namespace.Spec.Finalizers[i]) - } - } - namespace.Spec.Finalizers = updatedFinalizers - updatedNamespace, err := nc.federatedApiClient.Core().Namespaces().Finalize(namespace) - if err != nil { - return nil, fmt.Errorf("failed to remove finalizer %s from namespace %s: %v", string(finalizer), namespace.Name, err) - } - return updatedNamespace, nil -} - -func (nc *NamespaceController) Run(stopChan <-chan struct{}) { - go nc.namespaceInformerController.Run(stopChan) - nc.namespaceFederatedInformer.Start() - go func() { - <-stopChan - nc.namespaceFederatedInformer.Stop() - }() - nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - namespace := item.Value.(string) - nc.reconcileNamespace(namespace) - }) - nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - nc.reconcileNamespacesOnClusterChange() - }) - util.StartBackoffGC(nc.namespaceBackoff, stopChan) -} - -func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, failed bool) { - namespace := obj.(*apiv1.Namespace) - nc.deliverNamespace(namespace.Name, delay, failed) -} - -// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. -func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, failed bool) { - if failed { - nc.namespaceBackoff.Next(namespace, time.Now()) - delay = delay + nc.namespaceBackoff.Get(namespace) - } else { - nc.namespaceBackoff.Reset(namespace) - } - nc.namespaceDeliverer.DeliverAfter(namespace, namespace, delay) -} - -// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet -// synced with the corresponding api server. -func (nc *NamespaceController) isSynced() bool { - if !nc.namespaceFederatedInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - if !nc.namespaceFederatedInformer.GetTargetStore().ClustersSynced(clusters) { - return false - } - return true -} - -// The function triggers reconciliation of all federated namespaces. -func (nc *NamespaceController) reconcileNamespacesOnClusterChange() { - if !nc.isSynced() { - nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay) - } - for _, obj := range nc.namespaceInformerStore.List() { - namespace := obj.(*apiv1.Namespace) - nc.deliverNamespace(namespace.Name, nc.smallDelay, false) - } -} - -func (nc *NamespaceController) reconcileNamespace(namespace string) { - if !nc.isSynced() { - nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false) - return - } - - namespaceObjFromStore, exist, err := nc.namespaceInformerStore.GetByKey(namespace) - if err != nil { - glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err) - nc.deliverNamespace(namespace, 0, true) - return - } - - if !exist { - // Not federated namespace, ignoring. - return - } - // Create a copy before modifying the namespace to prevent race condition with - // other readers of namespace from store. - namespaceObj, err := api.Scheme.DeepCopy(namespaceObjFromStore) - baseNamespace, ok := namespaceObj.(*apiv1.Namespace) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err) - nc.deliverNamespace(namespace, 0, true) - return - } - if baseNamespace.DeletionTimestamp != nil { - if err := nc.delete(baseNamespace); err != nil { - glog.Errorf("Failed to delete %s: %v", namespace, err) - nc.eventRecorder.Eventf(baseNamespace, api.EventTypeWarning, "DeleteFailed", - "Namespace delete failed: %v", err) - nc.deliverNamespace(namespace, 0, true) - } - return - } - - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for namespace: %s", - baseNamespace.Name) - // Add the required finalizers before creating a namespace in - // underlying clusters. - // This ensures that the dependent namespaces are deleted in underlying - // clusters when the federated namespace is deleted. - updatedNamespaceObj, err := nc.deletionHelper.EnsureFinalizers(baseNamespace) - if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in namespace %s: %v", - baseNamespace.Name, err) - nc.deliverNamespace(namespace, 0, false) - return - } - baseNamespace = updatedNamespaceObj.(*apiv1.Namespace) - - glog.V(3).Infof("Syncing namespace %s in underlying clusters", baseNamespace.Name) - // Sync the namespace in all underlying clusters. - clusters, err := nc.namespaceFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get cluster list: %v", err) - nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false) - return - } - - operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { - clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace) - if err != nil { - glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err) - nc.deliverNamespace(namespace, 0, true) - return - } - // The object should not be modified. - desiredNamespace := &apiv1.Namespace{ - ObjectMeta: util.DeepCopyRelevantObjectMeta(baseNamespace.ObjectMeta), - Spec: *(util.DeepCopyApiTypeOrPanic(&baseNamespace.Spec).(*apiv1.NamespaceSpec)), - } - glog.V(5).Infof("Desired namespace in underlying clusters: %+v", desiredNamespace) - - if !found { - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeAdd, - Obj: desiredNamespace, - ClusterName: cluster.Name, - Key: namespace, - }) - } else { - clusterNamespace := clusterNamespaceObj.(*apiv1.Namespace) - - // Update existing namespace, if needed. - if !util.ObjectMetaAndSpecEquivalent(desiredNamespace, clusterNamespace) { - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeUpdate, - Obj: desiredNamespace, - ClusterName: cluster.Name, - Key: namespace, - }) - } - } - } - - if len(operations) == 0 { - // Everything is in order - return - } - glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations)) - - err = nc.federatedUpdater.Update(operations) - if err != nil { - glog.Errorf("Failed to execute updates for %s: %v", namespace, err) - nc.deliverNamespace(namespace, 0, true) - return - } - - // Everything is in order but lets be double sure - nc.deliverNamespace(namespace, nc.namespaceReviewDelay, false) -} - -// delete deletes the given namespace or returns error if the deletion was not complete. -func (nc *NamespaceController) delete(namespace *apiv1.Namespace) error { - // Set Terminating status. - updatedNamespace := &apiv1.Namespace{ - ObjectMeta: namespace.ObjectMeta, - Spec: namespace.Spec, - Status: apiv1.NamespaceStatus{ - Phase: apiv1.NamespaceTerminating, - }, - } - var err error - if namespace.Status.Phase != apiv1.NamespaceTerminating { - glog.V(2).Infof("Marking ns %s as terminating", namespace.Name) - nc.eventRecorder.Event(namespace, api.EventTypeNormal, "DeleteNamespace", fmt.Sprintf("Marking for deletion")) - _, err = nc.federatedApiClient.Core().Namespaces().Update(updatedNamespace) - if err != nil { - return fmt.Errorf("failed to update namespace: %v", err) - } - } - - if nc.hasFinalizerFuncInSpec(updatedNamespace, apiv1.FinalizerKubernetes) { - // Delete resources in this namespace. - err = nc.namespacedResourcesDeleter.Delete(updatedNamespace.Name) - if err != nil { - return fmt.Errorf("error in deleting resources in namespace %s: %v", namespace.Name, err) - } - glog.V(2).Infof("Removed kubernetes finalizer from ns %s", namespace.Name) - // Fetch the updated Namespace. - updatedNamespace, err = nc.federatedApiClient.Core().Namespaces().Get(updatedNamespace.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("error in fetching updated namespace %s: %s", updatedNamespace.Name, err) - } - } - - // Delete the namespace from all underlying clusters. - _, err = nc.deletionHelper.HandleObjectInUnderlyingClusters(updatedNamespace) - if err != nil { - return err - } - - err = nc.federatedApiClient.Core().Namespaces().Delete(namespace.Name, nil) - if err != nil { - // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of namespace finalizer deletion. - // The process that deleted the last finalizer is also going to delete the namespace and we do not have to do anything. - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete namespace: %v", err) - } - } - return nil -} diff --git a/federation/pkg/federation-controller/namespace/namespace_controller_test.go b/federation/pkg/federation-controller/namespace/namespace_controller_test.go deleted file mode 100644 index 461f84c65c..0000000000 --- a/federation/pkg/federation-controller/namespace/namespace_controller_test.go +++ /dev/null @@ -1,188 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package namespace - -import ( - "fmt" - "testing" - "time" - - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/dynamic" - restclient "k8s.io/client-go/rest" - core "k8s.io/client-go/testing" - federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" - fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" - . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - namespaces string = "namespaces" - clusters string = "clusters" -) - -func TestNamespaceController(t *testing.T) { - cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) - cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) - ns1 := apiv1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-namespace", - SelfLink: "/api/v1/namespaces/test-namespace", - }, - Spec: apiv1.NamespaceSpec{ - Finalizers: []apiv1.FinalizerName{apiv1.FinalizerKubernetes}, - }, - } - - fakeClient := &fakefedclientset.Clientset{} - RegisterFakeList(clusters, &fakeClient.Fake, &federationapi.ClusterList{Items: []federationapi.Cluster{*cluster1}}) - RegisterFakeList(namespaces, &fakeClient.Fake, &apiv1.NamespaceList{Items: []apiv1.Namespace{}}) - namespaceWatch := RegisterFakeWatch(namespaces, &fakeClient.Fake) - namespaceUpdateChan := RegisterFakeCopyOnUpdate(namespaces, &fakeClient.Fake, namespaceWatch) - clusterWatch := RegisterFakeWatch(clusters, &fakeClient.Fake) - - cluster1Client := &fakekubeclientset.Clientset{} - cluster1Watch := RegisterFakeWatch(namespaces, &cluster1Client.Fake) - RegisterFakeList(namespaces, &cluster1Client.Fake, &apiv1.NamespaceList{Items: []apiv1.Namespace{}}) - cluster1CreateChan := RegisterFakeCopyOnCreate(namespaces, &cluster1Client.Fake, cluster1Watch) - cluster1UpdateChan := RegisterFakeCopyOnUpdate(namespaces, &cluster1Client.Fake, cluster1Watch) - - cluster2Client := &fakekubeclientset.Clientset{} - cluster2Watch := RegisterFakeWatch(namespaces, &cluster2Client.Fake) - RegisterFakeList(namespaces, &cluster2Client.Fake, &apiv1.NamespaceList{Items: []apiv1.Namespace{}}) - cluster2CreateChan := RegisterFakeCopyOnCreate(namespaces, &cluster2Client.Fake, cluster2Watch) - - nsDeleteChan := RegisterDelete(&fakeClient.Fake, namespaces) - namespaceController := NewNamespaceController(fakeClient, dynamic.NewDynamicClientPool(&restclient.Config{})) - informerClientFactory := func(cluster *federationapi.Cluster) (kubeclientset.Interface, error) { - switch cluster.Name { - case cluster1.Name: - return cluster1Client, nil - case cluster2.Name: - return cluster2Client, nil - default: - return nil, fmt.Errorf("Unknown cluster") - } - } - setClientFactory(namespaceController.namespaceFederatedInformer, informerClientFactory) - namespaceController.clusterAvailableDelay = time.Second - namespaceController.namespaceReviewDelay = 50 * time.Millisecond - namespaceController.smallDelay = 20 * time.Millisecond - namespaceController.updateTimeout = 5 * time.Second - - stop := make(chan struct{}) - namespaceController.Run(stop) - - // Test add federated namespace. - namespaceWatch.Add(&ns1) - // Verify that the DeleteFromUnderlyingClusters finalizer is added to the namespace. - updatedNamespace := GetNamespaceFromChan(namespaceUpdateChan) - require.NotNil(t, updatedNamespace) - AssertHasFinalizer(t, updatedNamespace, deletionhelper.FinalizerDeleteFromUnderlyingClusters) - ns1 = *updatedNamespace - - // Verify that the namespace is created in underlying cluster1. - createdNamespace := GetNamespaceFromChan(cluster1CreateChan) - require.NotNil(t, createdNamespace) - assert.Equal(t, ns1.Name, createdNamespace.Name) - - // Wait for the namespace to appear in the informer store - err := WaitForStoreUpdate( - namespaceController.namespaceFederatedInformer.GetTargetStore(), - cluster1.Name, ns1.Name, wait.ForeverTestTimeout) - assert.Nil(t, err, "namespace should have appeared in the informer store") - - // Test update federated namespace. - ns1.Annotations = map[string]string{ - "A": "B", - } - namespaceWatch.Modify(&ns1) - assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, MetaAndSpecCheckingFunction(&ns1))) - - // Test add cluster - clusterWatch.Add(cluster2) - createdNamespace2 := GetNamespaceFromChan(cluster2CreateChan) - require.NotNil(t, createdNamespace2) - assert.Equal(t, ns1.Name, createdNamespace2.Name) - assert.Contains(t, createdNamespace2.Annotations, "A") - - // Delete the namespace with orphan finalizer (let namespaces - // in underlying clusters be as is). - // TODO: Add a test without orphan finalizer. - ns1.ObjectMeta.Finalizers = append(ns1.ObjectMeta.Finalizers, metav1.FinalizerOrphanDependents) - ns1.DeletionTimestamp = &metav1.Time{Time: time.Now()} - namespaceWatch.Modify(&ns1) - assert.Equal(t, ns1.Name, GetStringFromChan(nsDeleteChan)) - // TODO: Add a test for verifying that resources in the namespace are deleted - // when the namespace is deleted. - // Need a fake dynamic client to mock list and delete actions to be able to test this. - // TODO: Add a fake dynamic client and test this. - // In the meantime, e2e test verify that the resources in a namespace are - // deleted when the namespace is deleted. - close(stop) -} - -func setClientFactory(informer util.FederatedInformer, informerClientFactory func(*federationapi.Cluster) (kubeclientset.Interface, error)) { - testInformer := ToFederatedInformerForTestOnly(informer) - testInformer.SetClientFactory(informerClientFactory) -} - -func RegisterDeleteCollection(client *core.Fake, resource string) chan string { - deleteChan := make(chan string, 100) - client.AddReactor("delete-collection", resource, func(action core.Action) (bool, runtime.Object, error) { - deleteChan <- "all" - return true, nil, nil - }) - return deleteChan -} - -func RegisterDelete(client *core.Fake, resource string) chan string { - deleteChan := make(chan string, 100) - client.AddReactor("delete", resource, func(action core.Action) (bool, runtime.Object, error) { - deleteAction := action.(core.DeleteAction) - deleteChan <- deleteAction.GetName() - return true, nil, nil - }) - return deleteChan -} - -func GetStringFromChan(c chan string) string { - select { - case str := <-c: - return str - case <-time.After(5 * time.Second): - return "timedout" - } -} - -func GetNamespaceFromChan(c chan runtime.Object) *apiv1.Namespace { - if namespace := GetObjectFromChan(c); namespace == nil { - return nil - } else { - return namespace.(*apiv1.Namespace) - } -} diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index e4742769fa..3e02fc9bc7 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -28,7 +28,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", @@ -42,28 +41,18 @@ go_library( go_test( name = "go_default_test", - srcs = [ - "controller_test.go", - "deploymentcontroller_test.go", - ], + srcs = ["controller_test.go"], library = ":go_default_library", tags = ["automanaged"], deps = [ "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], ) diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 38eb0fc65e..f5a8331bfb 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -24,7 +24,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -97,7 +96,7 @@ type FederationSyncController struct { func StartFederationSyncController(kind string, adapterFactory federatedtypes.AdapterFactory, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) { restclient.AddUserAgent(config, fmt.Sprintf("federation-%s-controller", kind)) client := federationclientset.NewForConfigOrDie(config) - adapter := adapterFactory(client) + adapter := adapterFactory(client, config) controller := newFederationSyncController(client, adapter) if minimizeLatency { controller.minimizeLatency() @@ -189,9 +188,9 @@ func newFederationSyncController(client federationclientset.Interface, adapter f return err }, func(client kubeclientset.Interface, obj pkgruntime.Object) error { - namespacedName := adapter.NamespacedName(obj) + qualifiedName := adapter.QualifiedName(obj) orphanDependents := false - err := adapter.ClusterDelete(client, namespacedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) + err := adapter.ClusterDelete(client, qualifiedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) return err }) @@ -199,7 +198,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f s.updateObject, // objNameFunc func(obj pkgruntime.Object) string { - return adapter.NamespacedName(obj).String() + return adapter.QualifiedName(obj).String() }, s.informer, s.updater, @@ -264,38 +263,38 @@ func (s *FederationSyncController) worker() { } item := obj.(*util.DelayingDelivererItem) - namespacedName := item.Value.(*types.NamespacedName) - status := s.reconcile(*namespacedName) + qualifiedName := item.Value.(*federatedtypes.QualifiedName) + status := s.reconcile(*qualifiedName) s.workQueue.Done(item) switch status { case statusAllOK: break case statusError: - s.deliver(*namespacedName, 0, true) + s.deliver(*qualifiedName, 0, true) case statusNeedsRecheck: - s.deliver(*namespacedName, s.reviewDelay, false) + s.deliver(*qualifiedName, s.reviewDelay, false) case statusNotSynced: - s.deliver(*namespacedName, s.clusterAvailableDelay, false) + s.deliver(*qualifiedName, s.clusterAvailableDelay, false) } } } func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) { - namespacedName := s.adapter.NamespacedName(obj) - s.deliver(namespacedName, delay, failed) + qualifiedName := s.adapter.QualifiedName(obj) + s.deliver(qualifiedName, delay, failed) } // Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. -func (s *FederationSyncController) deliver(namespacedName types.NamespacedName, delay time.Duration, failed bool) { - key := namespacedName.String() +func (s *FederationSyncController) deliver(qualifiedName federatedtypes.QualifiedName, delay time.Duration, failed bool) { + key := qualifiedName.String() if failed { s.backoff.Next(key, time.Now()) delay = delay + s.backoff.Get(key) } else { s.backoff.Reset(key) } - s.deliverer.DeliverAfter(key, &namespacedName, delay) + s.deliverer.DeliverAfter(key, &qualifiedName, delay) } // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet @@ -322,18 +321,18 @@ func (s *FederationSyncController) reconcileOnClusterChange() { s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) } for _, obj := range s.store.List() { - namespacedName := s.adapter.NamespacedName(obj.(pkgruntime.Object)) - s.deliver(namespacedName, s.smallDelay, false) + qualifiedName := s.adapter.QualifiedName(obj.(pkgruntime.Object)) + s.deliver(qualifiedName, s.smallDelay, false) } } -func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) reconciliationStatus { +func (s *FederationSyncController) reconcile(qualifiedName federatedtypes.QualifiedName) reconciliationStatus { if !s.isSynced() { return statusNotSynced } kind := s.adapter.Kind() - key := namespacedName.String() + key := qualifiedName.String() glog.V(4).Infof("Starting to reconcile %v %v", kind, key) startTime := time.Now() @@ -349,10 +348,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName meta := s.adapter.ObjectMeta(obj) if meta.DeletionTimestamp != nil { - err := s.delete(obj, kind, namespacedName) + err := s.delete(obj, kind, qualifiedName) if err != nil { msg := "Failed to delete %s %q: %v" - args := []interface{}{kind, namespacedName, err} + args := []interface{}{kind, qualifiedName, err} runtime.HandleError(fmt.Errorf(msg, args...)) s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...) return statusError @@ -415,14 +414,25 @@ func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Ob } // delete deletes the given resource or returns error if the deletion was not complete. -func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, namespacedName types.NamespacedName) error { - glog.V(3).Infof("Handling deletion of %s %q", kind, namespacedName) +func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, qualifiedName federatedtypes.QualifiedName) error { + glog.V(3).Infof("Handling deletion of %s %q", kind, qualifiedName) + + // Perform pre-deletion cleanup for the namespace adapter + namespaceAdapter, ok := s.adapter.(*federatedtypes.NamespaceAdapter) + if ok { + var err error + obj, err = namespaceAdapter.CleanUpNamespace(obj, s.eventRecorder) + if err != nil { + return err + } + } + _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj) if err != nil { return err } - err = s.adapter.FedDelete(namespacedName, nil) + err = s.adapter.FedDelete(qualifiedName, nil) if err != nil { // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. // This is expected when we are processing an update as a result of finalizer deletion. diff --git a/federation/pkg/federation-controller/sync/deploymentcontroller_test.go b/federation/pkg/federation-controller/sync/deploymentcontroller_test.go deleted file mode 100644 index cae13c2c32..0000000000 --- a/federation/pkg/federation-controller/sync/deploymentcontroller_test.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sync - -import ( - "flag" - "fmt" - "testing" - - apiv1 "k8s.io/api/core/v1" - extensionsv1 "k8s.io/api/extensions/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" - fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" - "k8s.io/kubernetes/federation/pkg/federatedtypes" - . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" - - "github.com/stretchr/testify/assert" -) - -const ( - deployments = "deployments" -) - -func TestDeploymentController(t *testing.T) { - flag.Set("logtostderr", "true") - flag.Set("v", "5") - flag.Parse() - - cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) - cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) - - fakeClient := &fakefedclientset.Clientset{} - // Add an update reactor on fake client to return the desired updated object. - // This is a hack to workaround https://github.com/kubernetes/kubernetes/issues/40939. - AddFakeUpdateReactor(deployments, &fakeClient.Fake) - RegisterFakeList("clusters", &fakeClient.Fake, &fedv1.ClusterList{Items: []fedv1.Cluster{*cluster1}}) - deploymentsWatch := RegisterFakeWatch(deployments, &fakeClient.Fake) - clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake) - - cluster1Client := &fakekubeclientset.Clientset{} - cluster1Watch := RegisterFakeWatch(deployments, &cluster1Client.Fake) - cluster1CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster1Client.Fake, cluster1Watch) - cluster1UpdateChan := RegisterFakeCopyOnUpdate(deployments, &cluster1Client.Fake, cluster1Watch) - - cluster2Client := &fakekubeclientset.Clientset{} - cluster2Watch := RegisterFakeWatch(deployments, &cluster2Client.Fake) - cluster2CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster2Client.Fake, cluster2Watch) - - deploymentController := newFederationSyncController(fakeClient, federatedtypes.NewDeploymentAdapter(fakeClient)) - deploymentController.minimizeLatency() - clientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) { - switch cluster.Name { - case cluster1.Name: - return cluster1Client, nil - case cluster2.Name: - return cluster2Client, nil - default: - return nil, fmt.Errorf("Unknown cluster") - } - } - ToFederatedInformerForTestOnly(deploymentController.informer).SetClientFactory(clientFactory) - - stop := make(chan struct{}) - go deploymentController.Run(stop) - - // Create deployment. Expect to see it in cluster1. - dep1 := newDeploymentWithReplicas("depA", 6) - deploymentsWatch.Add(dep1) - checkDeployment := func(base *extensionsv1.Deployment, replicas int32) CheckingFunction { - return func(obj runtime.Object) error { - if obj == nil { - return fmt.Errorf("Observed object is nil") - } - d := obj.(*extensionsv1.Deployment) - if err := CompareObjectMeta(base.ObjectMeta, d.ObjectMeta); err != nil { - return err - } - if replicas != *d.Spec.Replicas { - return fmt.Errorf("Replica count is different expected:%d observed:%d", replicas, *d.Spec.Replicas) - } - return nil - } - } - assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas))) - err := WaitForStoreUpdate( - deploymentController.informer.GetTargetStore(), - cluster1.Name, types.NamespacedName{Namespace: dep1.Namespace, Name: dep1.Name}.String(), wait.ForeverTestTimeout) - assert.Nil(t, err, "deployment should have appeared in the informer store") - - // Increase replica count. Expect to see the update in cluster1. - newRep := int32(8) - dep1.Spec.Replicas = &newRep - deploymentsWatch.Modify(dep1) - assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, checkDeployment(dep1, *dep1.Spec.Replicas))) - - // Add new cluster. Although rebalance = false, no pods have been created yet so it should - // rebalance anyway. - clusterWatch.Add(cluster2) - assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, checkDeployment(dep1, *dep1.Spec.Replicas/2))) - assert.NoError(t, CheckObjectFromChan(cluster2CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas/2))) - - // Add new deployment with non-default replica placement preferences. - dep2 := newDeploymentWithReplicas("deployment2", 9) - dep2.Annotations = make(map[string]string) - dep2.Annotations[federatedtypes.FedDeploymentPreferencesAnnotation] = `{"rebalance": true, - "clusters": { - "cluster1": {"weight": 2}, - "cluster2": {"weight": 1} - }}` - deploymentsWatch.Add(dep2) - assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep2, 6))) - assert.NoError(t, CheckObjectFromChan(cluster2CreateChan, checkDeployment(dep2, 3))) -} - -func newDeploymentWithReplicas(name string, replicas int32) *extensionsv1.Deployment { - return &extensionsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: metav1.NamespaceDefault, - SelfLink: "/api/v1/namespaces/default/deployments/name", - }, - Spec: extensionsv1.DeploymentSpec{ - Replicas: &replicas, - }, - } -} diff --git a/test/e2e_federation/crud.go b/test/e2e_federation/crud.go index 7536f0c294..a4960d1877 100644 --- a/test/e2e_federation/crud.go +++ b/test/e2e_federation/crud.go @@ -45,7 +45,7 @@ var _ = framework.KubeDescribe("Federated types [Feature:Federation][Experimenta if clusterClients == nil { clusterClients = f.GetClusterClients() } - adapter := fedType.AdapterFactory(f.FederationClientset) + adapter := fedType.AdapterFactory(f.FederationClientset, f.FederationConfig) crudTester := fedframework.NewFederatedTypeCRUDTester(adapter, clusterClients) obj := adapter.NewTestObject(f.FederationNamespace.Name) crudTester.CheckLifecycle(obj) diff --git a/test/e2e_federation/framework/framework.go b/test/e2e_federation/framework/framework.go index 04c12c18c9..35f8fd22eb 100644 --- a/test/e2e_federation/framework/framework.go +++ b/test/e2e_federation/framework/framework.go @@ -26,6 +26,8 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/test/e2e/framework" @@ -44,7 +46,10 @@ type Framework struct { // should abort, the AfterSuite hook should run all Cleanup actions. cleanupHandle framework.CleanupActionHandle + FederationConfig *restclient.Config + FederationClientset *federation_clientset.Clientset + FederationNamespace *v1.Namespace } @@ -73,10 +78,16 @@ func (f *Framework) FederationBeforeEach() { // https://github.com/onsi/ginkgo/issues/222 f.cleanupHandle = framework.AddCleanupAction(f.FederationAfterEach) + if f.FederationConfig == nil { + By("Reading the federation configuration") + var err error + f.FederationConfig, err = LoadFederatedConfig(&clientcmd.ConfigOverrides{}) + Expect(err).NotTo(HaveOccurred()) + } if f.FederationClientset == nil { By("Creating a release 1.5 federation Clientset") var err error - f.FederationClientset, err = LoadFederationClientset() + f.FederationClientset, err = LoadFederationClientset(f.FederationConfig) Expect(err).NotTo(HaveOccurred()) } By("Waiting for federation-apiserver to be ready") diff --git a/test/e2e_federation/framework/util.go b/test/e2e_federation/framework/util.go index 0d9b167386..fbf0b708c1 100644 --- a/test/e2e_federation/framework/util.go +++ b/test/e2e_federation/framework/util.go @@ -65,12 +65,7 @@ func WaitForFederationApiserverReady(c *federation_clientset.Clientset) error { }) } -func LoadFederationClientset() (*federation_clientset.Clientset, error) { - config, err := LoadFederatedConfig(&clientcmd.ConfigOverrides{}) - if err != nil { - return nil, err - } - +func LoadFederationClientset(config *restclient.Config) (*federation_clientset.Clientset, error) { c, err := federation_clientset.NewForConfig(config) if err != nil { return nil, fmt.Errorf("error creating federation clientset: %v", err.Error()) @@ -83,7 +78,7 @@ func LoadFederatedConfig(overrides *clientcmd.ConfigOverrides) (*restclient.Conf if err != nil { return nil, fmt.Errorf("error creating federation client config: %v", err.Error()) } - cfg, err := clientcmd.NewDefaultClientConfig(*c, overrides).ClientConfig() + cfg, err := clientcmd.NewDefaultClientConfig(*c, &clientcmd.ConfigOverrides{}).ClientConfig() if cfg != nil { //TODO(colhom): this is only here because https://github.com/kubernetes/kubernetes/issues/25422 cfg.NegotiatedSerializer = api.Codecs diff --git a/test/e2e_federation/namespace.go b/test/e2e_federation/namespace.go index 01a17bc663..0a98185193 100644 --- a/test/e2e_federation/namespace.go +++ b/test/e2e_federation/namespace.go @@ -66,39 +66,6 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func } }) - It("should be created and deleted successfully", func() { - fedframework.SkipUnlessFederated(f.ClientSet) - - nsName = createNamespace(f.FederationClientset.Core().Namespaces()) - - By(fmt.Sprintf("Deleting namespace %s", nsName)) - deleteNamespace(nil, nsName, - f.FederationClientset.Core().Namespaces().Get, - f.FederationClientset.Core().Namespaces().Delete) - By(fmt.Sprintf("Verified that deletion succeeded")) - }) - - It("should be deleted from underlying clusters when OrphanDependents is false", func() { - fedframework.SkipUnlessFederated(f.ClientSet) - orphanDependents := false - nsName = verifyNsCascadingDeletion(f.FederationClientset.Core().Namespaces(), clusters, &orphanDependents) - By(fmt.Sprintf("Verified that namespaces were deleted from underlying clusters")) - }) - - It("should not be deleted from underlying clusters when OrphanDependents is true", func() { - fedframework.SkipUnlessFederated(f.ClientSet) - orphanDependents := true - nsName = verifyNsCascadingDeletion(f.FederationClientset.Core().Namespaces(), clusters, &orphanDependents) - By(fmt.Sprintf("Verified that namespaces were not deleted from underlying clusters")) - }) - - It("should not be deleted from underlying clusters when OrphanDependents is nil", func() { - fedframework.SkipUnlessFederated(f.ClientSet) - - nsName = verifyNsCascadingDeletion(f.FederationClientset.Core().Namespaces(), clusters, nil) - By(fmt.Sprintf("Verified that namespaces were not deleted from underlying clusters")) - }) - // See https://github.com/kubernetes/kubernetes/issues/38225 It("deletes replicasets in the namespace when the namespace is deleted", func() { fedframework.SkipUnlessFederated(f.ClientSet) diff --git a/test/e2e_federation/upgrades/simple.go b/test/e2e_federation/upgrades/simple.go index 8d51c59abe..75854ed297 100644 --- a/test/e2e_federation/upgrades/simple.go +++ b/test/e2e_federation/upgrades/simple.go @@ -38,7 +38,7 @@ type SimpleUpgradeTest struct { // Setup creates a resource and validates its propagation to member clusters func (ut *SimpleUpgradeTest) Setup(f *fedframework.Framework) { - adapter := ut.adapterFactory(f.FederationClientset) + adapter := ut.adapterFactory(f.FederationClientset, f.FederationConfig) clients := f.GetClusterClients() ut.crudTester = fedframework.NewFederatedTypeCRUDTester(adapter, clients) diff --git a/test/e2e_node/services/internal_services.go b/test/e2e_node/services/internal_services.go index 15f943220c..499b624ae3 100644 --- a/test/e2e_node/services/internal_services.go +++ b/test/e2e_node/services/internal_services.go @@ -20,6 +20,8 @@ import ( "io/ioutil" "os" + "k8s.io/kubernetes/test/e2e/framework" + "github.com/golang/glog" ) @@ -128,7 +130,7 @@ func (es *e2eServices) startApiServer() error { // startNamespaceController starts the embedded namespace controller or returns an error. func (es *e2eServices) startNamespaceController() error { glog.Info("Starting namespace controller") - es.nsController = NewNamespaceController() + es.nsController = NewNamespaceController(framework.TestContext.Host) return es.nsController.Start() } diff --git a/test/e2e_node/services/namespace_controller.go b/test/e2e_node/services/namespace_controller.go index aca771301f..4a35779e82 100644 --- a/test/e2e_node/services/namespace_controller.go +++ b/test/e2e_node/services/namespace_controller.go @@ -26,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" - "k8s.io/kubernetes/test/e2e/framework" ) const ( @@ -40,18 +39,19 @@ const ( // NamespaceController is a server which manages namespace controller. type NamespaceController struct { + host string stopCh chan struct{} } // NewNamespaceController creates a new namespace controller. -func NewNamespaceController() *NamespaceController { - return &NamespaceController{stopCh: make(chan struct{})} +func NewNamespaceController(host string) *NamespaceController { + return &NamespaceController{host: host, stopCh: make(chan struct{})} } // Start starts the namespace controller. func (n *NamespaceController) Start() error { // Use the default QPS - config := restclient.AddUserAgent(&restclient.Config{Host: framework.TestContext.Host}, ncName) + config := restclient.AddUserAgent(&restclient.Config{Host: n.host}, ncName) client, err := clientset.NewForConfig(config) if err != nil { return err diff --git a/test/integration/federation/crud_test.go b/test/integration/federation/crud_test.go index e6d4e54387..703eac9dd8 100644 --- a/test/integration/federation/crud_test.go +++ b/test/integration/federation/crud_test.go @@ -104,7 +104,7 @@ func initCRUDTest(t *testing.T, fedFixture *framework.FederationFixture, adapter fixture := framework.NewControllerFixture(t, kind, adapterFactory, config) client := fedFixture.APIFixture.NewClient(fmt.Sprintf("crud-test-%s", kind)) - adapter := adapterFactory(client) + adapter := adapterFactory(client, config) crudTester := framework.NewFederatedTypeCRUDTester(t, adapter, fedFixture.ClusterClients) diff --git a/test/integration/federation/framework/BUILD b/test/integration/federation/framework/BUILD index 775a77499e..edc8a86ba9 100644 --- a/test/integration/federation/framework/BUILD +++ b/test/integration/federation/framework/BUILD @@ -28,6 +28,7 @@ go_library( "//federation/pkg/federation-controller/sync:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/master:go_default_library", + "//test/e2e_node/services:go_default_library", "//test/integration/framework:go_default_library", "//vendor/github.com/pborman/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/test/integration/federation/framework/federation.go b/test/integration/federation/framework/federation.go index cf371bb09f..0242298b08 100644 --- a/test/integration/federation/framework/federation.go +++ b/test/integration/federation/framework/federation.go @@ -27,14 +27,16 @@ import ( clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/test/e2e_node/services" "k8s.io/kubernetes/test/integration/framework" ) type MemberCluster struct { - CloseFn framework.CloseFunc - Config *master.Config - Client clientset.Interface - Host string + CloseFn framework.CloseFunc + Config *master.Config + Client clientset.Interface + Host string + namespaceController *services.NamespaceController } // FederationFixture manages a federation api server and a set of member clusters @@ -42,10 +44,11 @@ type FederationFixture struct { APIFixture *FederationAPIFixture DesiredClusterCount int Clusters []*MemberCluster - ClusterClients []clientset.Interface - ClusterController *clustercontroller.ClusterController - fedClient federationclientset.Interface - stopChan chan struct{} + + ClusterClients []clientset.Interface + ClusterController *clustercontroller.ClusterController + fedClient federationclientset.Interface + stopChan chan struct{} } func (f *FederationFixture) SetUp(t *testing.T) { @@ -79,12 +82,18 @@ func (f *FederationFixture) StartCluster(t *testing.T) { clusterClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig) f.ClusterClients = append(f.ClusterClients, clusterClient) - f.Clusters = append(f.Clusters, &MemberCluster{ - CloseFn: closeFn, - Config: config, - Client: clusterClient, - Host: host, - }) + memberCluster := &MemberCluster{ + CloseFn: closeFn, + Config: config, + Client: clusterClient, + Host: host, + namespaceController: services.NewNamespaceController(host), + } + f.Clusters = append(f.Clusters, memberCluster) + err := memberCluster.namespaceController.Start() + if err != nil { + t.Fatal(err) + } clusterId := len(f.ClusterClients) @@ -115,6 +124,10 @@ func (f *FederationFixture) TearDown(t *testing.T) { f.stopChan = nil } for _, cluster := range f.Clusters { + // Need to close controllers with active connections to the + // cluster api before stopping the api or the connections will + // hang until tcp timeout. + cluster.namespaceController.Stop() cluster.CloseFn() } f.Clusters = nil diff --git a/test/test_owners.csv b/test/test_owners.csv index 674a8dac03..09be7003a1 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -599,7 +599,6 @@ k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns,madhusuda k8s.io/kubernetes/federation/pkg/federation-controller/cluster,nikhiljindal,0, k8s.io/kubernetes/federation/pkg/federation-controller/deployment,zmerlynn,1, k8s.io/kubernetes/federation/pkg/federation-controller/ingress,vishh,1, -k8s.io/kubernetes/federation/pkg/federation-controller/namespace,rrati,0, k8s.io/kubernetes/federation/pkg/federation-controller/replicaset,roberthbailey,1, k8s.io/kubernetes/federation/pkg/federation-controller/secret,apelisse,1, k8s.io/kubernetes/federation/pkg/federation-controller/service,pmorie,1,