mirror of https://github.com/portainer/portainer
fix(tunnels): fix a deadlock with the tunnels EE-2751 (#6649)
parent
f8fd28bb61
commit
f8cbb54ba5
|
@ -5,16 +5,14 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/portainer/portainer/api/http/proxy"
|
||||
|
||||
"github.com/dchest/uniuri"
|
||||
chserver "github.com/jpillora/chisel/server"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
"github.com/portainer/portainer/api/http/proxy"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -29,7 +27,7 @@ const (
|
|||
type Service struct {
|
||||
serverFingerprint string
|
||||
serverPort string
|
||||
tunnelDetailsMap map[string]*portainer.TunnelDetails
|
||||
tunnelDetailsMap map[portainer.EndpointID]*portainer.TunnelDetails
|
||||
dataStore dataservices.DataStore
|
||||
snapshotService portainer.SnapshotService
|
||||
chiselServer *chserver.Server
|
||||
|
@ -41,7 +39,7 @@ type Service struct {
|
|||
// NewService returns a pointer to a new instance of Service
|
||||
func NewService(dataStore dataservices.DataStore, shutdownCtx context.Context) *Service {
|
||||
return &Service{
|
||||
tunnelDetailsMap: make(map[string]*portainer.TunnelDetails),
|
||||
tunnelDetailsMap: make(map[portainer.EndpointID]*portainer.TunnelDetails),
|
||||
dataStore: dataStore,
|
||||
shutdownCtx: shutdownCtx,
|
||||
}
|
||||
|
@ -183,48 +181,41 @@ func (service *Service) startTunnelVerificationLoop() {
|
|||
}
|
||||
|
||||
func (service *Service) checkTunnels() {
|
||||
service.mu.Lock()
|
||||
tunnels := make(map[portainer.EndpointID]portainer.TunnelDetails)
|
||||
|
||||
service.mu.Lock()
|
||||
for key, tunnel := range service.tunnelDetailsMap {
|
||||
tunnels[key] = *tunnel
|
||||
}
|
||||
service.mu.Unlock()
|
||||
|
||||
for endpointID, tunnel := range tunnels {
|
||||
if tunnel.LastActivity.IsZero() || tunnel.Status == portainer.EdgeAgentIdle {
|
||||
continue
|
||||
}
|
||||
|
||||
elapsed := time.Since(tunnel.LastActivity)
|
||||
log.Printf("[DEBUG] [chisel,monitoring] [endpoint_id: %s] [status: %s] [status_time_seconds: %f] [message: environment tunnel monitoring]", key, tunnel.Status, elapsed.Seconds())
|
||||
log.Printf("[DEBUG] [chisel,monitoring] [endpoint_id: %d] [status: %s] [status_time_seconds: %f] [message: environment tunnel monitoring]", endpointID, tunnel.Status, elapsed.Seconds())
|
||||
|
||||
if tunnel.Status == portainer.EdgeAgentManagementRequired && elapsed.Seconds() < requiredTimeout.Seconds() {
|
||||
continue
|
||||
} else if tunnel.Status == portainer.EdgeAgentManagementRequired && elapsed.Seconds() > requiredTimeout.Seconds() {
|
||||
log.Printf("[DEBUG] [chisel,monitoring] [endpoint_id: %s] [status: %s] [status_time_seconds: %f] [timeout_seconds: %f] [message: REQUIRED state timeout exceeded]", key, tunnel.Status, elapsed.Seconds(), requiredTimeout.Seconds())
|
||||
log.Printf("[DEBUG] [chisel,monitoring] [endpoint_id: %d] [status: %s] [status_time_seconds: %f] [timeout_seconds: %f] [message: REQUIRED state timeout exceeded]", endpointID, tunnel.Status, elapsed.Seconds(), requiredTimeout.Seconds())
|
||||
}
|
||||
|
||||
if tunnel.Status == portainer.EdgeAgentActive && elapsed.Seconds() < activeTimeout.Seconds() {
|
||||
continue
|
||||
} else if tunnel.Status == portainer.EdgeAgentActive && elapsed.Seconds() > activeTimeout.Seconds() {
|
||||
log.Printf("[DEBUG] [chisel,monitoring] [endpoint_id: %s] [status: %s] [status_time_seconds: %f] [timeout_seconds: %f] [message: ACTIVE state timeout exceeded]", key, tunnel.Status, elapsed.Seconds(), activeTimeout.Seconds())
|
||||
log.Printf("[DEBUG] [chisel,monitoring] [endpoint_id: %d] [status: %s] [status_time_seconds: %f] [timeout_seconds: %f] [message: ACTIVE state timeout exceeded]", endpointID, tunnel.Status, elapsed.Seconds(), activeTimeout.Seconds())
|
||||
|
||||
endpointID, err := strconv.Atoi(key)
|
||||
err := service.snapshotEnvironment(endpointID, tunnel.Port)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] [chisel,snapshot,conversion] Invalid environment identifier (id: %s): %s", key, err)
|
||||
}
|
||||
|
||||
err = service.snapshotEnvironment(portainer.EndpointID(endpointID), tunnel.Port)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] [snapshot] Unable to snapshot Edge environment (id: %s): %s", key, err)
|
||||
log.Printf("[ERROR] [snapshot] Unable to snapshot Edge environment (id: %d): %s", endpointID, err)
|
||||
}
|
||||
}
|
||||
|
||||
endpointID, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] [chisel,conversion] Invalid environment identifier (id: %s): %s", key, err)
|
||||
continue
|
||||
}
|
||||
|
||||
service.setTunnelStatusToIdle(portainer.EndpointID(endpointID))
|
||||
service.SetTunnelStatusToIdle(portainer.EndpointID(endpointID))
|
||||
}
|
||||
|
||||
service.mu.Unlock()
|
||||
}
|
||||
|
||||
func (service *Service) snapshotEnvironment(endpointID portainer.EndpointID, tunnelPort int) error {
|
||||
|
|
|
@ -4,13 +4,11 @@ import (
|
|||
"encoding/base64"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/portainer/libcrypto"
|
||||
|
||||
"github.com/dchest/uniuri"
|
||||
"github.com/portainer/libcrypto"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
)
|
||||
|
||||
|
@ -19,6 +17,7 @@ const (
|
|||
maxAvailablePort = 65535
|
||||
)
|
||||
|
||||
// NOTE: it needs to be called with the lock acquired
|
||||
// getUnusedPort is used to generate an unused random port in the dynamic port range.
|
||||
// Dynamic ports (also called private ports) are 49152 to 65535.
|
||||
func (service *Service) getUnusedPort() int {
|
||||
|
@ -39,9 +38,8 @@ func randomInt(min, max int) int {
|
|||
|
||||
// NOTE: it needs to be called with the lock acquired
|
||||
func (service *Service) getTunnelDetails(endpointID portainer.EndpointID) *portainer.TunnelDetails {
|
||||
key := strconv.Itoa(int(endpointID))
|
||||
|
||||
if tunnel, ok := service.tunnelDetailsMap[key]; ok {
|
||||
if tunnel, ok := service.tunnelDetailsMap[endpointID]; ok {
|
||||
return tunnel
|
||||
}
|
||||
|
||||
|
@ -49,7 +47,7 @@ func (service *Service) getTunnelDetails(endpointID portainer.EndpointID) *porta
|
|||
Status: portainer.EdgeAgentIdle,
|
||||
}
|
||||
|
||||
service.tunnelDetailsMap[key] = tunnel
|
||||
service.tunnelDetailsMap[endpointID] = tunnel
|
||||
|
||||
return tunnel
|
||||
}
|
||||
|
@ -103,8 +101,12 @@ func (service *Service) SetTunnelStatusToActive(endpointID portainer.EndpointID)
|
|||
service.mu.Unlock()
|
||||
}
|
||||
|
||||
// NOTE: it needs to be called with the lock acquired
|
||||
func (service *Service) setTunnelStatusToIdle(endpointID portainer.EndpointID) {
|
||||
// SetTunnelStatusToIdle update the status of the tunnel associated to the specified environment(endpoint).
|
||||
// It sets the status to IDLE.
|
||||
// It removes any existing credentials associated to the tunnel.
|
||||
func (service *Service) SetTunnelStatusToIdle(endpointID portainer.EndpointID) {
|
||||
service.mu.Lock()
|
||||
|
||||
tunnel := service.getTunnelDetails(endpointID)
|
||||
tunnel.Status = portainer.EdgeAgentIdle
|
||||
tunnel.Port = 0
|
||||
|
@ -117,14 +119,7 @@ func (service *Service) setTunnelStatusToIdle(endpointID portainer.EndpointID) {
|
|||
}
|
||||
|
||||
service.ProxyManager.DeleteEndpointProxy(endpointID)
|
||||
}
|
||||
|
||||
// SetTunnelStatusToIdle update the status of the tunnel associated to the specified environment(endpoint).
|
||||
// It sets the status to IDLE.
|
||||
// It removes any existing credentials associated to the tunnel.
|
||||
func (service *Service) SetTunnelStatusToIdle(endpointID portainer.EndpointID) {
|
||||
service.mu.Lock()
|
||||
service.setTunnelStatusToIdle(endpointID)
|
||||
service.mu.Unlock()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue