fix(kubernetes): kube env permissions when down [EE-5427] (#10327)

pull/10331/head
Prabhat Khera 2023-09-19 08:57:27 +12:00 committed by GitHub
parent cc37ccfe4d
commit 14853f6da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 282 additions and 103 deletions

View File

@ -43,6 +43,7 @@ import (
kubecli "github.com/portainer/portainer/api/kubernetes/cli" kubecli "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/ldap" "github.com/portainer/portainer/api/ldap"
"github.com/portainer/portainer/api/oauth" "github.com/portainer/portainer/api/oauth"
"github.com/portainer/portainer/api/pendingactions"
"github.com/portainer/portainer/api/scheduler" "github.com/portainer/portainer/api/scheduler"
"github.com/portainer/portainer/api/stacks/deployments" "github.com/portainer/portainer/api/stacks/deployments"
"github.com/portainer/portainer/pkg/featureflags" "github.com/portainer/portainer/pkg/featureflags"
@ -263,11 +264,12 @@ func initSnapshotService(
dockerClientFactory *dockerclient.ClientFactory, dockerClientFactory *dockerclient.ClientFactory,
kubernetesClientFactory *kubecli.ClientFactory, kubernetesClientFactory *kubecli.ClientFactory,
shutdownCtx context.Context, shutdownCtx context.Context,
pendingActionsService *pendingactions.PendingActionsService,
) (portainer.SnapshotService, error) { ) (portainer.SnapshotService, error) {
dockerSnapshotter := docker.NewSnapshotter(dockerClientFactory) dockerSnapshotter := docker.NewSnapshotter(dockerClientFactory)
kubernetesSnapshotter := kubernetes.NewSnapshotter(kubernetesClientFactory) kubernetesSnapshotter := kubernetes.NewSnapshotter(kubernetesClientFactory)
snapshotService, err := snapshot.NewService(snapshotIntervalFromFlag, dataStore, dockerSnapshotter, kubernetesSnapshotter, shutdownCtx) snapshotService, err := snapshot.NewService(snapshotIntervalFromFlag, dataStore, dockerSnapshotter, kubernetesSnapshotter, shutdownCtx, pendingActionsService)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -454,15 +456,17 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
dockerClientFactory := initDockerClientFactory(digitalSignatureService, reverseTunnelService) dockerClientFactory := initDockerClientFactory(digitalSignatureService, reverseTunnelService)
kubernetesClientFactory, err := initKubernetesClientFactory(digitalSignatureService, reverseTunnelService, dataStore, instanceID, *flags.AddrHTTPS, settings.UserSessionTimeout) kubernetesClientFactory, err := initKubernetesClientFactory(digitalSignatureService, reverseTunnelService, dataStore, instanceID, *flags.AddrHTTPS, settings.UserSessionTimeout)
snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx) authorizationService := authorization.NewService(dataStore)
authorizationService.K8sClientFactory = kubernetesClientFactory
pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory, authorizationService, shutdownCtx)
snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed initializing snapshot service") log.Fatal().Err(err).Msg("failed initializing snapshot service")
} }
snapshotService.Start() snapshotService.Start()
authorizationService := authorization.NewService(dataStore)
authorizationService.K8sClientFactory = kubernetesClientFactory
kubernetesTokenCacheManager := kubeproxy.NewTokenCacheManager() kubernetesTokenCacheManager := kubeproxy.NewTokenCacheManager()
kubeClusterAccessService := kubernetes.NewKubeClusterAccessService(*flags.BaseURL, *flags.AddrHTTPS, sslSettings.CertPath) kubeClusterAccessService := kubernetes.NewKubeClusterAccessService(*flags.BaseURL, *flags.AddrHTTPS, sslSettings.CertPath)
@ -622,6 +626,7 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
DemoService: demoService, DemoService: demoService,
UpgradeService: upgradeService, UpgradeService: upgradeService,
AdminCreationDone: adminCreationDone, AdminCreationDone: adminCreationDone,
PendingActionsService: pendingActionsService,
} }
} }

