feat(kube): use local kubectl for all deployments (#5488)

pull/5751/head
Chaim Lev-Ari 2021-09-24 07:56:22 +03:00 committed by GitHub
parent 5ad3cacefd
commit d4f581a596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 126 additions and 197 deletions

View File

@ -56,6 +56,32 @@ func (service *Service) GetTunnelDetails(endpointID portainer.EndpointID) *porta
}
}
// GetActiveTunnel retrieves an active tunnel which allows communicating with edge agent
func (service *Service) GetActiveTunnel(endpoint *portainer.Endpoint) (*portainer.TunnelDetails, error) {
tunnel := service.GetTunnelDetails(endpoint.ID)
if tunnel.Status == portainer.EdgeAgentIdle || tunnel.Status == portainer.EdgeAgentManagementRequired {
err := service.SetTunnelStatusToRequired(endpoint.ID)
if err != nil {
return nil, fmt.Errorf("failed opening tunnel to endpoint: %w", err)
}
if endpoint.EdgeCheckinInterval == 0 {
settings, err := service.dataStore.Settings().Settings()
if err != nil {
return nil, fmt.Errorf("failed fetching settings from db: %w", err)
}
endpoint.EdgeCheckinInterval = settings.EdgeAgentCheckinInterval
}
waitForAgentToConnect := time.Duration(endpoint.EdgeCheckinInterval) * time.Second
time.Sleep(waitForAgentToConnect * 2)
}
tunnel = service.GetTunnelDetails(endpoint.ID)
return tunnel, nil
}
// SetTunnelStatusToActive update the status of the tunnel associated to the specified environment(endpoint).
// It sets the status to ACTIVE.
func (service *Service) SetTunnelStatusToActive(endpointID portainer.EndpointID) {

View File

@ -99,8 +99,8 @@ func initSwarmStackManager(assetsPath string, configPath string, signatureServic
return exec.NewSwarmStackManager(assetsPath, configPath, signatureService, fileService, reverseTunnelService)
}
func initKubernetesDeployer(kubernetesTokenCacheManager *kubeproxy.TokenCacheManager, kubernetesClientFactory *kubecli.ClientFactory, dataStore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, assetsPath string) portainer.KubernetesDeployer {
return exec.NewKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, signatureService, assetsPath)
func initKubernetesDeployer(kubernetesTokenCacheManager *kubeproxy.TokenCacheManager, kubernetesClientFactory *kubecli.ClientFactory, dataStore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, proxyManager *proxy.Manager, assetsPath string) portainer.KubernetesDeployer {
return exec.NewKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, signatureService, proxyManager, assetsPath)
}
func initHelmPackageManager(assetsPath string) (libhelm.HelmPackageManager, error) {
@ -469,7 +469,7 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
log.Fatalf("failed initializing swarm stack manager: %s", err)
}
kubernetesDeployer := initKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, digitalSignatureService, *flags.Assets)
kubernetesDeployer := initKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, digitalSignatureService, proxyManager, *flags.Assets)
helmPackageManager, err := initHelmPackageManager(*flags.Assets)
if err != nil {

View File

@ -47,7 +47,7 @@ func (manager *ComposeStackManager) ComposeSyntaxMaxVersion() string {
func (manager *ComposeStackManager) Up(ctx context.Context, stack *portainer.Stack, endpoint *portainer.Endpoint) error {
url, proxy, err := manager.fetchEndpointProxy(endpoint)
if err != nil {
return errors.Wrap(err, "failed to featch environment proxy")
return errors.Wrap(err, "failed to fetch endpoint proxy")
}
if proxy != nil {
@ -90,12 +90,12 @@ func (manager *ComposeStackManager) fetchEndpointProxy(endpoint *portainer.Endpo
return "", nil, nil
}
proxy, err := manager.proxyManager.CreateComposeProxyServer(endpoint)
proxy, err := manager.proxyManager.CreateAgentProxyServer(endpoint)
if err != nil {
return "", nil, err
}
return fmt.Sprintf("tcp://127.0.0.1:%d", proxy.Port), proxy, nil
return fmt.Sprintf("http://127.0.0.1:%d", proxy.Port), proxy, nil
}
func createEnvFile(stack *portainer.Stack) (string, error) {

View File

@ -2,24 +2,21 @@ package exec
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os/exec"
"path"
"runtime"
"strings"
"time"
"github.com/pkg/errors"
"github.com/portainer/portainer/api/http/proxy"
"github.com/portainer/portainer/api/http/proxy/factory"
"github.com/portainer/portainer/api/http/proxy/factory/kubernetes"
"github.com/portainer/portainer/api/http/security"
"github.com/portainer/portainer/api/kubernetes/cli"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/crypto"
)
// KubernetesDeployer represents a service to deploy resources inside a Kubernetes environment(endpoint).
@ -30,10 +27,11 @@ type KubernetesDeployer struct {
signatureService portainer.DigitalSignatureService
kubernetesClientFactory *cli.ClientFactory
kubernetesTokenCacheManager *kubernetes.TokenCacheManager
proxyManager *proxy.Manager
}
// NewKubernetesDeployer initializes a new KubernetesDeployer service.
func NewKubernetesDeployer(kubernetesTokenCacheManager *kubernetes.TokenCacheManager, kubernetesClientFactory *cli.ClientFactory, datastore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, binaryPath string) *KubernetesDeployer {
func NewKubernetesDeployer(kubernetesTokenCacheManager *kubernetes.TokenCacheManager, kubernetesClientFactory *cli.ClientFactory, datastore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, proxyManager *proxy.Manager, binaryPath string) *KubernetesDeployer {
return &KubernetesDeployer{
binaryPath: binaryPath,
dataStore: datastore,
@ -41,6 +39,7 @@ func NewKubernetesDeployer(kubernetesTokenCacheManager *kubernetes.TokenCacheMan
signatureService: signatureService,
kubernetesClientFactory: kubernetesClientFactory,
kubernetesTokenCacheManager: kubernetesTokenCacheManager,
proxyManager: proxyManager,
}
}
@ -50,14 +49,14 @@ func (deployer *KubernetesDeployer) getToken(request *http.Request, endpoint *po
return "", err
}
kubecli, err := deployer.kubernetesClientFactory.GetKubeClient(endpoint)
kubeCLI, err := deployer.kubernetesClientFactory.GetKubeClient(endpoint)
if err != nil {
return "", err
}
tokenCache := deployer.kubernetesTokenCacheManager.GetOrCreateTokenCache(int(endpoint.ID))
tokenManager, err := kubernetes.NewTokenManager(kubecli, deployer.dataStore, tokenCache, setLocalAdminToken)
tokenManager, err := kubernetes.NewTokenManager(kubeCLI, deployer.dataStore, tokenCache, setLocalAdminToken)
if err != nil {
return "", err
}
@ -80,153 +79,44 @@ func (deployer *KubernetesDeployer) getToken(request *http.Request, endpoint *po
// Deploy will deploy a Kubernetes manifest inside a specific namespace in a Kubernetes environment(endpoint).
// Otherwise it will use kubectl to deploy the manifest.
func (deployer *KubernetesDeployer) Deploy(request *http.Request, endpoint *portainer.Endpoint, stackConfig string, namespace string) (string, error) {
if endpoint.Type == portainer.KubernetesLocalEnvironment {
token, err := deployer.getToken(request, endpoint, true)
command := path.Join(deployer.binaryPath, "kubectl")
if runtime.GOOS == "windows" {
command = path.Join(deployer.binaryPath, "kubectl.exe")
}
args := make([]string, 0)
if endpoint.Type == portainer.AgentOnKubernetesEnvironment || endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment {
url, proxy, err := deployer.getAgentURL(endpoint)
if err != nil {
return "", err
return "", errors.WithMessage(err, "failed generating endpoint URL")
}
command := path.Join(deployer.binaryPath, "kubectl")
if runtime.GOOS == "windows" {
command = path.Join(deployer.binaryPath, "kubectl.exe")
}
args := make([]string, 0)
args = append(args, "--server", endpoint.URL)
defer proxy.Close()
args = append(args, "--server", url)
args = append(args, "--insecure-skip-tls-verify")
args = append(args, "--token", token)
args = append(args, "--namespace", namespace)
args = append(args, "apply", "-f", "-")
var stderr bytes.Buffer
cmd := exec.Command(command, args...)
cmd.Stderr = &stderr
cmd.Stdin = strings.NewReader(stackConfig)
output, err := cmd.Output()
if err != nil {
return "", errors.New(stderr.String())
}
return string(output), nil
}
// agent
endpointURL := endpoint.URL
if endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment {
tunnel := deployer.reverseTunnelService.GetTunnelDetails(endpoint.ID)
if tunnel.Status == portainer.EdgeAgentIdle {
err := deployer.reverseTunnelService.SetTunnelStatusToRequired(endpoint.ID)
if err != nil {
return "", err
}
settings, err := deployer.dataStore.Settings().Settings()
if err != nil {
return "", err
}
waitForAgentToConnect := time.Duration(settings.EdgeAgentCheckinInterval) * time.Second
time.Sleep(waitForAgentToConnect * 2)
}
endpointURL = fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
}
transport := &http.Transport{}
if endpoint.TLSConfig.TLS {
tlsConfig, err := crypto.CreateTLSConfigurationFromDisk(endpoint.TLSConfig.TLSCACertPath, endpoint.TLSConfig.TLSCertPath, endpoint.TLSConfig.TLSKeyPath, endpoint.TLSConfig.TLSSkipVerify)
if err != nil {
return "", err
}
transport.TLSClientConfig = tlsConfig
}
httpCli := &http.Client{
Transport: transport,
}
if !strings.HasPrefix(endpointURL, "http") {
endpointURL = fmt.Sprintf("https://%s", endpointURL)
}
url, err := url.Parse(fmt.Sprintf("%s/v2/kubernetes/stack", endpointURL))
token, err := deployer.getToken(request, endpoint, endpoint.Type == portainer.KubernetesLocalEnvironment)
if err != nil {
return "", err
}
reqPayload, err := json.Marshal(
struct {
StackConfig string
Namespace string
}{
StackConfig: stackConfig,
Namespace: namespace,
})
args = append(args, "--token", token)
args = append(args, "--namespace", namespace)
args = append(args, "apply", "-f", "-")
var stderr bytes.Buffer
cmd := exec.Command(command, args...)
cmd.Stderr = &stderr
cmd.Stdin = strings.NewReader(stackConfig)
output, err := cmd.Output()
if err != nil {
return "", err
return "", errors.Wrapf(err, "failed to execute kubectl command: %q", stderr.String())
}
req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(reqPayload))
if err != nil {
return "", err
}
signature, err := deployer.signatureService.CreateSignature(portainer.PortainerAgentSignatureMessage)
if err != nil {
return "", err
}
token, err := deployer.getToken(request, endpoint, false)
if err != nil {
return "", err
}
req.Header.Set(portainer.PortainerAgentPublicKeyHeader, deployer.signatureService.EncodedPublicKey())
req.Header.Set(portainer.PortainerAgentSignatureHeader, signature)
req.Header.Set(portainer.PortainerAgentKubernetesSATokenHeader, token)
resp, err := httpCli.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
var errorResponseData struct {
Message string
Details string
}
err = json.NewDecoder(resp.Body).Decode(&errorResponseData)
if err != nil {
output, parseStringErr := ioutil.ReadAll(resp.Body)
if parseStringErr != nil {
return "", parseStringErr
}
return "", fmt.Errorf("Failed parsing, body: %s, error: %w", output, err)
}
return "", fmt.Errorf("Deployment to agent failed: %s", errorResponseData.Details)
}
var responseData struct{ Output string }
err = json.NewDecoder(resp.Body).Decode(&responseData)
if err != nil {
parsedOutput, parseStringErr := ioutil.ReadAll(resp.Body)
if parseStringErr != nil {
return "", parseStringErr
}
return "", fmt.Errorf("Failed decoding, body: %s, err: %w", parsedOutput, err)
}
return responseData.Output, nil
return string(output), nil
}
// ConvertCompose leverages the kompose binary to deploy a compose compliant manifest.
@ -251,3 +141,12 @@ func (deployer *KubernetesDeployer) ConvertCompose(data []byte) ([]byte, error)
return output, nil
}
func (deployer *KubernetesDeployer) getAgentURL(endpoint *portainer.Endpoint) (string, *factory.ProxyServer, error) {
proxy, err := deployer.proxyManager.CreateAgentProxyServer(endpoint)
if err != nil {
return "", nil, err
}
return fmt.Sprintf("http://127.0.0.1:%d/kubernetes", proxy.Port), proxy, nil
}

View File

@ -6,29 +6,36 @@ import (
"net"
"net/http"
"net/url"
"strings"
"github.com/pkg/errors"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/crypto"
"github.com/portainer/portainer/api/http/proxy/factory/dockercompose"
"github.com/portainer/portainer/api/http/proxy/factory/agent"
"github.com/portainer/portainer/api/internal/endpointutils"
)
// ProxyServer provide an extedned proxy with a local server to forward requests
// ProxyServer provide an extended proxy with a local server to forward requests
type ProxyServer struct {
server *http.Server
Port int
}
func (factory *ProxyFactory) NewDockerComposeAgentProxy(endpoint *portainer.Endpoint) (*ProxyServer, error) {
// NewAgentProxy creates a new instance of ProxyServer that wrap http requests with agent headers
func (factory *ProxyFactory) NewAgentProxy(endpoint *portainer.Endpoint) (*ProxyServer, error) {
urlString := endpoint.URL
if endpointutils.IsEdgeEndpoint((endpoint)) {
tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint)
if err != nil {
return nil, errors.Wrap(err, "failed starting tunnel")
}
if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment {
return &ProxyServer{
Port: factory.reverseTunnelService.GetTunnelDetails(endpoint.ID).Port,
}, nil
urlString = fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
}
endpointURL, err := url.Parse(endpoint.URL)
endpointURL, err := parseURL(urlString)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed parsing url %s", endpoint.URL)
}
endpointURL.Scheme = "http"
@ -37,7 +44,7 @@ func (factory *ProxyFactory) NewDockerComposeAgentProxy(endpoint *portainer.Endp
if endpoint.TLSConfig.TLS || endpoint.TLSConfig.TLSSkipVerify {
config, err := crypto.CreateTLSConfigurationFromDisk(endpoint.TLSConfig.TLSCACertPath, endpoint.TLSConfig.TLSCertPath, endpoint.TLSConfig.TLSKeyPath, endpoint.TLSConfig.TLSSkipVerify)
if err != nil {
return nil, err
return nil, errors.WithMessage(err, "failed generating tls configuration")
}
httpTransport.TLSClientConfig = config
@ -46,7 +53,7 @@ func (factory *ProxyFactory) NewDockerComposeAgentProxy(endpoint *portainer.Endp
proxy := newSingleHostReverseProxyWithHostHeader(endpointURL)
proxy.Transport = dockercompose.NewAgentTransport(factory.signatureService, httpTransport)
proxy.Transport = agent.NewTransport(factory.signatureService, httpTransport)
proxyServer := &ProxyServer{
server: &http.Server{
@ -57,7 +64,7 @@ func (factory *ProxyFactory) NewDockerComposeAgentProxy(endpoint *portainer.Endp
err = proxyServer.start()
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed starting proxy server")
}
return proxyServer, nil
@ -91,3 +98,15 @@ func (proxy *ProxyServer) Close() {
proxy.server.Close()
}
}
// parseURL parses the endpointURL using url.Parse.
//
// to prevent an error when url has port but no protocol prefix
// we add `//` prefix if needed
func parseURL(endpointURL string) (*url.URL, error) {
if !strings.HasPrefix(endpointURL, "http") && !strings.HasPrefix(endpointURL, "tcp") && !strings.HasPrefix(endpointURL, "//") {
endpointURL = fmt.Sprintf("//%s", endpointURL)
}
return url.Parse(endpointURL)
}

View File

@ -1,4 +1,4 @@
package dockercompose
package agent
import (
"net/http"
@ -7,17 +7,17 @@ import (
)
type (
// AgentTransport is an http.Transport wrapper that adds custom http headers to communicate to an Agent
AgentTransport struct {
// Transport is an http.Transport wrapper that adds custom http headers to communicate to an Agent
Transport struct {
httpTransport *http.Transport
signatureService portainer.DigitalSignatureService
endpointIdentifier portainer.EndpointID
}
)
// NewAgentTransport returns a new transport that can be used to send signed requests to a Portainer agent
func NewAgentTransport(signatureService portainer.DigitalSignatureService, httpTransport *http.Transport) *AgentTransport {
transport := &AgentTransport{
// NewTransport returns a new transport that can be used to send signed requests to a Portainer agent
func NewTransport(signatureService portainer.DigitalSignatureService, httpTransport *http.Transport) *Transport {
transport := &Transport{
httpTransport: httpTransport,
signatureService: signatureService,
}
@ -26,8 +26,7 @@ func NewAgentTransport(signatureService portainer.DigitalSignatureService, httpT
}
// RoundTrip is the implementation of the the http.RoundTripper interface
func (transport *AgentTransport) RoundTrip(request *http.Request) (*http.Response, error) {
func (transport *Transport) RoundTrip(request *http.Request) (*http.Response, error) {
signature, err := transport.signatureService.CreateSignature(portainer.PortainerAgentSignatureMessage)
if err != nil {
return nil, err

View File

@ -48,10 +48,10 @@ func (manager *Manager) CreateAndRegisterEndpointProxy(endpoint *portainer.Endpo
return proxy, nil
}
// CreateComposeProxyServer creates a new HTTP reverse proxy based on environment(endpoint) properties and and adds it to the registered proxies.
// CreateAgentProxyServer creates a new HTTP reverse proxy based on environment(endpoint) properties and and adds it to the registered proxies.
// It can also be used to create a new HTTP reverse proxy and replace an already registered proxy.
func (manager *Manager) CreateComposeProxyServer(endpoint *portainer.Endpoint) (*factory.ProxyServer, error) {
return manager.proxyFactory.NewDockerComposeAgentProxy(endpoint)
func (manager *Manager) CreateAgentProxyServer(endpoint *portainer.Endpoint) (*factory.ProxyServer, error) {
return manager.proxyFactory.NewAgentProxy(endpoint)
}
// GetEndpointProxy returns the proxy associated to a key

View File

@ -24,3 +24,7 @@ func IsDockerEndpoint(endpoint *portainer.Endpoint) bool {
endpoint.Type == portainer.AgentOnDockerEnvironment ||
endpoint.Type == portainer.EdgeAgentOnDockerEnvironment
}
func IsEdgeEndpoint(endpoint *portainer.Endpoint) bool {
return endpoint.Type == portainer.EdgeAgentOnDockerEnvironment || endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment
}

View File

@ -1,14 +1,13 @@
package cli
import (
"errors"
"fmt"
"net/http"
"strconv"
"sync"
"time"
cmap "github.com/orcaman/concurrent-map"
"github.com/pkg/errors"
portainer "github.com/portainer/portainer/api"
"k8s.io/client-go/kubernetes"
@ -116,36 +115,18 @@ func (rt *agentHeaderRoundTripper) RoundTrip(req *http.Request) (*http.Response,
func (factory *ClientFactory) buildAgentClient(endpoint *portainer.Endpoint) (*kubernetes.Clientset, error) {
endpointURL := fmt.Sprintf("https://%s/kubernetes", endpoint.URL)
return factory.createRemoteClient(endpointURL);
return factory.createRemoteClient(endpointURL)
}
func (factory *ClientFactory) buildEdgeClient(endpoint *portainer.Endpoint) (*kubernetes.Clientset, error) {
tunnel := factory.reverseTunnelService.GetTunnelDetails(endpoint.ID)
if tunnel.Status == portainer.EdgeAgentIdle {
err := factory.reverseTunnelService.SetTunnelStatusToRequired(endpoint.ID)
if err != nil {
return nil, fmt.Errorf("failed opening tunnel to environment: %w", err)
}
if endpoint.EdgeCheckinInterval == 0 {
settings, err := factory.dataStore.Settings().Settings()
if err != nil {
return nil, fmt.Errorf("failed fetching settings from db: %w", err)
}
endpoint.EdgeCheckinInterval = settings.EdgeAgentCheckinInterval
}
waitForAgentToConnect := time.Duration(endpoint.EdgeCheckinInterval) * time.Second
time.Sleep(waitForAgentToConnect * 2)
tunnel = factory.reverseTunnelService.GetTunnelDetails(endpoint.ID)
tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint)
if err != nil {
return nil, errors.Wrap(err, "failed activating tunnel")
}
endpointURL := fmt.Sprintf("http://127.0.0.1:%d/kubernetes", tunnel.Port)
return factory.createRemoteClient(endpointURL);
return factory.createRemoteClient(endpointURL)
}
func (factory *ClientFactory) createRemoteClient(endpointURL string) (*kubernetes.Clientset, error) {

View File

@ -1326,6 +1326,7 @@ type (
SetTunnelStatusToIdle(endpointID EndpointID)
KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration)
GetTunnelDetails(endpointID EndpointID) *TunnelDetails
GetActiveTunnel(endpoint *Endpoint) (*TunnelDetails, error)
AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob)
RemoveEdgeJob(edgeJobID EdgeJobID)
}