From 14853f6da0582a54fdcace87a9213f6d776fa436 Mon Sep 17 00:00:00 2001 From: Prabhat Khera <91852476+prabhat-org@users.noreply.github.com> Date: Tue, 19 Sep 2023 08:57:27 +1200 Subject: [PATCH] fix(kubernetes): kube env permissions when down [EE-5427] (#10327) --- api/cmd/portainer/main.go | 15 ++- api/dataservices/interface.go | 6 + .../pendingactions/pendingactions.go | 74 ++++++++++++ api/datastore/services.go | 13 ++ api/datastore/services_tx.go | 2 + .../test_data/output_24_to_latest.json | 1 - .../endpointgroups/endpointgroup_update.go | 10 +- api/http/handler/endpointgroups/handler.go | 6 +- .../handler/endpoints/endpoint_inspect.go | 5 +- .../handler/endpoints/endpoint_list_test.go | 2 +- api/http/handler/endpoints/endpoint_update.go | 8 +- api/http/handler/endpoints/handler.go | 26 ++-- api/http/server.go | 4 + api/http/utils/pendingActions.go | 61 ---------- api/internal/snapshot/snapshot.go | 20 +++- api/internal/testhelpers/datastore.go | 5 + api/pendingactions/pendingactions.go | 113 ++++++++++++++++++ api/portainer.go | 14 +-- 18 files changed, 282 insertions(+), 103 deletions(-) create mode 100644 api/dataservices/pendingactions/pendingactions.go delete mode 100644 api/http/utils/pendingActions.go create mode 100644 api/pendingactions/pendingactions.go diff --git a/api/cmd/portainer/main.go b/api/cmd/portainer/main.go index b4f954298..76a7bc250 100644 --- a/api/cmd/portainer/main.go +++ b/api/cmd/portainer/main.go @@ -43,6 +43,7 @@ import ( kubecli "github.com/portainer/portainer/api/kubernetes/cli" "github.com/portainer/portainer/api/ldap" "github.com/portainer/portainer/api/oauth" + "github.com/portainer/portainer/api/pendingactions" "github.com/portainer/portainer/api/scheduler" "github.com/portainer/portainer/api/stacks/deployments" "github.com/portainer/portainer/pkg/featureflags" @@ -263,11 +264,12 @@ func initSnapshotService( dockerClientFactory *dockerclient.ClientFactory, kubernetesClientFactory *kubecli.ClientFactory, shutdownCtx context.Context, + pendingActionsService *pendingactions.PendingActionsService, ) (portainer.SnapshotService, error) { dockerSnapshotter := docker.NewSnapshotter(dockerClientFactory) kubernetesSnapshotter := kubernetes.NewSnapshotter(kubernetesClientFactory) - snapshotService, err := snapshot.NewService(snapshotIntervalFromFlag, dataStore, dockerSnapshotter, kubernetesSnapshotter, shutdownCtx) + snapshotService, err := snapshot.NewService(snapshotIntervalFromFlag, dataStore, dockerSnapshotter, kubernetesSnapshotter, shutdownCtx, pendingActionsService) if err != nil { return nil, err } @@ -454,15 +456,17 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server { dockerClientFactory := initDockerClientFactory(digitalSignatureService, reverseTunnelService) kubernetesClientFactory, err := initKubernetesClientFactory(digitalSignatureService, reverseTunnelService, dataStore, instanceID, *flags.AddrHTTPS, settings.UserSessionTimeout) - snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx) + authorizationService := authorization.NewService(dataStore) + authorizationService.K8sClientFactory = kubernetesClientFactory + + pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory, authorizationService, shutdownCtx) + + snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService) if err != nil { log.Fatal().Err(err).Msg("failed initializing snapshot service") } snapshotService.Start() - authorizationService := authorization.NewService(dataStore) - authorizationService.K8sClientFactory = kubernetesClientFactory - kubernetesTokenCacheManager := kubeproxy.NewTokenCacheManager() kubeClusterAccessService := kubernetes.NewKubeClusterAccessService(*flags.BaseURL, *flags.AddrHTTPS, sslSettings.CertPath) @@ -622,6 +626,7 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server { DemoService: demoService, UpgradeService: upgradeService, AdminCreationDone: adminCreationDone, + PendingActionsService: pendingActionsService, } } diff --git a/api/dataservices/interface.go b/api/dataservices/interface.go index ecb84e15a..524512568 100644 --- a/api/dataservices/interface.go +++ b/api/dataservices/interface.go @@ -35,6 +35,7 @@ type ( User() UserService Version() VersionService Webhook() WebhookService + PendingActions() PendingActionsService } DataStore interface { @@ -72,6 +73,11 @@ type ( GetNextIdentifier() int } + PendingActionsService interface { + BaseCRUD[portainer.PendingActions, portainer.PendingActionsID] + GetNextIdentifier() int + } + // EdgeStackService represents a service to manage Edge stacks EdgeStackService interface { EdgeStacks() ([]portainer.EdgeStack, error) diff --git a/api/dataservices/pendingactions/pendingactions.go b/api/dataservices/pendingactions/pendingactions.go new file mode 100644 index 000000000..5b55a8fd9 --- /dev/null +++ b/api/dataservices/pendingactions/pendingactions.go @@ -0,0 +1,74 @@ +package pendingactions + +import ( + "time" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" +) + +const ( + BucketName = "pending_actions" +) + +type Service struct { + dataservices.BaseDataService[portainer.PendingActions, portainer.PendingActionsID] +} + +type ServiceTx struct { + dataservices.BaseDataServiceTx[portainer.PendingActions, portainer.PendingActionsID] +} + +func NewService(connection portainer.Connection) (*Service, error) { + err := connection.SetServiceName(BucketName) + if err != nil { + return nil, err + } + + return &Service{ + BaseDataService: dataservices.BaseDataService[portainer.PendingActions, portainer.PendingActionsID]{ + Bucket: BucketName, + Connection: connection, + }, + }, nil +} + +func (s Service) Create(config *portainer.PendingActions) error { + return s.Connection.UpdateTx(func(tx portainer.Transaction) error { + return s.Tx(tx).Create(config) + }) +} + +func (s Service) Update(ID portainer.PendingActionsID, config *portainer.PendingActions) error { + return s.Connection.UpdateTx(func(tx portainer.Transaction) error { + return s.Tx(tx).Update(ID, config) + }) +} + +func (service *Service) Tx(tx portainer.Transaction) ServiceTx { + return ServiceTx{ + BaseDataServiceTx: dataservices.BaseDataServiceTx[portainer.PendingActions, portainer.PendingActionsID]{ + Bucket: BucketName, + Connection: service.Connection, + Tx: tx, + }, + } +} + +func (s ServiceTx) Create(config *portainer.PendingActions) error { + return s.Tx.CreateObject(BucketName, func(id uint64) (int, interface{}) { + config.ID = portainer.PendingActionsID(id) + config.CreatedAt = time.Now().Unix() + + return int(config.ID), config + }) +} + +func (s ServiceTx) Update(ID portainer.PendingActionsID, config *portainer.PendingActions) error { + return s.BaseDataServiceTx.Update(ID, config) +} + +// GetNextIdentifier returns the next identifier for a custom template. +func (service *Service) GetNextIdentifier() int { + return service.Connection.GetNextIdentifier(BucketName) +} diff --git a/api/datastore/services.go b/api/datastore/services.go index efb0351a7..b0e5c764e 100644 --- a/api/datastore/services.go +++ b/api/datastore/services.go @@ -20,6 +20,7 @@ import ( "github.com/portainer/portainer/api/dataservices/extension" "github.com/portainer/portainer/api/dataservices/fdoprofile" "github.com/portainer/portainer/api/dataservices/helmuserrepository" + "github.com/portainer/portainer/api/dataservices/pendingactions" "github.com/portainer/portainer/api/dataservices/registry" "github.com/portainer/portainer/api/dataservices/resourcecontrol" "github.com/portainer/portainer/api/dataservices/role" @@ -72,6 +73,7 @@ type Store struct { UserService *user.Service VersionService *version.Service WebhookService *webhook.Service + PendingActionsService *pendingactions.Service } func (store *Store) initServices() error { @@ -238,9 +240,20 @@ func (store *Store) initServices() error { } store.ScheduleService = scheduleService + pendingActionsService, err := pendingactions.NewService(store.connection) + if err != nil { + return err + } + store.PendingActionsService = pendingActionsService + return nil } +// PendingActions gives access to the PendingActions data management layer +func (store *Store) PendingActions() dataservices.PendingActionsService { + return store.PendingActionsService +} + // CustomTemplate gives access to the CustomTemplate data management layer func (store *Store) CustomTemplate() dataservices.CustomTemplateService { return store.CustomTemplateService diff --git a/api/datastore/services_tx.go b/api/datastore/services_tx.go index 44375baaa..a7a40d6e4 100644 --- a/api/datastore/services_tx.go +++ b/api/datastore/services_tx.go @@ -16,6 +16,8 @@ func (tx *StoreTx) IsErrObjectNotFound(err error) bool { func (tx *StoreTx) CustomTemplate() dataservices.CustomTemplateService { return nil } +func (tx *StoreTx) PendingActions() dataservices.PendingActionsService { return nil } + func (tx *StoreTx) EdgeGroup() dataservices.EdgeGroupService { return tx.store.EdgeGroupService.Tx(tx.tx) } diff --git a/api/datastore/test_data/output_24_to_latest.json b/api/datastore/test_data/output_24_to_latest.json index 813c96051..8ada67237 100644 --- a/api/datastore/test_data/output_24_to_latest.json +++ b/api/datastore/test_data/output_24_to_latest.json @@ -73,7 +73,6 @@ }, "LastCheckInDate": 0, "Name": "local", - "PendingActions": null, "PostInitMigrations": { "MigrateGPUs": true, "MigrateIngresses": true diff --git a/api/http/handler/endpointgroups/endpointgroup_update.go b/api/http/handler/endpointgroups/endpointgroup_update.go index ae007b46e..84964695d 100644 --- a/api/http/handler/endpointgroups/endpointgroup_update.go +++ b/api/http/handler/endpointgroups/endpointgroup_update.go @@ -7,7 +7,6 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" - "github.com/portainer/portainer/api/http/utils" "github.com/portainer/portainer/api/internal/tag" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" @@ -156,9 +155,12 @@ func (handler *Handler) updateEndpointGroup(tx dataservices.DataStoreTx, endpoin err = handler.AuthorizationService.CleanNAPWithOverridePolicies(tx, &endpoint, endpointGroup) if err != nil { // Update flag with endpoint and continue - endpoint.PendingActions = utils.GetUpdatedEndpointPendingActions(&endpoint, "CleanNAPWithOverridePolicies", endpointGroup.ID) - err = tx.Endpoint().UpdateEndpoint(endpoint.ID, &endpoint) - log.Warn().Err(err).Msgf("Unable to update user authorizations for endpoint (%d) and endpopint group (%d)", endpoint.ID, endpointGroup.ID) + handler.PendingActionsService.Create(portainer.PendingActions{ + EndpointID: endpoint.ID, + Action: "CleanNAPWithOverridePolicies", + ActionData: endpointGroup.ID, + }) + log.Warn().Err(err).Msgf("Unable to update user authorizations for endpoint (%d) and endpoint group (%d).", endpoint.ID, endpointGroup.ID) } } } diff --git a/api/http/handler/endpointgroups/handler.go b/api/http/handler/endpointgroups/handler.go index e3284c457..6fe912d02 100644 --- a/api/http/handler/endpointgroups/handler.go +++ b/api/http/handler/endpointgroups/handler.go @@ -6,6 +6,7 @@ import ( "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/http/security" "github.com/portainer/portainer/api/internal/authorization" + "github.com/portainer/portainer/api/pendingactions" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/gorilla/mux" @@ -14,8 +15,9 @@ import ( // Handler is the HTTP handler used to handle environment(endpoint) group operations. type Handler struct { *mux.Router - AuthorizationService *authorization.Service - DataStore dataservices.DataStore + AuthorizationService *authorization.Service + DataStore dataservices.DataStore + PendingActionsService *pendingactions.PendingActionsService } // NewHandler creates a handler to manage environment(endpoint) group operations. diff --git a/api/http/handler/endpoints/endpoint_inspect.go b/api/http/handler/endpoints/endpoint_inspect.go index 23894dc44..e25e79720 100644 --- a/api/http/handler/endpoints/endpoint_inspect.go +++ b/api/http/handler/endpoints/endpoint_inspect.go @@ -4,7 +4,6 @@ import ( "net/http" portainer "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/http/utils" "github.com/portainer/portainer/api/internal/endpointutils" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" @@ -79,8 +78,8 @@ func (handler *Handler) endpointInspect(w http.ResponseWriter, r *http.Request) } } - // Run the pending actions - utils.RunPendingActions(endpoint, handler.DataStore, handler.AuthorizationService) + // Execute endpoint pending actions + handler.PendingActionsService.Execute(endpoint.ID) return response.JSON(w, endpoint) } diff --git a/api/http/handler/endpoints/endpoint_list_test.go b/api/http/handler/endpoints/endpoint_list_test.go index 8ae1ba3bd..6806de38e 100644 --- a/api/http/handler/endpoints/endpoint_list_test.go +++ b/api/http/handler/endpoints/endpoint_list_test.go @@ -196,7 +196,7 @@ func setupEndpointListHandler(t *testing.T, endpoints []portainer.Endpoint) *Han handler := NewHandler(bouncer, nil) handler.DataStore = store handler.ComposeStackManager = testhelpers.NewComposeStackManager() - handler.SnapshotService, _ = snapshot.NewService("1s", store, nil, nil, nil) + handler.SnapshotService, _ = snapshot.NewService("1s", store, nil, nil, nil, nil) return handler } diff --git a/api/http/handler/endpoints/endpoint_update.go b/api/http/handler/endpoints/endpoint_update.go index 27e336437..6c2264338 100644 --- a/api/http/handler/endpoints/endpoint_update.go +++ b/api/http/handler/endpoints/endpoint_update.go @@ -12,6 +12,7 @@ import ( httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" + "github.com/rs/zerolog/log" ) type endpointUpdatePayload struct { @@ -264,7 +265,12 @@ func (handler *Handler) endpointUpdate(w http.ResponseWriter, r *http.Request) * if endpoint.Type == portainer.KubernetesLocalEnvironment || endpoint.Type == portainer.AgentOnKubernetesEnvironment || endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment { err = handler.AuthorizationService.CleanNAPWithOverridePolicies(handler.DataStore, endpoint, nil) if err != nil { - return httperror.InternalServerError("Unable to update user authorizations", err) + handler.PendingActionsService.Create(portainer.PendingActions{ + EndpointID: endpoint.ID, + Action: "CleanNAPWithOverridePolicies", + ActionData: nil, + }) + log.Warn().Err(err).Msgf("Unable to clean NAP with override policies for endpoint (%d). Will try to update when endpoint is online.", endpoint.ID) } } } diff --git a/api/http/handler/endpoints/handler.go b/api/http/handler/endpoints/handler.go index ac3f96dbf..59fb8f474 100644 --- a/api/http/handler/endpoints/handler.go +++ b/api/http/handler/endpoints/handler.go @@ -10,6 +10,7 @@ import ( "github.com/portainer/portainer/api/http/security" "github.com/portainer/portainer/api/internal/authorization" "github.com/portainer/portainer/api/kubernetes/cli" + "github.com/portainer/portainer/api/pendingactions" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/gorilla/mux" @@ -25,18 +26,19 @@ func hideFields(endpoint *portainer.Endpoint) { // Handler is the HTTP handler used to handle environment(endpoint) operations. type Handler struct { *mux.Router - requestBouncer security.BouncerService - demoService *demo.Service - DataStore dataservices.DataStore - FileService portainer.FileService - ProxyManager *proxy.Manager - ReverseTunnelService portainer.ReverseTunnelService - SnapshotService portainer.SnapshotService - K8sClientFactory *cli.ClientFactory - ComposeStackManager portainer.ComposeStackManager - AuthorizationService *authorization.Service - BindAddress string - BindAddressHTTPS string + requestBouncer security.BouncerService + demoService *demo.Service + DataStore dataservices.DataStore + FileService portainer.FileService + ProxyManager *proxy.Manager + ReverseTunnelService portainer.ReverseTunnelService + SnapshotService portainer.SnapshotService + K8sClientFactory *cli.ClientFactory + ComposeStackManager portainer.ComposeStackManager + AuthorizationService *authorization.Service + BindAddress string + BindAddressHTTPS string + PendingActionsService *pendingactions.PendingActionsService } // NewHandler creates a handler to manage environment(endpoint) operations. diff --git a/api/http/server.go b/api/http/server.go index e69dfef39..7800524f1 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -64,6 +64,7 @@ import ( "github.com/portainer/portainer/api/internal/upgrade" k8s "github.com/portainer/portainer/api/kubernetes" "github.com/portainer/portainer/api/kubernetes/cli" + "github.com/portainer/portainer/api/pendingactions" "github.com/portainer/portainer/api/scheduler" "github.com/portainer/portainer/api/stacks/deployments" "github.com/portainer/portainer/pkg/libhelm" @@ -110,6 +111,7 @@ type Server struct { DemoService *demo.Service UpgradeService upgrade.Service AdminCreationDone chan struct{} + PendingActionsService *pendingactions.PendingActionsService } // Start starts the HTTP server @@ -178,12 +180,14 @@ func (server *Server) Start() error { endpointHandler.AuthorizationService = server.AuthorizationService endpointHandler.BindAddress = server.BindAddress endpointHandler.BindAddressHTTPS = server.BindAddressHTTPS + endpointHandler.PendingActionsService = server.PendingActionsService var endpointEdgeHandler = endpointedge.NewHandler(requestBouncer, server.DataStore, server.FileService, server.ReverseTunnelService) var endpointGroupHandler = endpointgroups.NewHandler(requestBouncer) endpointGroupHandler.AuthorizationService = server.AuthorizationService endpointGroupHandler.DataStore = server.DataStore + endpointGroupHandler.PendingActionsService = server.PendingActionsService var endpointProxyHandler = endpointproxy.NewHandler(requestBouncer) endpointProxyHandler.DataStore = server.DataStore diff --git a/api/http/utils/pendingActions.go b/api/http/utils/pendingActions.go deleted file mode 100644 index 9d2285580..000000000 --- a/api/http/utils/pendingActions.go +++ /dev/null @@ -1,61 +0,0 @@ -package utils - -import ( - portainer "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/dataservices" - "github.com/portainer/portainer/api/internal/authorization" - "github.com/rs/zerolog/log" -) - -func EndpointPendingActions(endpoint *portainer.Endpoint) *portainer.EndpointPendingActions { - return endpoint.PendingActions -} - -func GetUpdatedEndpointPendingActions(endpoint *portainer.Endpoint, action string, value interface{}) *portainer.EndpointPendingActions { - if endpoint.PendingActions == nil { - endpoint.PendingActions = &portainer.EndpointPendingActions{} - } - - switch action { - case "CleanNAPWithOverridePolicies": - endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups = append(endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups, value.(portainer.EndpointGroupID)) - } - - return endpoint.PendingActions -} - -func RunPendingActions(endpoint *portainer.Endpoint, dataStore dataservices.DataStoreTx, authorizationService *authorization.Service) error { - - if endpoint.PendingActions == nil { - return nil - } - - log.Info().Msgf("Running pending actions for endpoint %d", endpoint.ID) - - if endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups != nil { - log.Info().Int("endpoint_id", int(endpoint.ID)).Msgf("Cleaning NAP with override policies for endpoint groups %v", endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups) - failedEndpointGroupIDs := make([]portainer.EndpointGroupID, 0) - for _, endpointGroupID := range endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups { - endpointGroup, err := dataStore.EndpointGroup().Read(portainer.EndpointGroupID(endpointGroupID)) - if err != nil { - log.Error().Err(err).Msgf("Error reading endpoint group to clean NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID) - failedEndpointGroupIDs = append(failedEndpointGroupIDs, endpointGroupID) - continue - } - err = authorizationService.CleanNAPWithOverridePolicies(dataStore, endpoint, endpointGroup) - if err != nil { - failedEndpointGroupIDs = append(failedEndpointGroupIDs, endpointGroupID) - log.Error().Err(err).Msgf("Error cleaning NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID) - } - } - - endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups = failedEndpointGroupIDs - err := dataStore.Endpoint().UpdateEndpoint(endpoint.ID, endpoint) - if err != nil { - log.Error().Err(err).Msgf("While running pending actions, error updating endpoint %d", endpoint.ID) - return err - } - } - - return nil -} diff --git a/api/internal/snapshot/snapshot.go b/api/internal/snapshot/snapshot.go index 3f509353d..6d57abbcc 100644 --- a/api/internal/snapshot/snapshot.go +++ b/api/internal/snapshot/snapshot.go @@ -10,9 +10,8 @@ import ( "github.com/portainer/portainer/api/agent" "github.com/portainer/portainer/api/crypto" "github.com/portainer/portainer/api/dataservices" - "github.com/portainer/portainer/api/http/utils" - "github.com/portainer/portainer/api/internal/authorization" "github.com/portainer/portainer/api/internal/endpointutils" + "github.com/portainer/portainer/api/pendingactions" "github.com/rs/zerolog/log" ) @@ -27,10 +26,18 @@ type Service struct { dockerSnapshotter portainer.DockerSnapshotter kubernetesSnapshotter portainer.KubernetesSnapshotter shutdownCtx context.Context + pendingActionsService *pendingactions.PendingActionsService } // NewService creates a new instance of a service -func NewService(snapshotIntervalFromFlag string, dataStore dataservices.DataStore, dockerSnapshotter portainer.DockerSnapshotter, kubernetesSnapshotter portainer.KubernetesSnapshotter, shutdownCtx context.Context) (*Service, error) { +func NewService( + snapshotIntervalFromFlag string, + dataStore dataservices.DataStore, + dockerSnapshotter portainer.DockerSnapshotter, + kubernetesSnapshotter portainer.KubernetesSnapshotter, + shutdownCtx context.Context, + pendingActionsService *pendingactions.PendingActionsService, +) (*Service, error) { interval, err := parseSnapshotFrequency(snapshotIntervalFromFlag, dataStore) if err != nil { return nil, err @@ -43,6 +50,7 @@ func NewService(snapshotIntervalFromFlag string, dataStore dataservices.DataStor dockerSnapshotter: dockerSnapshotter, kubernetesSnapshotter: kubernetesSnapshotter, shutdownCtx: shutdownCtx, + pendingActionsService: pendingActionsService, }, nil } @@ -263,7 +271,7 @@ func (service *Service) snapshotEndpoints() error { snapshotError := service.SnapshotEndpoint(&endpoint) service.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { - updateEndpointStatus(tx, &endpoint, snapshotError) + updateEndpointStatus(tx, &endpoint, snapshotError, service.pendingActionsService) return nil }) } @@ -271,7 +279,7 @@ func (service *Service) snapshotEndpoints() error { return nil } -func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint, snapshotError error) { +func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint, snapshotError error, pendingActionsService *pendingactions.PendingActionsService) { latestEndpointReference, err := tx.Endpoint().Endpoint(endpoint.ID) if latestEndpointReference == nil { log.Debug(). @@ -304,7 +312,7 @@ func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpo // Run the pending actions if latestEndpointReference.Status == portainer.EndpointStatusUp { - utils.RunPendingActions(latestEndpointReference, tx, authorization.NewService(tx)) + pendingActionsService.Execute(endpoint.ID) } } diff --git a/api/internal/testhelpers/datastore.go b/api/internal/testhelpers/datastore.go index 617499a00..89a3a943f 100644 --- a/api/internal/testhelpers/datastore.go +++ b/api/internal/testhelpers/datastore.go @@ -34,6 +34,7 @@ type testDatastore struct { user dataservices.UserService version dataservices.VersionService webhook dataservices.WebhookService + pendingActionsService dataservices.PendingActionsService } func (d *testDatastore) BackupTo(io.Writer) error { return nil } @@ -82,6 +83,10 @@ func (d *testDatastore) User() dataservices.UserService { re func (d *testDatastore) Version() dataservices.VersionService { return d.version } func (d *testDatastore) Webhook() dataservices.WebhookService { return d.webhook } +func (d *testDatastore) PendingActions() dataservices.PendingActionsService { + return d.pendingActionsService +} + func (d *testDatastore) IsErrObjectNotFound(e error) bool { return false } diff --git a/api/pendingactions/pendingactions.go b/api/pendingactions/pendingactions.go new file mode 100644 index 000000000..5a12ba41e --- /dev/null +++ b/api/pendingactions/pendingactions.go @@ -0,0 +1,113 @@ +package pendingactions + +import ( + "context" + "fmt" + "sync" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" + "github.com/portainer/portainer/api/internal/authorization" + kubecli "github.com/portainer/portainer/api/kubernetes/cli" + "github.com/rs/zerolog/log" +) + +type ( + PendingActionsService struct { + authorizationService *authorization.Service + clientFactory *kubecli.ClientFactory + dataStore dataservices.DataStore + shutdownCtx context.Context + + mu sync.Mutex + } +) + +func NewService( + dataStore dataservices.DataStore, + clientFactory *kubecli.ClientFactory, + authorizationService *authorization.Service, + shutdownCtx context.Context, +) *PendingActionsService { + return &PendingActionsService{ + dataStore: dataStore, + shutdownCtx: shutdownCtx, + authorizationService: authorizationService, + clientFactory: clientFactory, + mu: sync.Mutex{}, + } +} + +func (service *PendingActionsService) Create(r portainer.PendingActions) error { + return service.dataStore.PendingActions().Create(&r) +} + +func (service *PendingActionsService) Execute(id portainer.EndpointID) error { + + service.mu.Lock() + defer service.mu.Unlock() + + endpoint, err := service.dataStore.Endpoint().Endpoint(id) + if err != nil { + return fmt.Errorf("failed to retrieve endpoint %d: %w", id, err) + } + + if endpoint.Status != portainer.EndpointStatusUp { + log.Debug().Msgf("Endpoint %d is not up", id) + return fmt.Errorf("endpoint %d is not up: %w", id, err) + } + + pendingActions, err := service.dataStore.PendingActions().ReadAll() + if err != nil { + log.Error().Err(err).Msgf("failed to retrieve pending actions") + return fmt.Errorf("failed to retrieve pending actions for endpoint %d: %w", id, err) + } + + for _, endpointPendingAction := range pendingActions { + if endpointPendingAction.EndpointID == id { + err := service.executePendingAction(endpointPendingAction, endpoint) + if err != nil { + log.Error().Err(err).Msgf("failed to execute pending action") + return fmt.Errorf("failed to execute pending action: %w", err) + } else { + // delete the pending action + err := service.dataStore.PendingActions().Delete(endpointPendingAction.ID) + if err != nil { + log.Error().Err(err).Msgf("failed to delete pending action") + return fmt.Errorf("failed to delete pending action: %w", err) + } + } + } + } + + return nil +} + +func (service *PendingActionsService) executePendingAction(pendingAction portainer.PendingActions, endpoint *portainer.Endpoint) error { + log.Debug().Msgf("Executing pending action %s for endpoint %d", pendingAction.Action, pendingAction.EndpointID) + + defer func() { + log.Debug().Msgf("End executing pending action %s for endpoint %d", pendingAction.Action, pendingAction.EndpointID) + }() + + switch pendingAction.Action { + case "CleanNAPWithOverridePolicies": + if (pendingAction.ActionData == nil) || (pendingAction.ActionData.(portainer.EndpointGroupID) == 0) { + service.authorizationService.CleanNAPWithOverridePolicies(service.dataStore, endpoint, nil) + } else { + endpointGroupID := pendingAction.ActionData.(portainer.EndpointGroupID) + endpointGroup, err := service.dataStore.EndpointGroup().Read(portainer.EndpointGroupID(endpointGroupID)) + if err != nil { + log.Error().Err(err).Msgf("Error reading endpoint group to clean NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID) + return fmt.Errorf("failed to retrieve endpoint group %d: %w", endpointGroupID, err) + } + err = service.authorizationService.CleanNAPWithOverridePolicies(service.dataStore, endpoint, endpointGroup) + if err != nil { + log.Error().Err(err).Msgf("Error cleaning NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID) + return fmt.Errorf("failed to clean NAP with override policies for endpoint %d and endpoint group %d: %w", endpoint.ID, endpointGroup.ID, err) + } + } + return nil + } + return nil +} diff --git a/api/portainer.go b/api/portainer.go index 4e2880aa0..703bcfe82 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -373,10 +373,13 @@ type ( //EdgeStackStatusType represents an edge stack status type EdgeStackStatusType int - EndpointPendingActions struct { - CleanNAPWithOverridePolicies struct { - EndpointGroups []EndpointGroupID `json:"EndpointGroups"` - } `json:"CleanNAPWithOverridePolicies"` + PendingActionsID int + PendingActions struct { + ID PendingActionsID `json:"ID"` + EndpointID EndpointID `json:"EndpointID"` + Action string `json:"Action"` + ActionData interface{} `json:"ActionData"` + CreatedAt int64 `json:"CreatedAt"` } // Environment(Endpoint) represents a Docker environment(endpoint) with all the info required @@ -434,9 +437,6 @@ type ( // Whether we need to run any "post init migrations". PostInitMigrations EndpointPostInitMigrations `json:"PostInitMigrations"` - // Whether we need to run any action when an endpoint is back online. - PendingActions *EndpointPendingActions `json:"PendingActions"` - Edge EnvironmentEdgeSettings Agent struct {