View File

@ -35,6 +35,7 @@ type (
User() UserService User() UserService
Version() VersionService Version() VersionService
Webhook() WebhookService Webhook() WebhookService
PendingActions() PendingActionsService
} }
DataStore interface { DataStore interface {
@ -72,6 +73,11 @@ type (
GetNextIdentifier() int GetNextIdentifier() int
} }
PendingActionsService interface {
BaseCRUD[portainer.PendingActions, portainer.PendingActionsID]
GetNextIdentifier() int
}
// EdgeStackService represents a service to manage Edge stacks // EdgeStackService represents a service to manage Edge stacks
EdgeStackService interface { EdgeStackService interface {
EdgeStacks() ([]portainer.EdgeStack, error) EdgeStacks() ([]portainer.EdgeStack, error)

View File

@ -0,0 +1,74 @@
package pendingactions
import (
"time"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
)
const (
BucketName = "pending_actions"
)
type Service struct {
dataservices.BaseDataService[portainer.PendingActions, portainer.PendingActionsID]
}
type ServiceTx struct {
dataservices.BaseDataServiceTx[portainer.PendingActions, portainer.PendingActionsID]
}
func NewService(connection portainer.Connection) (*Service, error) {
err := connection.SetServiceName(BucketName)
if err != nil {
return nil, err
}
return &Service{
BaseDataService: dataservices.BaseDataService[portainer.PendingActions, portainer.PendingActionsID]{
Bucket: BucketName,
Connection: connection,
},
}, nil
}
func (s Service) Create(config *portainer.PendingActions) error {
return s.Connection.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Create(config)
})
}
func (s Service) Update(ID portainer.PendingActionsID, config *portainer.PendingActions) error {
return s.Connection.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Update(ID, config)
})
}
func (service *Service) Tx(tx portainer.Transaction) ServiceTx {
return ServiceTx{
BaseDataServiceTx: dataservices.BaseDataServiceTx[portainer.PendingActions, portainer.PendingActionsID]{
Bucket: BucketName,
Connection: service.Connection,
Tx: tx,
},
}
}
func (s ServiceTx) Create(config *portainer.PendingActions) error {
return s.Tx.CreateObject(BucketName, func(id uint64) (int, interface{}) {
config.ID = portainer.PendingActionsID(id)
config.CreatedAt = time.Now().Unix()
return int(config.ID), config
})
}
func (s ServiceTx) Update(ID portainer.PendingActionsID, config *portainer.PendingActions) error {
return s.BaseDataServiceTx.Update(ID, config)
}
// GetNextIdentifier returns the next identifier for a custom template.
func (service *Service) GetNextIdentifier() int {
return service.Connection.GetNextIdentifier(BucketName)
}

View File

