mirror of https://github.com/k3s-io/k3s
Merge pull request #50316 from shashidharatd/flake-fix
Automatic merge from submit-queue [Federation] Fix flake in TestUpdateClusterRace **What this PR does / why we need it**: Fix #50262 **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #50262 **Special notes for your reviewer**: Although we do locking/unlocking while using protected data in ClusterController, there are chances that `clusterClient` can be deleted as it is a pointer. Also its better to lock/unlock once for the function `UpdateClusterStatus` instead of multiple locks/unlocks. **Release note**: ``` NONE ``` /assign @madhusudancs /cc @dminh @kubernetes/sig-federation-bugspull/6/head
commit
ef0f723bfd
|
@ -13,15 +13,12 @@ go_test(
|
|||
deps = [
|
||||
"//federation/apis/federation/v1beta1:go_default_library",
|
||||
"//federation/client/clientset_generated/federation_clientset:go_default_library",
|
||||
"//federation/pkg/federation-controller/util:go_default_library",
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/testapi:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -49,7 +46,6 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
],
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/discovery"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
federation_v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
|
@ -38,7 +37,6 @@ const (
|
|||
)
|
||||
|
||||
type ClusterClient struct {
|
||||
discoveryClient *discovery.DiscoveryClient
|
||||
kubeClient *clientset.Clientset
|
||||
}
|
||||
|
||||
|
@ -49,10 +47,6 @@ func NewClusterClientSet(c *federation_v1beta1.Cluster) (*ClusterClient, error)
|
|||
}
|
||||
var clusterClientSet = ClusterClient{}
|
||||
if clusterConfig != nil {
|
||||
clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName)))
|
||||
if clusterClientSet.discoveryClient == nil {
|
||||
return nil, nil
|
||||
}
|
||||
clusterClientSet.kubeClient = clientset.NewForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName)))
|
||||
if clusterClientSet.kubeClient == nil {
|
||||
return nil, nil
|
||||
|
@ -97,7 +91,7 @@ func (self *ClusterClient) GetClusterHealthStatus() *federation_v1beta1.ClusterS
|
|||
LastProbeTime: currentTime,
|
||||
LastTransitionTime: currentTime,
|
||||
}
|
||||
body, err := self.discoveryClient.RESTClient().Get().AbsPath("/healthz").Do().Raw()
|
||||
body, err := self.kubeClient.DiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do().Raw()
|
||||
if err != nil {
|
||||
clusterStatus.Conditions = append(clusterStatus.Conditions, newNodeOfflineCondition)
|
||||
} else {
|
||||
|
@ -107,6 +101,15 @@ func (self *ClusterClient) GetClusterHealthStatus() *federation_v1beta1.ClusterS
|
|||
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition)
|
||||
}
|
||||
}
|
||||
|
||||
zones, region, err := self.GetClusterZones()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to get zones and region for cluster with client %v: %v", self, err)
|
||||
} else {
|
||||
clusterStatus.Zones = zones
|
||||
clusterStatus.Region = region
|
||||
}
|
||||
|
||||
return &clusterStatus
|
||||
}
|
||||
|
||||
|
|
|
@ -108,6 +108,7 @@ func (cc *ClusterController) delFromClusterSetByName(clusterName string) {
|
|||
glog.V(1).Infof("ClusterController observed a cluster deletion: %v", clusterName)
|
||||
cc.knownClusterSet.Delete(clusterName)
|
||||
delete(cc.clusterKubeClientMap, clusterName)
|
||||
delete(cc.clusterClusterStatusMap, clusterName)
|
||||
}
|
||||
|
||||
func (cc *ClusterController) addToClusterSet(obj interface{}) {
|
||||
|
@ -141,77 +142,37 @@ func (cc *ClusterController) Run(stopChan <-chan struct{}) {
|
|||
go cc.clusterController.Run(stopChan)
|
||||
// monitor cluster status periodically, in phase 1 we just get the health state from "/healthz"
|
||||
go wait.Until(func() {
|
||||
if err := cc.UpdateClusterStatus(); err != nil {
|
||||
if err := cc.updateClusterStatus(); err != nil {
|
||||
glog.Errorf("Error monitoring cluster status: %v", err)
|
||||
}
|
||||
}, cc.clusterMonitorPeriod, stopChan)
|
||||
}
|
||||
|
||||
func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) {
|
||||
cc.mu.RLock()
|
||||
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
||||
cc.mu.RUnlock()
|
||||
client := &clusterClient
|
||||
if !found {
|
||||
glog.Infof("It's a new cluster, a cluster client will be created")
|
||||
client, err := NewClusterClientSet(cluster)
|
||||
if err != nil || client == nil {
|
||||
glog.Errorf("Failed to create cluster client, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (cc *ClusterController) GetClusterStatus(cluster *federationv1beta1.Cluster) (*federationv1beta1.ClusterStatus, error) {
|
||||
// just get the status of cluster, by requesting the restapi "/healthz"
|
||||
clusterClient, err := cc.GetClusterClient(cluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clusterStatus := clusterClient.GetClusterHealthStatus()
|
||||
return clusterStatus, nil
|
||||
}
|
||||
|
||||
// UpdateClusterStatus checks cluster status and get the metrics from cluster's restapi
|
||||
func (cc *ClusterController) UpdateClusterStatus() error {
|
||||
// updateClusterStatus checks cluster status and get the metrics from cluster's restapi
|
||||
func (cc *ClusterController) updateClusterStatus() error {
|
||||
clusters, err := cc.federationClient.Federation().Clusters().List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cc.mu.Lock()
|
||||
for _, cluster := range clusters.Items {
|
||||
cc.addToClusterSetWithoutLock(&cluster)
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
|
||||
// If there's a difference between lengths of known clusters and observed clusters
|
||||
if len(cc.knownClusterSet) != len(clusters.Items) {
|
||||
observedSet := make(sets.String)
|
||||
for _, cluster := range clusters.Items {
|
||||
observedSet.Insert(cluster.Name)
|
||||
}
|
||||
deleted := cc.knownClusterSet.Difference(observedSet)
|
||||
|
||||
cc.mu.Lock()
|
||||
for clusterName := range deleted {
|
||||
cc.delFromClusterSetByName(clusterName)
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
}
|
||||
for _, cluster := range clusters.Items {
|
||||
clusterStatusNew, err := cc.GetClusterStatus(&cluster)
|
||||
if err != nil {
|
||||
glog.Infof("Failed to Get the status of cluster: %v", cluster.Name)
|
||||
cc.mu.RLock()
|
||||
// skip updating status of the cluster which is not yet added to knownClusterSet.
|
||||
if !cc.knownClusterSet.Has(cluster.Name) {
|
||||
cc.mu.RUnlock()
|
||||
continue
|
||||
}
|
||||
cc.mu.RLock()
|
||||
clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name]
|
||||
clusterClient, clientFound := cc.clusterKubeClientMap[cluster.Name]
|
||||
clusterStatusOld, statusFound := cc.clusterClusterStatusMap[cluster.Name]
|
||||
cc.mu.RUnlock()
|
||||
if !found {
|
||||
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
|
||||
|
||||
if !clientFound {
|
||||
glog.Warningf("Failed to get client for cluster %s", cluster.Name)
|
||||
continue
|
||||
}
|
||||
clusterStatusNew := clusterClient.GetClusterHealthStatus()
|
||||
if !statusFound {
|
||||
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
|
||||
} else {
|
||||
hasTransition := false
|
||||
if len(clusterStatusNew.Conditions) != len(clusterStatusOld.Conditions) {
|
||||
|
@ -233,22 +194,6 @@ func (cc *ClusterController) UpdateClusterStatus() error {
|
|||
}
|
||||
}
|
||||
|
||||
cc.mu.RLock()
|
||||
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
||||
cc.mu.RUnlock()
|
||||
if !found {
|
||||
glog.Warningf("Failed to get client for cluster %s", cluster.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
zones, region, err := clusterClient.GetClusterZones()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to get zones and region for cluster %s: %v", cluster.Name, err)
|
||||
// Don't return err here, as we want the rest of the status update to proceed.
|
||||
} else {
|
||||
clusterStatusNew.Zones = zones
|
||||
clusterStatusNew.Region = region
|
||||
}
|
||||
cc.mu.Lock()
|
||||
cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew
|
||||
cc.mu.Unlock()
|
||||
|
|
|
@ -22,17 +22,15 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
||||
controllerutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
)
|
||||
|
||||
|
@ -125,16 +123,9 @@ func TestUpdateClusterStatusOK(t *testing.T) {
|
|||
}
|
||||
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
|
||||
|
||||
// Override KubeconfigGetterForSecret to avoid having to setup service accounts and mount files with secret tokens.
|
||||
originalGetter := controllerutil.KubeconfigGetterForSecret
|
||||
controllerutil.KubeconfigGetterForSecret = func(s *api.Secret) clientcmd.KubeconfigGetter {
|
||||
return func() (*clientcmdapi.Config, error) {
|
||||
return &clientcmdapi.Config{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
manager := newClusterController(federationClientSet, 5)
|
||||
err = manager.UpdateClusterStatus()
|
||||
manager.addToClusterSet(federationCluster)
|
||||
err = manager.updateClusterStatus()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to Update Cluster Status: %v", err)
|
||||
}
|
||||
|
@ -146,9 +137,6 @@ func TestUpdateClusterStatusOK(t *testing.T) {
|
|||
t.Errorf("Failed to Update Cluster Status")
|
||||
}
|
||||
}
|
||||
|
||||
// Reset KubeconfigGetterForSecret
|
||||
controllerutil.KubeconfigGetterForSecret = originalGetter
|
||||
}
|
||||
|
||||
// Test races between informer's updates and routine updates of cluster status
|
||||
|
@ -170,29 +158,16 @@ func TestUpdateClusterRace(t *testing.T) {
|
|||
}
|
||||
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
|
||||
|
||||
// Override KubeconfigGetterForSecret to avoid having to setup service accounts and mount files with secret tokens.
|
||||
originalGetter := controllerutil.KubeconfigGetterForSecret
|
||||
controllerutil.KubeconfigGetterForSecret = func(s *api.Secret) clientcmd.KubeconfigGetter {
|
||||
return func() (*clientcmdapi.Config, error) {
|
||||
return &clientcmdapi.Config{}, nil
|
||||
}
|
||||
}
|
||||
manager := newClusterController(federationClientSet, 1*time.Millisecond)
|
||||
|
||||
manager := newClusterController(federationClientSet, 5)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
manager.UpdateClusterStatus()
|
||||
}
|
||||
}()
|
||||
stop := make(chan struct{})
|
||||
manager.Run(stop)
|
||||
|
||||
// try to trigger the race in UpdateClusterStatus
|
||||
for i := 0; i < 10; i++ {
|
||||
manager.GetClusterClient(federationCluster)
|
||||
manager.addToClusterSet(federationCluster)
|
||||
manager.delFromClusterSet(federationCluster)
|
||||
}
|
||||
|
||||
// Reset KubeconfigGetterForSecret
|
||||
controllerutil.KubeconfigGetterForSecret = originalGetter
|
||||
close(stop)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue