From 2b17cb91049e862951baa983383fa28f8cf8b252 Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Tue, 14 Mar 2023 20:11:28 -0300 Subject: [PATCH] fix(kubernetes): fix data-race in GetKubeClient() EE-4436 (#8498) --- api/kubernetes/cli/access.go | 5 +-- api/kubernetes/cli/access_test.go | 2 -- api/kubernetes/cli/client.go | 54 ++++++++++++++-------------- api/kubernetes/cli/namespace_test.go | 10 ------ 4 files changed, 31 insertions(+), 40 deletions(-) diff --git a/api/kubernetes/cli/access.go b/api/kubernetes/cli/access.go index 729e3c801..20a8e070e 100644 --- a/api/kubernetes/cli/access.go +++ b/api/kubernetes/cli/access.go @@ -12,8 +12,8 @@ import ( // NamespaceAccessPoliciesDeleteNamespace removes stored policies associated with a given namespace func (kcl *KubeClient) NamespaceAccessPoliciesDeleteNamespace(ns string) error { - kcl.lock.Lock() - defer kcl.lock.Unlock() + kcl.mu.Lock() + defer kcl.mu.Unlock() policies, err := kcl.GetNamespaceAccessPolicies() if err != nil { @@ -42,6 +42,7 @@ func (kcl *KubeClient) GetNamespaceAccessPolicies() (map[string]portainer.K8sNam if err != nil { return nil, err } + return policies, nil } diff --git a/api/kubernetes/cli/access_test.go b/api/kubernetes/cli/access_test.go index db250546c..09bb28d9e 100644 --- a/api/kubernetes/cli/access_test.go +++ b/api/kubernetes/cli/access_test.go @@ -2,7 +2,6 @@ package cli import ( "context" - "sync" "testing" portainer "github.com/portainer/portainer/api" @@ -40,7 +39,6 @@ func Test_NamespaceAccessPoliciesDeleteNamespace_updatesPortainerConfig_whenConf k := &KubeClient{ cli: kfake.NewSimpleClientset(), instanceID: "instance", - lock: &sync.Mutex{}, } config := &ktypes.ConfigMap{ diff --git a/api/kubernetes/cli/client.go b/api/kubernetes/cli/client.go index c2316c0d7..0b2157c73 100644 --- a/api/kubernetes/cli/client.go +++ b/api/kubernetes/cli/client.go @@ -7,7 +7,6 @@ import ( "sync" "time" - cmap "github.com/orcaman/concurrent-map" "github.com/patrickmn/go-cache" "github.com/pkg/errors" portainer "github.com/portainer/portainer/api" @@ -29,16 +28,17 @@ type ( reverseTunnelService portainer.ReverseTunnelService signatureService portainer.DigitalSignatureService instanceID string - endpointClients cmap.ConcurrentMap + endpointClients map[string]*KubeClient endpointProxyClients *cache.Cache AddrHTTPS string + mu sync.Mutex } // KubeClient represent a service used to execute Kubernetes operations KubeClient struct { cli kubernetes.Interface instanceID string - lock *sync.Mutex + mu sync.Mutex } ) @@ -57,7 +57,7 @@ func NewClientFactory(signatureService portainer.DigitalSignatureService, revers signatureService: signatureService, reverseTunnelService: reverseTunnelService, instanceID: instanceID, - endpointClients: cmap.New(), + endpointClients: make(map[string]*KubeClient), endpointProxyClients: cache.New(timeout, timeout), AddrHTTPS: addrHTTPS, }, nil @@ -69,50 +69,58 @@ func (factory *ClientFactory) GetInstanceID() (instanceID string) { // Remove the cached kube client so a new one can be created func (factory *ClientFactory) RemoveKubeClient(endpointID portainer.EndpointID) { - factory.endpointClients.Remove(strconv.Itoa(int(endpointID))) + factory.mu.Lock() + delete(factory.endpointClients, strconv.Itoa(int(endpointID))) + factory.mu.Unlock() } // GetKubeClient checks if an existing client is already registered for the environment(endpoint) and returns it if one is found. // If no client is registered, it will create a new client, register it, and returns it. -func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) { +func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) { + factory.mu.Lock() + defer factory.mu.Unlock() + key := strconv.Itoa(int(endpoint.ID)) - client, ok := factory.endpointClients.Get(key) + client, ok := factory.endpointClients[key] if !ok { - client, err := factory.createCachedAdminKubeClient(endpoint) + var err error + + client, err = factory.createCachedAdminKubeClient(endpoint) if err != nil { return nil, err } - factory.endpointClients.Set(key, client) - return client, nil + factory.endpointClients[key] = client } - return client.(portainer.KubeClient), nil + return client, nil } // GetProxyKubeClient retrieves a KubeClient from the cache. You should be // calling SetProxyKubeClient before first. It is normally, called the // kubernetes middleware. -func (factory *ClientFactory) GetProxyKubeClient(endpointID, token string) (portainer.KubeClient, bool) { +func (factory *ClientFactory) GetProxyKubeClient(endpointID, token string) (*KubeClient, bool) { client, ok := factory.endpointProxyClients.Get(endpointID + "." + token) if !ok { return nil, false } - return client.(portainer.KubeClient), true + + return client.(*KubeClient), true } // SetProxyKubeClient stores a kubeclient in the cache. -func (factory *ClientFactory) SetProxyKubeClient(endpointID, token string, cli portainer.KubeClient) { +func (factory *ClientFactory) SetProxyKubeClient(endpointID, token string, cli *KubeClient) { factory.endpointProxyClients.Set(endpointID+"."+token, cli, 0) } // CreateKubeClientFromKubeConfig creates a KubeClient from a clusterID, and // Kubernetes config. -func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (portainer.KubeClient, error) { +func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (*KubeClient, error) { config, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig)) if err != nil { return nil, err } + cliConfig, err := config.ClientConfig() if err != nil { return nil, err @@ -126,28 +134,22 @@ func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, k return nil, err } - kubecli := &KubeClient{ + return &KubeClient{ cli: cli, instanceID: factory.instanceID, - lock: &sync.Mutex{}, - } - - return kubecli, nil + }, nil } -func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) { +func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) { cli, err := factory.CreateClient(endpoint) if err != nil { return nil, err } - kubecli := &KubeClient{ + return &KubeClient{ cli: cli, instanceID: factory.instanceID, - lock: &sync.Mutex{}, - } - - return kubecli, nil + }, nil } // CreateClient returns a pointer to a new Clientset instance diff --git a/api/kubernetes/cli/namespace_test.go b/api/kubernetes/cli/namespace_test.go index 9d3647877..ea4821bab 100644 --- a/api/kubernetes/cli/namespace_test.go +++ b/api/kubernetes/cli/namespace_test.go @@ -3,7 +3,6 @@ package cli import ( "context" "strconv" - "sync" "testing" portainer "github.com/portainer/portainer/api" @@ -19,7 +18,6 @@ func Test_ToggleSystemState(t *testing.T) { kcl := &KubeClient{ cli: kfake.NewSimpleClientset(&core.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}), instanceID: "instance", - lock: &sync.Mutex{}, } err := kcl.ToggleSystemState(nsName, true) @@ -37,12 +35,10 @@ func Test_ToggleSystemState(t *testing.T) { kcl := &KubeClient{ cli: kfake.NewSimpleClientset(), instanceID: "instance", - lock: &sync.Mutex{}, } err := kcl.ToggleSystemState(nsName, true) assert.Error(t, err) - }) t.Run("if called with the same state, should skip (exit without error)", func(t *testing.T) { @@ -61,7 +57,6 @@ func Test_ToggleSystemState(t *testing.T) { systemNamespaceLabel: strconv.FormatBool(test.isSystem), }}}), instanceID: "instance", - lock: &sync.Mutex{}, } err := kcl.ToggleSystemState(nsName, test.isSystem) @@ -81,7 +76,6 @@ func Test_ToggleSystemState(t *testing.T) { kcl := &KubeClient{ cli: kfake.NewSimpleClientset(&core.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}), instanceID: "instance", - lock: &sync.Mutex{}, } err := kcl.ToggleSystemState(nsName, true) @@ -102,7 +96,6 @@ func Test_ToggleSystemState(t *testing.T) { kcl := &KubeClient{ cli: kfake.NewSimpleClientset(&core.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}), instanceID: "instance", - lock: &sync.Mutex{}, } err := kcl.ToggleSystemState(nsName, false) @@ -125,7 +118,6 @@ func Test_ToggleSystemState(t *testing.T) { systemNamespaceLabel: "true", }}}), instanceID: "instance", - lock: &sync.Mutex{}, } err := kcl.ToggleSystemState(nsName, false) @@ -159,7 +151,6 @@ func Test_ToggleSystemState(t *testing.T) { kcl := &KubeClient{ cli: kfake.NewSimpleClientset(namespace, config), instanceID: "instance", - lock: &sync.Mutex{}, } err := kcl.ToggleSystemState(nsName, true) @@ -178,6 +169,5 @@ func Test_ToggleSystemState(t *testing.T) { actualPolicies, err := kcl.GetNamespaceAccessPolicies() assert.NoError(t, err, "failed to fetch policies") assert.Equal(t, expectedPolicies, actualPolicies) - }) }