Federated Ingress: unify UID's across Cluster Ingress Controllers

Fixes #31180
pull/6/head
Quinton Hoole 2016-08-29 00:59:14 -07:00
parent ac8aae584d
commit fac6318c57
7 changed files with 590 additions and 154 deletions

View File

@ -17,7 +17,9 @@ limitations under the License.
package ingress
import (
"fmt"
"reflect"
"sync"
"time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
@ -25,12 +27,14 @@ import (
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
@ -40,24 +44,40 @@ import (
)
const (
allClustersKey = "ALL_CLUSTERS"
staticIPAnnotationKey = "ingress.kubernetes.io/static-ip" // TODO: Get this directly from the Kubernetes Ingress Controller constant
// Special cluster name which denotes all clusters - only used internally. It's not a valid cluster name, so effectively reserved.
allClustersKey = ".ALL_CLUSTERS"
// TODO: Get the constants below directly from the Kubernetes Ingress Controller constants - but thats in a separate repo
staticIPNameKeyWritable = "kubernetes.io/ingress.global-static-ip-name" // The writable annotation on Ingress to tell the controller to use a specific, named, static IP
staticIPNameKeyReadonly = "static-ip" // The readonly key via which the cluster's Ingress Controller communicates which static IP it used. If staticIPNameKeyWritable above is specified, it is used.
uidAnnotationKey = "kubernetes.io/ingress.uid" // The annotation on federation clusters, where we store the ingress UID
uidConfigMapName = "ingress-uid" // Name of the config-map and key the ingress controller stores its uid in.
uidConfigMapNamespace = "kube-system"
uidKey = "uid"
)
type IngressController struct {
// For triggering single ingress reconciliation. This is used when there is an
sync.Mutex // Lock used for leader election
// For triggering single ingress reconcilation. This is used when there is an
// add/update/delete operation on an ingress in either federated API server or
// in some member of the federation.
ingressDeliverer *util.DelayingDeliverer
// For triggering reconciliation of all ingresses. This is used when
// a new cluster becomes available.
// For triggering reconcilation of cluster ingress controller configmap and
// all ingresses. This is used when a new cluster becomes available.
clusterDeliverer *util.DelayingDeliverer
// For triggering reconcilation of cluster ingress controller configmap.
// This is used when a configmap is updated in the cluster.
configMapDeliverer *util.DelayingDeliverer
// Contains ingresses present in members of federation.
ingressFederatedInformer util.FederatedInformer
// For updating members of federation.
federatedUpdater util.FederatedUpdater
// Contains ingress controller configmaps present in members of federation.
configMapFederatedInformer util.FederatedInformer
// For updating ingresses in members of federation.
federatedIngressUpdater util.FederatedUpdater
// For updating configmaps in members of federation.
federatedConfigMapUpdater util.FederatedUpdater
// Definitions of ingresses that should be federated.
ingressInformerStore cache.Store
// Informer controller for ingresses that should be federated.
@ -68,11 +88,14 @@ type IngressController struct {
// Backoff manager for ingresses
ingressBackoff *flowcontrol.Backoff
// Backoff manager for configmaps
configMapBackoff *flowcontrol.Backoff
// For events
eventRecorder record.EventRecorder
ingressReviewDelay time.Duration
configMapReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
@ -80,23 +103,26 @@ type IngressController struct {
// NewIngressController returns a new ingress controller
func NewIngressController(client federation_release_1_4.Interface) *IngressController {
glog.V(4).Infof("->NewIngressController V(4)")
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-ingress-controller"})
ic := &IngressController{
federatedApiClient: client,
ingressReviewDelay: time.Second * 10,
configMapReviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
ingressBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
configMapBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
}
// Build deliverers for triggering reconciliations.
ic.ingressDeliverer = util.NewDelayingDeliverer()
ic.clusterDeliverer = util.NewDelayingDeliverer()
ic.configMapDeliverer = util.NewDelayingDeliverer()
// Start informer in federated API servers on ingresses that should be federated.
ic.ingressInformerStore, ic.ingressInformerController = framework.NewInformer(
@ -142,24 +168,72 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
// When new cluster becomes available process all the ingresses again.
ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay))
// When new cluster becomes available process all the ingresses again, and configure it's ingress controller's configmap with the correct UID
ic.clusterDeliverer.DeliverAfter(cluster.Name, cluster, ic.clusterAvailableDelay)
},
},
)
// Federated updater along with Create/Update/Delete operations.
ic.federatedUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer,
// Federated informer on configmaps for ingress controllers in members of the federation.
ic.configMapFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
glog.V(4).Infof("Returning new informer for cluster %q", cluster.Name)
return framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
if targetClient == nil {
glog.Errorf("Internal error: targetClient is nil")
}
return targetClient.Core().ConfigMaps(uidConfigMapNamespace).List(options) // we only want to list one by name - unfortunately Kubernetes don't have a selector for that.
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
if targetClient == nil {
glog.Errorf("Internal error: targetClient is nil")
}
return targetClient.Core().ConfigMaps(uidConfigMapNamespace).Watch(options) // as above
},
},
&v1.ConfigMap{},
controller.NoResyncPeriodFunc(),
// Trigger reconcilation whenever the ingress controller's configmap in a federated cluster is changed. In most cases it
// would be just confirmation that the configmap for the ingress controller is correct.
util.NewTriggerOnAllChanges(
func(obj pkg_runtime.Object) {
ic.deliverConfigMapObj(cluster.Name, obj, ic.configMapReviewDelay, false)
},
))
},
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
ic.clusterDeliverer.DeliverAfter(cluster.Name, cluster, ic.clusterAvailableDelay)
},
},
)
// Federated ingress updater along with Create/Update/Delete operations.
ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer,
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
ingress := obj.(*extensions_v1beta1.Ingress)
glog.V(4).Infof("Attempting to create Ingress: %v", ingress)
_, err := client.Extensions().Ingresses(ingress.Namespace).Create(ingress)
if err != nil {
glog.Errorf("Error creating ingress %q: %v", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, err)
} else {
glog.V(4).Infof("Successfully created ingress %q", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace})
}
return err
},
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
ingress := obj.(*extensions_v1beta1.Ingress)
glog.V(4).Infof("Attempting to update Ingress: %v", ingress)
_, err := client.Extensions().Ingresses(ingress.Namespace).Update(ingress)
if err != nil {
glog.V(4).Infof("Failed to update Ingress: %v", err)
} else {
glog.V(4).Infof("Successfully updated Ingress: %q", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace})
}
return err
},
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
@ -168,6 +242,35 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
err := client.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &api.DeleteOptions{})
return err
})
// Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called.
ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer,
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.Errorf("Internal error: Incorrectly attempting to create ConfigMap: %q", configMapName)
_, err := client.Core().ConfigMaps(configMap.Namespace).Create(configMap)
return err
},
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.V(4).Infof("Attempting to update ConfigMap: %v", configMap)
_, err := client.Core().ConfigMaps(configMap.Namespace).Update(configMap)
if err == nil {
glog.V(4).Infof("Successfully updated ConfigMap %q", configMapName)
} else {
glog.V(4).Infof("Failed to update ConfigMap %q: %v", configMapName, err)
}
return err
},
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.Errorf("Internal error: Incorrectly attempting to delete ConfigMap: %q", configMapName)
err := client.Core().ConfigMaps(configMap.Namespace).Delete(configMap.Name, &api.DeleteOptions{})
return err
})
return ic
}
@ -176,25 +279,46 @@ func (ic *IngressController) Run(stopChan <-chan struct{}) {
go ic.ingressInformerController.Run(stopChan)
glog.Infof("... Starting Ingress Federated Informer")
ic.ingressFederatedInformer.Start()
glog.Infof("... Starting ConfigMap Federated Informer")
ic.configMapFederatedInformer.Start()
go func() {
<-stopChan
glog.Infof("Stopping Ingress Controller")
glog.Infof("Stopping Ingress Federated Informer")
ic.ingressFederatedInformer.Stop()
glog.Infof("Stopping ConfigMap Federated Informer")
ic.configMapFederatedInformer.Stop()
}()
ic.ingressDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
ingress := item.Value.(types.NamespacedName)
glog.V(4).Infof("Ingress change delivered, reconciling: %v", ingress)
ic.reconcileIngress(ingress)
})
ic.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
glog.V(4).Infof("Cluster change delivered, reconciling ingresses")
ic.reconcileIngressesOnClusterChange()
ic.clusterDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
clusterName := item.Key
if clusterName != allClustersKey {
glog.V(4).Infof("Cluster change delivered for cluster %q, reconciling configmap and ingress for that cluster", clusterName)
} else {
glog.V(4).Infof("Cluster change delivered for all clusters, reconciling configmaps and ingresses for all clusters")
}
ic.reconcileIngressesOnClusterChange(clusterName)
ic.reconcileConfigMapForCluster(clusterName)
})
ic.configMapDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
clusterName := item.Key
if clusterName != allClustersKey {
glog.V(4).Infof("ConfigMap change delivered for cluster %q, reconciling configmap for that cluster", clusterName)
} else {
glog.V(4).Infof("ConfigMap change delivered for all clusters, reconciling configmaps for all clusters")
}
ic.reconcileConfigMapForCluster(clusterName)
})
go func() {
select {
case <-time.After(time.Minute):
glog.V(4).Infof("Ingress controller is garbage collecting")
ic.ingressBackoff.GC()
ic.configMapBackoff.GC()
glog.V(4).Infof("Ingress controller garbage collection complete")
case <-stopChan:
return
}
@ -218,40 +342,240 @@ func (ic *IngressController) deliverIngress(ingress types.NamespacedName, delay
ic.ingressDeliverer.DeliverAfter(key, ingress, delay)
}
func (ic *IngressController) deliverConfigMapObj(clusterName string, obj interface{}, delay time.Duration, failed bool) {
configMap := obj.(*v1.ConfigMap)
ic.deliverConfigMap(clusterName, types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}, delay, failed)
}
func (ic *IngressController) deliverConfigMap(cluster string, configMap types.NamespacedName, delay time.Duration, failed bool) {
key := cluster
if failed {
ic.configMapBackoff.Next(key, time.Now())
delay = delay + ic.configMapBackoff.Get(key)
} else {
ic.configMapBackoff.Reset(key)
}
glog.V(4).Infof("Delivering ConfigMap for cluster %q (delay %q): %s", cluster, delay, configMap)
ic.configMapDeliverer.DeliverAfter(key, configMap, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (ic *IngressController) isSynced() bool {
if !ic.ingressFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
glog.V(2).Infof("Cluster list not synced for ingress federated informer")
return false
}
clusters, err := ic.ingressFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
glog.Errorf("Failed to get ready clusters for ingress federated informer: %v", err)
return false
}
if !ic.ingressFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
glog.V(2).Infof("Target store not synced")
glog.V(2).Infof("Target store not synced for ingress federated informer")
return false
}
if !ic.configMapFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced for config map federated informer")
return false
}
clusters, err = ic.configMapFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters for configmap federated informer: %v", err)
return false
}
if !ic.configMapFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
glog.V(2).Infof("Target store not synced for configmap federated informer")
return false
}
glog.V(4).Infof("Cluster list is synced")
return true
}
// The function triggers reconciliation of all federated ingresses.
func (ic *IngressController) reconcileIngressesOnClusterChange() {
glog.V(4).Infof("Reconciling ingresses on cluster change")
// The function triggers reconcilation of all federated ingresses. clusterName is the name of the cluster that changed
// but all ingresses in all clusters are reconciled
func (ic *IngressController) reconcileIngressesOnClusterChange(clusterName string) {
glog.V(4).Infof("Reconciling ingresses on cluster change for cluster %q", clusterName)
if !ic.isSynced() {
ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay))
glog.V(4).Infof("Not synced, will try again later to reconcile ingresses.")
ic.clusterDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
}
for _, obj := range ic.ingressInformerStore.List() {
ingressList := ic.ingressInformerStore.List()
if len(ingressList) <= 0 {
glog.V(4).Infof("No federated ingresses to reconcile.")
}
for _, obj := range ingressList {
ingress := obj.(*extensions_v1beta1.Ingress)
ic.deliverIngress(types.NamespacedName{Namespace: ingress.Namespace, Name: ingress.Name}, ic.smallDelay, false)
nsName := types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}
glog.V(4).Infof("Delivering federated ingress %q for cluster %q", nsName, clusterName)
ic.deliverIngress(nsName, ic.smallDelay, false)
}
}
/*
reconcileConfigMapForCluster ensures that the configmap for the ingress controller in the cluster has objectmeta.data.UID
consistent with all the other clusters in the federation. If clusterName == allClustersKey, then all avaliable clusters
configmaps are reconciled.
*/
func (ic *IngressController) reconcileConfigMapForCluster(clusterName string) {
glog.V(4).Infof("Reconciling ConfigMap for cluster(s) %q", clusterName)
if !ic.isSynced() {
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
return
}
if clusterName == allClustersKey {
clusters, err := ic.configMapFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters. redelivering %q: %v", clusterName, err)
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
return
}
for _, cluster := range clusters {
glog.V(4).Infof("Delivering ConfigMap for cluster(s) %q", clusterName)
ic.configMapDeliverer.DeliverAt(cluster.Name, nil, time.Now())
}
return
} else {
cluster, found, err := ic.configMapFederatedInformer.GetReadyCluster(clusterName)
if err != nil || !found {
glog.Errorf("Internal error: Cluster %q queued for configmap reconciliation, but not found. Will try again later: error = %v", clusterName, err)
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
return
}
uidConfigMapNamespacedName := types.NamespacedName{Name: uidConfigMapName, Namespace: uidConfigMapNamespace}
configMapObj, found, err := ic.configMapFederatedInformer.GetTargetStore().GetByKey(cluster.Name, uidConfigMapNamespacedName.String())
if !found || err != nil {
glog.Errorf("Failed to get ConfigMap %q for cluster %q. Will try again later: %v", uidConfigMapNamespacedName, cluster.Name, err)
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.configMapReviewDelay)
return
}
glog.V(4).Infof("Successfully got ConfigMap %q for cluster %q.", uidConfigMapNamespacedName, clusterName)
configMap, ok := configMapObj.(*v1.ConfigMap)
if !ok {
glog.Errorf("Internal error: The object in the ConfigMap cache for cluster %q configmap %q is not a *ConfigMap", cluster.Name, uidConfigMapNamespacedName)
return
}
ic.reconcileConfigMap(cluster, configMap)
return
}
}
/*
reconcileConfigMap ensures that the configmap in the cluster has a UID
consistent with the federation cluster's associated annotation.
1. If the UID in the configmap differs from the UID stored in the cluster's annotation, the configmap is updated.
2. If the UID annotation is missing from the cluster, the cluster's UID annotation is updated to be consistent
with the master cluster.
3. If there is no elected master cluster, this cluster attempts to elect itself as the master cluster.
In cases 2 and 3, the configmaps will be updated in the next cycle, triggered by the federation cluster update(s)
*/
func (ic *IngressController) reconcileConfigMap(cluster *federation_api.Cluster, configMap *v1.ConfigMap) {
ic.Lock() // TODO: Reduce the scope of this master election lock.
defer ic.Unlock()
configMapNsName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.V(4).Infof("Reconciling ConfigMap %q in cluster %q", configMapNsName, cluster.Name)
clusterIngressUID, clusterIngressUIDExists := cluster.ObjectMeta.Annotations[uidAnnotationKey]
configMapUID, ok := configMap.Data[uidKey]
if !ok {
glog.Errorf("Warning: ConfigMap %q in cluster %q does not contain data key %q. Therefore it cannot become the master.", configMapNsName, cluster.Name, uidKey)
}
if !clusterIngressUIDExists || clusterIngressUID == "" {
ic.updateClusterIngressUIDToMasters(cluster, configMapUID) // Second argument is the fallback, in case this is the only cluster, in which case it becomes the master
return
}
if configMapUID != clusterIngressUID { // An update is required
glog.V(4).Infof("Ingress UID update is required: configMapUID %q not equal to clusterIngressUID %q", configMapUID, clusterIngressUID)
configMap.Data[uidKey] = clusterIngressUID
operations := []util.FederatedOperation{{
Type: util.OperationTypeUpdate,
Obj: configMap,
ClusterName: cluster.Name,
}}
glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations)
err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout)
if err != nil {
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapName, cluster.Name, err)
ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay)
}
} else {
glog.V(4).Infof("Ingress UID update is not required: configMapUID %q is equal to clusterIngressUID %q", configMapUID, clusterIngressUID)
}
}
/*
getMasterCluster returns the cluster which is the elected master w.r.t. ingress UID, and it's ingress UID.
If there is no elected master cluster, an error is returned.
All other clusters must use the ingress UID of the elected master.
*/
func (ic *IngressController) getMasterCluster() (master *federation_api.Cluster, ingressUID string, err error) {
clusters, err := ic.configMapFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
return nil, "", err
}
for _, c := range clusters {
UID, exists := c.ObjectMeta.Annotations[uidAnnotationKey]
if exists && UID != "" { // Found the master cluster
glog.V(4).Infof("Found master cluster %q with annotation %q=%q", c.Name, uidAnnotationKey, UID)
return c, UID, nil
}
}
return nil, "", fmt.Errorf("Failed to find master cluster with annotation %q", uidAnnotationKey)
}
/*
updateClusterIngressUIDToMasters takes the ingress UID annotation on the master cluster and applies it to cluster.
If there is no master cluster, then fallbackUID is used (and hence this cluster becomes the master).
*/
func (ic *IngressController) updateClusterIngressUIDToMasters(cluster *federation_api.Cluster, fallbackUID string) {
masterCluster, masterUID, err := ic.getMasterCluster()
clusterObj, clusterErr := conversion.NewCloner().DeepCopy(cluster) // Make a clone so that we don't clobber our input param
cluster, ok := clusterObj.(*federation_api.Cluster)
if clusterErr != nil || !ok {
glog.Errorf("Internal error: Failed clone cluster resource while attempting to add master ingress UID annotation (%q = %q) from master cluster %q to cluster %q, will try again later: %v", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name, err)
return
}
if err == nil {
if masterCluster.Name != cluster.Name { // We're not the master, need to get in sync
cluster.ObjectMeta.Annotations[uidAnnotationKey] = masterUID
if _, err = ic.federatedApiClient.Federation().Clusters().Update(cluster); err != nil {
glog.Errorf("Failed to add master ingress UID annotation (%q = %q) from master cluster %q to cluster %q, will try again later: %v", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name, err)
return
} else {
glog.V(4).Infof("Successfully added master ingress UID annotation (%q = %q) from master cluster %q to cluster %q.", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name)
}
} else {
glog.V(4).Infof("Cluster %q with ingress UID is already the master with annotation (%q = %q), no need to update.", cluster.Name, uidAnnotationKey, cluster.ObjectMeta.Annotations[uidAnnotationKey])
}
} else {
glog.V(2).Infof("No master cluster found to source an ingress UID from for cluster %q. Attempting to elect new master cluster %q with ingress UID %q = %q", cluster.Name, cluster.Name, uidAnnotationKey, fallbackUID)
if fallbackUID != "" {
cluster.ObjectMeta.Annotations[uidAnnotationKey] = fallbackUID
if _, err = ic.federatedApiClient.Federation().Clusters().Update(cluster); err != nil {
glog.Errorf("Failed to add ingress UID annotation (%q = %q) to cluster %q. No master elected. Will try again later: %v", uidAnnotationKey, fallbackUID, cluster.Name, err)
} else {
glog.V(4).Infof("Successfully added ingress UID annotation (%q = %q) to cluster %q.", uidAnnotationKey, fallbackUID, cluster.Name)
}
} else {
glog.Errorf("No master cluster exists, and fallbackUID for cluster %q is invalid (%q). This probably means that no clusters have an ingress controller configmap with key %q. Federated Ingress currently supports clusters running Google Loadbalancer Controller (\"GLBC\")", cluster.Name, fallbackUID, uidKey)
}
}
}
func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
glog.V(4).Infof("Reconciling ingress %q", ingress)
glog.V(4).Infof("Reconciling ingress %q for all clusters", ingress)
if !ic.isSynced() {
ic.deliverIngress(ingress, ic.clusterAvailableDelay, false)
return
@ -269,22 +593,29 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
glog.V(4).Infof("Ingress %q is not federated. Ignoring.", ingress)
return
}
baseIngress := baseIngressObj.(*extensions_v1beta1.Ingress)
baseIngress, ok := baseIngressObj.(*extensions_v1beta1.Ingress)
if !ok {
glog.Errorf("Internal Error: Object retrieved from ingressInformerStore with key %q is not of correct type *extensions_v1beta1.Ingress: %v", key, baseIngressObj)
} else {
glog.V(4).Infof("Base (federated) ingress: %v", baseIngress)
}
clusters, err := ic.ingressFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
ic.deliverIngress(ingress, ic.clusterAvailableDelay, false)
return
} else {
glog.V(4).Infof("Found %d ready clusters across which to reconcile ingress %q", len(clusters), ingress)
}
operations := make([]util.FederatedOperation, 0)
for clusterIndex, cluster := range clusters {
_, baseIPExists := baseIngress.ObjectMeta.Annotations[staticIPAnnotationKey]
clusterIngressObj, found, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
baseIPName, baseIPAnnotationExists := baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable]
clusterIngressObj, clusterIngressFound, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
glog.Errorf("Failed to get %s from %s: %v", ingress, cluster.Name, err)
glog.Errorf("Failed to get cached ingress %s for cluster %s, will retry: %v", ingress, cluster.Name, err)
ic.deliverIngress(ingress, 0, true)
return
}
@ -292,8 +623,26 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
ObjectMeta: baseIngress.ObjectMeta,
Spec: baseIngress.Spec,
}
objMeta, err := conversion.NewCloner().DeepCopy(baseIngress.ObjectMeta)
if err != nil {
glog.Errorf("Error deep copying ObjectMeta: %v", err)
}
objSpec, err := conversion.NewCloner().DeepCopy(baseIngress.Spec)
if err != nil {
glog.Errorf("Error deep copying Spec: %v", err)
}
desiredIngress.ObjectMeta, ok = objMeta.(v1.ObjectMeta)
if !ok {
glog.Errorf("Internal error: Failed to cast to v1.ObjectMeta: %v", objMeta)
}
desiredIngress.Spec = objSpec.(extensions_v1beta1.IngressSpec)
if !ok {
glog.Errorf("Internal error: Failed to cast to extensions_v1beta1.IngressSpec: %v", objSpec)
}
glog.V(4).Infof("Desired Ingress: %v", desiredIngress)
if !found {
if !clusterIngressFound {
glog.V(4).Infof("No existing Ingress %s in cluster %s - checking if appropriate to queue a create operation", ingress, cluster.Name)
// We can't supply server-created fields when creating a new object.
desiredIngress.ObjectMeta.ResourceVersion = ""
desiredIngress.ObjectMeta.UID = ""
@ -307,63 +656,87 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
// Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated
// index 0, but eventually all ingresses will share the single global IP recorded in the annotation
// of the federated ingress.
if baseIPExists || (clusterIndex == 0) {
if baseIPAnnotationExists || (clusterIndex == 0) {
glog.V(4).Infof("No existing Ingress %s in cluster %s (index %d) and static IP annotation (%q) on base ingress - queuing a create operation", ingress, cluster.Name, clusterIndex, staticIPNameKeyWritable)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredIngress,
ClusterName: cluster.Name,
})
} else {
glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex)
}
} else {
clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress)
glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required", ingress, cluster.Name)
clusterIPName, clusterIPExists := clusterIngress.ObjectMeta.Annotations[staticIPAnnotationKey]
if !baseIPExists && clusterIPExists {
// Add annotation to federated ingress via API.
original, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Get(baseIngress.Name)
if err == nil {
original.ObjectMeta.Annotations[staticIPAnnotationKey] = clusterIPName
if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(original); err != nil {
glog.Errorf("Failed to add static IP annotation to federated ingress %q: %v", ingress, err)
glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required (in either direction)", ingress, cluster.Name)
clusterIPName, clusterIPNameExists := clusterIngress.ObjectMeta.Annotations[staticIPNameKeyReadonly]
baseLBStatusExists := len(baseIngress.Status.LoadBalancer.Ingress) > 0
clusterLBStatusExists := len(clusterIngress.Status.LoadBalancer.Ingress) > 0
logStr := fmt.Sprintf("Cluster ingress %q has annotation %q=%q, loadbalancer status exists? [%v], federated ingress has annotation %q=%q, loadbalancer status exists? [%v]. %%s annotation and/or loadbalancer status from cluster ingress to federated ingress.", ingress, staticIPNameKeyReadonly, clusterIPName, clusterLBStatusExists, staticIPNameKeyWritable, baseIPName, baseLBStatusExists)
if (!baseIPAnnotationExists && clusterIPNameExists) || (!baseLBStatusExists && clusterLBStatusExists) { // copy the IP name from the readonly annotation on the cluster ingress, to the writable annotation on the federated ingress
glog.V(4).Infof(logStr, "Transferring")
if !baseIPAnnotationExists && clusterIPNameExists {
baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] = clusterIPName
}
if !baseLBStatusExists && clusterLBStatusExists {
lbstatusObj, lbErr := conversion.NewCloner().DeepCopy(&clusterIngress.Status.LoadBalancer)
lbstatus, ok := lbstatusObj.(*v1.LoadBalancerStatus)
if lbErr != nil || !ok {
glog.Errorf("Internal error: Failed to clone LoadBalancerStatus of %q in cluster %q while attempting to update master loadbalancer ingress status, will try again later. error: %v, Object to be cloned: %v", ingress, cluster.Name, lbErr, lbstatusObj)
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
return
}
baseIngress.Status.LoadBalancer = *lbstatus
}
glog.V(4).Infof("Attempting to update base federated ingress: %v", baseIngress)
if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil {
glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err)
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
return
} else {
glog.Errorf("Failed to get federated ingress %q: %v", ingress, err)
glog.V(4).Infof("Successfully added static IP annotation to federated ingress: %q", ingress)
ic.deliverIngress(ingress, ic.smallDelay, false)
return
}
} else {
glog.V(4).Infof(logStr, "Not transferring")
}
// Update existing ingress, if needed.
if !util.ObjectMetaIsEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) ||
!reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) {
// TODO: In some cases Ingress controllers in the clusters add annotations, so we ideally need to exclude those from
// the equivalence comparison to cut down on unnecessary updates.
// Update existing cluster ingress, if needed.
if util.ObjectMetaEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) &&
reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) &&
reflect.DeepEqual(baseIngress.Status.LoadBalancer.Ingress, clusterIngress.Status.LoadBalancer.Ingress) {
glog.V(4).Infof("Ingress %q in cluster %q does not need an update: cluster ingress is equivalent to federated ingress", ingress, cluster.Name)
} else {
glog.V(4).Infof("Ingress %s in cluster %s needs an update: cluster ingress %v is not equivalent to federated ingress %v", ingress, cluster.Name, clusterIngress, desiredIngress)
// We need to use server-created fields from the cluster, not the desired object when updating.
desiredIngress.ObjectMeta.ResourceVersion = clusterIngress.ObjectMeta.ResourceVersion
desiredIngress.ObjectMeta.UID = clusterIngress.ObjectMeta.UID
// Merge any annotations on the federated ingress onto the underlying cluster ingress,
// overwriting duplicates.
// TODO: We should probably use a PATCH operation for this instead.
for key, val := range baseIngress.ObjectMeta.Annotations {
desiredIngress.ObjectMeta.Annotations[key] = val
}
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateInCluster",
"Updating ingress in cluster %s", cluster.Name)
if !util.ObjectMetaEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) {
// Merge any annotations on the federated ingress onto the underlying cluster ingress,
// overwriting duplicates.
for key, val := range baseIngress.ObjectMeta.Annotations {
desiredIngress.ObjectMeta.Annotations[key] = val
}
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateInCluster",
"Updating ingress in cluster %s", cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredIngress,
ClusterName: cluster.Name,
})
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredIngress,
ClusterName: cluster.Name,
})
// TODO: Transfer any readonly (target-proxy, url-map etc) annotations from the master cluster to the federation, if this is the master cluster.
// This is only for consistency, so that the federation ingress metadata matches the underlying clusters. It's not actually required.
}
}
}
}
if len(operations) == 0 {
// Everything is in order
glog.V(4).Infof("Ingress %q is up-to-date in all clusters - no propagation to clusters required.", ingress)
return
}
glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
err = ic.federatedUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) {
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "FailedUpdateInCluster",
err = ic.federatedIngressUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) {
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "FailedClusterUpdate",
"Ingress update in cluster %s failed: %v", op.ClusterName, operror)
})
if err != nil {
@ -371,7 +744,4 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
return
}
// Evertyhing is in order but lets be double sure - TODO: quinton: Why? This seems like a hack.
ic.deliverIngress(ingress, ic.ingressReviewDelay, false)
}

