From 2aec348814af358d0e1e5ce5e4f68305b0a7ac76 Mon Sep 17 00:00:00 2001 From: cong meng Date: Tue, 21 Sep 2021 13:12:31 +1200 Subject: [PATCH] fix(k8s) keep tunnel alive for websocket connection EE-1690 (#5679) * fix(k8s) EE-1690 keep tunnel alive for websocket connection * fix(k8s) EE-1690 fix comment Co-authored-by: Simon Meng --- api/chisel/service.go | 50 +++++++++++++++++++++++++++++ api/http/handler/websocket/proxy.go | 3 ++ api/kubernetes/cli/pod.go | 6 ++-- api/portainer.go | 3 ++ 4 files changed, 58 insertions(+), 4 deletions(-) diff --git a/api/chisel/service.go b/api/chisel/service.go index 9af0f4866..f22281bd5 100644 --- a/api/chisel/service.go +++ b/api/chisel/service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/http" "strconv" "time" @@ -42,6 +43,55 @@ func NewService(dataStore portainer.DataStore, shutdownCtx context.Context) *Ser } } +// pingAgent ping the given agent so that the agent can keep the tunnel alive +func (service *Service) pingAgent(endpointID portainer.EndpointID) error{ + tunnel := service.GetTunnelDetails(endpointID) + requestURL := fmt.Sprintf("http://127.0.0.1:%d/ping", tunnel.Port) + req, err := http.NewRequest(http.MethodHead, requestURL, nil) + if err != nil { + return err + } + + httpClient := &http.Client{ + Timeout: 3 * time.Second, + } + _, err = httpClient.Do(req) + if err != nil { + return err + } + + return nil +} + +// KeepTunnelAlive keeps the tunnel of the given environment for maxAlive duration, or until ctx is done +func (service *Service) KeepTunnelAlive(endpointID portainer.EndpointID, ctx context.Context, maxAlive time.Duration) { + go func() { + log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [message: start for %.0f minutes]\n", endpointID, maxAlive.Minutes()) + maxAliveTicker := time.NewTicker(maxAlive) + defer maxAliveTicker.Stop() + pingTicker := time.NewTicker(tunnelCleanupInterval) + defer pingTicker.Stop() + + for { + select { + case <-pingTicker.C: + service.SetTunnelStatusToActive(endpointID) + err := service.pingAgent(endpointID) + if err != nil { + log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [warning: ping agent err=%s]\n", endpointID, err) + } + case <-maxAliveTicker.C: + log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [message: stop as %.0f minutes timeout]\n", endpointID, maxAlive.Minutes()) + return + case <-ctx.Done(): + err := ctx.Err() + log.Printf("[DEBUG] [chisel,KeepTunnelAlive] [endpoint_id: %d] [message: stop as err=%s]\n", endpointID, err) + return + } + } + }() +} + // StartTunnelServer starts a tunnel server on the specified addr and port. // It uses a seed to generate a new private/public key pair. If the seed cannot // be found inside the database, it will generate a new one randomly and persist it. diff --git a/api/http/handler/websocket/proxy.go b/api/http/handler/websocket/proxy.go index a03cb6637..c9a3b07de 100644 --- a/api/http/handler/websocket/proxy.go +++ b/api/http/handler/websocket/proxy.go @@ -35,6 +35,9 @@ func (handler *Handler) proxyEdgeAgentWebsocketRequest(w http.ResponseWriter, r } handler.ReverseTunnelService.SetTunnelStatusToActive(params.endpoint.ID) + + handler.ReverseTunnelService.KeepTunnelAlive(params.endpoint.ID, r.Context(), portainer.WebSocketKeepAlive) + proxy.ServeHTTP(w, r) return nil diff --git a/api/kubernetes/cli/pod.go b/api/kubernetes/cli/pod.go index 3db620d55..43d66434c 100644 --- a/api/kubernetes/cli/pod.go +++ b/api/kubernetes/cli/pod.go @@ -21,9 +21,7 @@ const shellPodImage = "portainer/kubectl-shell" // - The shell pod will be automatically removed after a specified max life (prevent zombie pods) // - The shell pod will be automatically removed if request is cancelled (or client closes websocket connection) func (kcl *KubeClient) CreateUserShellPod(ctx context.Context, serviceAccountName string) (*portainer.KubernetesShellPod, error) { - // Schedule the pod for automatic removal - maxPodKeepAlive := 1 * time.Hour - maxPodKeepAliveSecondsStr := fmt.Sprintf("%d", int(maxPodKeepAlive.Seconds())) + maxPodKeepAliveSecondsStr := fmt.Sprintf("%d", int(portainer.WebSocketKeepAlive.Seconds())) podPrefix := userShellPodPrefix(serviceAccountName) @@ -81,7 +79,7 @@ func (kcl *KubeClient) CreateUserShellPod(ctx context.Context, serviceAccountNam // Handle pod lifecycle/cleanup - terminate pod after maxPodKeepAlive or upon request (long-lived) cancellation go func() { select { - case <-time.After(maxPodKeepAlive): + case <-time.After(portainer.WebSocketKeepAlive): log.Println("[DEBUG] [internal,kubernetes/pod] [message: pod removal schedule duration exceeded]") kcl.cli.CoreV1().Pods(portainerNamespace).Delete(shellPod.Name, nil) case <-ctx.Done(): diff --git a/api/portainer.go b/api/portainer.go index 787e62fe8..9ebc4d7d7 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -1324,6 +1324,7 @@ type ( SetTunnelStatusToActive(endpointID EndpointID) SetTunnelStatusToRequired(endpointID EndpointID) error SetTunnelStatusToIdle(endpointID EndpointID) + KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration) GetTunnelDetails(endpointID EndpointID) *TunnelDetails AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob) RemoveEdgeJob(edgeJobID EdgeJobID) @@ -1493,6 +1494,8 @@ const ( DefaultUserSessionTimeout = "8h" // DefaultUserSessionTimeout represents the default timeout after which the user session is cleared DefaultKubeconfigExpiry = "0" + // WebSocketKeepAlive web socket keep alive for edge environments + WebSocketKeepAlive = 1 * time.Hour ) const (