fix(pendingactions): refactor pending actions [EE-7011] (#11780)

pull/11604/merge
Matt Hook 2024-05-09 08:10:10 +12:00 committed by GitHub
parent 9685e260ea
commit 5a5a10821d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 380 additions and 293 deletions

View File

@ -43,6 +43,8 @@ import (
"github.com/portainer/portainer/api/ldap" "github.com/portainer/portainer/api/ldap"
"github.com/portainer/portainer/api/oauth" "github.com/portainer/portainer/api/oauth"
"github.com/portainer/portainer/api/pendingactions" "github.com/portainer/portainer/api/pendingactions"
"github.com/portainer/portainer/api/pendingactions/actions"
"github.com/portainer/portainer/api/pendingactions/handlers"
"github.com/portainer/portainer/api/scheduler" "github.com/portainer/portainer/api/scheduler"
"github.com/portainer/portainer/api/stacks/deployments" "github.com/portainer/portainer/api/stacks/deployments"
"github.com/portainer/portainer/pkg/featureflags" "github.com/portainer/portainer/pkg/featureflags"
@ -482,7 +484,10 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
kubernetesDeployer := initKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, digitalSignatureService, proxyManager, *flags.Assets) kubernetesDeployer := initKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, digitalSignatureService, proxyManager, *flags.Assets)
pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory, dockerClientFactory, authorizationService, shutdownCtx, *flags.Assets, kubernetesDeployer) pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory)
pendingActionsService.RegisterHandler(actions.CleanNAPWithOverridePolicies, handlers.NewHandlerCleanNAPWithOverridePolicies(authorizationService, dataStore))
pendingActionsService.RegisterHandler(actions.DeleteK8sRegistrySecrets, handlers.NewHandlerDeleteRegistrySecrets(authorizationService, dataStore, kubernetesClientFactory))
pendingActionsService.RegisterHandler(actions.PostInitMigrateEnvironment, handlers.NewHandlerPostInitMigrateEnvironment(authorizationService, dataStore, kubernetesClientFactory, dockerClientFactory, *flags.Assets, kubernetesDeployer))
snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService) snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService)
if err != nil { if err != nil {

View File

@ -71,7 +71,7 @@ type (
} }
PendingActionsService interface { PendingActionsService interface {
BaseCRUD[portainer.PendingActions, portainer.PendingActionsID] BaseCRUD[portainer.PendingAction, portainer.PendingActionID]
GetNextIdentifier() int GetNextIdentifier() int
DeleteByEndpointID(ID portainer.EndpointID) error DeleteByEndpointID(ID portainer.EndpointID) error
} }

View File

@ -14,11 +14,11 @@ const (
) )
type Service struct { type Service struct {
dataservices.BaseDataService[portainer.PendingActions, portainer.PendingActionsID] dataservices.BaseDataService[portainer.PendingAction, portainer.PendingActionID]
} }
type ServiceTx struct { type ServiceTx struct {
dataservices.BaseDataServiceTx[portainer.PendingActions, portainer.PendingActionsID] dataservices.BaseDataServiceTx[portainer.PendingAction, portainer.PendingActionID]
} }
func NewService(connection portainer.Connection) (*Service, error) { func NewService(connection portainer.Connection) (*Service, error) {
@ -28,20 +28,20 @@ func NewService(connection portainer.Connection) (*Service, error) {
} }
return &Service{ return &Service{
BaseDataService: dataservices.BaseDataService[portainer.PendingActions, portainer.PendingActionsID]{ BaseDataService: dataservices.BaseDataService[portainer.PendingAction, portainer.PendingActionID]{
Bucket: BucketName, Bucket: BucketName,
Connection: connection, Connection: connection,
}, },
}, nil }, nil
} }
func (s Service) Create(config *portainer.PendingActions) error { func (s Service) Create(config *portainer.PendingAction) error {
return s.Connection.UpdateTx(func(tx portainer.Transaction) error { return s.Connection.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Create(config) return s.Tx(tx).Create(config)
}) })
} }
func (s Service) Update(ID portainer.PendingActionsID, config *portainer.PendingActions) error { func (s Service) Update(ID portainer.PendingActionID, config *portainer.PendingAction) error {
return s.Connection.UpdateTx(func(tx portainer.Transaction) error { return s.Connection.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Update(ID, config) return s.Tx(tx).Update(ID, config)
}) })
@ -55,7 +55,7 @@ func (s Service) DeleteByEndpointID(ID portainer.EndpointID) error {
func (service *Service) Tx(tx portainer.Transaction) ServiceTx { func (service *Service) Tx(tx portainer.Transaction) ServiceTx {
return ServiceTx{ return ServiceTx{
BaseDataServiceTx: dataservices.BaseDataServiceTx[portainer.PendingActions, portainer.PendingActionsID]{ BaseDataServiceTx: dataservices.BaseDataServiceTx[portainer.PendingAction, portainer.PendingActionID]{
Bucket: BucketName, Bucket: BucketName,
Connection: service.Connection, Connection: service.Connection,
Tx: tx, Tx: tx,
@ -63,16 +63,16 @@ func (service *Service) Tx(tx portainer.Transaction) ServiceTx {
} }
} }
func (s ServiceTx) Create(config *portainer.PendingActions) error { func (s ServiceTx) Create(config *portainer.PendingAction) error {
return s.Tx.CreateObject(BucketName, func(id uint64) (int, interface{}) { return s.Tx.CreateObject(BucketName, func(id uint64) (int, interface{}) {
config.ID = portainer.PendingActionsID(id) config.ID = portainer.PendingActionID(id)
config.CreatedAt = time.Now().Unix() config.CreatedAt = time.Now().Unix()
return int(config.ID), config return int(config.ID), config
}) })
} }
func (s ServiceTx) Update(ID portainer.PendingActionsID, config *portainer.PendingActions) error { func (s ServiceTx) Update(ID portainer.PendingActionID, config *portainer.PendingAction) error {
return s.BaseDataServiceTx.Update(ID, config) return s.BaseDataServiceTx.Update(ID, config)
} }

View File

@ -0,0 +1,33 @@
package migrator
import (
"github.com/segmentio/encoding/json"
"github.com/rs/zerolog/log"
)
func (migrator *Migrator) migratePendingActionsDataForDB130() error {
log.Info().Msg("Migrating pending actions data")
pendingActions, err := migrator.pendingActionsService.ReadAll()
if err != nil {
return err
}
for _, pa := range pendingActions {
actionData, err := json.Marshal(pa.ActionData)
if err != nil {
return err
}
pa.ActionData = string(actionData)
// Update the pending action
err = migrator.pendingActionsService.Update(pa.ID, &pa)
if err != nil {
return err
}
}
return nil
}

View File

@ -239,8 +239,11 @@ func (m *Migrator) initMigrations() {
m.addMigrations("2.20.2", m.addMigrations("2.20.2",
m.cleanPendingActionsForDeletedEndpointsForDB111, m.cleanPendingActionsForDeletedEndpointsForDB111,
) )
m.addMigrations("2.22.0",
m.migratePendingActionsDataForDB130,
)
// Add new migrations below... // Add new migrations above...
// One function per migration, each versions migration funcs in the same file. // One function per migration, each versions migration funcs in the same file.
} }

View File

@ -5,47 +5,48 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/pendingactions/actions" "github.com/portainer/portainer/api/pendingactions/actions"
"github.com/portainer/portainer/api/pendingactions/handlers"
) )
type cleanNAPWithOverridePolicies struct {
EndpointGroupID portainer.EndpointGroupID
}
func Test_ConvertCleanNAPWithOverridePoliciesPayload(t *testing.T) { func Test_ConvertCleanNAPWithOverridePoliciesPayload(t *testing.T) {
t.Run("test ConvertCleanNAPWithOverridePoliciesPayload", func(t *testing.T) { t.Run("test ConvertCleanNAPWithOverridePoliciesPayload", func(t *testing.T) {
_, store := MustNewTestStore(t, true, false) _, store := MustNewTestStore(t, true, false)
defer store.Close() defer store.Close()
gid := portainer.EndpointGroupID(1)
testData := []struct { testData := []struct {
Name string Name string
PendingAction portainer.PendingActions PendingAction portainer.PendingAction
Expected *actions.CleanNAPWithOverridePoliciesPayload Expected any
Err bool Err bool
}{ }{
{ {
Name: "test actiondata with EndpointGroupID 1", Name: "test actiondata with EndpointGroupID 1",
PendingAction: portainer.PendingActions{ PendingAction: handlers.NewCleanNAPWithOverridePolicies(
EndpointID: 1, 1,
Action: "CleanNAPWithOverridePolicies", &gid,
ActionData: &actions.CleanNAPWithOverridePoliciesPayload{ ),
EndpointGroupID: 1, Expected: portainer.EndpointGroupID(1),
},
},
Expected: &actions.CleanNAPWithOverridePoliciesPayload{
EndpointGroupID: 1,
},
}, },
{ {
Name: "test actionData nil", Name: "test actionData nil",
PendingAction: portainer.PendingActions{ PendingAction: handlers.NewCleanNAPWithOverridePolicies(
EndpointID: 2, 2,
Action: "CleanNAPWithOverridePolicies", nil,
ActionData: nil, ),
},
Expected: nil, Expected: nil,
}, },
{ {
Name: "test actionData empty and expected error", Name: "test actionData empty and expected error",
PendingAction: portainer.PendingActions{ PendingAction: portainer.PendingAction{
EndpointID: 2, EndpointID: 2,
Action: "CleanNAPWithOverridePolicies", Action: actions.CleanNAPWithOverridePolicies,
ActionData: "", ActionData: "",
}, },
Expected: nil, Expected: nil,
@ -68,22 +69,24 @@ func Test_ConvertCleanNAPWithOverridePoliciesPayload(t *testing.T) {
for _, endpointPendingAction := range pendingActions { for _, endpointPendingAction := range pendingActions {
t.Run(d.Name, func(t *testing.T) { t.Run(d.Name, func(t *testing.T) {
if endpointPendingAction.Action == "CleanNAPWithOverridePolicies" { if endpointPendingAction.Action == actions.CleanNAPWithOverridePolicies {
actionData, err := actions.ConvertCleanNAPWithOverridePoliciesPayload(endpointPendingAction.ActionData) var payload cleanNAPWithOverridePolicies
err := endpointPendingAction.UnmarshallActionData(&payload)
if d.Err && err == nil { if d.Err && err == nil {
t.Error(err) t.Error(err)
} }
if d.Expected == nil && actionData != nil { if d.Expected == nil && payload.EndpointGroupID != 0 {
t.Errorf("expected nil , got %d", actionData) t.Errorf("expected nil, got %d", payload.EndpointGroupID)
} }
if d.Expected != nil && actionData == nil { if d.Expected != nil {
t.Errorf("expected not nil , got %d", actionData) expected := d.Expected.(portainer.EndpointGroupID)
} if d.Expected != nil && expected != payload.EndpointGroupID {
t.Errorf("expected EndpointGroupID %d, got %d", expected, payload.EndpointGroupID)
if d.Expected != nil && actionData.EndpointGroupID != d.Expected.EndpointGroupID { }
t.Errorf("expected EndpointGroupID %d , got %d", d.Expected.EndpointGroupID, actionData.EndpointGroupID)
} }
} }
}) })

View File

@ -74,7 +74,7 @@ func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
// this function exists for readability, not reusability // this function exists for readability, not reusability
// TODO: This should be moved into pending actions as part of the pending action migration // TODO: This should be moved into pending actions as part of the pending action migration
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error { func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
migrateEnvPendingAction := portainer.PendingActions{ migrateEnvPendingAction := portainer.PendingAction{
EndpointID: environmentID, EndpointID: environmentID,
Action: actions.PostInitMigrateEnvironment, Action: actions.PostInitMigrateEnvironment,
} }

View File

@ -941,6 +941,6 @@
} }
], ],
"version": { "version": {
"VERSION": "{\"SchemaVersion\":\"2.22.0\",\"MigratorCount\":0,\"Edition\":1,\"InstanceID\":\"463d5c47-0ea5-4aca-85b1-405ceefee254\"}" "VERSION": "{\"SchemaVersion\":\"2.22.0\",\"MigratorCount\":1,\"Edition\":1,\"InstanceID\":\"463d5c47-0ea5-4aca-85b1-405ceefee254\"}"
} }
} }

View File

@ -8,7 +8,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/tag" "github.com/portainer/portainer/api/internal/tag"
pendingActionActions "github.com/portainer/portainer/api/pendingactions/actions" "github.com/portainer/portainer/api/pendingactions/handlers"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
@ -157,13 +157,7 @@ func (handler *Handler) updateEndpointGroup(tx dataservices.DataStoreTx, endpoin
if err != nil { if err != nil {
// Update flag with endpoint and continue // Update flag with endpoint and continue
go func(endpointID portainer.EndpointID, endpointGroupID portainer.EndpointGroupID) { go func(endpointID portainer.EndpointID, endpointGroupID portainer.EndpointGroupID) {
err := handler.PendingActionsService.Create(portainer.PendingActions{ err := handler.PendingActionsService.Create(handlers.NewCleanNAPWithOverridePolicies(endpointID, &endpointGroupID))
EndpointID: endpointID,
Action: "CleanNAPWithOverridePolicies",
ActionData: &pendingActionActions.CleanNAPWithOverridePoliciesPayload{
EndpointGroupID: endpointGroupID,
},
})
if err != nil { if err != nil {
log.Error().Err(err).Msgf("Unable to create pending action to clean NAP with override policies for endpoint (%d) and endpoint group (%d).", endpointID, endpointGroupID) log.Error().Err(err).Msgf("Unable to create pending action to clean NAP with override policies for endpoint (%d) and endpoint group (%d).", endpointID, endpointGroupID)
} }

View File

@ -9,6 +9,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/client" "github.com/portainer/portainer/api/http/client"
"github.com/portainer/portainer/api/pendingactions/handlers"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
@ -266,11 +267,7 @@ func (handler *Handler) endpointUpdate(w http.ResponseWriter, r *http.Request) *
if endpoint.Type == portainer.KubernetesLocalEnvironment || endpoint.Type == portainer.AgentOnKubernetesEnvironment || endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment { if endpoint.Type == portainer.KubernetesLocalEnvironment || endpoint.Type == portainer.AgentOnKubernetesEnvironment || endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment {
err = handler.AuthorizationService.CleanNAPWithOverridePolicies(handler.DataStore, endpoint, nil) err = handler.AuthorizationService.CleanNAPWithOverridePolicies(handler.DataStore, endpoint, nil)
if err != nil { if err != nil {
handler.PendingActionsService.Create(portainer.PendingActions{ handler.PendingActionsService.Create(handlers.NewCleanNAPWithOverridePolicies(endpoint.ID, nil))
EndpointID: endpoint.ID,
Action: "CleanNAPWithOverridePolicies",
ActionData: nil,
})
log.Warn().Err(err).Msgf("Unable to clean NAP with override policies for endpoint (%d). Will try to update when endpoint is online.", endpoint.ID) log.Warn().Err(err).Msgf("Unable to clean NAP with override policies for endpoint (%d). Will try to update when endpoint is online.", endpoint.ID)
} }
} }

View File

@ -7,8 +7,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
httperrors "github.com/portainer/portainer/api/http/errors" httperrors "github.com/portainer/portainer/api/http/errors"
"github.com/portainer/portainer/api/http/security" "github.com/portainer/portainer/api/http/security"
"github.com/portainer/portainer/api/pendingactions" "github.com/portainer/portainer/api/pendingactions/handlers"
"github.com/portainer/portainer/api/pendingactions/actions"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
@ -90,17 +89,9 @@ func (handler *Handler) deleteKubernetesSecrets(registry *portainer.Registry) er
} }
if len(failedNamespaces) > 0 { if len(failedNamespaces) > 0 {
handler.PendingActionsService.Create(portainer.PendingActions{ handler.PendingActionsService.Create(
EndpointID: endpointId, handlers.NewDeleteK8sRegistrySecrets(portainer.EndpointID(endpointId), registry.ID, failedNamespaces),
Action: actions.DeletePortainerK8sRegistrySecrets, )
// When extracting the data, this is the type we need to pull out
// i.e. pendingactions.DeletePortainerK8sRegistrySecretsData
ActionData: pendingactions.DeletePortainerK8sRegistrySecretsData{
RegistryID: registry.ID,
Namespaces: failedNamespaces,
},
})
} }
} }
} }

50
api/pending_action.go Normal file
View File

@ -0,0 +1,50 @@
package portainer
import "github.com/segmentio/encoding/json"
type (
PendingActionID int
PendingAction struct {
ID PendingActionID `json:"ID"`
EndpointID EndpointID `json:"EndpointID"`
Action string `json:"Action"`
ActionData any `json:"ActionData"`
CreatedAt int64 `json:"CreatedAt"`
}
PendingActionHandler interface {
Execute(PendingAction, *Endpoint) error
}
)
// MarshalJSON marshals the PendingAction struct to JSON
// and converts the ActionData field to an embedded json string
// This makes unmarshalling the ActionData field easier
func (pa PendingAction) MarshalJSON() ([]byte, error) {
// Create a map to hold the marshalled fields
data := map[string]any{
"ID": pa.ID,
"EndpointID": pa.EndpointID,
"Action": pa.Action,
"CreatedAt": pa.CreatedAt,
}
actionDataBytes, err := json.Marshal(pa.ActionData)
if err != nil {
return nil, err
}
data["ActionData"] = string(actionDataBytes)
// Marshal the map
return json.Marshal(data)
}
// Unmarshal the ActionData field from a string to a specific type.
func (pa PendingAction) UnmarshallActionData(v any) error {
s, ok := pa.ActionData.(string)
if !ok {
return nil
}
return json.Unmarshal([]byte(s), v)
}

View File

@ -1,7 +1,7 @@
package actions package actions
const ( const (
CleanNAPWithOverridePolicies = "CleanNAPWithOverridePolicies" CleanNAPWithOverridePolicies = "CleanNAPWithOverridePolicies"
DeletePortainerK8sRegistrySecrets = "DeletePortainerK8sRegistrySecrets" DeleteK8sRegistrySecrets = "DeleteK8sRegistrySecrets"
PostInitMigrateEnvironment = "PostInitMigrateEnvironment" PostInitMigrateEnvironment = "PostInitMigrateEnvironment"
) )

View File

@ -1,44 +0,0 @@
package actions
import (
"fmt"
portainer "github.com/portainer/portainer/api"
)
type (
CleanNAPWithOverridePoliciesPayload struct {
EndpointGroupID portainer.EndpointGroupID
}
)
func ConvertCleanNAPWithOverridePoliciesPayload(actionData interface{}) (*CleanNAPWithOverridePoliciesPayload, error) {
var payload CleanNAPWithOverridePoliciesPayload
if actionData == nil {
return nil, nil
}
// backward compatible with old data format
if endpointGroupId, ok := actionData.(float64); ok {
payload.EndpointGroupID = portainer.EndpointGroupID(endpointGroupId)
return &payload, nil
}
data, ok := actionData.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("failed to convert actionData to map[string]interface{}")
}
for key, value := range data {
switch key {
case "EndpointGroupID":
if endpointGroupID, ok := value.(float64); ok {
payload.EndpointGroupID = portainer.EndpointGroupID(endpointGroupID)
}
}
}
return &payload, nil
}

View File

@ -1,76 +0,0 @@
package pendingactions
import (
"fmt"
portainer "github.com/portainer/portainer/api"
"github.com/rs/zerolog/log"
)
type DeletePortainerK8sRegistrySecretsData struct {
RegistryID portainer.RegistryID `json:"RegistryID"`
Namespaces []string `json:"Namespaces"`
}
func (service *PendingActionsService) DeleteKubernetesRegistrySecrets(endpoint *portainer.Endpoint, registryData *DeletePortainerK8sRegistrySecretsData) error {
if endpoint == nil || registryData == nil {
return nil
}
kubeClient, err := service.kubeFactory.GetKubeClient(endpoint)
if err != nil {
return err
}
for _, namespace := range registryData.Namespaces {
err = kubeClient.DeleteRegistrySecret(registryData.RegistryID, namespace)
if err != nil {
return err
}
}
return nil
}
// Failure in this code is basically a bug. So if we get one we should log it and continue.
func convertToDeletePortainerK8sRegistrySecretsData(actionData interface{}) (*DeletePortainerK8sRegistrySecretsData, error) {
var registryData DeletePortainerK8sRegistrySecretsData
// Due to the way data is stored and subsequently read from the database, we can't directly type assert the actionData to
// the type DeletePortainerK8sRegistrySecretsData. It's stored as a map[string]interface{} and we need to extract the
// data from that map.
if data, ok := actionData.(map[string]interface{}); ok {
for key, value := range data {
switch key {
case "Namespaces":
if namespaces, ok := value.([]interface{}); ok {
registryData.Namespaces = make([]string, len(namespaces))
for i, ns := range namespaces {
if namespace, ok := ns.(string); ok {
registryData.Namespaces[i] = namespace
}
}
} else {
// we shouldn't ever see this. It's a bug if we do.
log.Debug().Msgf("DeletePortainerK8sRegistrySecrets: Failed to convert Namespaces to []interface{}")
}
case "RegistryID":
if registryID, ok := value.(float64); ok {
registryData.RegistryID = portainer.RegistryID(registryID)
} else {
// we shouldn't ever see this. It's a bug if we do.
log.Debug().Msgf("DeletePortainerK8sRegistrySecrets: Failed to convert RegistryID to float64")
}
}
}
log.Debug().Msgf("DeletePortainerK8sRegistrySecrets: %+v", registryData)
} else {
// this should not happen. It's a bug if it does. As the actionData is defined
// by what portainer puts in it. It never comes from a user or external source so it shouldn't fail.
// Nevertheless we should check it in case of db corruption or developer mistake down the road
return nil, fmt.Errorf("type assertion failed in convertToDeletePortainerK8sRegistrySecretsData")
}
return &registryData, nil
}

View File

@ -0,0 +1,82 @@
package handlers
import (
"fmt"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/authorization"
"github.com/portainer/portainer/api/pendingactions/actions"
"github.com/rs/zerolog/log"
)
type (
cleanNAPWithOverridePolicies struct {
EndpointGroupID portainer.EndpointGroupID
}
HandlerCleanNAPWithOverridePolicies struct {
authorizationService *authorization.Service
dataStore dataservices.DataStore
}
)
// NewCleanNAPWithOverridePolicies creates a new CleanNAPWithOverridePolicies pending action
func NewCleanNAPWithOverridePolicies(endpointID portainer.EndpointID, gid *portainer.EndpointGroupID) portainer.PendingAction {
pendingAction := portainer.PendingAction{
EndpointID: endpointID,
Action: actions.CleanNAPWithOverridePolicies,
}
if gid != nil {
pendingAction.ActionData = cleanNAPWithOverridePolicies{
EndpointGroupID: *gid,
}
}
return pendingAction
}
// NewHandlerCleanNAPWithOverridePolicies creates a new handler to execute CleanNAPWithOverridePolicies pending action
func NewHandlerCleanNAPWithOverridePolicies(
authorizationService *authorization.Service,
dataStore dataservices.DataStore,
) *HandlerCleanNAPWithOverridePolicies {
return &HandlerCleanNAPWithOverridePolicies{
authorizationService: authorizationService,
dataStore: dataStore,
}
}
func (h *HandlerCleanNAPWithOverridePolicies) Execute(pendingAction portainer.PendingAction, endpoint *portainer.Endpoint) error {
if pendingAction.ActionData == nil {
h.authorizationService.CleanNAPWithOverridePolicies(h.dataStore, endpoint, nil)
return nil
}
var payload cleanNAPWithOverridePolicies
err := pendingAction.UnmarshallActionData(&payload)
if err != nil {
log.Error().Err(err).Msgf("Error unmarshalling endpoint group ID for cleaning NAP with override policies for environment %d", endpoint.ID)
return fmt.Errorf("failed to unmarshal endpoint group ID for cleaning NAP with override policies for environment %d: %w", endpoint.ID, err)
}
if payload.EndpointGroupID == 0 {
h.authorizationService.CleanNAPWithOverridePolicies(h.dataStore, endpoint, nil)
return nil
}
endpointGroup, err := h.dataStore.EndpointGroup().Read(portainer.EndpointGroupID(payload.EndpointGroupID))
if err != nil {
log.Error().Err(err).Msgf("Error reading environment group to clean NAP with override policies for environment %d and environment group %d", endpoint.ID, endpointGroup.ID)
return fmt.Errorf("failed to retrieve environment group %d: %w", payload.EndpointGroupID, err)
}
err = h.authorizationService.CleanNAPWithOverridePolicies(h.dataStore, endpoint, endpointGroup)
if err != nil {
log.Error().Err(err).Msgf("Error cleaning NAP with override policies for environment %d and environment group %d", endpoint.ID, endpointGroup.ID)
return fmt.Errorf("failed to clean NAP with override policies for environment %d and environment group %d: %w", endpoint.ID, endpointGroup.ID, err)
}
return nil
}

View File

@ -0,0 +1,73 @@
package handlers
import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/authorization"
kubecli "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/pendingactions/actions"
)
type (
HandlerDeleteK8sRegistrySecrets struct {
authorizationService *authorization.Service
dataStore dataservices.DataStore
kubeFactory *kubecli.ClientFactory
}
deleteK8sRegistrySecretsData struct {
RegistryID portainer.RegistryID `json:"RegistryID"`
Namespaces []string `json:"Namespaces"`
}
)
// NewDeleteK8sRegistrySecrets creates a new DeleteK8sRegistrySecrets pending action
func NewDeleteK8sRegistrySecrets(endpointID portainer.EndpointID, registryID portainer.RegistryID, namespaces []string) portainer.PendingAction {
return portainer.PendingAction{
EndpointID: endpointID,
Action: actions.DeleteK8sRegistrySecrets,
ActionData: &deleteK8sRegistrySecretsData{
RegistryID: registryID,
Namespaces: namespaces,
},
}
}
// NewHandlerDeleteRegistrySecrets creates a new handler to execute DeleteK8sRegistrySecrets pending action
func NewHandlerDeleteRegistrySecrets(
authorizationService *authorization.Service,
dataStore dataservices.DataStore,
kubeFactory *kubecli.ClientFactory,
) *HandlerDeleteK8sRegistrySecrets {
return &HandlerDeleteK8sRegistrySecrets{
authorizationService: authorizationService,
dataStore: dataStore,
kubeFactory: kubeFactory,
}
}
func (h *HandlerDeleteK8sRegistrySecrets) Execute(pa portainer.PendingAction, endpoint *portainer.Endpoint) error {
if endpoint == nil || pa.ActionData == nil {
return nil
}
var registryData deleteK8sRegistrySecretsData
err := pa.UnmarshallActionData(&registryData)
if err != nil {
return err
}
kubeClient, err := h.kubeFactory.GetKubeClient(endpoint)
if err != nil {
return err
}
for _, namespace := range registryData.Namespaces {
err = kubeClient.DeleteRegistrySecret(registryData.RegistryID, namespace)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,58 @@
package handlers
import (
"fmt"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/datastore/postinit"
dockerClient "github.com/portainer/portainer/api/docker/client"
"github.com/portainer/portainer/api/internal/authorization"
kubecli "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/rs/zerolog/log"
)
type HandlerPostInitMigrateEnvironment struct {
authorizationService *authorization.Service
dataStore dataservices.DataStore
kubeFactory *kubecli.ClientFactory
dockerFactory *dockerClient.ClientFactory
assetsPath string
kubernetesDeployer portainer.KubernetesDeployer
}
// NewPostInitMigrateEnvironment creates a new PostInitMigrateEnvironment pending action
func NewHandlerPostInitMigrateEnvironment(
authorizationService *authorization.Service,
dataStore dataservices.DataStore,
kubeFactory *kubecli.ClientFactory,
dockerFactory *dockerClient.ClientFactory,
assetsPath string,
kubernetesDeployer portainer.KubernetesDeployer,
) *HandlerPostInitMigrateEnvironment {
return &HandlerPostInitMigrateEnvironment{
authorizationService: authorizationService,
dataStore: dataStore,
kubeFactory: kubeFactory,
dockerFactory: dockerFactory,
assetsPath: assetsPath,
kubernetesDeployer: kubernetesDeployer,
}
}
func (h *HandlerPostInitMigrateEnvironment) Execute(_ portainer.PendingAction, endpoint *portainer.Endpoint) error {
postInitMigrator := postinit.NewPostInitMigrator(
h.kubeFactory,
h.dockerFactory,
h.dataStore,
h.assetsPath,
h.kubernetesDeployer,
)
err := postInitMigrator.MigrateEnvironment(endpoint)
if err != nil {
log.Error().Err(err).Msgf("Error running post-init migrations for edge environment %d", endpoint.ID)
return fmt.Errorf("failed running post-init migrations for edge environment %d: %w", endpoint.ID, err)
}
return nil
}

View File

@ -1,62 +1,44 @@
package pendingactions package pendingactions
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/datastore/postinit"
dockerClient "github.com/portainer/portainer/api/docker/client"
"github.com/portainer/portainer/api/internal/authorization"
"github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/endpointutils"
kubecli "github.com/portainer/portainer/api/kubernetes/cli" kubecli "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/pendingactions/actions"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
type ( type PendingActionsService struct {
PendingActionsService struct { kubeFactory *kubecli.ClientFactory
authorizationService *authorization.Service dataStore dataservices.DataStore
kubeFactory *kubecli.ClientFactory mu sync.Mutex
dockerFactory *dockerClient.ClientFactory }
dataStore dataservices.DataStore
shutdownCtx context.Context
assetsPath string
kubernetesDeployer portainer.KubernetesDeployer
mu sync.Mutex var handlers = make(map[string]portainer.PendingActionHandler)
}
)
func NewService( func NewService(
dataStore dataservices.DataStore, dataStore dataservices.DataStore,
kubeFactory *kubecli.ClientFactory, kubeFactory *kubecli.ClientFactory,
dockerFactory *dockerClient.ClientFactory,
authorizationService *authorization.Service,
shutdownCtx context.Context,
assetsPath string,
kubernetesDeployer portainer.KubernetesDeployer,
) *PendingActionsService { ) *PendingActionsService {
return &PendingActionsService{ return &PendingActionsService{
dataStore: dataStore, dataStore: dataStore,
shutdownCtx: shutdownCtx, kubeFactory: kubeFactory,
authorizationService: authorizationService, mu: sync.Mutex{},
kubeFactory: kubeFactory,
dockerFactory: dockerFactory,
assetsPath: assetsPath,
kubernetesDeployer: kubernetesDeployer,
mu: sync.Mutex{},
} }
} }
func (service *PendingActionsService) Create(r portainer.PendingActions) error { func (service *PendingActionsService) RegisterHandler(name string, handler portainer.PendingActionHandler) {
handlers[name] = handler
}
func (service *PendingActionsService) Create(r portainer.PendingAction) error {
return service.dataStore.PendingActions().Create(&r) return service.dataStore.PendingActions().Create(&r)
} }
func (service *PendingActionsService) Execute(id portainer.EndpointID) error { func (service *PendingActionsService) Execute(id portainer.EndpointID) error {
service.mu.Lock() service.mu.Lock()
defer service.mu.Unlock() defer service.mu.Unlock()
@ -108,73 +90,18 @@ func (service *PendingActionsService) Execute(id portainer.EndpointID) error {
return nil return nil
} }
func (service *PendingActionsService) executePendingAction(pendingAction portainer.PendingActions, endpoint *portainer.Endpoint) error { func (service *PendingActionsService) executePendingAction(pendingAction portainer.PendingAction, endpoint *portainer.Endpoint) error {
log.Debug().Msgf("Executing pending action %s for environment %d", pendingAction.Action, pendingAction.EndpointID) log.Debug().Msgf("Executing pending action %s for environment %d", pendingAction.Action, pendingAction.EndpointID)
defer func() { defer func() {
log.Debug().Msgf("End executing pending action %s for environment %d", pendingAction.Action, pendingAction.EndpointID) log.Debug().Msgf("End executing pending action %s for environment %d", pendingAction.Action, pendingAction.EndpointID)
}() }()
switch pendingAction.Action { handler, ok := handlers[pendingAction.Action]
case actions.CleanNAPWithOverridePolicies: if !ok {
pendingActionData, err := actions.ConvertCleanNAPWithOverridePoliciesPayload(pendingAction.ActionData) log.Warn().Msgf("No handler found for pending action %s", pendingAction.Action)
if err != nil {
return fmt.Errorf("failed to parse pendingActionData for CleanNAPWithOverridePoliciesPayload")
}
if pendingActionData == nil || pendingActionData.EndpointGroupID == 0 {
service.authorizationService.CleanNAPWithOverridePolicies(service.dataStore, endpoint, nil)
return nil
}
endpointGroupID := pendingActionData.EndpointGroupID
endpointGroup, err := service.dataStore.EndpointGroup().Read(portainer.EndpointGroupID(endpointGroupID))
if err != nil {
log.Error().Err(err).Msgf("Error reading environment group to clean NAP with override policies for environment %d and environment group %d", endpoint.ID, endpointGroup.ID)
return fmt.Errorf("failed to retrieve environment group %d: %w", endpointGroupID, err)
}
err = service.authorizationService.CleanNAPWithOverridePolicies(service.dataStore, endpoint, endpointGroup)
if err != nil {
log.Error().Err(err).Msgf("Error cleaning NAP with override policies for environment %d and environment group %d", endpoint.ID, endpointGroup.ID)
return fmt.Errorf("failed to clean NAP with override policies for environment %d and environment group %d: %w", endpoint.ID, endpointGroup.ID, err)
}
return nil
case actions.DeletePortainerK8sRegistrySecrets:
if pendingAction.ActionData == nil {
return nil
}
registryData, err := convertToDeletePortainerK8sRegistrySecretsData(pendingAction.ActionData)
if err != nil {
return fmt.Errorf("failed to parse pendingActionData: %w", err)
}
err = service.DeleteKubernetesRegistrySecrets(endpoint, registryData)
if err != nil {
log.Warn().Err(err).Int("endpoint_id", int(endpoint.ID)).Msgf("Unable to delete kubernetes registry secrets")
return fmt.Errorf("failed to delete kubernetes registry secrets for environment %d: %w", endpoint.ID, err)
}
return nil
case actions.PostInitMigrateEnvironment:
postInitMigrator := postinit.NewPostInitMigrator(
service.kubeFactory,
service.dockerFactory,
service.dataStore,
service.assetsPath,
service.kubernetesDeployer,
)
err := postInitMigrator.MigrateEnvironment(endpoint)
if err != nil {
log.Error().Err(err).Msgf("Error running post-init migrations for edge environment %d", endpoint.ID)
return fmt.Errorf("failed running post-init migrations for edge environment %d: %w", endpoint.ID, err)
}
return nil return nil
} }
return nil return handler.Execute(pendingAction, endpoint)
} }

View File

@ -381,15 +381,6 @@ type (
// EdgeStackStatusType represents an edge stack status type // EdgeStackStatusType represents an edge stack status type
EdgeStackStatusType int EdgeStackStatusType int
PendingActionsID int
PendingActions struct {
ID PendingActionsID `json:"ID"`
EndpointID EndpointID `json:"EndpointID"`
Action string `json:"Action"`
ActionData interface{} `json:"ActionData"`
CreatedAt int64 `json:"CreatedAt"`
}
// Environment(Endpoint) represents a Docker environment(endpoint) with all the info required // Environment(Endpoint) represents a Docker environment(endpoint) with all the info required
// to connect to it // to connect to it
Endpoint struct { Endpoint struct {