View File

@ -24,6 +24,7 @@ import (
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
@ -35,29 +36,40 @@ import (
)
func TestIngressController(t *testing.T) {
fakeClusterList := federation_api.ClusterList{Items: []federation_api.Cluster{}}
fakeConfigMapList1 := api_v1.ConfigMapList{Items: []api_v1.ConfigMap{}}
fakeConfigMapList2 := api_v1.ConfigMapList{Items: []api_v1.ConfigMap{}}
cluster1 := NewCluster("cluster1", api_v1.ConditionTrue)
cluster2 := NewCluster("cluster2", api_v1.ConditionTrue)
cfg1 := NewConfigMap("foo")
cfg2 := NewConfigMap("bar") // Different UID from cfg1, so that we can check that they get reconciled.
fakeClient := &fake_federation_release_1_4.Clientset{}
RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
RegisterFakeList("ingresses", &fakeClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
ingressWatch := RegisterFakeWatch("ingresses", &fakeClient.Fake)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)
t.Log("Creating fake infrastructure")
fedClient := &fake_federation_release_1_4.Clientset{}
RegisterFakeList("clusters", &fedClient.Fake, &fakeClusterList)
RegisterFakeList("ingresses", &fedClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
fedIngressWatch := RegisterFakeWatch("ingresses", &fedClient.Fake)
clusterWatch := RegisterFakeWatch("clusters", &fedClient.Fake)
fedClusterUpdateChan := RegisterFakeCopyOnUpdate("clusters", &fedClient.Fake, clusterWatch)
fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch)
cluster1Client := &fake_kube_release_1_4.Clientset{}
cluster1Watch := RegisterFakeWatch("ingresses", &cluster1Client.Fake)
RegisterFakeList("ingresses", &cluster1Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
cluster1CreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1Watch)
cluster1UpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1Watch)
RegisterFakeList("configmaps", &cluster1Client.Fake, &fakeConfigMapList1)
cluster1IngressWatch := RegisterFakeWatch("ingresses", &cluster1Client.Fake)
cluster1ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster1Client.Fake)
cluster1IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
cluster2Client := &fake_kube_release_1_4.Clientset{}
cluster2Watch := RegisterFakeWatch("ingresses", &cluster2Client.Fake)
RegisterFakeList("ingresses", &cluster2Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
cluster2CreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2Watch)
RegisterFakeList("configmaps", &cluster2Client.Fake, &fakeConfigMapList2)
cluster2IngressWatch := RegisterFakeWatch("ingresses", &cluster2Client.Fake)
cluster2ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster2Client.Fake)
cluster2IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2IngressWatch)
cluster2ConfigMapUpdateChan := RegisterFakeCopyOnUpdate("configmaps", &cluster2Client.Fake, cluster2ConfigMapWatch)
ingressController := NewIngressController(fakeClient)
informer := ToFederatedInformerForTestOnly(ingressController.ingressFederatedInformer)
informer.SetClientFactory(func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) {
clientFactoryFunc := func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
@ -66,50 +78,129 @@ func TestIngressController(t *testing.T) {
default:
return nil, fmt.Errorf("Unknown cluster")
}
})
}
ingressController := NewIngressController(fedClient)
ingressInformer := ToFederatedInformerForTestOnly(ingressController.ingressFederatedInformer)
ingressInformer.SetClientFactory(clientFactoryFunc)
configMapInformer := ToFederatedInformerForTestOnly(ingressController.configMapFederatedInformer)
configMapInformer.SetClientFactory(clientFactoryFunc)
ingressController.clusterAvailableDelay = time.Second
ingressController.ingressReviewDelay = 50 * time.Millisecond
ingressController.configMapReviewDelay = 50 * time.Millisecond
ingressController.smallDelay = 20 * time.Millisecond
ingressController.updateTimeout = 5 * time.Second
stop := make(chan struct{})
t.Log("Running Ingress Controller")
ingressController.Run(stop)
ing1 := extensions_v1beta1.Ingress{
ObjectMeta: api_v1.ObjectMeta{
Name: "test-ingress",
Namespace: "mynamespace",
SelfLink: "/api/v1/namespaces/mynamespaces/ingress/test-ingress",
Name: "test-ingress",
Namespace: "mynamespace",
SelfLink: "/api/v1/namespaces/mynamespace/ingress/test-ingress",
Annotations: map[string]string{},
},
Status: extensions_v1beta1.IngressStatus{
LoadBalancer: api_v1.LoadBalancerStatus{
Ingress: make([]api_v1.LoadBalancerIngress, 0, 0),
},
},
}
t.Log("Adding cluster 1")
clusterWatch.Add(cluster1)
t.Log("Adding Ingress UID ConfigMap to cluster 1")
cluster1ConfigMapWatch.Add(cfg1)
t.Log("Checking that UID annotation on Cluster 1 annotation was correctly updated")
cluster := GetClusterFromChan(fedClusterUpdateChan)
assert.NotNil(t, cluster)
assert.Equal(t, cluster.ObjectMeta.Annotations[uidAnnotationKey], cfg1.Data[uidKey])
// Test add federated ingress.
ingressWatch.Add(&ing1)
createdIngress := GetIngressFromChan(cluster1CreateChan)
t.Log("Adding Federated Ingress")
fedIngressWatch.Add(&ing1)
t.Log("Checking that Ingress was correctly created in cluster 1")
createdIngress := GetIngressFromChan(t, cluster1IngressCreateChan)
assert.NotNil(t, createdIngress)
assert.True(t, reflect.DeepEqual(&ing1, createdIngress))
assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress.Spec), "Spec of created ingress is not equal")
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress.ObjectMeta), "Metadata of created object is not equivalent")
// Test that IP address gets transferred from cluster ingress to federated ingress.
t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress")
createdIngress.Status.LoadBalancer.Ingress = append(createdIngress.Status.LoadBalancer.Ingress, api_v1.LoadBalancerIngress{IP: "1.2.3.4"})
cluster1IngressWatch.Modify(createdIngress)
updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan)
assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress")
if updatedIngress != nil {
assert.True(t, reflect.DeepEqual(createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress), fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v", createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress))
}
// Test update federated ingress.
ing1.Annotations = map[string]string{
"A": "B",
}
ingressWatch.Modify(&ing1)
updatedIngress := GetIngressFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedIngress)
assert.True(t, reflect.DeepEqual(&ing1, updatedIngress))
updatedIngress.ObjectMeta.Annotations["A"] = "B"
t.Log("Modifying Federated Ingress")
fedIngressWatch.Modify(updatedIngress)
t.Log("Checking that Ingress was correctly updated in cluster 1")
updatedIngress2 := GetIngressFromChan(t, cluster1IngressUpdateChan)
assert.NotNil(t, updatedIngress2)
assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, updatedIngress.Spec), "Spec of updated ingress is not equal")
assert.True(t, util.ObjectMetaEquivalent(updatedIngress2.ObjectMeta, updatedIngress.ObjectMeta), "Metadata of updated object is not equivalent")
// Test add cluster
ing1.Annotations[staticIPAnnotationKey] = "foo" // Make sure that the base object has a static IP name first.
ingressWatch.Modify(&ing1)
t.Log("Adding a second cluster")
ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first.
fedIngressWatch.Modify(&ing1)
clusterWatch.Add(cluster2)
createdIngress2 := GetIngressFromChan(cluster2CreateChan)
// First check that the original values are not equal - see above comment
assert.NotEqual(t, cfg1.Data[uidKey], cfg2.Data[uidKey], fmt.Sprintf("ConfigMap in cluster 2 must initially not equal that in cluster 1 for this test - please fix test"))
cluster2ConfigMapWatch.Add(cfg2)
t.Log("Checking that the ingress got created in cluster 2")
createdIngress2 := GetIngressFromChan(t, cluster2IngressCreateChan)
assert.NotNil(t, createdIngress2)
assert.True(t, reflect.DeepEqual(&ing1, createdIngress2))
assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress2.Spec), "Spec of created ingress is not equal")
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent")
t.Log("Checking that the configmap in cluster 2 got updated.")
updatedConfigMap2 := GetConfigMapFromChan(cluster2ConfigMapUpdateChan)
assert.NotNil(t, updatedConfigMap2, fmt.Sprintf("ConfigMap in cluster 2 was not updated (or more likely the test is broken and the API type written is wrong)"))
if updatedConfigMap2 != nil {
assert.Equal(t, cfg1.Data[uidKey], updatedConfigMap2.Data[uidKey],
fmt.Sprintf("UID's in configmaps in cluster's 1 and 2 are not equal (%q != %q)", cfg1.Data["uid"], updatedConfigMap2.Data["uid"]))
}
close(stop)
}
func GetIngressFromChan(c chan runtime.Object) *extensions_v1beta1.Ingress {
ingress := GetObjectFromChan(c).(*extensions_v1beta1.Ingress)
func GetIngressFromChan(t *testing.T, c chan runtime.Object) *extensions_v1beta1.Ingress {
obj := GetObjectFromChan(c)
ingress, ok := obj.(*extensions_v1beta1.Ingress)
if !ok {
t.Logf("Object on channel was not of type *extensions_v1beta1.Ingress: %v", obj)
}
return ingress
}
func GetConfigMapFromChan(c chan runtime.Object) *api_v1.ConfigMap {
configMap, _ := GetObjectFromChan(c).(*api_v1.ConfigMap)
return configMap
}
func GetClusterFromChan(c chan runtime.Object) *federation_api.Cluster {
cluster, _ := GetObjectFromChan(c).(*federation_api.Cluster)
return cluster
}
func NewConfigMap(uid string) *api_v1.ConfigMap {
return &api_v1.ConfigMap{
ObjectMeta: api_v1.ObjectMeta{
Name: uidConfigMapName,
Namespace: uidConfigMapNamespace,
SelfLink: "/api/v1/namespaces/" + uidConfigMapNamespace + "/configmap/" + uidConfigMapName,
Annotations: map[string]string{},
},
Data: map[string]string{
uidKey: uid,
},
}
}

View File

@ -162,7 +162,7 @@ func (d *DelayingDeliverer) DeliverAfter(key string, value interface{}, delay ti
d.DeliverAt(key, value, time.Now().Add(delay))
}
// Gets target chanel of the deliverer.
// Gets target channel of the deliverer.
func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem {
return d.targetChannel
}

View File

@ -85,9 +85,9 @@ type FederationView interface {
ClustersSynced() bool
}
// A structure that combines an informer running agains federated api server and listening for cluster updates
// A structure that combines an informer running against federated api server and listening for cluster updates
// with multiple Kubernetes API informers (called target informers) running against federation members. Whenever a new
// cluster is added to the federation an informer is created for it using TargetInformerFactory. Infomrers are stoped
// cluster is added to the federation an informer is created for it using TargetInformerFactory. Informers are stopped
// when a cluster is either put offline of deleted. It is assumed that some controller keeps an eye on the cluster list
// and thus the clusters in ETCD are up to date.
type FederatedInformer interface {
@ -186,18 +186,22 @@ func NewFederatedInformer(
if clusterLifecycle.ClusterAvailable != nil {
clusterLifecycle.ClusterAvailable(curCluster)
}
} else {
glog.Errorf("Cluster %v not added. Not of correct type, or cluster not ready.", cur)
}
},
UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*federation_api.Cluster)
if !ok {
glog.Errorf("Internal error: Cluster %v not updated. Old cluster not of correct type.", old)
return
}
curCluster, ok := cur.(*federation_api.Cluster)
if !ok {
glog.Errorf("Internal error: Cluster %v not updated. New cluster not of correct type.", cur)
return
}
if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) || !reflect.DeepEqual(oldCluster.ObjectMeta.Annotations, curCluster.ObjectMeta.Annotations) {
var data []interface{}
if clusterLifecycle.ClusterUnavailable != nil {
data = getClusterData(oldCluster.Name)
@ -213,6 +217,8 @@ func NewFederatedInformer(
clusterLifecycle.ClusterAvailable(curCluster)
}
}
} else {
glog.V(4).Infof("Cluster %v not updated to %v as ready status and specs are identical", oldCluster, curCluster)
}
},
},
@ -258,11 +264,14 @@ type federatedStoreImpl struct {
}
func (f *federatedInformerImpl) Stop() {
glog.V(4).Infof("Stopping federated informer.")
f.Lock()
defer f.Unlock()
glog.V(4).Infof("... Closing cluster informer channel.")
close(f.clusterInformer.stopChan)
for _, informer := range f.targetInformers {
for key, informer := range f.targetInformers {
glog.V(4).Infof("... Closing informer channel for %q.", key)
close(informer.stopChan)
}
}
@ -291,14 +300,16 @@ func (f *federatedInformerImpl) GetClientsetForCluster(clusterName string) (kube
func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName string) (kube_release_1_4.Interface, error) {
// No locking needed. Will happen in f.GetCluster.
glog.V(4).Infof("Getting clientset for cluster %q", clusterName)
if cluster, found, err := f.getReadyClusterUnlocked(clusterName); found && err == nil {
glog.V(4).Infof("Got clientset for cluster %q", clusterName)
return f.clientFactory(cluster)
} else {
if err != nil {
return nil, err
}
}
return nil, fmt.Errorf("cluster %s not found", clusterName)
return nil, fmt.Errorf("cluster %q not found", clusterName)
}
// GetReadyClusers returns all clusters for which the sub-informers are run.
@ -441,7 +452,7 @@ func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]FederatedObject,
return result, nil
}
// GetKey for returns the key under which the item would be put in the store.
// GetKeyFor returns the key under which the item would be put in the store.
func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
// TODO: support other keying functions.
key, _ := framework.DeletionHandlingMetaNamespaceKeyFunc(item)

