diff --git a/federation/pkg/federation-controller/cluster/BUILD b/federation/pkg/federation-controller/cluster/BUILD index c31d1cf63d..49c2464555 100644 --- a/federation/pkg/federation-controller/cluster/BUILD +++ b/federation/pkg/federation-controller/cluster/BUILD @@ -13,15 +13,12 @@ go_test( deps = [ "//federation/apis/federation/v1beta1:go_default_library", "//federation/client/clientset_generated/federation_clientset:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//pkg/api:go_default_library", "//pkg/api/testapi:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", - "//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library", ], ) @@ -49,7 +46,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/federation/pkg/federation-controller/cluster/cluster_client.go b/federation/pkg/federation-controller/cluster/cluster_client.go index 1c374f0ea7..c9f963588c 100644 --- a/federation/pkg/federation-controller/cluster/cluster_client.go +++ b/federation/pkg/federation-controller/cluster/cluster_client.go @@ -24,7 +24,6 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/discovery" restclient "k8s.io/client-go/rest" federation_v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" "k8s.io/kubernetes/federation/pkg/federation-controller/util" @@ -38,8 +37,7 @@ const ( ) type ClusterClient struct { - discoveryClient *discovery.DiscoveryClient - kubeClient *clientset.Clientset + kubeClient *clientset.Clientset } func NewClusterClientSet(c *federation_v1beta1.Cluster) (*ClusterClient, error) { @@ -49,10 +47,6 @@ func NewClusterClientSet(c *federation_v1beta1.Cluster) (*ClusterClient, error) } var clusterClientSet = ClusterClient{} if clusterConfig != nil { - clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName))) - if clusterClientSet.discoveryClient == nil { - return nil, nil - } clusterClientSet.kubeClient = clientset.NewForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName))) if clusterClientSet.kubeClient == nil { return nil, nil @@ -97,7 +91,7 @@ func (self *ClusterClient) GetClusterHealthStatus() *federation_v1beta1.ClusterS LastProbeTime: currentTime, LastTransitionTime: currentTime, } - body, err := self.discoveryClient.RESTClient().Get().AbsPath("/healthz").Do().Raw() + body, err := self.kubeClient.DiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do().Raw() if err != nil { clusterStatus.Conditions = append(clusterStatus.Conditions, newNodeOfflineCondition) } else { @@ -107,6 +101,15 @@ func (self *ClusterClient) GetClusterHealthStatus() *federation_v1beta1.ClusterS clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition) } } + + zones, region, err := self.GetClusterZones() + if err != nil { + glog.Warningf("Failed to get zones and region for cluster with client %v: %v", self, err) + } else { + clusterStatus.Zones = zones + clusterStatus.Region = region + } + return &clusterStatus } diff --git a/federation/pkg/federation-controller/cluster/clustercontroller.go b/federation/pkg/federation-controller/cluster/clustercontroller.go index 47d3a268db..6820a71e76 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller.go @@ -108,6 +108,7 @@ func (cc *ClusterController) delFromClusterSetByName(clusterName string) { glog.V(1).Infof("ClusterController observed a cluster deletion: %v", clusterName) cc.knownClusterSet.Delete(clusterName) delete(cc.clusterKubeClientMap, clusterName) + delete(cc.clusterClusterStatusMap, clusterName) } func (cc *ClusterController) addToClusterSet(obj interface{}) { @@ -141,77 +142,37 @@ func (cc *ClusterController) Run(stopChan <-chan struct{}) { go cc.clusterController.Run(stopChan) // monitor cluster status periodically, in phase 1 we just get the health state from "/healthz" go wait.Until(func() { - if err := cc.UpdateClusterStatus(); err != nil { + if err := cc.updateClusterStatus(); err != nil { glog.Errorf("Error monitoring cluster status: %v", err) } }, cc.clusterMonitorPeriod, stopChan) } -func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) { - cc.mu.RLock() - clusterClient, found := cc.clusterKubeClientMap[cluster.Name] - cc.mu.RUnlock() - client := &clusterClient - if !found { - glog.Infof("It's a new cluster, a cluster client will be created") - client, err := NewClusterClientSet(cluster) - if err != nil || client == nil { - glog.Errorf("Failed to create cluster client, err: %v", err) - return nil, err - } - } - return client, nil -} - -func (cc *ClusterController) GetClusterStatus(cluster *federationv1beta1.Cluster) (*federationv1beta1.ClusterStatus, error) { - // just get the status of cluster, by requesting the restapi "/healthz" - clusterClient, err := cc.GetClusterClient(cluster) - if err != nil { - return nil, err - } - clusterStatus := clusterClient.GetClusterHealthStatus() - return clusterStatus, nil -} - -// UpdateClusterStatus checks cluster status and get the metrics from cluster's restapi -func (cc *ClusterController) UpdateClusterStatus() error { +// updateClusterStatus checks cluster status and get the metrics from cluster's restapi +func (cc *ClusterController) updateClusterStatus() error { clusters, err := cc.federationClient.Federation().Clusters().List(metav1.ListOptions{}) if err != nil { return err } - cc.mu.Lock() for _, cluster := range clusters.Items { - cc.addToClusterSetWithoutLock(&cluster) - } - cc.mu.Unlock() - - // If there's a difference between lengths of known clusters and observed clusters - if len(cc.knownClusterSet) != len(clusters.Items) { - observedSet := make(sets.String) - for _, cluster := range clusters.Items { - observedSet.Insert(cluster.Name) - } - deleted := cc.knownClusterSet.Difference(observedSet) - - cc.mu.Lock() - for clusterName := range deleted { - cc.delFromClusterSetByName(clusterName) - } - cc.mu.Unlock() - } - for _, cluster := range clusters.Items { - clusterStatusNew, err := cc.GetClusterStatus(&cluster) - if err != nil { - glog.Infof("Failed to Get the status of cluster: %v", cluster.Name) + cc.mu.RLock() + // skip updating status of the cluster which is not yet added to knownClusterSet. + if !cc.knownClusterSet.Has(cluster.Name) { + cc.mu.RUnlock() continue } - cc.mu.RLock() - clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name] + clusterClient, clientFound := cc.clusterKubeClientMap[cluster.Name] + clusterStatusOld, statusFound := cc.clusterClusterStatusMap[cluster.Name] cc.mu.RUnlock() - if !found { - glog.Infof("There is no status stored for cluster: %v before", cluster.Name) + if !clientFound { + glog.Warningf("Failed to get client for cluster %s", cluster.Name) + continue + } + clusterStatusNew := clusterClient.GetClusterHealthStatus() + if !statusFound { + glog.Infof("There is no status stored for cluster: %v before", cluster.Name) } else { hasTransition := false if len(clusterStatusNew.Conditions) != len(clusterStatusOld.Conditions) { @@ -233,22 +194,6 @@ func (cc *ClusterController) UpdateClusterStatus() error { } } - cc.mu.RLock() - clusterClient, found := cc.clusterKubeClientMap[cluster.Name] - cc.mu.RUnlock() - if !found { - glog.Warningf("Failed to get client for cluster %s", cluster.Name) - continue - } - - zones, region, err := clusterClient.GetClusterZones() - if err != nil { - glog.Warningf("Failed to get zones and region for cluster %s: %v", cluster.Name, err) - // Don't return err here, as we want the rest of the status update to proceed. - } else { - clusterStatusNew.Zones = zones - clusterStatusNew.Region = region - } cc.mu.Lock() cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew cc.mu.Unlock() diff --git a/federation/pkg/federation-controller/cluster/clustercontroller_test.go b/federation/pkg/federation-controller/cluster/clustercontroller_test.go index 3b4c34c9be..8ae0acc029 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller_test.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller_test.go @@ -22,17 +22,15 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - controllerutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" ) @@ -125,16 +123,9 @@ func TestUpdateClusterStatusOK(t *testing.T) { } federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) - // Override KubeconfigGetterForSecret to avoid having to setup service accounts and mount files with secret tokens. - originalGetter := controllerutil.KubeconfigGetterForSecret - controllerutil.KubeconfigGetterForSecret = func(s *api.Secret) clientcmd.KubeconfigGetter { - return func() (*clientcmdapi.Config, error) { - return &clientcmdapi.Config{}, nil - } - } - manager := newClusterController(federationClientSet, 5) - err = manager.UpdateClusterStatus() + manager.addToClusterSet(federationCluster) + err = manager.updateClusterStatus() if err != nil { t.Errorf("Failed to Update Cluster Status: %v", err) } @@ -146,9 +137,6 @@ func TestUpdateClusterStatusOK(t *testing.T) { t.Errorf("Failed to Update Cluster Status") } } - - // Reset KubeconfigGetterForSecret - controllerutil.KubeconfigGetterForSecret = originalGetter } // Test races between informer's updates and routine updates of cluster status @@ -170,29 +158,16 @@ func TestUpdateClusterRace(t *testing.T) { } federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) - // Override KubeconfigGetterForSecret to avoid having to setup service accounts and mount files with secret tokens. - originalGetter := controllerutil.KubeconfigGetterForSecret - controllerutil.KubeconfigGetterForSecret = func(s *api.Secret) clientcmd.KubeconfigGetter { - return func() (*clientcmdapi.Config, error) { - return &clientcmdapi.Config{}, nil - } - } + manager := newClusterController(federationClientSet, 1*time.Millisecond) - manager := newClusterController(federationClientSet, 5) - - go func() { - for { - manager.UpdateClusterStatus() - } - }() + stop := make(chan struct{}) + manager.Run(stop) // try to trigger the race in UpdateClusterStatus for i := 0; i < 10; i++ { - manager.GetClusterClient(federationCluster) manager.addToClusterSet(federationCluster) manager.delFromClusterSet(federationCluster) } - // Reset KubeconfigGetterForSecret - controllerutil.KubeconfigGetterForSecret = originalGetter + close(stop) }