@ -20,6 +20,7 @@ import (
"github.com/portainer/portainer/api/dataservices/extension" "github.com/portainer/portainer/api/dataservices/extension"
"github.com/portainer/portainer/api/dataservices/fdoprofile" "github.com/portainer/portainer/api/dataservices/fdoprofile"
"github.com/portainer/portainer/api/dataservices/helmuserrepository" "github.com/portainer/portainer/api/dataservices/helmuserrepository"
"github.com/portainer/portainer/api/dataservices/pendingactions"
"github.com/portainer/portainer/api/dataservices/registry" "github.com/portainer/portainer/api/dataservices/registry"
"github.com/portainer/portainer/api/dataservices/resourcecontrol" "github.com/portainer/portainer/api/dataservices/resourcecontrol"
"github.com/portainer/portainer/api/dataservices/role" "github.com/portainer/portainer/api/dataservices/role"
@ -72,6 +73,7 @@ type Store struct {
UserService *user.Service UserService *user.Service
VersionService *version.Service VersionService *version.Service
WebhookService *webhook.Service WebhookService *webhook.Service
PendingActionsService *pendingactions.Service
} }
func (store *Store) initServices() error { func (store *Store) initServices() error {
@ -238,9 +240,20 @@ func (store *Store) initServices() error {
} }
store.ScheduleService = scheduleService store.ScheduleService = scheduleService
pendingActionsService, err := pendingactions.NewService(store.connection)
if err != nil {
return err
}
store.PendingActionsService = pendingActionsService
return nil return nil
} }
// PendingActions gives access to the PendingActions data management layer
func (store *Store) PendingActions() dataservices.PendingActionsService {
return store.PendingActionsService
}
// CustomTemplate gives access to the CustomTemplate data management layer // CustomTemplate gives access to the CustomTemplate data management layer
func (store *Store) CustomTemplate() dataservices.CustomTemplateService { func (store *Store) CustomTemplate() dataservices.CustomTemplateService {
return store.CustomTemplateService return store.CustomTemplateService

View File

@ -16,6 +16,8 @@ func (tx *StoreTx) IsErrObjectNotFound(err error) bool {
func (tx *StoreTx) CustomTemplate() dataservices.CustomTemplateService { return nil } func (tx *StoreTx) CustomTemplate() dataservices.CustomTemplateService { return nil }
func (tx *StoreTx) PendingActions() dataservices.PendingActionsService { return nil }
func (tx *StoreTx) EdgeGroup() dataservices.EdgeGroupService { func (tx *StoreTx) EdgeGroup() dataservices.EdgeGroupService {
return tx.store.EdgeGroupService.Tx(tx.tx) return tx.store.EdgeGroupService.Tx(tx.tx)
} }

View File

@ -73,7 +73,6 @@
}, },
"LastCheckInDate": 0, "LastCheckInDate": 0,
"Name": "local", "Name": "local",
"PendingActions": null,
"PostInitMigrations": { "PostInitMigrations": {
"MigrateGPUs": true, "MigrateGPUs": true,
"MigrateIngresses": true "MigrateIngresses": true

View File

@ -7,7 +7,6 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/utils"
"github.com/portainer/portainer/api/internal/tag" "github.com/portainer/portainer/api/internal/tag"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
@ -156,9 +155,12 @@ func (handler *Handler) updateEndpointGroup(tx dataservices.DataStoreTx, endpoin
err = handler.AuthorizationService.CleanNAPWithOverridePolicies(tx, &endpoint, endpointGroup) err = handler.AuthorizationService.CleanNAPWithOverridePolicies(tx, &endpoint, endpointGroup)
if err != nil { if err != nil {
// Update flag with endpoint and continue // Update flag with endpoint and continue
endpoint.PendingActions = utils.GetUpdatedEndpointPendingActions(&endpoint, "CleanNAPWithOverridePolicies", endpointGroup.ID) handler.PendingActionsService.Create(portainer.PendingActions{
err = tx.Endpoint().UpdateEndpoint(endpoint.ID, &endpoint) EndpointID: endpoint.ID,
log.Warn().Err(err).Msgf("Unable to update user authorizations for endpoint (%d) and endpopint group (%d)", endpoint.ID, endpointGroup.ID) Action: "CleanNAPWithOverridePolicies",
ActionData: endpointGroup.ID,
})
log.Warn().Err(err).Msgf("Unable to update user authorizations for endpoint (%d) and endpoint group (%d).", endpoint.ID, endpointGroup.ID)
} }
} }
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/security" "github.com/portainer/portainer/api/http/security"
"github.com/portainer/portainer/api/internal/authorization" "github.com/portainer/portainer/api/internal/authorization"
"github.com/portainer/portainer/api/pendingactions"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -14,8 +15,9 @@ import (
// Handler is the HTTP handler used to handle environment(endpoint) group operations. // Handler is the HTTP handler used to handle environment(endpoint) group operations.
type Handler struct { type Handler struct {
*mux.Router *mux.Router
AuthorizationService *authorization.Service AuthorizationService *authorization.Service
DataStore dataservices.DataStore DataStore dataservices.DataStore
PendingActionsService *pendingactions.PendingActionsService
} }
// NewHandler creates a handler to manage environment(endpoint) group operations. // NewHandler creates a handler to manage environment(endpoint) group operations.

View File

@ -4,7 +4,6 @@ import (
"net/http" "net/http"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/http/utils"
"github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/endpointutils"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
@ -79,8 +78,8 @@ func (handler *Handler) endpointInspect(w http.ResponseWriter, r *http.Request)
} }
} }
// Run the pending actions // Execute endpoint pending actions
utils.RunPendingActions(endpoint, handler.DataStore, handler.AuthorizationService) handler.PendingActionsService.Execute(endpoint.ID)
return response.JSON(w, endpoint) return response.JSON(w, endpoint)
} }