View File

@ -64,10 +64,14 @@ func (wd *WatcherDispatcher) Add(obj runtime.Object) {
func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
wd.Lock()
defer wd.Unlock()
glog.V(4).Infof("->WatcherDispatcher.Modify(%v)", obj)
wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: obj})
for _, watcher := range wd.watchers {
for i, watcher := range wd.watchers {
if !watcher.IsStopped() {
glog.V(4).Infof("->Watcher(%d).Modify(%v)", i, obj)
watcher.Modify(obj)
} else {
glog.V(4).Infof("->Watcher(%d) is stopped. Not calling Modify(%v)", i, obj)
}
}
}
@ -173,7 +177,7 @@ func GetObjectFromChan(c chan runtime.Object) runtime.Object {
select {
case obj := <-c:
return obj
case <-time.After(10 * time.Second):
case <-time.After(20 * time.Second):
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
return nil
}

View File

@ -1,43 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"reflect"
"k8s.io/kubernetes/pkg/api/v1"
)
/*
ObjectMetaIsEquivalent determines whether two ObjectMeta's (typically one from a federated API object,
and the other from a cluster object) are equivalent.
*/
func ObjectMetaIsEquivalent(m1, m2 v1.ObjectMeta) bool {
// First make all of the read-only fields equal, then perform a deep equality comparison
m1.SelfLink = m2.SelfLink // Might be different in different cluster contexts.
m1.UID = m2.UID // Definitely different in different cluster contexts
m1.ResourceVersion = m2.ResourceVersion // Definitely different in different cluster contexts
m1.Generation = m2.Generation // Might be different in different cluster contexts.
m1.CreationTimestamp = m2.CreationTimestamp // Definitely different in different cluster contexts.
m1.DeletionTimestamp = m2.DeletionTimestamp // Might be different in different cluster contexts.
m1.OwnerReferences = nil // Might be different in different cluster contexts.
m2.OwnerReferences = nil
m1.Finalizers = nil // Might be different in different cluster contexts.
m2.Finalizers = nil
return reflect.DeepEqual(m1, m2)
}

View File

@ -20,6 +20,8 @@ import (
"sync"
"k8s.io/kubernetes/pkg/runtime"
"github.com/golang/glog"
)
// Interface can be implemented by anything that knows how to watch and report changes.
@ -100,6 +102,7 @@ func (f *FakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
glog.V(4).Infof("Stopping fake watcher.")
close(f.result)
f.Stopped = true
}