mirror of https://github.com/portainer/portainer
fix(kubernetes): fix data-race in GetKubeClient() EE-4436 (#8498)
parent
347f66b1f1
commit
2b17cb9104
|
@ -12,8 +12,8 @@ import (
|
|||
|
||||
// NamespaceAccessPoliciesDeleteNamespace removes stored policies associated with a given namespace
|
||||
func (kcl *KubeClient) NamespaceAccessPoliciesDeleteNamespace(ns string) error {
|
||||
kcl.lock.Lock()
|
||||
defer kcl.lock.Unlock()
|
||||
kcl.mu.Lock()
|
||||
defer kcl.mu.Unlock()
|
||||
|
||||
policies, err := kcl.GetNamespaceAccessPolicies()
|
||||
if err != nil {
|
||||
|
@ -42,6 +42,7 @@ func (kcl *KubeClient) GetNamespaceAccessPolicies() (map[string]portainer.K8sNam
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return policies, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package cli
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
|
@ -40,7 +39,6 @@ func Test_NamespaceAccessPoliciesDeleteNamespace_updatesPortainerConfig_whenConf
|
|||
k := &KubeClient{
|
||||
cli: kfake.NewSimpleClientset(),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
config := &ktypes.ConfigMap{
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
|
@ -29,16 +28,17 @@ type (
|
|||
reverseTunnelService portainer.ReverseTunnelService
|
||||
signatureService portainer.DigitalSignatureService
|
||||
instanceID string
|
||||
endpointClients cmap.ConcurrentMap
|
||||
endpointClients map[string]*KubeClient
|
||||
endpointProxyClients *cache.Cache
|
||||
AddrHTTPS string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// KubeClient represent a service used to execute Kubernetes operations
|
||||
KubeClient struct {
|
||||
cli kubernetes.Interface
|
||||
instanceID string
|
||||
lock *sync.Mutex
|
||||
mu sync.Mutex
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -57,7 +57,7 @@ func NewClientFactory(signatureService portainer.DigitalSignatureService, revers
|
|||
signatureService: signatureService,
|
||||
reverseTunnelService: reverseTunnelService,
|
||||
instanceID: instanceID,
|
||||
endpointClients: cmap.New(),
|
||||
endpointClients: make(map[string]*KubeClient),
|
||||
endpointProxyClients: cache.New(timeout, timeout),
|
||||
AddrHTTPS: addrHTTPS,
|
||||
}, nil
|
||||
|
@ -69,50 +69,58 @@ func (factory *ClientFactory) GetInstanceID() (instanceID string) {
|
|||
|
||||
// Remove the cached kube client so a new one can be created
|
||||
func (factory *ClientFactory) RemoveKubeClient(endpointID portainer.EndpointID) {
|
||||
factory.endpointClients.Remove(strconv.Itoa(int(endpointID)))
|
||||
factory.mu.Lock()
|
||||
delete(factory.endpointClients, strconv.Itoa(int(endpointID)))
|
||||
factory.mu.Unlock()
|
||||
}
|
||||
|
||||
// GetKubeClient checks if an existing client is already registered for the environment(endpoint) and returns it if one is found.
|
||||
// If no client is registered, it will create a new client, register it, and returns it.
|
||||
func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {
|
||||
func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
|
||||
factory.mu.Lock()
|
||||
defer factory.mu.Unlock()
|
||||
|
||||
key := strconv.Itoa(int(endpoint.ID))
|
||||
client, ok := factory.endpointClients.Get(key)
|
||||
client, ok := factory.endpointClients[key]
|
||||
if !ok {
|
||||
client, err := factory.createCachedAdminKubeClient(endpoint)
|
||||
var err error
|
||||
|
||||
client, err = factory.createCachedAdminKubeClient(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
factory.endpointClients.Set(key, client)
|
||||
return client, nil
|
||||
factory.endpointClients[key] = client
|
||||
}
|
||||
|
||||
return client.(portainer.KubeClient), nil
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// GetProxyKubeClient retrieves a KubeClient from the cache. You should be
|
||||
// calling SetProxyKubeClient before first. It is normally, called the
|
||||
// kubernetes middleware.
|
||||
func (factory *ClientFactory) GetProxyKubeClient(endpointID, token string) (portainer.KubeClient, bool) {
|
||||
func (factory *ClientFactory) GetProxyKubeClient(endpointID, token string) (*KubeClient, bool) {
|
||||
client, ok := factory.endpointProxyClients.Get(endpointID + "." + token)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return client.(portainer.KubeClient), true
|
||||
|
||||
return client.(*KubeClient), true
|
||||
}
|
||||
|
||||
// SetProxyKubeClient stores a kubeclient in the cache.
|
||||
func (factory *ClientFactory) SetProxyKubeClient(endpointID, token string, cli portainer.KubeClient) {
|
||||
func (factory *ClientFactory) SetProxyKubeClient(endpointID, token string, cli *KubeClient) {
|
||||
factory.endpointProxyClients.Set(endpointID+"."+token, cli, 0)
|
||||
}
|
||||
|
||||
// CreateKubeClientFromKubeConfig creates a KubeClient from a clusterID, and
|
||||
// Kubernetes config.
|
||||
func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (portainer.KubeClient, error) {
|
||||
func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (*KubeClient, error) {
|
||||
config, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cliConfig, err := config.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -126,28 +134,22 @@ func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, k
|
|||
return nil, err
|
||||
}
|
||||
|
||||
kubecli := &KubeClient{
|
||||
return &KubeClient{
|
||||
cli: cli,
|
||||
instanceID: factory.instanceID,
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
return kubecli, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {
|
||||
func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
|
||||
cli, err := factory.CreateClient(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kubecli := &KubeClient{
|
||||
return &KubeClient{
|
||||
cli: cli,
|
||||
instanceID: factory.instanceID,
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
return kubecli, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateClient returns a pointer to a new Clientset instance
|
||||
|
|
|
@ -3,7 +3,6 @@ package cli
|
|||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
|
@ -19,7 +18,6 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
kcl := &KubeClient{
|
||||
cli: kfake.NewSimpleClientset(&core.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
err := kcl.ToggleSystemState(nsName, true)
|
||||
|
@ -37,12 +35,10 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
kcl := &KubeClient{
|
||||
cli: kfake.NewSimpleClientset(),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
err := kcl.ToggleSystemState(nsName, true)
|
||||
assert.Error(t, err)
|
||||
|
||||
})
|
||||
|
||||
t.Run("if called with the same state, should skip (exit without error)", func(t *testing.T) {
|
||||
|
@ -61,7 +57,6 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
systemNamespaceLabel: strconv.FormatBool(test.isSystem),
|
||||
}}}),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
err := kcl.ToggleSystemState(nsName, test.isSystem)
|
||||
|
@ -81,7 +76,6 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
kcl := &KubeClient{
|
||||
cli: kfake.NewSimpleClientset(&core.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
err := kcl.ToggleSystemState(nsName, true)
|
||||
|
@ -102,7 +96,6 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
kcl := &KubeClient{
|
||||
cli: kfake.NewSimpleClientset(&core.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
err := kcl.ToggleSystemState(nsName, false)
|
||||
|
@ -125,7 +118,6 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
systemNamespaceLabel: "true",
|
||||
}}}),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
err := kcl.ToggleSystemState(nsName, false)
|
||||
|
@ -159,7 +151,6 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
kcl := &KubeClient{
|
||||
cli: kfake.NewSimpleClientset(namespace, config),
|
||||
instanceID: "instance",
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
err := kcl.ToggleSystemState(nsName, true)
|
||||
|
@ -178,6 +169,5 @@ func Test_ToggleSystemState(t *testing.T) {
|
|||
actualPolicies, err := kcl.GetNamespaceAccessPolicies()
|
||||
assert.NoError(t, err, "failed to fetch policies")
|
||||
assert.Equal(t, expectedPolicies, actualPolicies)
|
||||
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue