You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
portainer/api/datastore/postinit/migrate_post_init.go

204 lines
7.6 KiB

package postinit
import (
"context"
"fmt"
"reflect"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
dockerClient "github.com/portainer/portainer/api/docker/client"
"github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/pendingactions/actions"
"github.com/rs/zerolog/log"
)
type PostInitMigrator struct {
kubeFactory *cli.ClientFactory
dockerFactory *dockerClient.ClientFactory
dataStore dataservices.DataStore
assetsPath string
kubernetesDeployer portainer.KubernetesDeployer
}
func NewPostInitMigrator(
kubeFactory *cli.ClientFactory,
dockerFactory *dockerClient.ClientFactory,
dataStore dataservices.DataStore,
assetsPath string,
kubernetesDeployer portainer.KubernetesDeployer,
) *PostInitMigrator {
return &PostInitMigrator{
kubeFactory: kubeFactory,
dockerFactory: dockerFactory,
dataStore: dataStore,
assetsPath: assetsPath,
kubernetesDeployer: kubernetesDeployer,
}
}
// PostInitMigrate will run all post-init migrations, which require docker/kube clients for all edge or non-edge environments
func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
environments, err := postInitMigrator.dataStore.Endpoint().Endpoints()
if err != nil {
log.Error().Err(err).Msg("Error getting environments")
return err
}
for _, environment := range environments {
// edge environments will run after the server starts, in pending actions
if endpointutils.IsEdgeEndpoint(&environment) {
log.Info().Msgf("Adding pending action 'PostInitMigrateEnvironment' for environment %d", environment.ID)
err = postInitMigrator.createPostInitMigrationPendingAction(environment.ID)
if err != nil {
log.Error().Err(err).Msgf("Error creating pending action for environment %d", environment.ID)
}
} else {
// non-edge environments will run before the server starts.
err = postInitMigrator.MigrateEnvironment(&environment)
if err != nil {
log.Error().Err(err).Msgf("Error running post-init migrations for non-edge environment %d", environment.ID)
}
}
}
return nil
}
// try to create a post init migration pending action. If it already exists, do nothing
// this function exists for readability, not reusability
// TODO: This should be moved into pending actions as part of the pending action migration
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
migrateEnvPendingAction := portainer.PendingActions{
EndpointID: environmentID,
Action: actions.PostInitMigrateEnvironment,
}
// Get all pending actions and filter them by endpoint, action and action args that are equal to the migrateEnvPendingAction
pendingActions, err := postInitMigrator.dataStore.PendingActions().ReadAll()
if err != nil {
log.Error().Err(err).Msgf("Error retrieving pending actions")
return fmt.Errorf("failed to retrieve pending actions for environment %d: %w", environmentID, err)
}
for _, pendingAction := range pendingActions {
if pendingAction.EndpointID == environmentID &&
pendingAction.Action == migrateEnvPendingAction.Action &&
reflect.DeepEqual(pendingAction.ActionData, migrateEnvPendingAction.ActionData) {
log.Debug().Msgf("Migration pending action for environment %d already exists, skipping creating another", environmentID)
return nil
}
}
// If there are no pending actions for the given endpoint, create one
err = postInitMigrator.dataStore.PendingActions().Create(&migrateEnvPendingAction)
if err != nil {
log.Error().Err(err).Msgf("Error creating pending action for environment %d", environmentID)
}
return nil
}
// MigrateEnvironment runs migrations on a single environment
func (migrator *PostInitMigrator) MigrateEnvironment(environment *portainer.Endpoint) error {
log.Info().Msgf("Executing post init migration for environment %d", environment.ID)
switch {
case endpointutils.IsKubernetesEndpoint(environment):
// get the kubeclient for the environment, and skip all kube migrations if there's an error
kubeclient, err := migrator.kubeFactory.GetKubeClient(environment)
if err != nil {
log.Error().Err(err).Msgf("Error creating kubeclient for environment: %d", environment.ID)
return err
}
// if one environment fails, it is logged and the next migration runs. The error is returned at the end and handled by pending actions
err = migrator.MigrateIngresses(*environment, kubeclient)
if err != nil {
return err
}
return nil
case endpointutils.IsDockerEndpoint(environment):
// get the docker client for the environment, and skip all docker migrations if there's an error
dockerClient, err := migrator.dockerFactory.CreateClient(environment, "", nil)
if err != nil {
log.Error().Err(err).Msgf("Error creating docker client for environment: %d", environment.ID)
return err
}
defer dockerClient.Close()
migrator.MigrateGPUs(*environment, dockerClient)
}
return nil
}
func (migrator *PostInitMigrator) MigrateIngresses(environment portainer.Endpoint, kubeclient *cli.KubeClient) error {
// Early exit if we do not need to migrate!
if !environment.PostInitMigrations.MigrateIngresses {
return nil
}
log.Debug().Msgf("Migrating ingresses for environment %d", environment.ID)
err := migrator.kubeFactory.MigrateEndpointIngresses(&environment, migrator.dataStore, kubeclient)
if err != nil {
log.Error().Err(err).Msgf("Error migrating ingresses for environment %d", environment.ID)
return err
}
return nil
}
// MigrateGPUs will check all docker endpoints for containers with GPUs and set EnableGPUManagement to true if any are found
// If there's an error getting the containers, we'll log it and move on
func (migrator *PostInitMigrator) MigrateGPUs(e portainer.Endpoint, dockerClient *client.Client) error {
return migrator.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
environment, err := tx.Endpoint().Endpoint(e.ID)
if err != nil {
log.Error().Err(err).Msgf("Error getting environment %d", environment.ID)
return err
}
// Early exit if we do not need to migrate!
if !environment.PostInitMigrations.MigrateGPUs {
return nil
}
log.Debug().Msgf("Migrating GPUs for environment %d", e.ID)
// get all containers
containers, err := dockerClient.ContainerList(context.Background(), container.ListOptions{All: true})
if err != nil {
log.Error().Err(err).Msgf("failed to list containers for environment %d", environment.ID)
return err
}
// check for a gpu on each container. If even one GPU is found, set EnableGPUManagement to true for the whole environment
containersLoop:
for _, container := range containers {
// https://www.sobyte.net/post/2022-10/go-docker/ has nice documentation on the docker client with GPUs
containerDetails, err := dockerClient.ContainerInspect(context.Background(), container.ID)
if err != nil {
log.Error().Err(err).Msg("failed to inspect container")
continue
}
deviceRequests := containerDetails.HostConfig.Resources.DeviceRequests
for _, deviceRequest := range deviceRequests {
if deviceRequest.Driver == "nvidia" {
environment.EnableGPUManagement = true
break containersLoop
}
}
}
// set the MigrateGPUs flag to false so we don't run this again
environment.PostInitMigrations.MigrateGPUs = false
err = tx.Endpoint().UpdateEndpoint(environment.ID, environment)
if err != nil {
log.Error().Err(err).Msgf("Error updating EnableGPUManagement flag for environment %d", environment.ID)
return err
}
return nil
})
}