View File

@ -196,7 +196,7 @@ func setupEndpointListHandler(t *testing.T, endpoints []portainer.Endpoint) *Han
handler := NewHandler(bouncer, nil) handler := NewHandler(bouncer, nil)
handler.DataStore = store handler.DataStore = store
handler.ComposeStackManager = testhelpers.NewComposeStackManager() handler.ComposeStackManager = testhelpers.NewComposeStackManager()
handler.SnapshotService, _ = snapshot.NewService("1s", store, nil, nil, nil) handler.SnapshotService, _ = snapshot.NewService("1s", store, nil, nil, nil, nil)
return handler return handler
} }

View File

@ -12,6 +12,7 @@ import (
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
"github.com/rs/zerolog/log"
) )
type endpointUpdatePayload struct { type endpointUpdatePayload struct {
@ -264,7 +265,12 @@ func (handler *Handler) endpointUpdate(w http.ResponseWriter, r *http.Request) *
if endpoint.Type == portainer.KubernetesLocalEnvironment || endpoint.Type == portainer.AgentOnKubernetesEnvironment || endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment { if endpoint.Type == portainer.KubernetesLocalEnvironment || endpoint.Type == portainer.AgentOnKubernetesEnvironment || endpoint.Type == portainer.EdgeAgentOnKubernetesEnvironment {
err = handler.AuthorizationService.CleanNAPWithOverridePolicies(handler.DataStore, endpoint, nil) err = handler.AuthorizationService.CleanNAPWithOverridePolicies(handler.DataStore, endpoint, nil)
if err != nil { if err != nil {
return httperror.InternalServerError("Unable to update user authorizations", err) handler.PendingActionsService.Create(portainer.PendingActions{
EndpointID: endpoint.ID,
Action: "CleanNAPWithOverridePolicies",
ActionData: nil,
})
log.Warn().Err(err).Msgf("Unable to clean NAP with override policies for endpoint (%d). Will try to update when endpoint is online.", endpoint.ID)
} }
} }
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/portainer/portainer/api/http/security" "github.com/portainer/portainer/api/http/security"
"github.com/portainer/portainer/api/internal/authorization" "github.com/portainer/portainer/api/internal/authorization"
"github.com/portainer/portainer/api/kubernetes/cli" "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/pendingactions"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -25,18 +26,19 @@ func hideFields(endpoint *portainer.Endpoint) {
// Handler is the HTTP handler used to handle environment(endpoint) operations. // Handler is the HTTP handler used to handle environment(endpoint) operations.
type Handler struct { type Handler struct {
*mux.Router *mux.Router
requestBouncer security.BouncerService requestBouncer security.BouncerService
demoService *demo.Service demoService *demo.Service
DataStore dataservices.DataStore DataStore dataservices.DataStore
FileService portainer.FileService FileService portainer.FileService
ProxyManager *proxy.Manager ProxyManager *proxy.Manager
ReverseTunnelService portainer.ReverseTunnelService ReverseTunnelService portainer.ReverseTunnelService
SnapshotService portainer.SnapshotService SnapshotService portainer.SnapshotService
K8sClientFactory *cli.ClientFactory K8sClientFactory *cli.ClientFactory
ComposeStackManager portainer.ComposeStackManager ComposeStackManager portainer.ComposeStackManager
AuthorizationService *authorization.Service AuthorizationService *authorization.Service
BindAddress string BindAddress string
BindAddressHTTPS string BindAddressHTTPS string
PendingActionsService *pendingactions.PendingActionsService
} }
// NewHandler creates a handler to manage environment(endpoint) operations. // NewHandler creates a handler to manage environment(endpoint) operations.

View File

@ -64,6 +64,7 @@ import (
"github.com/portainer/portainer/api/internal/upgrade" "github.com/portainer/portainer/api/internal/upgrade"
k8s "github.com/portainer/portainer/api/kubernetes" k8s "github.com/portainer/portainer/api/kubernetes"
"github.com/portainer/portainer/api/kubernetes/cli" "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/portainer/portainer/api/pendingactions"
"github.com/portainer/portainer/api/scheduler" "github.com/portainer/portainer/api/scheduler"
"github.com/portainer/portainer/api/stacks/deployments" "github.com/portainer/portainer/api/stacks/deployments"
"github.com/portainer/portainer/pkg/libhelm" "github.com/portainer/portainer/pkg/libhelm"
@ -110,6 +111,7 @@ type Server struct {
DemoService *demo.Service DemoService *demo.Service
UpgradeService upgrade.Service UpgradeService upgrade.Service
AdminCreationDone chan struct{} AdminCreationDone chan struct{}
PendingActionsService *pendingactions.PendingActionsService
} }
// Start starts the HTTP server // Start starts the HTTP server
@ -178,12 +180,14 @@ func (server *Server) Start() error {
endpointHandler.AuthorizationService = server.AuthorizationService endpointHandler.AuthorizationService = server.AuthorizationService
endpointHandler.BindAddress = server.BindAddress endpointHandler.BindAddress = server.BindAddress
endpointHandler.BindAddressHTTPS = server.BindAddressHTTPS endpointHandler.BindAddressHTTPS = server.BindAddressHTTPS
endpointHandler.PendingActionsService = server.PendingActionsService
var endpointEdgeHandler = endpointedge.NewHandler(requestBouncer, server.DataStore, server.FileService, server.ReverseTunnelService) var endpointEdgeHandler = endpointedge.NewHandler(requestBouncer, server.DataStore, server.FileService, server.ReverseTunnelService)
var endpointGroupHandler = endpointgroups.NewHandler(requestBouncer) var endpointGroupHandler = endpointgroups.NewHandler(requestBouncer)
endpointGroupHandler.AuthorizationService = server.AuthorizationService endpointGroupHandler.AuthorizationService = server.AuthorizationService
endpointGroupHandler.DataStore = server.DataStore endpointGroupHandler.DataStore = server.DataStore
endpointGroupHandler.PendingActionsService = server.PendingActionsService
var endpointProxyHandler = endpointproxy.NewHandler(requestBouncer) var endpointProxyHandler = endpointproxy.NewHandler(requestBouncer)
endpointProxyHandler.DataStore = server.DataStore endpointProxyHandler.DataStore = server.DataStore

View File

@ -1,61 +0,0 @@
package utils
import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/authorization"
"github.com/rs/zerolog/log"
)
func EndpointPendingActions(endpoint *portainer.Endpoint) *portainer.EndpointPendingActions {
return endpoint.PendingActions
}
func GetUpdatedEndpointPendingActions(endpoint *portainer.Endpoint, action string, value interface{}) *portainer.EndpointPendingActions {
if endpoint.PendingActions == nil {
endpoint.PendingActions = &portainer.EndpointPendingActions{}
}
switch action {
case "CleanNAPWithOverridePolicies":
endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups = append(endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups, value.(portainer.EndpointGroupID))
}
return endpoint.PendingActions
}
func RunPendingActions(endpoint *portainer.Endpoint, dataStore dataservices.DataStoreTx, authorizationService *authorization.Service) error {
if endpoint.PendingActions == nil {
return nil
}
log.Info().Msgf("Running pending actions for endpoint %d", endpoint.ID)
if endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups != nil {
log.Info().Int("endpoint_id", int(endpoint.ID)).Msgf("Cleaning NAP with override policies for endpoint groups %v", endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups)
failedEndpointGroupIDs := make([]portainer.EndpointGroupID, 0)
for _, endpointGroupID := range endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups {
endpointGroup, err := dataStore.EndpointGroup().Read(portainer.EndpointGroupID(endpointGroupID))
if err != nil {
log.Error().Err(err).Msgf("Error reading endpoint group to clean NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID)
failedEndpointGroupIDs = append(failedEndpointGroupIDs, endpointGroupID)
continue
}
err = authorizationService.CleanNAPWithOverridePolicies(dataStore, endpoint, endpointGroup)
if err != nil {
failedEndpointGroupIDs = append(failedEndpointGroupIDs, endpointGroupID)
log.Error().Err(err).Msgf("Error cleaning NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID)
}
}
endpoint.PendingActions.CleanNAPWithOverridePolicies.EndpointGroups = failedEndpointGroupIDs
err := dataStore.Endpoint().UpdateEndpoint(endpoint.ID, endpoint)
if err != nil {
log.Error().Err(err).Msgf("While running pending actions, error updating endpoint %d", endpoint.ID)
return err
}
}
return nil
}

View File

@ -10,9 +10,8 @@ import (
"github.com/portainer/portainer/api/agent" "github.com/portainer/portainer/api/agent"
"github.com/portainer/portainer/api/crypto" "github.com/portainer/portainer/api/crypto"
"github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/http/utils"
"github.com/portainer/portainer/api/internal/authorization"
"github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/endpointutils"
"github.com/portainer/portainer/api/pendingactions"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -27,10 +26,18 @@ type Service struct {
dockerSnapshotter portainer.DockerSnapshotter dockerSnapshotter portainer.DockerSnapshotter
kubernetesSnapshotter portainer.KubernetesSnapshotter kubernetesSnapshotter portainer.KubernetesSnapshotter
shutdownCtx context.Context shutdownCtx context.Context
pendingActionsService *pendingactions.PendingActionsService
} }
// NewService creates a new instance of a service // NewService creates a new instance of a service
func NewService(snapshotIntervalFromFlag string, dataStore dataservices.DataStore, dockerSnapshotter portainer.DockerSnapshotter, kubernetesSnapshotter portainer.KubernetesSnapshotter, shutdownCtx context.Context) (*Service, error) { func NewService(
snapshotIntervalFromFlag string,
dataStore dataservices.DataStore,
dockerSnapshotter portainer.DockerSnapshotter,
kubernetesSnapshotter portainer.KubernetesSnapshotter,
shutdownCtx context.Context,
pendingActionsService *pendingactions.PendingActionsService,
) (*Service, error) {
interval, err := parseSnapshotFrequency(snapshotIntervalFromFlag, dataStore) interval, err := parseSnapshotFrequency(snapshotIntervalFromFlag, dataStore)
if err != nil { if err != nil {
return nil, err return nil, err
@ -43,6 +50,7 @@ func NewService(snapshotIntervalFromFlag string, dataStore dataservices.DataStor
dockerSnapshotter: dockerSnapshotter, dockerSnapshotter: dockerSnapshotter,
kubernetesSnapshotter: kubernetesSnapshotter, kubernetesSnapshotter: kubernetesSnapshotter,
shutdownCtx: shutdownCtx, shutdownCtx: shutdownCtx,
pendingActionsService: pendingActionsService,
}, nil }, nil
} }
@ -263,7 +271,7 @@ func (service *Service) snapshotEndpoints() error {
snapshotError := service.SnapshotEndpoint(&endpoint) snapshotError := service.SnapshotEndpoint(&endpoint)
service.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { service.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
updateEndpointStatus(tx, &endpoint, snapshotError) updateEndpointStatus(tx, &endpoint, snapshotError, service.pendingActionsService)
return nil return nil
}) })
} }
@ -271,7 +279,7 @@ func (service *Service) snapshotEndpoints() error {
return nil return nil
} }
func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint, snapshotError error) { func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint, snapshotError error, pendingActionsService *pendingactions.PendingActionsService) {
latestEndpointReference, err := tx.Endpoint().Endpoint(endpoint.ID) latestEndpointReference, err := tx.Endpoint().Endpoint(endpoint.ID)
if latestEndpointReference == nil { if latestEndpointReference == nil {
log.Debug(). log.Debug().
@ -304,7 +312,7 @@ func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpo
// Run the pending actions // Run the pending actions
if latestEndpointReference.Status == portainer.EndpointStatusUp { if latestEndpointReference.Status == portainer.EndpointStatusUp {
utils.RunPendingActions(latestEndpointReference, tx, authorization.NewService(tx)) pendingActionsService.Execute(endpoint.ID)
} }
} }

View File

@ -34,6 +34,7 @@ type testDatastore struct {
user dataservices.UserService user dataservices.UserService
version dataservices.VersionService version dataservices.VersionService
webhook dataservices.WebhookService webhook dataservices.WebhookService
pendingActionsService dataservices.PendingActionsService
} }
func (d *testDatastore) BackupTo(io.Writer) error { return nil } func (d *testDatastore) BackupTo(io.Writer) error { return nil }
@ -82,6 +83,10 @@ func (d *testDatastore) User() dataservices.UserService { re
func (d *testDatastore) Version() dataservices.VersionService { return d.version } func (d *testDatastore) Version() dataservices.VersionService { return d.version }
func (d *testDatastore) Webhook() dataservices.WebhookService { return d.webhook } func (d *testDatastore) Webhook() dataservices.WebhookService { return d.webhook }
func (d *testDatastore) PendingActions() dataservices.PendingActionsService {
return d.pendingActionsService
}
func (d *testDatastore) IsErrObjectNotFound(e error) bool { func (d *testDatastore) IsErrObjectNotFound(e error) bool {
return false return false
} }

View File

@ -0,0 +1,113 @@
package pendingactions
import (
"context"
"fmt"
"sync"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/internal/authorization"
kubecli "github.com/portainer/portainer/api/kubernetes/cli"
"github.com/rs/zerolog/log"
)
type (
PendingActionsService struct {
authorizationService *authorization.Service
clientFactory *kubecli.ClientFactory
dataStore dataservices.DataStore
shutdownCtx context.Context
mu sync.Mutex
}
)
func NewService(
dataStore dataservices.DataStore,
clientFactory *kubecli.ClientFactory,
authorizationService *authorization.Service,
shutdownCtx context.Context,
) *PendingActionsService {
return &PendingActionsService{
dataStore: dataStore,
shutdownCtx: shutdownCtx,
authorizationService: authorizationService,
clientFactory: clientFactory,
mu: sync.Mutex{},
}
}
func (service *PendingActionsService) Create(r portainer.PendingActions) error {
return service.dataStore.PendingActions().Create(&r)
}
func (service *PendingActionsService) Execute(id portainer.EndpointID) error {
service.mu.Lock()
defer service.mu.Unlock()
endpoint, err := service.dataStore.Endpoint().Endpoint(id)
if err != nil {
return fmt.Errorf("failed to retrieve endpoint %d: %w", id, err)
}
if endpoint.Status != portainer.EndpointStatusUp {
log.Debug().Msgf("Endpoint %d is not up", id)
return fmt.Errorf("endpoint %d is not up: %w", id, err)
}
pendingActions, err := service.dataStore.PendingActions().ReadAll()
if err != nil {
log.Error().Err(err).Msgf("failed to retrieve pending actions")
return fmt.Errorf("failed to retrieve pending actions for endpoint %d: %w", id, err)
}
for _, endpointPendingAction := range pendingActions {
if endpointPendingAction.EndpointID == id {
err := service.executePendingAction(endpointPendingAction, endpoint)
if err != nil {
log.Error().Err(err).Msgf("failed to execute pending action")
return fmt.Errorf("failed to execute pending action: %w", err)
} else {
// delete the pending action
err := service.dataStore.PendingActions().Delete(endpointPendingAction.ID)
if err != nil {
log.Error().Err(err).Msgf("failed to delete pending action")
return fmt.Errorf("failed to delete pending action: %w", err)
}
}
}
}
return nil
}
func (service *PendingActionsService) executePendingAction(pendingAction portainer.PendingActions, endpoint *portainer.Endpoint) error {
log.Debug().Msgf("Executing pending action %s for endpoint %d", pendingAction.Action, pendingAction.EndpointID)
defer func() {
log.Debug().Msgf("End executing pending action %s for endpoint %d", pendingAction.Action, pendingAction.EndpointID)
}()
switch pendingAction.Action {
case "CleanNAPWithOverridePolicies":
if (pendingAction.ActionData == nil) || (pendingAction.ActionData.(portainer.EndpointGroupID) == 0) {
service.authorizationService.CleanNAPWithOverridePolicies(service.dataStore, endpoint, nil)
} else {
endpointGroupID := pendingAction.ActionData.(portainer.EndpointGroupID)
endpointGroup, err := service.dataStore.EndpointGroup().Read(portainer.EndpointGroupID(endpointGroupID))
if err != nil {
log.Error().Err(err).Msgf("Error reading endpoint group to clean NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID)
return fmt.Errorf("failed to retrieve endpoint group %d: %w", endpointGroupID, err)
}
err = service.authorizationService.CleanNAPWithOverridePolicies(service.dataStore, endpoint, endpointGroup)
if err != nil {
log.Error().Err(err).Msgf("Error cleaning NAP with override policies for endpoint %d and endpoint group %d", endpoint.ID, endpointGroup.ID)
return fmt.Errorf("failed to clean NAP with override policies for endpoint %d and endpoint group %d: %w", endpoint.ID, endpointGroup.ID, err)
}
}
return nil
}
return nil
}

View File

@ -373,10 +373,13 @@ type (
//EdgeStackStatusType represents an edge stack status type //EdgeStackStatusType represents an edge stack status type
EdgeStackStatusType int EdgeStackStatusType int
EndpointPendingActions struct { PendingActionsID int
CleanNAPWithOverridePolicies struct { PendingActions struct {
EndpointGroups []EndpointGroupID `json:"EndpointGroups"` ID PendingActionsID `json:"ID"`
} `json:"CleanNAPWithOverridePolicies"` EndpointID EndpointID `json:"EndpointID"`
Action string `json:"Action"`
ActionData interface{} `json:"ActionData"`
CreatedAt int64 `json:"CreatedAt"`
} }
// Environment(Endpoint) represents a Docker environment(endpoint) with all the info required // Environment(Endpoint) represents a Docker environment(endpoint) with all the info required
@ -434,9 +437,6 @@ type (
// Whether we need to run any "post init migrations". // Whether we need to run any "post init migrations".
PostInitMigrations EndpointPostInitMigrations `json:"PostInitMigrations"` PostInitMigrations EndpointPostInitMigrations `json:"PostInitMigrations"`
// Whether we need to run any action when an endpoint is back online.
PendingActions *EndpointPendingActions `json:"PendingActions"`
Edge EnvironmentEdgeSettings Edge EnvironmentEdgeSettings
Agent struct { Agent struct {