fix(edge) EE-1720 activate tunnel and remove proxy cache when needed (#5775)

Co-authored-by: Simon Meng <simon.meng@portainer.io>
pull/5913/head
cong meng 3 years ago committed by GitHub
parent 588ce549ad
commit 41999e149f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3,6 +3,7 @@ package chisel
import (
"context"
"fmt"
"github.com/portainer/portainer/api/http/proxy"
"log"
"net/http"
"strconv"
@ -32,6 +33,7 @@ type Service struct {
snapshotService portainer.SnapshotService
chiselServer *chserver.Server
shutdownCtx context.Context
ProxyManager *proxy.Manager
}
// NewService returns a pointer to a new instance of Service
@ -215,18 +217,13 @@ func (service *Service) checkTunnels() {
}
}
if len(tunnel.Jobs) > 0 {
endpointID, err := strconv.Atoi(item.Key)
if err != nil {
log.Printf("[ERROR] [chisel,conversion] Invalid environment identifier (id: %s): %s", item.Key, err)
continue
}
service.SetTunnelStatusToIdle(portainer.EndpointID(endpointID))
} else {
service.tunnelDetailsMap.Remove(item.Key)
endpointID, err := strconv.Atoi(item.Key)
if err != nil {
log.Printf("[ERROR] [chisel,conversion] Invalid environment identifier (id: %s): %s", item.Key, err)
continue
}
service.SetTunnelStatusToIdle(portainer.EndpointID(endpointID))
}
}

