fix(kubernetes): removes kube client cache when edge proxy is removed (#4487) (#4574)

Co-authored-by: Simon Meng <simon.meng@portainer.io>
pull/4689/head
cong meng 2021-01-08 11:55:42 +13:00 committed by GitHub
parent 7848bcf2f4
commit c9f68a4d8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 38 deletions

View File

@ -3,6 +3,7 @@ package kubernetes
import (
"crypto/tls"
"fmt"
"log"
"net/http"
"github.com/portainer/portainer/api/http/security"
@ -13,14 +14,16 @@ import (
type (
localTransport struct {
httpTransport *http.Transport
tokenManager *tokenManager
httpTransport *http.Transport
tokenManager *tokenManager
endpointIdentifier portainer.EndpointID
}
agentTransport struct {
httpTransport *http.Transport
tokenManager *tokenManager
signatureService portainer.DigitalSignatureService
httpTransport *http.Transport
tokenManager *tokenManager
signatureService portainer.DigitalSignatureService
endpointIdentifier portainer.EndpointID
}
edgeTransport struct {
@ -50,21 +53,11 @@ func NewLocalTransport(tokenManager *tokenManager) (*localTransport, error) {
// RoundTrip is the implementation of the the http.RoundTripper interface
func (transport *localTransport) RoundTrip(request *http.Request) (*http.Response, error) {
tokenData, err := security.RetrieveTokenData(request)
token, err := getRoundTripToken(request, transport.tokenManager, transport.endpointIdentifier)
if err != nil {
return nil, err
}
var token string
if tokenData.Role == portainer.AdministratorRole {
token = transport.tokenManager.getAdminServiceAccountToken()
} else {
token, err = transport.tokenManager.getUserServiceAccountToken(int(tokenData.ID))
if err != nil {
return nil, err
}
}
request.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
return transport.httpTransport.RoundTrip(request)
@ -85,21 +78,11 @@ func NewAgentTransport(signatureService portainer.DigitalSignatureService, tlsCo
// RoundTrip is the implementation of the the http.RoundTripper interface
func (transport *agentTransport) RoundTrip(request *http.Request) (*http.Response, error) {
tokenData, err := security.RetrieveTokenData(request)
token, err := getRoundTripToken(request, transport.tokenManager, transport.endpointIdentifier)
if err != nil {
return nil, err
}
var token string
if tokenData.Role == portainer.AdministratorRole {
token = transport.tokenManager.getAdminServiceAccountToken()
} else {
token, err = transport.tokenManager.getUserServiceAccountToken(int(tokenData.ID))
if err != nil {
return nil, err
}
}
request.Header.Set(portainer.PortainerAgentKubernetesSATokenHeader, token)
signature, err := transport.signatureService.CreateSignature(portainer.PortainerAgentSignatureMessage)
@ -127,21 +110,11 @@ func NewEdgeTransport(reverseTunnelService portainer.ReverseTunnelService, endpo
// RoundTrip is the implementation of the the http.RoundTripper interface
func (transport *edgeTransport) RoundTrip(request *http.Request) (*http.Response, error) {
tokenData, err := security.RetrieveTokenData(request)
token, err := getRoundTripToken(request, transport.tokenManager, transport.endpointIdentifier)
if err != nil {
return nil, err
}
var token string
if tokenData.Role == portainer.AdministratorRole {
token = transport.tokenManager.getAdminServiceAccountToken()
} else {
token, err = transport.tokenManager.getUserServiceAccountToken(int(tokenData.ID))
if err != nil {
return nil, err
}
}
request.Header.Set(portainer.PortainerAgentKubernetesSATokenHeader, token)
response, err := transport.httpTransport.RoundTrip(request)
@ -154,3 +127,27 @@ func (transport *edgeTransport) RoundTrip(request *http.Request) (*http.Response
return response, err
}
func getRoundTripToken(
request *http.Request,
tokenManager *tokenManager,
endpointIdentifier portainer.EndpointID,
) (string, error) {
tokenData, err := security.RetrieveTokenData(request)
if err != nil {
return "", err
}
var token string
if tokenData.Role == portainer.AdministratorRole {
token = tokenManager.getAdminServiceAccountToken()
} else {
token, err = tokenManager.getUserServiceAccountToken(int(tokenData.ID))
if err != nil {
log.Printf("Failed retrieving service account token: %v", err)
return "", err
}
}
return token, nil
}

View File

@ -21,6 +21,7 @@ type (
proxyFactory *factory.ProxyFactory
endpointProxies cmap.ConcurrentMap
legacyExtensionProxies cmap.ConcurrentMap
k8sClientFactory *cli.ClientFactory
}
)
@ -29,6 +30,7 @@ func NewManager(dataStore portainer.DataStore, signatureService portainer.Digita
return &Manager{
endpointProxies: cmap.New(),
legacyExtensionProxies: cmap.New(),
k8sClientFactory: kubernetesClientFactory,
proxyFactory: factory.NewProxyFactory(dataStore, signatureService, tunnelService, clientFactory, kubernetesClientFactory, kubernetesTokenCacheManager),
}
}
@ -56,8 +58,11 @@ func (manager *Manager) GetEndpointProxy(endpoint *portainer.Endpoint) http.Hand
}
// DeleteEndpointProxy deletes the proxy associated to a key
// and cleans the k8s endpoint client cache. DeleteEndpointProxy
// is currently only called for edge connection clean up.
func (manager *Manager) DeleteEndpointProxy(endpoint *portainer.Endpoint) {
manager.endpointProxies.Remove(string(endpoint.ID))
manager.k8sClientFactory.RemoveKubeClient(endpoint)
}
// CreateLegacyExtensionProxy creates a new HTTP reverse proxy for a legacy extension and adds it to the registered proxies

View File

@ -40,6 +40,11 @@ func NewClientFactory(signatureService portainer.DigitalSignatureService, revers
}
}
// Remove the cached kube client so a new one can be created
func (factory *ClientFactory) RemoveKubeClient(endpoint *portainer.Endpoint) {
factory.endpointClients.Remove(strconv.Itoa(int(endpoint.ID)))
}
// GetKubeClient checks if an existing client is already registered for the endpoint and returns it if one is found.
// If no client is registered, it will create a new client, register it, and returns it.
func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {