diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index a1182f6a9d..f4bacc0c93 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -17,7 +17,9 @@ limitations under the License. package ingress import ( + "fmt" "reflect" + "sync" "time" federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" @@ -25,12 +27,14 @@ import ( "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/cache" kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/conversion" pkg_runtime "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" @@ -40,24 +44,40 @@ import ( ) const ( - allClustersKey = "ALL_CLUSTERS" - staticIPAnnotationKey = "ingress.kubernetes.io/static-ip" // TODO: Get this directly from the Kubernetes Ingress Controller constant + // Special cluster name which denotes all clusters - only used internally. It's not a valid cluster name, so effectively reserved. + allClustersKey = ".ALL_CLUSTERS" + // TODO: Get the constants below directly from the Kubernetes Ingress Controller constants - but thats in a separate repo + staticIPNameKeyWritable = "kubernetes.io/ingress.global-static-ip-name" // The writable annotation on Ingress to tell the controller to use a specific, named, static IP + staticIPNameKeyReadonly = "static-ip" // The readonly key via which the cluster's Ingress Controller communicates which static IP it used. If staticIPNameKeyWritable above is specified, it is used. + uidAnnotationKey = "kubernetes.io/ingress.uid" // The annotation on federation clusters, where we store the ingress UID + uidConfigMapName = "ingress-uid" // Name of the config-map and key the ingress controller stores its uid in. + uidConfigMapNamespace = "kube-system" + uidKey = "uid" ) type IngressController struct { - // For triggering single ingress reconciliation. This is used when there is an + sync.Mutex // Lock used for leader election + // For triggering single ingress reconcilation. This is used when there is an // add/update/delete operation on an ingress in either federated API server or // in some member of the federation. ingressDeliverer *util.DelayingDeliverer - // For triggering reconciliation of all ingresses. This is used when - // a new cluster becomes available. + // For triggering reconcilation of cluster ingress controller configmap and + // all ingresses. This is used when a new cluster becomes available. clusterDeliverer *util.DelayingDeliverer + // For triggering reconcilation of cluster ingress controller configmap. + // This is used when a configmap is updated in the cluster. + configMapDeliverer *util.DelayingDeliverer + // Contains ingresses present in members of federation. ingressFederatedInformer util.FederatedInformer - // For updating members of federation. - federatedUpdater util.FederatedUpdater + // Contains ingress controller configmaps present in members of federation. + configMapFederatedInformer util.FederatedInformer + // For updating ingresses in members of federation. + federatedIngressUpdater util.FederatedUpdater + // For updating configmaps in members of federation. + federatedConfigMapUpdater util.FederatedUpdater // Definitions of ingresses that should be federated. ingressInformerStore cache.Store // Informer controller for ingresses that should be federated. @@ -68,11 +88,14 @@ type IngressController struct { // Backoff manager for ingresses ingressBackoff *flowcontrol.Backoff + // Backoff manager for configmaps + configMapBackoff *flowcontrol.Backoff // For events eventRecorder record.EventRecorder ingressReviewDelay time.Duration + configMapReviewDelay time.Duration clusterAvailableDelay time.Duration smallDelay time.Duration updateTimeout time.Duration @@ -80,23 +103,26 @@ type IngressController struct { // NewIngressController returns a new ingress controller func NewIngressController(client federation_release_1_4.Interface) *IngressController { + glog.V(4).Infof("->NewIngressController V(4)") broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-ingress-controller"}) - ic := &IngressController{ federatedApiClient: client, ingressReviewDelay: time.Second * 10, + configMapReviewDelay: time.Second * 10, clusterAvailableDelay: time.Second * 20, smallDelay: time.Second * 3, updateTimeout: time.Second * 30, ingressBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), eventRecorder: recorder, + configMapBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), } // Build deliverers for triggering reconciliations. ic.ingressDeliverer = util.NewDelayingDeliverer() ic.clusterDeliverer = util.NewDelayingDeliverer() + ic.configMapDeliverer = util.NewDelayingDeliverer() // Start informer in federated API servers on ingresses that should be federated. ic.ingressInformerStore, ic.ingressInformerController = framework.NewInformer( @@ -142,24 +168,72 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *federation_api.Cluster) { - // When new cluster becomes available process all the ingresses again. - ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay)) + // When new cluster becomes available process all the ingresses again, and configure it's ingress controller's configmap with the correct UID + ic.clusterDeliverer.DeliverAfter(cluster.Name, cluster, ic.clusterAvailableDelay) }, }, ) - // Federated updater along with Create/Update/Delete operations. - ic.federatedUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, + // Federated informer on configmaps for ingress controllers in members of the federation. + ic.configMapFederatedInformer = util.NewFederatedInformer( + client, + func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) { + glog.V(4).Infof("Returning new informer for cluster %q", cluster.Name) + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + if targetClient == nil { + glog.Errorf("Internal error: targetClient is nil") + } + return targetClient.Core().ConfigMaps(uidConfigMapNamespace).List(options) // we only want to list one by name - unfortunately Kubernetes don't have a selector for that. + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + if targetClient == nil { + glog.Errorf("Internal error: targetClient is nil") + } + return targetClient.Core().ConfigMaps(uidConfigMapNamespace).Watch(options) // as above + }, + }, + &v1.ConfigMap{}, + controller.NoResyncPeriodFunc(), + // Trigger reconcilation whenever the ingress controller's configmap in a federated cluster is changed. In most cases it + // would be just confirmation that the configmap for the ingress controller is correct. + util.NewTriggerOnAllChanges( + func(obj pkg_runtime.Object) { + ic.deliverConfigMapObj(cluster.Name, obj, ic.configMapReviewDelay, false) + }, + )) + }, + + &util.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *federation_api.Cluster) { + ic.clusterDeliverer.DeliverAfter(cluster.Name, cluster, ic.clusterAvailableDelay) + }, + }, + ) + + // Federated ingress updater along with Create/Update/Delete operations. + ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { ingress := obj.(*extensions_v1beta1.Ingress) glog.V(4).Infof("Attempting to create Ingress: %v", ingress) _, err := client.Extensions().Ingresses(ingress.Namespace).Create(ingress) + if err != nil { + glog.Errorf("Error creating ingress %q: %v", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, err) + } else { + glog.V(4).Infof("Successfully created ingress %q", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}) + } return err }, func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { ingress := obj.(*extensions_v1beta1.Ingress) glog.V(4).Infof("Attempting to update Ingress: %v", ingress) _, err := client.Extensions().Ingresses(ingress.Namespace).Update(ingress) + if err != nil { + glog.V(4).Infof("Failed to update Ingress: %v", err) + } else { + glog.V(4).Infof("Successfully updated Ingress: %q", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}) + } return err }, func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { @@ -168,6 +242,35 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr err := client.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &api.DeleteOptions{}) return err }) + + // Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called. + ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, + func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { + configMap := obj.(*v1.ConfigMap) + configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} + glog.Errorf("Internal error: Incorrectly attempting to create ConfigMap: %q", configMapName) + _, err := client.Core().ConfigMaps(configMap.Namespace).Create(configMap) + return err + }, + func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { + configMap := obj.(*v1.ConfigMap) + configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} + glog.V(4).Infof("Attempting to update ConfigMap: %v", configMap) + _, err := client.Core().ConfigMaps(configMap.Namespace).Update(configMap) + if err == nil { + glog.V(4).Infof("Successfully updated ConfigMap %q", configMapName) + } else { + glog.V(4).Infof("Failed to update ConfigMap %q: %v", configMapName, err) + } + return err + }, + func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { + configMap := obj.(*v1.ConfigMap) + configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} + glog.Errorf("Internal error: Incorrectly attempting to delete ConfigMap: %q", configMapName) + err := client.Core().ConfigMaps(configMap.Namespace).Delete(configMap.Name, &api.DeleteOptions{}) + return err + }) return ic } @@ -176,25 +279,46 @@ func (ic *IngressController) Run(stopChan <-chan struct{}) { go ic.ingressInformerController.Run(stopChan) glog.Infof("... Starting Ingress Federated Informer") ic.ingressFederatedInformer.Start() + glog.Infof("... Starting ConfigMap Federated Informer") + ic.configMapFederatedInformer.Start() go func() { <-stopChan - glog.Infof("Stopping Ingress Controller") + glog.Infof("Stopping Ingress Federated Informer") ic.ingressFederatedInformer.Stop() + glog.Infof("Stopping ConfigMap Federated Informer") + ic.configMapFederatedInformer.Stop() }() ic.ingressDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { ingress := item.Value.(types.NamespacedName) glog.V(4).Infof("Ingress change delivered, reconciling: %v", ingress) ic.reconcileIngress(ingress) }) - ic.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - glog.V(4).Infof("Cluster change delivered, reconciling ingresses") - ic.reconcileIngressesOnClusterChange() + ic.clusterDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + clusterName := item.Key + if clusterName != allClustersKey { + glog.V(4).Infof("Cluster change delivered for cluster %q, reconciling configmap and ingress for that cluster", clusterName) + } else { + glog.V(4).Infof("Cluster change delivered for all clusters, reconciling configmaps and ingresses for all clusters") + } + ic.reconcileIngressesOnClusterChange(clusterName) + ic.reconcileConfigMapForCluster(clusterName) + }) + ic.configMapDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + clusterName := item.Key + if clusterName != allClustersKey { + glog.V(4).Infof("ConfigMap change delivered for cluster %q, reconciling configmap for that cluster", clusterName) + } else { + glog.V(4).Infof("ConfigMap change delivered for all clusters, reconciling configmaps for all clusters") + } + ic.reconcileConfigMapForCluster(clusterName) }) go func() { select { case <-time.After(time.Minute): glog.V(4).Infof("Ingress controller is garbage collecting") ic.ingressBackoff.GC() + ic.configMapBackoff.GC() + glog.V(4).Infof("Ingress controller garbage collection complete") case <-stopChan: return } @@ -218,40 +342,240 @@ func (ic *IngressController) deliverIngress(ingress types.NamespacedName, delay ic.ingressDeliverer.DeliverAfter(key, ingress, delay) } +func (ic *IngressController) deliverConfigMapObj(clusterName string, obj interface{}, delay time.Duration, failed bool) { + configMap := obj.(*v1.ConfigMap) + ic.deliverConfigMap(clusterName, types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}, delay, failed) +} + +func (ic *IngressController) deliverConfigMap(cluster string, configMap types.NamespacedName, delay time.Duration, failed bool) { + key := cluster + if failed { + ic.configMapBackoff.Next(key, time.Now()) + delay = delay + ic.configMapBackoff.Get(key) + } else { + ic.configMapBackoff.Reset(key) + } + glog.V(4).Infof("Delivering ConfigMap for cluster %q (delay %q): %s", cluster, delay, configMap) + ic.configMapDeliverer.DeliverAfter(key, configMap, delay) +} + // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // synced with the corresponding api server. func (ic *IngressController) isSynced() bool { if !ic.ingressFederatedInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") + glog.V(2).Infof("Cluster list not synced for ingress federated informer") return false } clusters, err := ic.ingressFederatedInformer.GetReadyClusters() if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) + glog.Errorf("Failed to get ready clusters for ingress federated informer: %v", err) return false } if !ic.ingressFederatedInformer.GetTargetStore().ClustersSynced(clusters) { - glog.V(2).Infof("Target store not synced") + glog.V(2).Infof("Target store not synced for ingress federated informer") + return false + } + if !ic.configMapFederatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced for config map federated informer") + return false + } + clusters, err = ic.configMapFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters for configmap federated informer: %v", err) + return false + } + if !ic.configMapFederatedInformer.GetTargetStore().ClustersSynced(clusters) { + glog.V(2).Infof("Target store not synced for configmap federated informer") return false } glog.V(4).Infof("Cluster list is synced") return true } -// The function triggers reconciliation of all federated ingresses. -func (ic *IngressController) reconcileIngressesOnClusterChange() { - glog.V(4).Infof("Reconciling ingresses on cluster change") +// The function triggers reconcilation of all federated ingresses. clusterName is the name of the cluster that changed +// but all ingresses in all clusters are reconciled +func (ic *IngressController) reconcileIngressesOnClusterChange(clusterName string) { + glog.V(4).Infof("Reconciling ingresses on cluster change for cluster %q", clusterName) if !ic.isSynced() { - ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay)) + glog.V(4).Infof("Not synced, will try again later to reconcile ingresses.") + ic.clusterDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay) } - for _, obj := range ic.ingressInformerStore.List() { + ingressList := ic.ingressInformerStore.List() + if len(ingressList) <= 0 { + glog.V(4).Infof("No federated ingresses to reconcile.") + } + + for _, obj := range ingressList { ingress := obj.(*extensions_v1beta1.Ingress) - ic.deliverIngress(types.NamespacedName{Namespace: ingress.Namespace, Name: ingress.Name}, ic.smallDelay, false) + nsName := types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace} + glog.V(4).Infof("Delivering federated ingress %q for cluster %q", nsName, clusterName) + ic.deliverIngress(nsName, ic.smallDelay, false) + } +} + +/* + reconcileConfigMapForCluster ensures that the configmap for the ingress controller in the cluster has objectmeta.data.UID + consistent with all the other clusters in the federation. If clusterName == allClustersKey, then all avaliable clusters + configmaps are reconciled. +*/ +func (ic *IngressController) reconcileConfigMapForCluster(clusterName string) { + glog.V(4).Infof("Reconciling ConfigMap for cluster(s) %q", clusterName) + + if !ic.isSynced() { + ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay) + return + } + + if clusterName == allClustersKey { + clusters, err := ic.configMapFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters. redelivering %q: %v", clusterName, err) + ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay) + return + } + for _, cluster := range clusters { + glog.V(4).Infof("Delivering ConfigMap for cluster(s) %q", clusterName) + ic.configMapDeliverer.DeliverAt(cluster.Name, nil, time.Now()) + } + return + } else { + cluster, found, err := ic.configMapFederatedInformer.GetReadyCluster(clusterName) + if err != nil || !found { + glog.Errorf("Internal error: Cluster %q queued for configmap reconciliation, but not found. Will try again later: error = %v", clusterName, err) + ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay) + return + } + uidConfigMapNamespacedName := types.NamespacedName{Name: uidConfigMapName, Namespace: uidConfigMapNamespace} + configMapObj, found, err := ic.configMapFederatedInformer.GetTargetStore().GetByKey(cluster.Name, uidConfigMapNamespacedName.String()) + if !found || err != nil { + glog.Errorf("Failed to get ConfigMap %q for cluster %q. Will try again later: %v", uidConfigMapNamespacedName, cluster.Name, err) + ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.configMapReviewDelay) + return + } + glog.V(4).Infof("Successfully got ConfigMap %q for cluster %q.", uidConfigMapNamespacedName, clusterName) + configMap, ok := configMapObj.(*v1.ConfigMap) + if !ok { + glog.Errorf("Internal error: The object in the ConfigMap cache for cluster %q configmap %q is not a *ConfigMap", cluster.Name, uidConfigMapNamespacedName) + return + } + ic.reconcileConfigMap(cluster, configMap) + return + } +} + +/* + reconcileConfigMap ensures that the configmap in the cluster has a UID + consistent with the federation cluster's associated annotation. + + 1. If the UID in the configmap differs from the UID stored in the cluster's annotation, the configmap is updated. + 2. If the UID annotation is missing from the cluster, the cluster's UID annotation is updated to be consistent + with the master cluster. + 3. If there is no elected master cluster, this cluster attempts to elect itself as the master cluster. + + In cases 2 and 3, the configmaps will be updated in the next cycle, triggered by the federation cluster update(s) + +*/ +func (ic *IngressController) reconcileConfigMap(cluster *federation_api.Cluster, configMap *v1.ConfigMap) { + ic.Lock() // TODO: Reduce the scope of this master election lock. + defer ic.Unlock() + + configMapNsName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} + glog.V(4).Infof("Reconciling ConfigMap %q in cluster %q", configMapNsName, cluster.Name) + + clusterIngressUID, clusterIngressUIDExists := cluster.ObjectMeta.Annotations[uidAnnotationKey] + configMapUID, ok := configMap.Data[uidKey] + + if !ok { + glog.Errorf("Warning: ConfigMap %q in cluster %q does not contain data key %q. Therefore it cannot become the master.", configMapNsName, cluster.Name, uidKey) + } + + if !clusterIngressUIDExists || clusterIngressUID == "" { + ic.updateClusterIngressUIDToMasters(cluster, configMapUID) // Second argument is the fallback, in case this is the only cluster, in which case it becomes the master + return + } + if configMapUID != clusterIngressUID { // An update is required + glog.V(4).Infof("Ingress UID update is required: configMapUID %q not equal to clusterIngressUID %q", configMapUID, clusterIngressUID) + configMap.Data[uidKey] = clusterIngressUID + operations := []util.FederatedOperation{{ + Type: util.OperationTypeUpdate, + Obj: configMap, + ClusterName: cluster.Name, + }} + glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations) + err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout) + if err != nil { + configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} + glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapName, cluster.Name, err) + ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay) + } + } else { + glog.V(4).Infof("Ingress UID update is not required: configMapUID %q is equal to clusterIngressUID %q", configMapUID, clusterIngressUID) + } +} + +/* + getMasterCluster returns the cluster which is the elected master w.r.t. ingress UID, and it's ingress UID. + If there is no elected master cluster, an error is returned. + All other clusters must use the ingress UID of the elected master. +*/ +func (ic *IngressController) getMasterCluster() (master *federation_api.Cluster, ingressUID string, err error) { + clusters, err := ic.configMapFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get cluster list: %v", err) + return nil, "", err + } + + for _, c := range clusters { + UID, exists := c.ObjectMeta.Annotations[uidAnnotationKey] + if exists && UID != "" { // Found the master cluster + glog.V(4).Infof("Found master cluster %q with annotation %q=%q", c.Name, uidAnnotationKey, UID) + return c, UID, nil + } + } + return nil, "", fmt.Errorf("Failed to find master cluster with annotation %q", uidAnnotationKey) +} + +/* + updateClusterIngressUIDToMasters takes the ingress UID annotation on the master cluster and applies it to cluster. + If there is no master cluster, then fallbackUID is used (and hence this cluster becomes the master). +*/ +func (ic *IngressController) updateClusterIngressUIDToMasters(cluster *federation_api.Cluster, fallbackUID string) { + masterCluster, masterUID, err := ic.getMasterCluster() + clusterObj, clusterErr := conversion.NewCloner().DeepCopy(cluster) // Make a clone so that we don't clobber our input param + cluster, ok := clusterObj.(*federation_api.Cluster) + if clusterErr != nil || !ok { + glog.Errorf("Internal error: Failed clone cluster resource while attempting to add master ingress UID annotation (%q = %q) from master cluster %q to cluster %q, will try again later: %v", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name, err) + return + } + if err == nil { + if masterCluster.Name != cluster.Name { // We're not the master, need to get in sync + cluster.ObjectMeta.Annotations[uidAnnotationKey] = masterUID + if _, err = ic.federatedApiClient.Federation().Clusters().Update(cluster); err != nil { + glog.Errorf("Failed to add master ingress UID annotation (%q = %q) from master cluster %q to cluster %q, will try again later: %v", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name, err) + return + } else { + glog.V(4).Infof("Successfully added master ingress UID annotation (%q = %q) from master cluster %q to cluster %q.", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name) + } + } else { + glog.V(4).Infof("Cluster %q with ingress UID is already the master with annotation (%q = %q), no need to update.", cluster.Name, uidAnnotationKey, cluster.ObjectMeta.Annotations[uidAnnotationKey]) + } + } else { + glog.V(2).Infof("No master cluster found to source an ingress UID from for cluster %q. Attempting to elect new master cluster %q with ingress UID %q = %q", cluster.Name, cluster.Name, uidAnnotationKey, fallbackUID) + if fallbackUID != "" { + cluster.ObjectMeta.Annotations[uidAnnotationKey] = fallbackUID + if _, err = ic.federatedApiClient.Federation().Clusters().Update(cluster); err != nil { + glog.Errorf("Failed to add ingress UID annotation (%q = %q) to cluster %q. No master elected. Will try again later: %v", uidAnnotationKey, fallbackUID, cluster.Name, err) + } else { + glog.V(4).Infof("Successfully added ingress UID annotation (%q = %q) to cluster %q.", uidAnnotationKey, fallbackUID, cluster.Name) + } + } else { + glog.Errorf("No master cluster exists, and fallbackUID for cluster %q is invalid (%q). This probably means that no clusters have an ingress controller configmap with key %q. Federated Ingress currently supports clusters running Google Loadbalancer Controller (\"GLBC\")", cluster.Name, fallbackUID, uidKey) + } } } func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { - glog.V(4).Infof("Reconciling ingress %q", ingress) + glog.V(4).Infof("Reconciling ingress %q for all clusters", ingress) if !ic.isSynced() { ic.deliverIngress(ingress, ic.clusterAvailableDelay, false) return @@ -269,22 +593,29 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { glog.V(4).Infof("Ingress %q is not federated. Ignoring.", ingress) return } - baseIngress := baseIngressObj.(*extensions_v1beta1.Ingress) + baseIngress, ok := baseIngressObj.(*extensions_v1beta1.Ingress) + if !ok { + glog.Errorf("Internal Error: Object retrieved from ingressInformerStore with key %q is not of correct type *extensions_v1beta1.Ingress: %v", key, baseIngressObj) + } else { + glog.V(4).Infof("Base (federated) ingress: %v", baseIngress) + } clusters, err := ic.ingressFederatedInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get cluster list: %v", err) ic.deliverIngress(ingress, ic.clusterAvailableDelay, false) return + } else { + glog.V(4).Infof("Found %d ready clusters across which to reconcile ingress %q", len(clusters), ingress) } operations := make([]util.FederatedOperation, 0) for clusterIndex, cluster := range clusters { - _, baseIPExists := baseIngress.ObjectMeta.Annotations[staticIPAnnotationKey] - clusterIngressObj, found, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + baseIPName, baseIPAnnotationExists := baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] + clusterIngressObj, clusterIngressFound, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { - glog.Errorf("Failed to get %s from %s: %v", ingress, cluster.Name, err) + glog.Errorf("Failed to get cached ingress %s for cluster %s, will retry: %v", ingress, cluster.Name, err) ic.deliverIngress(ingress, 0, true) return } @@ -292,8 +623,26 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { ObjectMeta: baseIngress.ObjectMeta, Spec: baseIngress.Spec, } + objMeta, err := conversion.NewCloner().DeepCopy(baseIngress.ObjectMeta) + if err != nil { + glog.Errorf("Error deep copying ObjectMeta: %v", err) + } + objSpec, err := conversion.NewCloner().DeepCopy(baseIngress.Spec) + if err != nil { + glog.Errorf("Error deep copying Spec: %v", err) + } + desiredIngress.ObjectMeta, ok = objMeta.(v1.ObjectMeta) + if !ok { + glog.Errorf("Internal error: Failed to cast to v1.ObjectMeta: %v", objMeta) + } + desiredIngress.Spec = objSpec.(extensions_v1beta1.IngressSpec) + if !ok { + glog.Errorf("Internal error: Failed to cast to extensions_v1beta1.IngressSpec: %v", objSpec) + } + glog.V(4).Infof("Desired Ingress: %v", desiredIngress) - if !found { + if !clusterIngressFound { + glog.V(4).Infof("No existing Ingress %s in cluster %s - checking if appropriate to queue a create operation", ingress, cluster.Name) // We can't supply server-created fields when creating a new object. desiredIngress.ObjectMeta.ResourceVersion = "" desiredIngress.ObjectMeta.UID = "" @@ -307,63 +656,87 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { // Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated // index 0, but eventually all ingresses will share the single global IP recorded in the annotation // of the federated ingress. - if baseIPExists || (clusterIndex == 0) { + if baseIPAnnotationExists || (clusterIndex == 0) { + glog.V(4).Infof("No existing Ingress %s in cluster %s (index %d) and static IP annotation (%q) on base ingress - queuing a create operation", ingress, cluster.Name, clusterIndex, staticIPNameKeyWritable) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeAdd, Obj: desiredIngress, ClusterName: cluster.Name, }) + } else { + glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex) } } else { clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress) - glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required", ingress, cluster.Name) - clusterIPName, clusterIPExists := clusterIngress.ObjectMeta.Annotations[staticIPAnnotationKey] - if !baseIPExists && clusterIPExists { - // Add annotation to federated ingress via API. - original, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Get(baseIngress.Name) - if err == nil { - original.ObjectMeta.Annotations[staticIPAnnotationKey] = clusterIPName - if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(original); err != nil { - glog.Errorf("Failed to add static IP annotation to federated ingress %q: %v", ingress, err) + glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required (in either direction)", ingress, cluster.Name) + clusterIPName, clusterIPNameExists := clusterIngress.ObjectMeta.Annotations[staticIPNameKeyReadonly] + baseLBStatusExists := len(baseIngress.Status.LoadBalancer.Ingress) > 0 + clusterLBStatusExists := len(clusterIngress.Status.LoadBalancer.Ingress) > 0 + logStr := fmt.Sprintf("Cluster ingress %q has annotation %q=%q, loadbalancer status exists? [%v], federated ingress has annotation %q=%q, loadbalancer status exists? [%v]. %%s annotation and/or loadbalancer status from cluster ingress to federated ingress.", ingress, staticIPNameKeyReadonly, clusterIPName, clusterLBStatusExists, staticIPNameKeyWritable, baseIPName, baseLBStatusExists) + if (!baseIPAnnotationExists && clusterIPNameExists) || (!baseLBStatusExists && clusterLBStatusExists) { // copy the IP name from the readonly annotation on the cluster ingress, to the writable annotation on the federated ingress + glog.V(4).Infof(logStr, "Transferring") + if !baseIPAnnotationExists && clusterIPNameExists { + baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] = clusterIPName + } + if !baseLBStatusExists && clusterLBStatusExists { + lbstatusObj, lbErr := conversion.NewCloner().DeepCopy(&clusterIngress.Status.LoadBalancer) + lbstatus, ok := lbstatusObj.(*v1.LoadBalancerStatus) + if lbErr != nil || !ok { + glog.Errorf("Internal error: Failed to clone LoadBalancerStatus of %q in cluster %q while attempting to update master loadbalancer ingress status, will try again later. error: %v, Object to be cloned: %v", ingress, cluster.Name, lbErr, lbstatusObj) + ic.deliverIngress(ingress, ic.ingressReviewDelay, true) + return } + baseIngress.Status.LoadBalancer = *lbstatus + } + glog.V(4).Infof("Attempting to update base federated ingress: %v", baseIngress) + if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil { + glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err) + ic.deliverIngress(ingress, ic.ingressReviewDelay, true) + return } else { - glog.Errorf("Failed to get federated ingress %q: %v", ingress, err) + glog.V(4).Infof("Successfully added static IP annotation to federated ingress: %q", ingress) + ic.deliverIngress(ingress, ic.smallDelay, false) + return } + } else { + glog.V(4).Infof(logStr, "Not transferring") } - // Update existing ingress, if needed. - if !util.ObjectMetaIsEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) || - !reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) { - // TODO: In some cases Ingress controllers in the clusters add annotations, so we ideally need to exclude those from - // the equivalence comparison to cut down on unnecessary updates. + // Update existing cluster ingress, if needed. + if util.ObjectMetaEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) && + reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) && + reflect.DeepEqual(baseIngress.Status.LoadBalancer.Ingress, clusterIngress.Status.LoadBalancer.Ingress) { + glog.V(4).Infof("Ingress %q in cluster %q does not need an update: cluster ingress is equivalent to federated ingress", ingress, cluster.Name) + } else { glog.V(4).Infof("Ingress %s in cluster %s needs an update: cluster ingress %v is not equivalent to federated ingress %v", ingress, cluster.Name, clusterIngress, desiredIngress) - // We need to use server-created fields from the cluster, not the desired object when updating. - desiredIngress.ObjectMeta.ResourceVersion = clusterIngress.ObjectMeta.ResourceVersion - desiredIngress.ObjectMeta.UID = clusterIngress.ObjectMeta.UID - // Merge any annotations on the federated ingress onto the underlying cluster ingress, - // overwriting duplicates. - // TODO: We should probably use a PATCH operation for this instead. - for key, val := range baseIngress.ObjectMeta.Annotations { - desiredIngress.ObjectMeta.Annotations[key] = val - } - ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateInCluster", - "Updating ingress in cluster %s", cluster.Name) + if !util.ObjectMetaEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) { + // Merge any annotations on the federated ingress onto the underlying cluster ingress, + // overwriting duplicates. + for key, val := range baseIngress.ObjectMeta.Annotations { + desiredIngress.ObjectMeta.Annotations[key] = val + } + ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateInCluster", + "Updating ingress in cluster %s", cluster.Name) - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeUpdate, - Obj: desiredIngress, - ClusterName: cluster.Name, - }) + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeUpdate, + Obj: desiredIngress, + ClusterName: cluster.Name, + }) + // TODO: Transfer any readonly (target-proxy, url-map etc) annotations from the master cluster to the federation, if this is the master cluster. + // This is only for consistency, so that the federation ingress metadata matches the underlying clusters. It's not actually required. + } } } } if len(operations) == 0 { // Everything is in order + glog.V(4).Infof("Ingress %q is up-to-date in all clusters - no propagation to clusters required.", ingress) return } glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations) - err = ic.federatedUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) { - ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "FailedUpdateInCluster", + err = ic.federatedIngressUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) { + ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "FailedClusterUpdate", "Ingress update in cluster %s failed: %v", op.ClusterName, operror) }) if err != nil { @@ -371,7 +744,4 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { ic.deliverIngress(ingress, ic.ingressReviewDelay, true) return } - - // Evertyhing is in order but lets be double sure - TODO: quinton: Why? This seems like a hack. - ic.deliverIngress(ingress, ic.ingressReviewDelay, false) } diff --git a/federation/pkg/federation-controller/ingress/ingress_controller_test.go b/federation/pkg/federation-controller/ingress/ingress_controller_test.go index ab8a9ab1d3..c183abeb2a 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller_test.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller_test.go @@ -24,6 +24,7 @@ import ( federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" api_v1 "k8s.io/kubernetes/pkg/api/v1" extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" @@ -35,29 +36,40 @@ import ( ) func TestIngressController(t *testing.T) { + fakeClusterList := federation_api.ClusterList{Items: []federation_api.Cluster{}} + fakeConfigMapList1 := api_v1.ConfigMapList{Items: []api_v1.ConfigMap{}} + fakeConfigMapList2 := api_v1.ConfigMapList{Items: []api_v1.ConfigMap{}} cluster1 := NewCluster("cluster1", api_v1.ConditionTrue) cluster2 := NewCluster("cluster2", api_v1.ConditionTrue) + cfg1 := NewConfigMap("foo") + cfg2 := NewConfigMap("bar") // Different UID from cfg1, so that we can check that they get reconciled. - fakeClient := &fake_federation_release_1_4.Clientset{} - RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}}) - RegisterFakeList("ingresses", &fakeClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) - ingressWatch := RegisterFakeWatch("ingresses", &fakeClient.Fake) - clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake) + t.Log("Creating fake infrastructure") + fedClient := &fake_federation_release_1_4.Clientset{} + RegisterFakeList("clusters", &fedClient.Fake, &fakeClusterList) + RegisterFakeList("ingresses", &fedClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) + fedIngressWatch := RegisterFakeWatch("ingresses", &fedClient.Fake) + clusterWatch := RegisterFakeWatch("clusters", &fedClient.Fake) + fedClusterUpdateChan := RegisterFakeCopyOnUpdate("clusters", &fedClient.Fake, clusterWatch) + fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch) cluster1Client := &fake_kube_release_1_4.Clientset{} - cluster1Watch := RegisterFakeWatch("ingresses", &cluster1Client.Fake) RegisterFakeList("ingresses", &cluster1Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) - cluster1CreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1Watch) - cluster1UpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1Watch) + RegisterFakeList("configmaps", &cluster1Client.Fake, &fakeConfigMapList1) + cluster1IngressWatch := RegisterFakeWatch("ingresses", &cluster1Client.Fake) + cluster1ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster1Client.Fake) + cluster1IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1IngressWatch) + cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch) cluster2Client := &fake_kube_release_1_4.Clientset{} - cluster2Watch := RegisterFakeWatch("ingresses", &cluster2Client.Fake) RegisterFakeList("ingresses", &cluster2Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) - cluster2CreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2Watch) + RegisterFakeList("configmaps", &cluster2Client.Fake, &fakeConfigMapList2) + cluster2IngressWatch := RegisterFakeWatch("ingresses", &cluster2Client.Fake) + cluster2ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster2Client.Fake) + cluster2IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2IngressWatch) + cluster2ConfigMapUpdateChan := RegisterFakeCopyOnUpdate("configmaps", &cluster2Client.Fake, cluster2ConfigMapWatch) - ingressController := NewIngressController(fakeClient) - informer := ToFederatedInformerForTestOnly(ingressController.ingressFederatedInformer) - informer.SetClientFactory(func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) { + clientFactoryFunc := func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) { switch cluster.Name { case cluster1.Name: return cluster1Client, nil @@ -66,50 +78,129 @@ func TestIngressController(t *testing.T) { default: return nil, fmt.Errorf("Unknown cluster") } - }) + } + ingressController := NewIngressController(fedClient) + ingressInformer := ToFederatedInformerForTestOnly(ingressController.ingressFederatedInformer) + ingressInformer.SetClientFactory(clientFactoryFunc) + configMapInformer := ToFederatedInformerForTestOnly(ingressController.configMapFederatedInformer) + configMapInformer.SetClientFactory(clientFactoryFunc) ingressController.clusterAvailableDelay = time.Second ingressController.ingressReviewDelay = 50 * time.Millisecond + ingressController.configMapReviewDelay = 50 * time.Millisecond ingressController.smallDelay = 20 * time.Millisecond ingressController.updateTimeout = 5 * time.Second stop := make(chan struct{}) + t.Log("Running Ingress Controller") ingressController.Run(stop) ing1 := extensions_v1beta1.Ingress{ ObjectMeta: api_v1.ObjectMeta{ - Name: "test-ingress", - Namespace: "mynamespace", - SelfLink: "/api/v1/namespaces/mynamespaces/ingress/test-ingress", + Name: "test-ingress", + Namespace: "mynamespace", + SelfLink: "/api/v1/namespaces/mynamespace/ingress/test-ingress", + Annotations: map[string]string{}, + }, + Status: extensions_v1beta1.IngressStatus{ + LoadBalancer: api_v1.LoadBalancerStatus{ + Ingress: make([]api_v1.LoadBalancerIngress, 0, 0), + }, }, } + t.Log("Adding cluster 1") + clusterWatch.Add(cluster1) + + t.Log("Adding Ingress UID ConfigMap to cluster 1") + cluster1ConfigMapWatch.Add(cfg1) + + t.Log("Checking that UID annotation on Cluster 1 annotation was correctly updated") + cluster := GetClusterFromChan(fedClusterUpdateChan) + assert.NotNil(t, cluster) + assert.Equal(t, cluster.ObjectMeta.Annotations[uidAnnotationKey], cfg1.Data[uidKey]) + // Test add federated ingress. - ingressWatch.Add(&ing1) - createdIngress := GetIngressFromChan(cluster1CreateChan) + t.Log("Adding Federated Ingress") + fedIngressWatch.Add(&ing1) + t.Log("Checking that Ingress was correctly created in cluster 1") + createdIngress := GetIngressFromChan(t, cluster1IngressCreateChan) assert.NotNil(t, createdIngress) - assert.True(t, reflect.DeepEqual(&ing1, createdIngress)) + assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress.Spec), "Spec of created ingress is not equal") + assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress.ObjectMeta), "Metadata of created object is not equivalent") + + // Test that IP address gets transferred from cluster ingress to federated ingress. + t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress") + createdIngress.Status.LoadBalancer.Ingress = append(createdIngress.Status.LoadBalancer.Ingress, api_v1.LoadBalancerIngress{IP: "1.2.3.4"}) + cluster1IngressWatch.Modify(createdIngress) + updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan) + assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress") + if updatedIngress != nil { + assert.True(t, reflect.DeepEqual(createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress), fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v", createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress)) + } // Test update federated ingress. - ing1.Annotations = map[string]string{ - "A": "B", - } - ingressWatch.Modify(&ing1) - updatedIngress := GetIngressFromChan(cluster1UpdateChan) - assert.NotNil(t, updatedIngress) - assert.True(t, reflect.DeepEqual(&ing1, updatedIngress)) - + updatedIngress.ObjectMeta.Annotations["A"] = "B" + t.Log("Modifying Federated Ingress") + fedIngressWatch.Modify(updatedIngress) + t.Log("Checking that Ingress was correctly updated in cluster 1") + updatedIngress2 := GetIngressFromChan(t, cluster1IngressUpdateChan) + assert.NotNil(t, updatedIngress2) + assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, updatedIngress.Spec), "Spec of updated ingress is not equal") + assert.True(t, util.ObjectMetaEquivalent(updatedIngress2.ObjectMeta, updatedIngress.ObjectMeta), "Metadata of updated object is not equivalent") // Test add cluster - ing1.Annotations[staticIPAnnotationKey] = "foo" // Make sure that the base object has a static IP name first. - ingressWatch.Modify(&ing1) + t.Log("Adding a second cluster") + ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first. + fedIngressWatch.Modify(&ing1) clusterWatch.Add(cluster2) - createdIngress2 := GetIngressFromChan(cluster2CreateChan) + // First check that the original values are not equal - see above comment + assert.NotEqual(t, cfg1.Data[uidKey], cfg2.Data[uidKey], fmt.Sprintf("ConfigMap in cluster 2 must initially not equal that in cluster 1 for this test - please fix test")) + cluster2ConfigMapWatch.Add(cfg2) + t.Log("Checking that the ingress got created in cluster 2") + createdIngress2 := GetIngressFromChan(t, cluster2IngressCreateChan) assert.NotNil(t, createdIngress2) - assert.True(t, reflect.DeepEqual(&ing1, createdIngress2)) + assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress2.Spec), "Spec of created ingress is not equal") + assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent") + + t.Log("Checking that the configmap in cluster 2 got updated.") + updatedConfigMap2 := GetConfigMapFromChan(cluster2ConfigMapUpdateChan) + assert.NotNil(t, updatedConfigMap2, fmt.Sprintf("ConfigMap in cluster 2 was not updated (or more likely the test is broken and the API type written is wrong)")) + if updatedConfigMap2 != nil { + assert.Equal(t, cfg1.Data[uidKey], updatedConfigMap2.Data[uidKey], + fmt.Sprintf("UID's in configmaps in cluster's 1 and 2 are not equal (%q != %q)", cfg1.Data["uid"], updatedConfigMap2.Data["uid"])) + } close(stop) } -func GetIngressFromChan(c chan runtime.Object) *extensions_v1beta1.Ingress { - ingress := GetObjectFromChan(c).(*extensions_v1beta1.Ingress) +func GetIngressFromChan(t *testing.T, c chan runtime.Object) *extensions_v1beta1.Ingress { + obj := GetObjectFromChan(c) + ingress, ok := obj.(*extensions_v1beta1.Ingress) + if !ok { + t.Logf("Object on channel was not of type *extensions_v1beta1.Ingress: %v", obj) + } return ingress } + +func GetConfigMapFromChan(c chan runtime.Object) *api_v1.ConfigMap { + configMap, _ := GetObjectFromChan(c).(*api_v1.ConfigMap) + return configMap +} + +func GetClusterFromChan(c chan runtime.Object) *federation_api.Cluster { + cluster, _ := GetObjectFromChan(c).(*federation_api.Cluster) + return cluster +} + +func NewConfigMap(uid string) *api_v1.ConfigMap { + return &api_v1.ConfigMap{ + ObjectMeta: api_v1.ObjectMeta{ + Name: uidConfigMapName, + Namespace: uidConfigMapNamespace, + SelfLink: "/api/v1/namespaces/" + uidConfigMapNamespace + "/configmap/" + uidConfigMapName, + Annotations: map[string]string{}, + }, + Data: map[string]string{ + uidKey: uid, + }, + } +} diff --git a/federation/pkg/federation-controller/util/delaying_deliverer.go b/federation/pkg/federation-controller/util/delaying_deliverer.go index e63a620c03..e087ad6d97 100644 --- a/federation/pkg/federation-controller/util/delaying_deliverer.go +++ b/federation/pkg/federation-controller/util/delaying_deliverer.go @@ -162,7 +162,7 @@ func (d *DelayingDeliverer) DeliverAfter(key string, value interface{}, delay ti d.DeliverAt(key, value, time.Now().Add(delay)) } -// Gets target chanel of the deliverer. +// Gets target channel of the deliverer. func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem { return d.targetChannel } diff --git a/federation/pkg/federation-controller/util/federated_informer.go b/federation/pkg/federation-controller/util/federated_informer.go index 201724ea1f..cbdbc7fe20 100644 --- a/federation/pkg/federation-controller/util/federated_informer.go +++ b/federation/pkg/federation-controller/util/federated_informer.go @@ -85,9 +85,9 @@ type FederationView interface { ClustersSynced() bool } -// A structure that combines an informer running agains federated api server and listening for cluster updates +// A structure that combines an informer running against federated api server and listening for cluster updates // with multiple Kubernetes API informers (called target informers) running against federation members. Whenever a new -// cluster is added to the federation an informer is created for it using TargetInformerFactory. Infomrers are stoped +// cluster is added to the federation an informer is created for it using TargetInformerFactory. Informers are stopped // when a cluster is either put offline of deleted. It is assumed that some controller keeps an eye on the cluster list // and thus the clusters in ETCD are up to date. type FederatedInformer interface { @@ -186,18 +186,22 @@ func NewFederatedInformer( if clusterLifecycle.ClusterAvailable != nil { clusterLifecycle.ClusterAvailable(curCluster) } + } else { + glog.Errorf("Cluster %v not added. Not of correct type, or cluster not ready.", cur) } }, UpdateFunc: func(old, cur interface{}) { oldCluster, ok := old.(*federation_api.Cluster) if !ok { + glog.Errorf("Internal error: Cluster %v not updated. Old cluster not of correct type.", old) return } curCluster, ok := cur.(*federation_api.Cluster) if !ok { + glog.Errorf("Internal error: Cluster %v not updated. New cluster not of correct type.", cur) return } - if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { + if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) || !reflect.DeepEqual(oldCluster.ObjectMeta.Annotations, curCluster.ObjectMeta.Annotations) { var data []interface{} if clusterLifecycle.ClusterUnavailable != nil { data = getClusterData(oldCluster.Name) @@ -213,6 +217,8 @@ func NewFederatedInformer( clusterLifecycle.ClusterAvailable(curCluster) } } + } else { + glog.V(4).Infof("Cluster %v not updated to %v as ready status and specs are identical", oldCluster, curCluster) } }, }, @@ -258,11 +264,14 @@ type federatedStoreImpl struct { } func (f *federatedInformerImpl) Stop() { + glog.V(4).Infof("Stopping federated informer.") f.Lock() defer f.Unlock() + glog.V(4).Infof("... Closing cluster informer channel.") close(f.clusterInformer.stopChan) - for _, informer := range f.targetInformers { + for key, informer := range f.targetInformers { + glog.V(4).Infof("... Closing informer channel for %q.", key) close(informer.stopChan) } } @@ -291,14 +300,16 @@ func (f *federatedInformerImpl) GetClientsetForCluster(clusterName string) (kube func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName string) (kube_release_1_4.Interface, error) { // No locking needed. Will happen in f.GetCluster. + glog.V(4).Infof("Getting clientset for cluster %q", clusterName) if cluster, found, err := f.getReadyClusterUnlocked(clusterName); found && err == nil { + glog.V(4).Infof("Got clientset for cluster %q", clusterName) return f.clientFactory(cluster) } else { if err != nil { return nil, err } } - return nil, fmt.Errorf("cluster %s not found", clusterName) + return nil, fmt.Errorf("cluster %q not found", clusterName) } // GetReadyClusers returns all clusters for which the sub-informers are run. @@ -441,7 +452,7 @@ func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]FederatedObject, return result, nil } -// GetKey for returns the key under which the item would be put in the store. +// GetKeyFor returns the key under which the item would be put in the store. func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string { // TODO: support other keying functions. key, _ := framework.DeletionHandlingMetaNamespaceKeyFunc(item) diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index bb0303d68b..df6c07fd9f 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -64,10 +64,14 @@ func (wd *WatcherDispatcher) Add(obj runtime.Object) { func (wd *WatcherDispatcher) Modify(obj runtime.Object) { wd.Lock() defer wd.Unlock() + glog.V(4).Infof("->WatcherDispatcher.Modify(%v)", obj) wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: obj}) - for _, watcher := range wd.watchers { + for i, watcher := range wd.watchers { if !watcher.IsStopped() { + glog.V(4).Infof("->Watcher(%d).Modify(%v)", i, obj) watcher.Modify(obj) + } else { + glog.V(4).Infof("->Watcher(%d) is stopped. Not calling Modify(%v)", i, obj) } } } @@ -173,7 +177,7 @@ func GetObjectFromChan(c chan runtime.Object) runtime.Object { select { case obj := <-c: return obj - case <-time.After(10 * time.Second): + case <-time.After(20 * time.Second): pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) return nil } diff --git a/federation/pkg/federation-controller/util/util.go b/federation/pkg/federation-controller/util/util.go deleted file mode 100644 index e0bebee309..0000000000 --- a/federation/pkg/federation-controller/util/util.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "reflect" - - "k8s.io/kubernetes/pkg/api/v1" -) - -/* -ObjectMetaIsEquivalent determines whether two ObjectMeta's (typically one from a federated API object, -and the other from a cluster object) are equivalent. -*/ -func ObjectMetaIsEquivalent(m1, m2 v1.ObjectMeta) bool { - // First make all of the read-only fields equal, then perform a deep equality comparison - m1.SelfLink = m2.SelfLink // Might be different in different cluster contexts. - m1.UID = m2.UID // Definitely different in different cluster contexts - m1.ResourceVersion = m2.ResourceVersion // Definitely different in different cluster contexts - m1.Generation = m2.Generation // Might be different in different cluster contexts. - m1.CreationTimestamp = m2.CreationTimestamp // Definitely different in different cluster contexts. - m1.DeletionTimestamp = m2.DeletionTimestamp // Might be different in different cluster contexts. - m1.OwnerReferences = nil // Might be different in different cluster contexts. - m2.OwnerReferences = nil - m1.Finalizers = nil // Might be different in different cluster contexts. - m2.Finalizers = nil - - return reflect.DeepEqual(m1, m2) -} diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 90125ac6dd..5c2e4b9150 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -20,6 +20,8 @@ import ( "sync" "k8s.io/kubernetes/pkg/runtime" + + "github.com/golang/glog" ) // Interface can be implemented by anything that knows how to watch and report changes. @@ -100,6 +102,7 @@ func (f *FakeWatcher) Stop() { f.Lock() defer f.Unlock() if !f.Stopped { + glog.V(4).Infof("Stopping fake watcher.") close(f.result) f.Stopped = true }