@ -59,6 +59,12 @@ func (service *Service) GetTunnelDetails(endpointID portainer.EndpointID) *porta
// GetActiveTunnel retrieves an active tunnel which allows communicating with edge agent
func (service *Service) GetActiveTunnel(endpoint *portainer.Endpoint) (*portainer.TunnelDetails, error) {
tunnel := service.GetTunnelDetails(endpoint.ID)
if tunnel.Status == portainer.EdgeAgentActive {
// update the LastActivity
service.SetTunnelStatusToActive(endpoint.ID)
}
if tunnel.Status == portainer.EdgeAgentIdle || tunnel.Status == portainer.EdgeAgentManagementRequired {
err := service.SetTunnelStatusToRequired(endpoint.ID)
if err != nil {
@ -74,9 +80,18 @@ func (service *Service) GetActiveTunnel(endpoint *portainer.Endpoint) (*portaine
endpoint.EdgeCheckinInterval = settings.EdgeAgentCheckinInterval
}
waitForAgentToConnect := time.Duration(endpoint.EdgeCheckinInterval) * time.Second
time.Sleep(waitForAgentToConnect * 2)
waitForAgentToConnect := 2 * time.Duration(endpoint.EdgeCheckinInterval)
for waitForAgentToConnect >= 0 {
waitForAgentToConnect--
time.Sleep(time.Second)
tunnel = service.GetTunnelDetails(endpoint.ID)
if tunnel.Status == portainer.EdgeAgentActive {
break
}
}
}
tunnel = service.GetTunnelDetails(endpoint.ID)
return tunnel, nil
@ -112,6 +127,8 @@ func (service *Service) SetTunnelStatusToIdle(endpointID portainer.EndpointID) {
key := strconv.Itoa(int(endpointID))
service.tunnelDetailsMap.Set(key, tunnel)
service.ProxyManager.DeleteEndpointProxy(endpointID)
}
// SetTunnelStatusToRequired update the status of the tunnel associated to the specified environment(endpoint).

@ -467,6 +467,8 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
proxyManager := proxy.NewManager(dataStore, digitalSignatureService, reverseTunnelService, dockerClientFactory, kubernetesClientFactory, kubernetesTokenCacheManager)
reverseTunnelService.ProxyManager = proxyManager
dockerConfigPath := fileService.GetDockerConfigPath()
composeStackManager := initComposeStackManager(*flags.Assets, dockerConfigPath, reverseTunnelService, proxyManager)

@ -91,7 +91,11 @@ func createEdgeClient(endpoint *portainer.Endpoint, signatureService portainer.D
headers[portainer.PortainerAgentTargetHeader] = nodeName
}
tunnel := reverseTunnelService.GetTunnelDetails(endpoint.ID)
tunnel, err := reverseTunnelService.GetActiveTunnel(endpoint)
if err != nil {
return nil, err
}
endpointURL := fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
return client.NewClientWithOpts(

@ -44,19 +44,26 @@ func NewSwarmStackManager(binaryPath, configPath string, signatureService portai
}
// Login executes the docker login command against a list of registries (including DockerHub).
func (manager *SwarmStackManager) Login(registries []portainer.Registry, endpoint *portainer.Endpoint) {
command, args := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
func (manager *SwarmStackManager) Login(registries []portainer.Registry, endpoint *portainer.Endpoint) error {
command, args, err := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
if err != nil {
return err
}
for _, registry := range registries {
if registry.Authentication {
registryArgs := append(args, "login", "--username", registry.Username, "--password", registry.Password, registry.URL)
runCommandAndCaptureStdErr(command, registryArgs, nil, "")
}
}
return nil
}
// Logout executes the docker logout command.
func (manager *SwarmStackManager) Logout(endpoint *portainer.Endpoint) error {
command, args := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
command, args, err := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
if err != nil {
return err
}
args = append(args, "logout")
return runCommandAndCaptureStdErr(command, args, nil, "")
}
@ -64,7 +71,10 @@ func (manager *SwarmStackManager) Logout(endpoint *portainer.Endpoint) error {
// Deploy executes the docker stack deploy command.
func (manager *SwarmStackManager) Deploy(stack *portainer.Stack, prune bool, endpoint *portainer.Endpoint) error {
filePaths := stackutils.GetStackFilePaths(stack)
command, args := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
command, args, err := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
if err != nil {
return err
}
if prune {
args = append(args, "stack", "deploy", "--prune", "--with-registry-auth")
@ -84,7 +94,10 @@ func (manager *SwarmStackManager) Deploy(stack *portainer.Stack, prune bool, end
// Remove executes the docker stack rm command.
func (manager *SwarmStackManager) Remove(stack *portainer.Stack, endpoint *portainer.Endpoint) error {
command, args := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
command, args, err := manager.prepareDockerCommandAndArgs(manager.binaryPath, manager.configPath, endpoint)
if err != nil {
return err
}
args = append(args, "stack", "rm", stack.Name)
return runCommandAndCaptureStdErr(command, args, nil, "")
}
@ -108,7 +121,7 @@ func runCommandAndCaptureStdErr(command string, args []string, env []string, wor
return nil
}
func (manager *SwarmStackManager) prepareDockerCommandAndArgs(binaryPath, configPath string, endpoint *portainer.Endpoint) (string, []string) {
func (manager *SwarmStackManager) prepareDockerCommandAndArgs(binaryPath, configPath string, endpoint *portainer.Endpoint) (string, []string, error) {
// Assume Linux as a default
command := path.Join(binaryPath, "docker")
@ -121,7 +134,10 @@ func (manager *SwarmStackManager) prepareDockerCommandAndArgs(binaryPath, config
endpointURL := endpoint.URL
if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment {
tunnel := manager.reverseTunnelService.GetTunnelDetails(endpoint.ID)
tunnel, err := manager.reverseTunnelService.GetActiveTunnel(endpoint)
if err != nil {
return "", nil, err
}
endpointURL = fmt.Sprintf("tcp://127.0.0.1:%d", tunnel.Port)
}
@ -141,7 +157,7 @@ func (manager *SwarmStackManager) prepareDockerCommandAndArgs(binaryPath, config
}
}
return command, args
return command, args, nil
}
func (manager *SwarmStackManager) updateDockerCLIConfiguration(configPath string) error {

@ -2,14 +2,12 @@ package endpointproxy
import (
"errors"
"strconv"
"strings"
"time"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
"github.com/portainer/portainer/api"
bolterrors "github.com/portainer/portainer/api/bolt/errors"
"strconv"
"strings"
"net/http"
)
@ -37,22 +35,9 @@ func (handler *Handler) proxyRequestsToDockerAPI(w http.ResponseWriter, r *http.
return &httperror.HandlerError{http.StatusInternalServerError, "No Edge agent registered with the environment", errors.New("No agent available")}
}
tunnel := handler.ReverseTunnelService.GetTunnelDetails(endpoint.ID)
if tunnel.Status == portainer.EdgeAgentIdle {
handler.ProxyManager.DeleteEndpointProxy(endpoint)
err := handler.ReverseTunnelService.SetTunnelStatusToRequired(endpoint.ID)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update tunnel status", err}
}
settings, err := handler.DataStore.Settings().Settings()
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve settings from the database", err}
}
waitForAgentToConnect := time.Duration(settings.EdgeAgentCheckinInterval) * time.Second
time.Sleep(waitForAgentToConnect * 2)
_, err := handler.ReverseTunnelService.GetActiveTunnel(endpoint)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to get the active tunnel", err}
}
}

@ -3,13 +3,11 @@ package endpointproxy
import (
"errors"
"fmt"
"strings"
"time"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
portainer "github.com/portainer/portainer/api"
bolterrors "github.com/portainer/portainer/api/bolt/errors"
"strings"
"net/http"
)
@ -37,22 +35,9 @@ func (handler *Handler) proxyRequestsToKubernetesAPI(w http.ResponseWriter, r *h
return &httperror.HandlerError{http.StatusInternalServerError, "No Edge agent registered with the environment", errors.New("No agent available")}
}
tunnel := handler.ReverseTunnelService.GetTunnelDetails(endpoint.ID)
if tunnel.Status == portainer.EdgeAgentIdle {
handler.ProxyManager.DeleteEndpointProxy(endpoint)
err := handler.ReverseTunnelService.SetTunnelStatusToRequired(endpoint.ID)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update tunnel status", err}
}
settings, err := handler.DataStore.Settings().Settings()
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve settings from the database", err}
}
waitForAgentToConnect := time.Duration(settings.EdgeAgentCheckinInterval) * time.Second
time.Sleep(waitForAgentToConnect * 2)
_, err := handler.ReverseTunnelService.GetActiveTunnel(endpoint)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to get the active tunnel", err}
}
}

@ -49,7 +49,7 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) *
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove environment from the database", err}
}
handler.ProxyManager.DeleteEndpointProxy(endpoint)
handler.ProxyManager.DeleteEndpointProxy(endpoint.ID)
err = handler.DataStore.EndpointRelation().DeleteEndpointRelation(endpoint.ID)
if err != nil {

@ -12,7 +12,10 @@ import (
)
func (handler *Handler) proxyEdgeAgentWebsocketRequest(w http.ResponseWriter, r *http.Request, params *webSocketRequestParams) error {
tunnel := handler.ReverseTunnelService.GetTunnelDetails(params.endpoint.ID)
tunnel, err := handler.ReverseTunnelService.GetActiveTunnel(params.endpoint)
if err != nil {
return err
}
endpointURL, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port))
if err != nil {

@ -122,17 +122,11 @@ func (transport *Transport) createPrivateResourceControl(resourceIdentifier stri
}
func (transport *Transport) getInheritedResourceControlFromServiceOrStack(resourceIdentifier, nodeName string, resourceType portainer.ResourceControlType, resourceControls []portainer.ResourceControl) (*portainer.ResourceControl, error) {
client := transport.dockerClient
if nodeName != "" {
dockerClient, err := transport.dockerClientFactory.CreateClient(transport.endpoint, nodeName)
if err != nil {
return nil, err
}
defer dockerClient.Close()
client = dockerClient
client, err := transport.dockerClientFactory.CreateClient(transport.endpoint, nodeName)
if err != nil {
return nil, err
}
defer client.Close()
switch resourceType {
case portainer.ContainerResourceControl:

@ -14,7 +14,6 @@ import (
"strconv"
"strings"
"github.com/docker/docker/client"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/docker"
"github.com/portainer/portainer/api/http/proxy/factory/utils"
@ -33,7 +32,6 @@ type (
dataStore portainer.DataStore
signatureService portainer.DigitalSignatureService
reverseTunnelService portainer.ReverseTunnelService
dockerClient *client.Client
dockerClientFactory *docker.ClientFactory
}
@ -63,11 +61,6 @@ type (
// NewTransport returns a pointer to a new Transport instance.
func NewTransport(parameters *TransportParameters, httpTransport *http.Transport) (*Transport, error) {
dockerClient, err := parameters.DockerClientFactory.CreateClient(parameters.Endpoint, "")
if err != nil {
return nil, err
}
transport := &Transport{
endpoint: parameters.Endpoint,
dataStore: parameters.DataStore,
@ -75,7 +68,6 @@ func NewTransport(parameters *TransportParameters, httpTransport *http.Transport
reverseTunnelService: parameters.ReverseTunnelService,
dockerClientFactory: parameters.DockerClientFactory,
HTTPTransport: httpTransport,
dockerClient: dockerClient,
}
return transport, nil

@ -132,16 +132,12 @@ func (transport *Transport) decorateVolumeResourceCreationOperation(request *htt
volumeID := request.Header.Get("X-Portainer-VolumeName")
if volumeID != "" {
cli := transport.dockerClient
agentTargetHeader := request.Header.Get(portainer.PortainerAgentTargetHeader)
if agentTargetHeader != "" {
dockerClient, err := transport.dockerClientFactory.CreateClient(transport.endpoint, agentTargetHeader)
if err != nil {
return nil, err
}
defer dockerClient.Close()
cli = dockerClient
cli, err := transport.dockerClientFactory.CreateClient(transport.endpoint, agentTargetHeader)
if err != nil {
return nil, err
}
defer cli.Close()
_, err = cli.VolumeInspect(context.Background(), volumeID)
if err == nil {
@ -223,10 +219,13 @@ func (transport *Transport) getDockerID() (string, error) {
}
}
cli := transport.dockerClient
defer cli.Close()
client, err := transport.dockerClientFactory.CreateClient(transport.endpoint, "")
if err != nil {
return "", err
}
defer client.Close()
info, err := cli.Info(context.Background())
info, err := client.Info(context.Background())
if err != nil {
return "", err
}

@ -52,7 +52,7 @@ func (factory *ProxyFactory) newKubernetesLocalProxy(endpoint *portainer.Endpoin
func (factory *ProxyFactory) newKubernetesEdgeHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
tunnel := factory.reverseTunnelService.GetTunnelDetails(endpoint.ID)
rawURL := fmt.Sprintf("http://localhost:%d", tunnel.Port)
rawURL := fmt.Sprintf("http://127.0.0.1:%d", tunnel.Port)
endpointURL, err := url.Parse(rawURL)
if err != nil {

@ -67,9 +67,9 @@ func (manager *Manager) GetEndpointProxy(endpoint *portainer.Endpoint) http.Hand
// DeleteEndpointProxy deletes the proxy associated to a key
// and cleans the k8s environment(endpoint) client cache. DeleteEndpointProxy
// is currently only called for edge connection clean up.
func (manager *Manager) DeleteEndpointProxy(endpoint *portainer.Endpoint) {
manager.endpointProxies.Remove(fmt.Sprint(endpoint.ID))
manager.k8sClientFactory.RemoveKubeClient(endpoint)
func (manager *Manager) DeleteEndpointProxy(endpointID portainer.EndpointID) {
manager.endpointProxies.Remove(fmt.Sprint(endpointID))
manager.k8sClientFactory.RemoveKubeClient(endpointID)
}
// CreateLegacyExtensionProxy creates a new HTTP reverse proxy for a legacy extension and adds it to the registered proxies

@ -2,11 +2,11 @@ package cli
import (
"fmt"
cmap "github.com/orcaman/concurrent-map"
"net/http"
"strconv"
"sync"
cmap "github.com/orcaman/concurrent-map"
"github.com/pkg/errors"
portainer "github.com/portainer/portainer/api"
@ -45,8 +45,8 @@ func NewClientFactory(signatureService portainer.DigitalSignatureService, revers
}
// Remove the cached kube client so a new one can be created
func (factory *ClientFactory) RemoveKubeClient(endpoint *portainer.Endpoint) {
factory.endpointClients.Remove(strconv.Itoa(int(endpoint.ID)))
func (factory *ClientFactory) RemoveKubeClient(endpointID portainer.EndpointID) {
factory.endpointClients.Remove(strconv.Itoa(int(endpointID)))
}
// GetKubeClient checks if an existing client is already registered for the environment(endpoint) and returns it if one is found.
@ -123,7 +123,6 @@ func (factory *ClientFactory) buildEdgeClient(endpoint *portainer.Endpoint) (*ku
if err != nil {
return nil, errors.Wrap(err, "failed activating tunnel")
}
endpointURL := fmt.Sprintf("http://127.0.0.1:%d/kubernetes", tunnel.Port)
return factory.createRemoteClient(endpointURL)

@ -1392,7 +1392,7 @@ type (
// SwarmStackManager represents a service to manage Swarm stacks
SwarmStackManager interface {
Login(registries []Registry, endpoint *Endpoint)
Login(registries []Registry, endpoint *Endpoint) error
Logout(endpoint *Endpoint) error
Deploy(stack *Stack, prune bool, endpoint *Endpoint) error
Remove(stack *Stack, endpoint *Endpoint) error

@ -1,4 +1,5 @@
import $ from 'jquery';
import { PortainerEndpointTypes } from 'Portainer/models/endpoint/models';
angular.module('portainer').run([
'$rootScope',
@ -49,7 +50,7 @@ angular.module('portainer').run([
function ping(EndpointProvider, SystemService) {
let endpoint = EndpointProvider.currentEndpoint();
if (endpoint !== undefined && endpoint.Type === 4) {
if (endpoint !== undefined && endpoint.Type == PortainerEndpointTypes.EdgeAgentOnDockerEnvironment) {
SystemService.ping(endpoint.Id);
}
}

@ -21,6 +21,10 @@ export default class EndpointHelper {
].includes(endpoint.Type);
}
static isEdgeEndpoint(endpoint) {
return [PortainerEndpointTypes.EdgeAgentOnDockerEnvironment, PortainerEndpointTypes.EdgeAgentOnKubernetesEnvironment].includes(endpoint.Type);
}
static mapGroupNameToEndpoint(endpoints, groups) {
for (var i = 0; i < endpoints.length; i++) {
var endpoint = endpoints[i];

Loading…
Cancel
Save