From febe9adcf0ebd079f2afd59bb89e30a6208b0b0e Mon Sep 17 00:00:00 2001 From: kshafiee Date: Tue, 16 Aug 2016 03:04:56 +0000 Subject: [PATCH] Federated secret controller --- .../app/controllermanager.go | 5 + .../secret/secret_controller.go | 295 ++++++++++++++++++ .../secret/secret_controller_test.go | 174 +++++++++++ .../federation-controller/util/handlers.go | 29 ++ 4 files changed, 503 insertions(+) create mode 100644 federation/pkg/federation-controller/secret/secret_controller.go create mode 100644 federation/pkg/federation-controller/secret/secret_controller_test.go diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 187abd7159..74a58d0a67 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" + secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/pkg/client/restclient" @@ -150,5 +151,9 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err namespaceController := namespacecontroller.NewNamespaceController(nsClientset) namespaceController.Run(wait.NeverStop) + secretcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "secret-controller")) + secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset) + secretcontroller.Run(wait.NeverStop) + select {} } diff --git a/federation/pkg/federation-controller/secret/secret_controller.go b/federation/pkg/federation-controller/secret/secret_controller.go new file mode 100644 index 0000000000..771b9914f1 --- /dev/null +++ b/federation/pkg/federation-controller/secret/secret_controller.go @@ -0,0 +1,295 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "reflect" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +const ( + allClustersKey = "ALL_CLUSTERS" +) + +type SecretController struct { + // For triggering single secret reconcilation. This is used when there is an + // add/update/delete operation on a secret in either federated API server or + // in some member of the federation. + secretDeliverer *util.DelayingDeliverer + + // For triggering all secrets reconcilation. This is used when + // a new cluster becomes available. + clusterDeliverer *util.DelayingDeliverer + + // Contains secrets present in members of federation. + secretFederatedInformer util.FederatedInformer + // For updating members of federation. + federatedUpdater util.FederatedUpdater + // Definitions of secrets that should be federated. + secretInformerStore cache.Store + // Informer controller for secrets that should be federated. + secretInformerController framework.ControllerInterface + + // Client to federated api server. + federatedApiClient federation_release_1_4.Interface + + // Backoff manager for secrets + secretBackoff *flowcontrol.Backoff + + secretReviewDelay time.Duration + clusterAvailableDelay time.Duration + smallDelay time.Duration + updateTimeout time.Duration +} + +// NewSecretController returns a new secret controller +func NewSecretController(client federation_release_1_4.Interface) *SecretController { + secretcontroller := &SecretController{ + federatedApiClient: client, + secretReviewDelay: time.Second * 10, + clusterAvailableDelay: time.Second * 20, + smallDelay: time.Second * 3, + updateTimeout: time.Second * 30, + secretBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + } + + // Build delivereres for triggering reconcilations. + secretcontroller.secretDeliverer = util.NewDelayingDeliverer() + secretcontroller.clusterDeliverer = util.NewDelayingDeliverer() + + // Start informer in federated API servers on secrets that should be federated. + secretcontroller.secretInformerStore, secretcontroller.secretInformerController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return client.Core().Secrets(api_v1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Secrets(api_v1.NamespaceAll).Watch(options) + }, + }, + &api_v1.Secret{}, + controller.NoResyncPeriodFunc(), + util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { secretcontroller.deliverSecretObj(obj, 0, false) })) + + // Federated informer on secrets in members of federation. + secretcontroller.secretFederatedInformer = util.NewFederatedInformer( + client, + func(cluster *federation_api.Cluster, targetClient federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return targetClient.Core().Secrets(api_v1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return targetClient.Core().Secrets(api_v1.NamespaceAll).Watch(options) + }, + }, + &api_v1.Secret{}, + controller.NoResyncPeriodFunc(), + // Trigger reconcilation whenever something in federated cluster is changed. In most cases it + // would be just confirmation that some secret opration suceeded. + util.NewTriggerOnChanges( + func(obj pkg_runtime.Object) { + secretcontroller.deliverSecretObj(obj, secretcontroller.secretReviewDelay, false) + }, + )) + }, + + &util.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *federation_api.Cluster) { + // When new cluster becomes available process all the secrets again. + secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay)) + }, + }, + ) + + // Federated updeater along with Create/Update/Delete operations. + secretcontroller.federatedUpdater = util.NewFederatedUpdater(secretcontroller.secretFederatedInformer, + func(client federation_release_1_4.Interface, obj pkg_runtime.Object) error { + secret := obj.(*api_v1.Secret) + _, err := client.Core().Secrets(secret.Namespace).Create(secret) + return err + }, + func(client federation_release_1_4.Interface, obj pkg_runtime.Object) error { + secret := obj.(*api_v1.Secret) + _, err := client.Core().Secrets(secret.Namespace).Update(secret) + return err + }, + func(client federation_release_1_4.Interface, obj pkg_runtime.Object) error { + secret := obj.(*api_v1.Secret) + err := client.Core().Secrets(secret.Namespace).Delete(secret.Name, &api.DeleteOptions{}) + return err + }) + return secretcontroller +} + +func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) { + go secretcontroller.secretInformerController.Run(stopChan) + secretcontroller.secretFederatedInformer.Start() + go func() { + <-stopChan + secretcontroller.secretFederatedInformer.Stop() + }() + secretcontroller.secretDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + secret := item.Value.(string) + secretcontroller.reconcileSecret(secret) + }) + secretcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + secretcontroller.reconcileSecretsOnClusterChange() + }) + go func() { + select { + case <-time.After(time.Minute): + secretcontroller.secretBackoff.GC() + case <-stopChan: + return + } + }() +} + +func (secretcontroller *SecretController) deliverSecretObj(obj interface{}, delay time.Duration, failed bool) { + secret := obj.(*api_v1.Secret) + secretcontroller.deliverSecret(secret.Name, delay, failed) +} + +// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. +func (secretcontroller *SecretController) deliverSecret(secret string, delay time.Duration, failed bool) { + if failed { + secretcontroller.secretBackoff.Next(secret, time.Now()) + delay = delay + secretcontroller.secretBackoff.Get(secret) + } else { + secretcontroller.secretBackoff.Reset(secret) + } + secretcontroller.secretDeliverer.DeliverAfter(secret, secret, delay) +} + +// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet +// synced with the coresponding api server. +func (secretcontroller *SecretController) isSynced() bool { + if !secretcontroller.secretFederatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !secretcontroller.secretFederatedInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + return true +} + +// The function triggers reconcilation of all federated secrets. +func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() { + if !secretcontroller.isSynced() { + secretcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(secretcontroller.clusterAvailableDelay)) + } + for _, obj := range secretcontroller.secretInformerStore.List() { + secret := obj.(*api_v1.Secret) + secretcontroller.deliverSecret(secret.Name, secretcontroller.smallDelay, false) + } +} + +func (secretcontroller *SecretController) reconcileSecret(secret string) { + if !secretcontroller.isSynced() { + secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false) + return + } + + baseSecretObj, exist, err := secretcontroller.secretInformerStore.GetByKey(secret) + if err != nil { + glog.Errorf("Failed to query main secret store for %v: %v", secret, err) + secretcontroller.deliverSecret(secret, 0, true) + return + } + + if !exist { + // Not federated secret, ignoring. + return + } + baseSecret := baseSecretObj.(*api_v1.Secret) + + clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get cluster list: %v", err) + secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false) + return + } + + operations := make([]util.FederatedOperation, 0) + for _, cluster := range clusters { + clusterSecretObj, found, err := secretcontroller.secretFederatedInformer.GetTargetStore().GetByKey(cluster.Name, secret) + if err != nil { + glog.Errorf("Failed to get %s from %s: %v", secret, cluster.Name, err) + secretcontroller.deliverSecret(secret, 0, true) + return + } + + desiredSecret := &api_v1.Secret{ + ObjectMeta: baseSecret.ObjectMeta, + } + + if !found { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeAdd, + Obj: desiredSecret, + ClusterName: cluster.Name, + }) + } else { + clusterSecret := clusterSecretObj.(*api_v1.Secret) + + // Update existing secret, if needed. + if !reflect.DeepEqual(desiredSecret.ObjectMeta, clusterSecret.ObjectMeta) { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeUpdate, + Obj: desiredSecret, + ClusterName: cluster.Name, + }) + } + } + } + + if len(operations) == 0 { + // Everything is in order + return + } + err = secretcontroller.federatedUpdater.Update(operations, secretcontroller.updateTimeout) + if err != nil { + glog.Errorf("Failed to execute updates for %s: %v", secret, err) + secretcontroller.deliverSecret(secret, 0, true) + return + } + + // Evertyhing is in order but lets be double sure + secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false) +} diff --git a/federation/pkg/federation-controller/secret/secret_controller_test.go b/federation/pkg/federation-controller/secret/secret_controller_test.go new file mode 100644 index 0000000000..7acb575401 --- /dev/null +++ b/federation/pkg/federation-controller/secret/secret_controller_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package secret + +import ( + "fmt" + "testing" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "github.com/stretchr/testify/assert" +) + +func TestSecretController(t *testing.T) { + cluster1 := mkCluster("cluster1", api_v1.ConditionTrue) + cluster2 := mkCluster("cluster2", api_v1.ConditionTrue) + + fakeClient := &fake_federation_release_1_4.Clientset{} + RegisterList("clusters", fakeClient, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}}) + RegisterList("secrets", fakeClient, &api_v1.SecretList{Items: []api_v1.Secret{}}) + secretWatch := RegisterWatch("secrets", fakeClient) + clusterWatch := RegisterWatch("clusters", fakeClient) + + cluster1Client := &fake_federation_release_1_4.Clientset{} + cluster1Watch := RegisterWatch("secrets", cluster1Client) + RegisterList("secrets", cluster1Client, &api_v1.SecretList{Items: []api_v1.Secret{}}) + cluster1CreateChan := RegisterCopyOnCreate("secrets", cluster1Client, cluster1Watch) + cluster1UpdateChan := RegisterCopyOnUpdate("secrets", cluster1Client, cluster1Watch) + + cluster2Client := &fake_federation_release_1_4.Clientset{} + cluster2Watch := RegisterWatch("secrets", cluster2Client) + RegisterList("secrets", cluster2Client, &api_v1.SecretList{Items: []api_v1.Secret{}}) + cluster2CreateChan := RegisterCopyOnCreate("secrets", cluster2Client, cluster2Watch) + + secretController := NewSecretController(fakeClient) + informer := toFederatedInformerForTestOnly(secretController.secretFederatedInformer) + informer.SetClientFactory(func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) { + switch cluster.Name { + case cluster1.Name: + return cluster1Client, nil + case cluster2.Name: + return cluster2Client, nil + default: + return nil, fmt.Errorf("Unknown cluster") + } + }) + + secretController.clusterAvailableDelay = time.Second + secretController.secretReviewDelay = 50 * time.Millisecond + secretController.smallDelay = 20 * time.Millisecond + secretController.updateTimeout = 5 * time.Second + + stop := make(chan struct{}) + secretController.Run(stop) + + secret1 := api_v1.Secret{ + ObjectMeta: api_v1.ObjectMeta{ + Name: "test-secret", + }, + } + + // Test add federated secret. + secretWatch.Add(&secret1) + createdSecret := GetSecretFromChan(cluster1CreateChan) + assert.NotNil(t, createdSecret) + assert.Equal(t, secret1.Name, createdSecret.Name) + + // Test update federated secret. + secret1.Annotations = map[string]string{ + "A": "B", + } + secretWatch.Modify(&secret1) + updatedSecret := GetSecretFromChan(cluster1UpdateChan) + assert.NotNil(t, updatedSecret) + assert.Equal(t, secret1.Name, updatedSecret.Name) + + // Test add cluster + clusterWatch.Add(cluster2) + createdSecret2 := GetSecretFromChan(cluster2CreateChan) + assert.NotNil(t, createdSecret2) + assert.Equal(t, secret1.Name, createdSecret2.Name) + + close(stop) +} + +func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly { + inter := informer.(interface{}) + return inter.(util.FederatedInformerForTestOnly) +} + +func mkCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster { + return &federation_api.Cluster{ + ObjectMeta: api_v1.ObjectMeta{ + Name: name, + }, + Status: federation_api.ClusterStatus{ + Conditions: []federation_api.ClusterCondition{ + {Type: federation_api.ClusterReady, Status: readyStatus}, + }, + }, + } +} + +func RegisterWatch(resource string, client *fake_federation_release_1_4.Clientset) *watch.FakeWatcher { + watcher := watch.NewFake() + client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil }) + return watcher +} + +func RegisterList(resource string, client *fake_federation_release_1_4.Clientset, obj runtime.Object) { + client.AddReactor("list", resource, func(action core.Action) (bool, runtime.Object, error) { + return true, obj, nil + }) +} + +func RegisterCopyOnCreate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + obj := createAction.GetObject() + go func() { + watcher.Add(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func RegisterCopyOnUpdate(resource string, client *fake_federation_release_1_4.Clientset, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { + updateAction := action.(core.UpdateAction) + obj := updateAction.GetObject() + go func() { + watcher.Modify(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func GetSecretFromChan(c chan runtime.Object) *api_v1.Secret { + select { + case obj := <-c: + secret := obj.(*api_v1.Secret) + return secret + case <-time.After(time.Minute): + return nil + } +} diff --git a/federation/pkg/federation-controller/util/handlers.go b/federation/pkg/federation-controller/util/handlers.go index 5a61518435..17977d116c 100644 --- a/federation/pkg/federation-controller/util/handlers.go +++ b/federation/pkg/federation-controller/util/handlers.go @@ -74,3 +74,32 @@ func NewTriggerOnMetaAndSpecChanges(triggerFunc func(pkg_runtime.Object)) *frame }, } } + +// Returns framework.ResourceEventHandlerFuncs that trigger the given function +// on object add and delete. +func NewTriggerOnChanges(triggerFunc func(pkg_runtime.Object)) *framework.ResourceEventHandlerFuncs { + getFieldOrPanic := func(obj interface{}, fieldName string) interface{} { + val := reflect.ValueOf(obj).Elem().FieldByName(fieldName) + if val.IsValid() { + return val.Interface() + } else { + panic(fmt.Errorf("field not found: %s", fieldName)) + } + } + return &framework.ResourceEventHandlerFuncs{ + DeleteFunc: func(old interface{}) { + oldObj := old.(pkg_runtime.Object) + triggerFunc(oldObj) + }, + AddFunc: func(cur interface{}) { + curObj := cur.(pkg_runtime.Object) + triggerFunc(curObj) + }, + UpdateFunc: func(old, cur interface{}) { + curObj := cur.(pkg_runtime.Object) + if !reflect.DeepEqual(getFieldOrPanic(old, "ObjectMeta"), getFieldOrPanic(cur, "ObjectMeta")) { + triggerFunc(curObj) + } + }, + } +}