fix(migration): run post init migrations for edge after server starts [EE-6905] (#11548)

Co-authored-by: testa113 <testa113>
pull/11673/head
Ali 2024-04-23 16:15:23 +12:00 committed by GitHub
parent 00d8391a02
commit efc88c0073
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 358 additions and 231 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/datastore"
"github.com/portainer/portainer/api/datastore/migrator"
"github.com/portainer/portainer/api/datastore/postinit"
"github.com/portainer/portainer/api/demo"
"github.com/portainer/portainer/api/docker"
dockerclient "github.com/portainer/portainer/api/docker/client"
@ -457,14 +458,6 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
authorizationService := authorization.NewService(dataStore)
authorizationService.K8sClientFactory = kubernetesClientFactory
pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory, authorizationService, shutdownCtx)
snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService)
if err != nil {
log.Fatal().Err(err).Msg("failed initializing snapshot service")
}
snapshotService.Start()
kubernetesTokenCacheManager := kubeproxy.NewTokenCacheManager()
kubeClusterAccessService := kubernetes.NewKubeClusterAccessService(*flags.BaseURL, *flags.AddrHTTPS, sslSettings.CertPath)
@ -489,6 +482,14 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
kubernetesDeployer := initKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, digitalSignatureService, proxyManager, *flags.Assets)
pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory, dockerClientFactory, authorizationService, shutdownCtx, *flags.Assets, kubernetesDeployer)
snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService)
if err != nil {
log.Fatal().Err(err).Msg("failed initializing snapshot service")
}
snapshotService.Start()
helmPackageManager, err := initHelmPackageManager(*flags.Assets)
if err != nil {
log.Fatal().Err(err).Msg("failed initializing helm package manager")
@ -578,10 +579,12 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
// but some more complex migrations require access to a kubernetes or docker
// client. Therefore we run a separate migration process just before
// starting the server.
postInitMigrator := datastore.NewPostInitMigrator(
postInitMigrator := postinit.NewPostInitMigrator(
kubernetesClientFactory,
dockerClientFactory,
dataStore,
*flags.Assets,
kubernetesDeployer,
)
if err := postInitMigrator.PostInitMigrate(); err != nil {
log.Fatal().Err(err).Msg("failure during post init migrations")
@ -650,6 +653,7 @@ func main() {
Msg("starting Portainer")
err := server.Start()
log.Info().Err(err).Msg("HTTP server exited")
}
}

View File

@ -1,117 +0,0 @@
package datastore
import (
"context"
"github.com/docker/docker/api/types/container"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
dockerclient "github.com/portainer/portainer/api/docker/client"
"github.com/portainer/portainer/api/kubernetes/cli"
"github.com/rs/zerolog/log"
)
type PostInitMigrator struct {
kubeFactory *cli.ClientFactory
dockerFactory *dockerclient.ClientFactory
dataStore dataservices.DataStore
}
func NewPostInitMigrator(kubeFactory *cli.ClientFactory, dockerFactory *dockerclient.ClientFactory, dataStore dataservices.DataStore) *PostInitMigrator {
return &PostInitMigrator{
kubeFactory: kubeFactory,
dockerFactory: dockerFactory,
dataStore: dataStore,
}
}
func (migrator *PostInitMigrator) PostInitMigrate() error {
if err := migrator.PostInitMigrateIngresses(); err != nil {
return err
}
migrator.PostInitMigrateGPUs()
return nil
}
func (migrator *PostInitMigrator) PostInitMigrateIngresses() error {
endpoints, err := migrator.dataStore.Endpoint().Endpoints()
if err != nil {
return err
}
for i := range endpoints {
// Early exit if we do not need to migrate!
if !endpoints[i].PostInitMigrations.MigrateIngresses {
return nil
}
err := migrator.kubeFactory.MigrateEndpointIngresses(&endpoints[i])
if err != nil {
log.Debug().Err(err).Msg("failure migrating endpoint ingresses")
}
}
return nil
}
// PostInitMigrateGPUs will check all docker endpoints for containers with GPUs and set EnableGPUManagement to true if any are found
// If there's an error getting the containers, we'll log it and move on
func (migrator *PostInitMigrator) PostInitMigrateGPUs() {
environments, err := migrator.dataStore.Endpoint().Endpoints()
if err != nil {
log.Err(err).Msg("failure getting endpoints")
return
}
for i := range environments {
if environments[i].Type == portainer.DockerEnvironment {
// // Early exit if we do not need to migrate!
if !environments[i].PostInitMigrations.MigrateGPUs {
return
}
// set the MigrateGPUs flag to false so we don't run this again
environments[i].PostInitMigrations.MigrateGPUs = false
migrator.dataStore.Endpoint().UpdateEndpoint(environments[i].ID, &environments[i])
// create a docker client
dockerClient, err := migrator.dockerFactory.CreateClient(&environments[i], "", nil)
if err != nil {
log.Err(err).Msg("failure creating docker client for environment: " + environments[i].Name)
return
}
defer dockerClient.Close()
// get all containers
containers, err := dockerClient.ContainerList(context.Background(), container.ListOptions{All: true})
if err != nil {
log.Err(err).Msg("failed to list containers")
return
}
// check for a gpu on each container. If even one GPU is found, set EnableGPUManagement to true for the whole endpoint
containersLoop:
for _, container := range containers {
// https://www.sobyte.net/post/2022-10/go-docker/ has nice documentation on the docker client with GPUs
containerDetails, err := dockerClient.ContainerInspect(context.Background(), container.ID)
if err != nil {
log.Err(err).Msg("failed to inspect container")
return
}
deviceRequests := containerDetails.HostConfig.Resources.DeviceRequests
for _, deviceRequest := range deviceRequests {
if deviceRequest.Driver == "nvidia" {
environments[i].EnableGPUManagement = true
migrator.dataStore.Endpoint().UpdateEndpoint(environments[i].ID, &environments[i])
break containersLoop
}
}
}
}
}
}

View File

@ -0,0 +1,203 @@
package postinit
import (
"context"
"fmt"
"reflect"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
dockerClient "github.com/portainer/portainer/api/docker/client"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/pendingactions/actions"
"github.com/rs/zerolog/log"
)
type PostInitMigrator struct {
kubeFactory *cli.ClientFactory
dockerFactory *dockerClient.ClientFactory
dataStore dataservices.DataStore
assetsPath string
kubernetesDeployer portainer.KubernetesDeployer
}
func NewPostInitMigrator(
kubeFactory *cli.ClientFactory,
dockerFactory *dockerClient.ClientFactory,
dataStore dataservices.DataStore,
assetsPath string,
kubernetesDeployer portainer.KubernetesDeployer,
) *PostInitMigrator {
return &PostInitMigrator{
kubeFactory: kubeFactory,
dockerFactory: dockerFactory,
dataStore: dataStore,
assetsPath: assetsPath,
kubernetesDeployer: kubernetesDeployer,
}
}
// PostInitMigrate will run all post-init migrations, which require docker/kube clients for all edge or non-edge environments
func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
environments, err := postInitMigrator.dataStore.Endpoint().Endpoints()
if err != nil {
log.Error().Err(err).Msg("Error getting environments")
return err
}
for _, environment := range environments {
// edge environments will run after the server starts, in pending actions
if endpointutils.IsEdgeEndpoint(&environment) {
log.Info().Msgf("Adding pending action 'PostInitMigrateEnvironment' for environment %d", environment.ID)
err = postInitMigrator.createPostInitMigrationPendingAction(environment.ID)
if err != nil {
log.Error().Err(err).Msgf("Error creating pending action for environment %d", environment.ID)
}
} else {
// non-edge environments will run before the server starts.
err = postInitMigrator.MigrateEnvironment(&environment)
if err != nil {
log.Error().Err(err).Msgf("Error running post-init migrations for non-edge environment %d", environment.ID)
}
}
}
return nil
}
// try to create a post init migration pending action. If it already exists, do nothing
// this function exists for readability, not reusability
// TODO: This should be moved into pending actions as part of the pending action migration
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
migrateEnvPendingAction := portainer.PendingActions{
EndpointID: environmentID,
Action: actions.PostInitMigrateEnvironment,
}
// Get all pending actions and filter them by endpoint, action and action args that are equal to the migrateEnvPendingAction
pendingActions, err := postInitMigrator.dataStore.PendingActions().ReadAll()
if err != nil {
log.Error().Err(err).Msgf("Error retrieving pending actions")
return fmt.Errorf("failed to retrieve pending actions for environment %d: %w", environmentID, err)
}
for _, pendingAction := range pendingActions {
if pendingAction.EndpointID == environmentID &&
pendingAction.Action == migrateEnvPendingAction.Action &&
reflect.DeepEqual(pendingAction.ActionData, migrateEnvPendingAction.ActionData) {
log.Debug().Msgf("Migration pending action for environment %d already exists, skipping creating another", environmentID)
return nil
}
}
// If there are no pending actions for the given endpoint, create one
err = postInitMigrator.dataStore.PendingActions().Create(&migrateEnvPendingAction)
if err != nil {
log.Error().Err(err).Msgf("Error creating pending action for environment %d", environmentID)
}
return nil
}
// MigrateEnvironment runs migrations on a single environment
func (migrator *PostInitMigrator) MigrateEnvironment(environment *portainer.Endpoint) error {
log.Info().Msgf("Executing post init migration for environment %d", environment.ID)
switch {
case endpointutils.IsKubernetesEndpoint(environment):
// get the kubeclient for the environment, and skip all kube migrations if there's an error
kubeclient, err := migrator.kubeFactory.GetKubeClient(environment)
if err != nil {
log.Error().Err(err).Msgf("Error creating kubeclient for environment: %d", environment.ID)
return err
}
// if one environment fails, it is logged and the next migration runs. The error is returned at the end and handled by pending actions
err = migrator.MigrateIngresses(*environment, kubeclient)
if err != nil {
return err
}
return nil
case endpointutils.IsDockerEndpoint(environment):
// get the docker client for the environment, and skip all docker migrations if there's an error
dockerClient, err := migrator.dockerFactory.CreateClient(environment, "", nil)
if err != nil {
log.Error().Err(err).Msgf("Error creating docker client for environment: %d", environment.ID)
return err
}
defer dockerClient.Close()
migrator.MigrateGPUs(*environment, dockerClient)
}
return nil
}
func (migrator *PostInitMigrator) MigrateIngresses(environment portainer.Endpoint, kubeclient *cli.KubeClient) error {
// Early exit if we do not need to migrate!
if !environment.PostInitMigrations.MigrateIngresses {
return nil
}
log.Debug().Msgf("Migrating ingresses for environment %d", environment.ID)
err := migrator.kubeFactory.MigrateEndpointIngresses(&environment, migrator.dataStore, kubeclient)
if err != nil {
log.Error().Err(err).Msgf("Error migrating ingresses for environment %d", environment.ID)
return err
}
return nil
}
// MigrateGPUs will check all docker endpoints for containers with GPUs and set EnableGPUManagement to true if any are found
// If there's an error getting the containers, we'll log it and move on
func (migrator *PostInitMigrator) MigrateGPUs(e portainer.Endpoint, dockerClient *client.Client) error {
return migrator.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
environment, err := tx.Endpoint().Endpoint(e.ID)
if err != nil {
log.Error().Err(err).Msgf("Error getting environment %d", environment.ID)
return err
}
// Early exit if we do not need to migrate!
if !environment.PostInitMigrations.MigrateGPUs {
return nil
}
log.Debug().Msgf("Migrating GPUs for environment %d", e.ID)
// get all containers
containers, err := dockerClient.ContainerList(context.Background(), container.ListOptions{All: true})
if err != nil {
log.Error().Err(err).Msgf("failed to list containers for environment %d", environment.ID)
return err
}
// check for a gpu on each container. If even one GPU is found, set EnableGPUManagement to true for the whole environment
containersLoop:
for _, container := range containers {
// https://www.sobyte.net/post/2022-10/go-docker/ has nice documentation on the docker client with GPUs
containerDetails, err := dockerClient.ContainerInspect(context.Background(), container.ID)
if err != nil {
log.Error().Err(err).Msg("failed to inspect container")
continue
}
deviceRequests := containerDetails.HostConfig.Resources.DeviceRequests
for _, deviceRequest := range deviceRequests {
if deviceRequest.Driver == "nvidia" {
environment.EnableGPUManagement = true
break containersLoop
}
}
}
// set the MigrateGPUs flag to false so we don't run this again
environment.PostInitMigrations.MigrateGPUs = false
err = tx.Endpoint().UpdateEndpoint(environment.ID, environment)
if err != nil {
log.Error().Err(err).Msgf("Error updating EnableGPUManagement flag for environment %d", environment.ID)
return err
}
return nil
})
}

