From d490061c1f9fc2030cfe41953d72f78f9169fe9e Mon Sep 17 00:00:00 2001 From: Matt Hook Date: Thu, 30 May 2024 14:55:09 +1200 Subject: [PATCH] fix(pendingactions): fix deadlock and improve debug logging [EE-7049] (#11868) --- api/internal/snapshot/snapshot.go | 5 +--- api/pendingactions/pendingactions.go | 41 ++++++++++++++++++---------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/api/internal/snapshot/snapshot.go b/api/internal/snapshot/snapshot.go index 69e068397..e38d5366a 100644 --- a/api/internal/snapshot/snapshot.go +++ b/api/internal/snapshot/snapshot.go @@ -305,10 +305,7 @@ func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpo // Run the pending actions if latestEndpointReference.Status == portainer.EndpointStatusUp { - err = pendingActionsService.Execute(endpoint.ID) - if err != nil { - log.Error().Err(err).Msg("background schedule error (environment snapshot), unable to execute pending actions") - } + pendingActionsService.Execute(endpoint.ID) } } diff --git a/api/pendingactions/pendingactions.go b/api/pendingactions/pendingactions.go index aac840632..22ac3ebc4 100644 --- a/api/pendingactions/pendingactions.go +++ b/api/pendingactions/pendingactions.go @@ -55,14 +55,19 @@ func (service *PendingActionsService) Create(r portainer.PendingActions) error { return service.dataStore.PendingActions().Create(&r) } -func (service *PendingActionsService) Execute(id portainer.EndpointID) error { +func (service *PendingActionsService) Execute(id portainer.EndpointID) { + // Run in a goroutine to avoid blocking the main thread due to db tx = + go service.execute(id) +} +func (service *PendingActionsService) execute(environmentID portainer.EndpointID) { service.mu.Lock() defer service.mu.Unlock() - endpoint, err := service.dataStore.Endpoint().Endpoint(id) + endpoint, err := service.dataStore.Endpoint().Endpoint(environmentID) if err != nil { - return fmt.Errorf("failed to retrieve environment %d: %w", id, err) + log.Debug().Msgf("failed to retrieve environment %d: %v", environmentID, err) + return } isKubernetesEndpoint := endpointutils.IsKubernetesEndpoint(endpoint) && !endpointutils.IsEdgeEndpoint(endpoint) @@ -70,16 +75,16 @@ func (service *PendingActionsService) Execute(id portainer.EndpointID) error { // EndpointStatusUp is only relevant for non-Kubernetes endpoints // Sometimes the endpoint is UP but the status is not updated in the database if !isKubernetesEndpoint && endpoint.Status != portainer.EndpointStatusUp { - log.Debug().Msgf("Environment %q (id: %d) is not up", endpoint.Name, id) - return fmt.Errorf("environment %q (id: %d) is not up", endpoint.Name, id) + log.Debug().Msgf("failed to create Kubernetes client for environment %d: %v", environmentID, err) + return } // For Kubernetes endpoints, we need to check if the endpoint is up by creating a kube client if isKubernetesEndpoint { if client, _ := service.kubeFactory.GetKubeClient(endpoint); client != nil { if _, err = client.ServerVersion(); err != nil { - log.Debug().Err(err).Msgf("Environment %q (id: %d) is not up", endpoint.Name, id) - return fmt.Errorf("environment %q (id: %d) is not up", endpoint.Name, id) + log.Debug().Err(err).Msgf("Environment %q (id: %d) is not up", endpoint.Name, environmentID) + return } } } @@ -87,26 +92,34 @@ func (service *PendingActionsService) Execute(id portainer.EndpointID) error { pendingActions, err := service.dataStore.PendingActions().ReadAll() if err != nil { log.Error().Err(err).Msgf("failed to retrieve pending actions") - return fmt.Errorf("failed to retrieve pending actions for environment %d: %w", id, err) + return } - for _, endpointPendingAction := range pendingActions { - if endpointPendingAction.EndpointID == id { + if len(pendingActions) > 0 { + log.Debug().Msgf("Found %d pending actions", len(pendingActions)) + log.Debug().Msgf("PendingActions %+v", pendingActions) + } + + for i, endpointPendingAction := range pendingActions { + if endpointPendingAction.EndpointID == environmentID { + if i == 0 { + // We have at least 1 pending action for this environment + log.Debug().Msgf("Executing pending actions for environment %d", environmentID) + } + err := service.executePendingAction(endpointPendingAction, endpoint) if err != nil { log.Warn().Err(err).Msgf("failed to execute pending action") - return fmt.Errorf("failed to execute pending action: %w", err) + continue } err = service.dataStore.PendingActions().Delete(endpointPendingAction.ID) if err != nil { log.Error().Err(err).Msgf("failed to delete pending action") - return fmt.Errorf("failed to delete pending action: %w", err) + continue } } } - - return nil } func (service *PendingActionsService) executePendingAction(pendingAction portainer.PendingActions, endpoint *portainer.Endpoint) error {