View File

@ -8,6 +8,7 @@ import (
httperrors "github.com/portainer/portainer/api/http/errors"
"github.com/portainer/portainer/api/http/security"
"github.com/portainer/portainer/api/pendingactions"
"github.com/portainer/portainer/api/pendingactions/actions"
httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response"
@ -91,7 +92,7 @@ func (handler *Handler) deleteKubernetesSecrets(registry *portainer.Registry) er
if len(failedNamespaces) > 0 {
handler.PendingActionsService.Create(portainer.PendingActions{
EndpointID: endpointId,
Action: pendingactions.DeletePortainerK8sRegistrySecrets,
Action: actions.DeletePortainerK8sRegistrySecrets,
// When extracting the data, this is the type we need to pull out
// i.e. pendingactions.DeletePortainerK8sRegistrySecretsData

View File

@ -61,7 +61,6 @@ import (
"github.com/portainer/portainer/api/http/security"
"github.com/portainer/portainer/api/internal/authorization"
edgestackservice "github.com/portainer/portainer/api/internal/edge/edgestacks"
"github.com/portainer/portainer/api/internal/snapshot"
"github.com/portainer/portainer/api/internal/ssl"
"github.com/portainer/portainer/api/internal/upgrade"
k8s "github.com/portainer/portainer/api/kubernetes"
@ -382,7 +381,8 @@ func (server *Server) Start() error {
go shutdown(server.ShutdownCtx, httpsServer)
go snapshot.NewBackgroundSnapshotter(server.DataStore, server.ReverseTunnelService)
// Temporarily disable for EE-6905 until we have a solution for the snapshotter
// go snapshot.NewBackgroundSnapshotter(server.DataStore, server.ReverseTunnelService)
return httpsServer.ListenAndServeTLS("", "")
}

View File

@ -10,6 +10,7 @@ import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/rs/zerolog/log"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
@ -286,106 +287,111 @@ func buildLocalConfig() (*rest.Config, error) {
return config, nil
}
func (factory *ClientFactory) MigrateEndpointIngresses(e *portainer.Endpoint) error {
// classes is a list of controllers which have been manually added to the
// cluster setup view. These need to all be allowed globally, but then
// blocked in specific namespaces which they were not previously allowed in.
classes := e.Kubernetes.Configuration.IngressClasses
// We need a kube client to gather namespace level permissions. In pre-2.16
// versions of portainer, the namespace level permissions were stored by
// creating an actual ingress rule in the cluster with a particular
// annotation indicating that it's name (the class name) should be allowed.
cli, err := factory.GetKubeClient(e)
if err != nil {
return err
}
detected, err := cli.GetIngressControllers()
if err != nil {
return err
}
// newControllers is a set of all currently detected controllers.
newControllers := make(map[string]struct{})
for _, controller := range detected {
newControllers[controller.ClassName] = struct{}{}
}
namespaces, err := cli.GetNamespaces()
if err != nil {
return err
}
// Set of namespaces, if any, in which "allow none" should be true.
allow := make(map[string]map[string]struct{})
for _, c := range classes {
allow[c.Name] = make(map[string]struct{})
}
allow["none"] = make(map[string]struct{})
for namespace := range namespaces {
// Compare old annotations with currently detected controllers.
ingresses, err := cli.GetIngresses(namespace)
func (factory *ClientFactory) MigrateEndpointIngresses(e *portainer.Endpoint, datastore dataservices.DataStore, cli *KubeClient) error {
return datastore.UpdateTx(func(tx dataservices.DataStoreTx) error {
environment, err := tx.Endpoint().Endpoint(e.ID)
if err != nil {
return fmt.Errorf("failure getting ingresses during migration")
log.Error().Err(err).Msgf("Error retrieving environment %d", e.ID)
return err
}
for _, ingress := range ingresses {
oldController, ok := ingress.Annotations["ingress.portainer.io/ingress-type"]
if !ok {
// Skip rules without our old annotation.
continue
}
if _, ok := newControllers[oldController]; ok {
// Skip rules which match a detected controller.
// TODO: Allow this particular controller.
allow[oldController][ingress.Namespace] = struct{}{}
continue
}
// classes is a list of controllers which have been manually added to the
// cluster setup view. These need to all be allowed globally, but then
// blocked in specific namespaces which they were not previously allowed in.
classes := environment.Kubernetes.Configuration.IngressClasses
allow["none"][ingress.Namespace] = struct{}{}
// In pre-2.16 versions of portainer, the namespace level permissions were stored by
// creating an actual ingress rule in the cluster with a particular
// annotation indicating that it's name (the class name) should be allowed.
detected, err := cli.GetIngressControllers()
if err != nil {
log.Error().Err(err).Msgf("Error getting ingress controllers in environment %d", environment.ID)
return err
}
}
// Locally, disable "allow none" for namespaces not inside shouldAllowNone.
var newClasses []portainer.KubernetesIngressClassConfig
for _, c := range classes {
var blocked []string
// newControllers is a set of all currently detected controllers.
newControllers := make(map[string]struct{})
for _, controller := range detected {
newControllers[controller.ClassName] = struct{}{}
}
namespaces, err := cli.GetNamespaces()
if err != nil {
log.Error().Err(err).Msgf("Error getting namespaces in environment %d", environment.ID)
return err
}
// Set of namespaces, if any, in which "allow none" should be true.
allow := make(map[string]map[string]struct{})
for _, c := range classes {
allow[c.Name] = make(map[string]struct{})
}
allow["none"] = make(map[string]struct{})
for namespace := range namespaces {
if _, ok := allow[c.Name][namespace]; ok {
continue
// Compare old annotations with currently detected controllers.
ingresses, err := cli.GetIngresses(namespace)
if err != nil {
log.Error().Err(err).Msgf("Error getting ingresses in environment %d", environment.ID)
return err
}
for _, ingress := range ingresses {
oldController, ok := ingress.Annotations["ingress.portainer.io/ingress-type"]
if !ok {
// Skip rules without our old annotation.
continue
}
if _, ok := newControllers[oldController]; ok {
// Skip rules which match a detected controller.
// TODO: Allow this particular controller.
allow[oldController][ingress.Namespace] = struct{}{}
continue
}
allow["none"][ingress.Namespace] = struct{}{}
}
blocked = append(blocked, namespace)
}
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
Name: c.Name,
Type: c.Type,
GloballyBlocked: false,
BlockedNamespaces: blocked,
})
}
// Handle "none".
if len(allow["none"]) != 0 {
e.Kubernetes.Configuration.AllowNoneIngressClass = true
var disallowNone []string
for namespace := range namespaces {
if _, ok := allow["none"][namespace]; ok {
continue
// Locally, disable "allow none" for namespaces not inside shouldAllowNone.
var newClasses []portainer.KubernetesIngressClassConfig
for _, c := range classes {
var blocked []string
for namespace := range namespaces {
if _, ok := allow[c.Name][namespace]; ok {
continue
}
blocked = append(blocked, namespace)
}
disallowNone = append(disallowNone, namespace)
}
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
Name: "none",
Type: "custom",
GloballyBlocked: false,
BlockedNamespaces: disallowNone,
})
}
e.Kubernetes.Configuration.IngressClasses = newClasses
e.PostInitMigrations.MigrateIngresses = false
return factory.dataStore.Endpoint().UpdateEndpoint(e.ID, e)
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
Name: c.Name,
Type: c.Type,
GloballyBlocked: false,
BlockedNamespaces: blocked,
})
}
// Handle "none".
if len(allow["none"]) != 0 {
environment.Kubernetes.Configuration.AllowNoneIngressClass = true
var disallowNone []string
for namespace := range namespaces {
if _, ok := allow["none"][namespace]; ok {
continue
}
disallowNone = append(disallowNone, namespace)
}
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
Name: "none",
Type: "custom",
GloballyBlocked: false,
BlockedNamespaces: disallowNone,
})
}
environment.Kubernetes.Configuration.IngressClasses = newClasses
environment.PostInitMigrations.MigrateIngresses = false
return tx.Endpoint().UpdateEndpoint(environment.ID, environment)
})
}

View File

@ -0,0 +1,7 @@
package actions
const (
CleanNAPWithOverridePolicies = "CleanNAPWithOverridePolicies"
DeletePortainerK8sRegistrySecrets = "DeletePortainerK8sRegistrySecrets"
PostInitMigrateEnvironment = "PostInitMigrateEnvironment"
)

View File

@ -17,7 +17,7 @@ func (service *PendingActionsService) DeleteKubernetesRegistrySecrets(endpoint *
return nil
}
kubeClient, err := service.clientFactory.GetKubeClient(endpoint)
kubeClient, err := service.kubeFactory.GetKubeClient(endpoint)
if err != nil {
return err
}

View File

@ -7,23 +7,24 @@ import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/datastore/postinit"
dockerClient "github.com/portainer/portainer/api/docker/client"
"github.com/portainer/portainer/api/internal/authorization"
"github.com/portainer/portainer/api/internal/endpointutils"
kubecli "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/pendingactions/actions"
"github.com/rs/zerolog/log"
)
const (
CleanNAPWithOverridePolicies = "CleanNAPWithOverridePolicies"
DeletePortainerK8sRegistrySecrets = "DeletePortainerK8sRegistrySecrets"
)
type (
PendingActionsService struct {
authorizationService *authorization.Service
clientFactory *kubecli.ClientFactory
kubeFactory *kubecli.ClientFactory
dockerFactory *dockerClient.ClientFactory
dataStore dataservices.DataStore
shutdownCtx context.Context
assetsPath string
kubernetesDeployer portainer.KubernetesDeployer
mu sync.Mutex
}
@ -31,15 +32,21 @@ type (
func NewService(
dataStore dataservices.DataStore,
clientFactory *kubecli.ClientFactory,
kubeFactory *kubecli.ClientFactory,
dockerFactory *dockerClient.ClientFactory,
authorizationService *authorization.Service,
shutdownCtx context.Context,
assetsPath string,
kubernetesDeployer portainer.KubernetesDeployer,
) *PendingActionsService {
return &PendingActionsService{
dataStore: dataStore,
shutdownCtx: shutdownCtx,
authorizationService: authorizationService,
clientFactory: clientFactory,
kubeFactory: kubeFactory,
dockerFactory: dockerFactory,
assetsPath: assetsPath,
kubernetesDeployer: kubernetesDeployer,
mu: sync.Mutex{},
}
}
@ -69,7 +76,7 @@ func (service *PendingActionsService) Execute(id portainer.EndpointID) error {
// For Kubernetes endpoints, we need to check if the endpoint is up by creating a kube client
if isKubernetesEndpoint {
_, err := service.clientFactory.GetKubeClient(endpoint)
_, err := service.kubeFactory.GetKubeClient(endpoint)
if err != nil {
log.Debug().Err(err).Msgf("Environment %q (id: %d) is not up", endpoint.Name, id)
return fmt.Errorf("environment %q (id: %d) is not up", endpoint.Name, id)
@ -109,7 +116,7 @@ func (service *PendingActionsService) executePendingAction(pendingAction portain
}()
switch pendingAction.Action {
case CleanNAPWithOverridePolicies:
case actions.CleanNAPWithOverridePolicies:
if (pendingAction.ActionData == nil) || (pendingAction.ActionData.(portainer.EndpointGroupID) == 0) {
service.authorizationService.CleanNAPWithOverridePolicies(service.dataStore, endpoint, nil)
return nil
@ -128,7 +135,7 @@ func (service *PendingActionsService) executePendingAction(pendingAction portain
}
return nil
case DeletePortainerK8sRegistrySecrets:
case actions.DeletePortainerK8sRegistrySecrets:
if pendingAction.ActionData == nil {
return nil
}
@ -144,6 +151,22 @@ func (service *PendingActionsService) executePendingAction(pendingAction portain
return fmt.Errorf("failed to delete kubernetes registry secrets for environment %d: %w", endpoint.ID, err)
}
return nil
case actions.PostInitMigrateEnvironment:
postInitMigrator := postinit.NewPostInitMigrator(
service.kubeFactory,
service.dockerFactory,
service.dataStore,
service.assetsPath,
service.kubernetesDeployer,
)
err := postInitMigrator.MigrateEnvironment(endpoint)
if err != nil {
log.Error().Err(err).Msgf("Error running post-init migrations for edge environment %d", endpoint.ID)
return fmt.Errorf("failed running post-init migrations for edge environment %d: %w", endpoint.ID, err)
}
return nil
}