diff --git a/api/bolt/datastore.go b/api/bolt/datastore.go index 05ec10d10..9ace83b9a 100644 --- a/api/bolt/datastore.go +++ b/api/bolt/datastore.go @@ -9,6 +9,7 @@ import ( "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/bolt/dockerhub" "github.com/portainer/portainer/api/bolt/edgegroup" + "github.com/portainer/portainer/api/bolt/edgejob" "github.com/portainer/portainer/api/bolt/edgestack" "github.com/portainer/portainer/api/bolt/endpoint" "github.com/portainer/portainer/api/bolt/endpointgroup" @@ -44,6 +45,7 @@ type Store struct { fileService portainer.FileService DockerHubService *dockerhub.Service EdgeGroupService *edgegroup.Service + EdgeJobService *edgejob.Service EdgeStackService *edgestack.Service EndpointGroupService *endpointgroup.Service EndpointService *endpoint.Service @@ -184,6 +186,12 @@ func (store *Store) initServices() error { } store.EdgeGroupService = edgeGroupService + edgeJobService, err := edgejob.NewService(store.db) + if err != nil { + return err + } + store.EdgeJobService = edgeJobService + endpointgroupService, err := endpointgroup.NewService(store.db) if err != nil { return err @@ -293,6 +301,11 @@ func (store *Store) EdgeGroup() portainer.EdgeGroupService { return store.EdgeGroupService } +// EdgeJob gives access to the EdgeJob data management layer +func (store *Store) EdgeJob() portainer.EdgeJobService { + return store.EdgeJobService +} + // EdgeStack gives access to the EdgeStack data management layer func (store *Store) EdgeStack() portainer.EdgeStackService { return store.EdgeStackService @@ -333,11 +346,6 @@ func (store *Store) Role() portainer.RoleService { return store.RoleService } -// Schedule gives access to the Schedule data management layer -func (store *Store) Schedule() portainer.ScheduleService { - return store.ScheduleService -} - // Settings gives access to the Settings data management layer func (store *Store) Settings() portainer.SettingsService { return store.SettingsService diff --git a/api/bolt/edgejob/edgejob.go b/api/bolt/edgejob/edgejob.go new file mode 100644 index 000000000..f3354c7d8 --- /dev/null +++ b/api/bolt/edgejob/edgejob.go @@ -0,0 +1,101 @@ +package edgejob + +import ( + "github.com/boltdb/bolt" + "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/bolt/internal" +) + +const ( + // BucketName represents the name of the bucket where this service stores data. + BucketName = "edgejobs" +) + +// Service represents a service for managing edge jobs data. +type Service struct { + db *bolt.DB +} + +// NewService creates a new instance of a service. +func NewService(db *bolt.DB) (*Service, error) { + err := internal.CreateBucket(db, BucketName) + if err != nil { + return nil, err + } + + return &Service{ + db: db, + }, nil +} + +// EdgeJobs returns a list of Edge jobs +func (service *Service) EdgeJobs() ([]portainer.EdgeJob, error) { + var edgeJobs = make([]portainer.EdgeJob, 0) + + err := service.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(BucketName)) + + cursor := bucket.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + var edgeJob portainer.EdgeJob + err := internal.UnmarshalObject(v, &edgeJob) + if err != nil { + return err + } + edgeJobs = append(edgeJobs, edgeJob) + } + + return nil + }) + + return edgeJobs, err +} + +// EdgeJob returns an Edge job by ID +func (service *Service) EdgeJob(ID portainer.EdgeJobID) (*portainer.EdgeJob, error) { + var edgeJob portainer.EdgeJob + identifier := internal.Itob(int(ID)) + + err := internal.GetObject(service.db, BucketName, identifier, &edgeJob) + if err != nil { + return nil, err + } + + return &edgeJob, nil +} + +// CreateEdgeJob creates a new Edge job +func (service *Service) CreateEdgeJob(edgeJob *portainer.EdgeJob) error { + return service.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(BucketName)) + + if edgeJob.ID == 0 { + id, _ := bucket.NextSequence() + edgeJob.ID = portainer.EdgeJobID(id) + } + + data, err := internal.MarshalObject(edgeJob) + if err != nil { + return err + } + + return bucket.Put(internal.Itob(int(edgeJob.ID)), data) + }) +} + +// UpdateEdgeJob updates an Edge job by ID +func (service *Service) UpdateEdgeJob(ID portainer.EdgeJobID, edgeJob *portainer.EdgeJob) error { + identifier := internal.Itob(int(ID)) + return internal.UpdateObject(service.db, BucketName, identifier, edgeJob) +} + +// DeleteEdgeJob deletes an Edge job +func (service *Service) DeleteEdgeJob(ID portainer.EdgeJobID) error { + identifier := internal.Itob(int(ID)) + return internal.DeleteObject(service.db, BucketName, identifier) +} + +// GetNextIdentifier returns the next identifier for an endpoint. +func (service *Service) GetNextIdentifier() int { + return internal.GetNextIdentifier(service.db, BucketName) +} diff --git a/api/bolt/migrator/migrate_dbversion19.go b/api/bolt/migrator/migrate_dbversion19.go index 00a41a4e4..9f793f41f 100644 --- a/api/bolt/migrator/migrate_dbversion19.go +++ b/api/bolt/migrator/migrate_dbversion19.go @@ -2,10 +2,10 @@ package migrator import ( "strings" - - portainer "github.com/portainer/portainer/api" ) +const scheduleScriptExecutionJobType = 1 + func (m *Migrator) updateUsersToDBVersion20() error { return m.authorizationService.UpdateUsersAuthorizations() } @@ -28,7 +28,7 @@ func (m *Migrator) updateSchedulesToDBVersion20() error { } for _, schedule := range legacySchedules { - if schedule.JobType == portainer.ScriptExecutionJobType { + if schedule.JobType == scheduleScriptExecutionJobType { if schedule.CronExpression == "0 0 * * *" { schedule.CronExpression = "0 * * * *" } else if schedule.CronExpression == "0 0 0/2 * *" { diff --git a/api/bolt/migrator/migrate_dbversion23.go b/api/bolt/migrator/migrate_dbversion23.go index fe4deca48..01c97d152 100644 --- a/api/bolt/migrator/migrate_dbversion23.go +++ b/api/bolt/migrator/migrate_dbversion23.go @@ -1,6 +1,8 @@ package migrator -import portainer "github.com/portainer/portainer/api" +import ( + "github.com/portainer/portainer/api" +) func (m *Migrator) updateSettingsToDB24() error { legacySettings, err := m.settingsService.Settings() diff --git a/api/chisel/schedules.go b/api/chisel/schedules.go index 39ba9a340..bac424fcb 100644 --- a/api/chisel/schedules.go +++ b/api/chisel/schedules.go @@ -6,42 +6,42 @@ import ( portainer "github.com/portainer/portainer/api" ) -// AddSchedule register a schedule inside the tunnel details associated to an endpoint. -func (service *Service) AddSchedule(endpointID portainer.EndpointID, schedule *portainer.EdgeSchedule) { +// AddEdgeJob register an EdgeJob inside the tunnel details associated to an endpoint. +func (service *Service) AddEdgeJob(endpointID portainer.EndpointID, edgeJob *portainer.EdgeJob) { tunnel := service.GetTunnelDetails(endpointID) - existingScheduleIndex := -1 - for idx, existingSchedule := range tunnel.Schedules { - if existingSchedule.ID == schedule.ID { - existingScheduleIndex = idx + existingJobIndex := -1 + for idx, existingJob := range tunnel.Jobs { + if existingJob.ID == edgeJob.ID { + existingJobIndex = idx break } } - if existingScheduleIndex == -1 { - tunnel.Schedules = append(tunnel.Schedules, *schedule) + if existingJobIndex == -1 { + tunnel.Jobs = append(tunnel.Jobs, *edgeJob) } else { - tunnel.Schedules[existingScheduleIndex] = *schedule + tunnel.Jobs[existingJobIndex] = *edgeJob } key := strconv.Itoa(int(endpointID)) service.tunnelDetailsMap.Set(key, tunnel) } -// RemoveSchedule will remove the specified schedule from each tunnel it was registered with. -func (service *Service) RemoveSchedule(scheduleID portainer.ScheduleID) { +// RemoveEdgeJob will remove the specified Edge job from each tunnel it was registered with. +func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) { for item := range service.tunnelDetailsMap.IterBuffered() { tunnelDetails := item.Val.(*portainer.TunnelDetails) - updatedSchedules := make([]portainer.EdgeSchedule, 0) - for _, schedule := range tunnelDetails.Schedules { - if schedule.ID == scheduleID { + updatedJobs := make([]portainer.EdgeJob, 0) + for _, edgeJob := range tunnelDetails.Jobs { + if edgeJob.ID == edgeJobID { continue } - updatedSchedules = append(updatedSchedules, schedule) + updatedJobs = append(updatedJobs, edgeJob) } - tunnelDetails.Schedules = updatedSchedules + tunnelDetails.Jobs = updatedJobs service.tunnelDetailsMap.Set(item.Key, tunnelDetails) } } diff --git a/api/chisel/service.go b/api/chisel/service.go index e007a416a..e21b67358 100644 --- a/api/chisel/service.go +++ b/api/chisel/service.go @@ -155,7 +155,7 @@ func (service *Service) checkTunnels() { } } - if len(tunnel.Schedules) > 0 { + if len(tunnel.Jobs) > 0 { endpointID, err := strconv.Atoi(item.Key) if err != nil { log.Printf("[ERROR] [chisel,conversion] Invalid endpoint identifier (id: %s): %s", item.Key, err) diff --git a/api/chisel/tunnel.go b/api/chisel/tunnel.go index ba9495419..1306df48c 100644 --- a/api/chisel/tunnel.go +++ b/api/chisel/tunnel.go @@ -47,11 +47,11 @@ func (service *Service) GetTunnelDetails(endpointID portainer.EndpointID) *porta return tunnelDetails } - schedules := make([]portainer.EdgeSchedule, 0) + jobs := make([]portainer.EdgeJob, 0) return &portainer.TunnelDetails{ Status: portainer.EdgeAgentIdle, Port: 0, - Schedules: schedules, + Jobs: jobs, Credentials: "", } } diff --git a/api/cmd/portainer/main.go b/api/cmd/portainer/main.go index 3ec42000c..728018ccd 100644 --- a/api/cmd/portainer/main.go +++ b/api/cmd/portainer/main.go @@ -8,11 +8,11 @@ import ( "github.com/portainer/portainer/api/chisel" "github.com/portainer/portainer/api/internal/authorization" + "github.com/portainer/portainer/api/internal/snapshot" "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/bolt" "github.com/portainer/portainer/api/cli" - "github.com/portainer/portainer/api/cron" "github.com/portainer/portainer/api/crypto" "github.com/portainer/portainer/api/docker" "github.com/portainer/portainer/api/exec" @@ -115,75 +115,16 @@ func initSnapshotter(clientFactory *docker.ClientFactory) portainer.Snapshotter return docker.NewSnapshotter(clientFactory) } -func initJobScheduler() portainer.JobScheduler { - return cron.NewJobScheduler() -} - -func loadSnapshotSystemSchedule(jobScheduler portainer.JobScheduler, snapshotter portainer.Snapshotter, dataStore portainer.DataStore) error { - settings, err := dataStore.Settings().Settings() +func loadEdgeJobsFromDatabase(dataStore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService) error { + edgeJobs, err := dataStore.EdgeJob().EdgeJobs() if err != nil { return err } - schedules, err := dataStore.Schedule().SchedulesByJobType(portainer.SnapshotJobType) - if err != nil { - return err - } - - var snapshotSchedule *portainer.Schedule - if len(schedules) == 0 { - snapshotJob := &portainer.SnapshotJob{} - snapshotSchedule = &portainer.Schedule{ - ID: portainer.ScheduleID(dataStore.Schedule().GetNextIdentifier()), - Name: "system_snapshot", - CronExpression: "@every " + settings.SnapshotInterval, - Recurring: true, - JobType: portainer.SnapshotJobType, - SnapshotJob: snapshotJob, - Created: time.Now().Unix(), + for _, edgeJob := range edgeJobs { + for endpointID := range edgeJob.Endpoints { + reverseTunnelService.AddEdgeJob(endpointID, &edgeJob) } - } else { - snapshotSchedule = &schedules[0] - } - - snapshotJobContext := cron.NewSnapshotJobContext(dataStore, snapshotter) - snapshotJobRunner := cron.NewSnapshotJobRunner(snapshotSchedule, snapshotJobContext) - - err = jobScheduler.ScheduleJob(snapshotJobRunner) - if err != nil { - return err - } - - if len(schedules) == 0 { - return dataStore.Schedule().CreateSchedule(snapshotSchedule) - } - return nil -} - -func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService portainer.JobService, dataStore portainer.DataStore, fileService portainer.FileService, reverseTunnelService portainer.ReverseTunnelService) error { - schedules, err := dataStore.Schedule().Schedules() - if err != nil { - return err - } - - for _, schedule := range schedules { - - if schedule.JobType == portainer.ScriptExecutionJobType { - jobContext := cron.NewScriptExecutionJobContext(jobService, dataStore, fileService) - jobRunner := cron.NewScriptExecutionJobRunner(&schedule, jobContext) - - err = jobScheduler.ScheduleJob(jobRunner) - if err != nil { - return err - } - } - - if schedule.EdgeSchedule != nil { - for _, endpointID := range schedule.EdgeSchedule.Endpoints { - reverseTunnelService.AddSchedule(endpointID, schedule.EdgeSchedule) - } - } - } return nil @@ -357,10 +298,6 @@ func initEndpoint(flags *portainer.CLIFlags, dataStore portainer.DataStore, snap return createUnsecuredEndpoint(*flags.EndpointURL, dataStore, snapshotter) } -func initJobService(dockerClientFactory *docker.ClientFactory) portainer.JobService { - return docker.NewJobService(dockerClientFactory) -} - func initExtensionManager(fileService portainer.FileService, dataStore portainer.DataStore) (portainer.ExtensionManager, error) { extensionManager := exec.NewExtensionManager(fileService, dataStore) @@ -422,10 +359,14 @@ func main() { clientFactory := initClientFactory(digitalSignatureService, reverseTunnelService) - jobService := initJobService(clientFactory) - snapshotter := initSnapshotter(clientFactory) + snapshotService, err := snapshot.NewService(*flags.SnapshotInterval, dataStore, snapshotter) + if err != nil { + log.Fatal(err) + } + snapshotService.Start() + swarmStackManager, err := initSwarmStackManager(*flags.Assets, *flags.Data, digitalSignatureService, fileService, reverseTunnelService) if err != nil { log.Fatal(err) @@ -440,20 +381,11 @@ func main() { } } - jobScheduler := initJobScheduler() - - err = loadSchedulesFromDatabase(jobScheduler, jobService, dataStore, fileService, reverseTunnelService) + err = loadEdgeJobsFromDatabase(dataStore, reverseTunnelService) if err != nil { log.Fatal(err) } - err = loadSnapshotSystemSchedule(jobScheduler, snapshotter, dataStore) - if err != nil { - log.Fatal(err) - } - - jobScheduler.Start() - applicationStatus := initStatus(flags) err = initEndpoint(flags, dataStore, snapshotter) @@ -520,13 +452,12 @@ func main() { LDAPService: ldapService, GitService: gitService, SignatureService: digitalSignatureService, - JobScheduler: jobScheduler, + SnapshotService: snapshotService, Snapshotter: snapshotter, SSL: *flags.SSL, SSLCert: *flags.SSLCert, SSLKey: *flags.SSLKey, DockerClientFactory: clientFactory, - JobService: jobService, } log.Printf("Starting Portainer %s on %s", portainer.APIVersion, *flags.Addr) diff --git a/api/cron/job_script_execution.go b/api/cron/job_script_execution.go deleted file mode 100644 index b554e6047..000000000 --- a/api/cron/job_script_execution.go +++ /dev/null @@ -1,96 +0,0 @@ -package cron - -import ( - "log" - "time" - - "github.com/portainer/portainer/api" -) - -// ScriptExecutionJobRunner is used to run a ScriptExecutionJob -type ScriptExecutionJobRunner struct { - schedule *portainer.Schedule - context *ScriptExecutionJobContext - executedOnce bool -} - -// ScriptExecutionJobContext represents the context of execution of a ScriptExecutionJob -type ScriptExecutionJobContext struct { - dataStore portainer.DataStore - jobService portainer.JobService - fileService portainer.FileService -} - -// NewScriptExecutionJobContext returns a new context that can be used to execute a ScriptExecutionJob -func NewScriptExecutionJobContext(jobService portainer.JobService, dataStore portainer.DataStore, fileService portainer.FileService) *ScriptExecutionJobContext { - return &ScriptExecutionJobContext{ - jobService: jobService, - dataStore: dataStore, - fileService: fileService, - } -} - -// NewScriptExecutionJobRunner returns a new runner that can be scheduled -func NewScriptExecutionJobRunner(schedule *portainer.Schedule, context *ScriptExecutionJobContext) *ScriptExecutionJobRunner { - return &ScriptExecutionJobRunner{ - schedule: schedule, - context: context, - executedOnce: false, - } -} - -// Run triggers the execution of the job. -// It will iterate through all the endpoints specified in the context to -// execute the script associated to the job. -func (runner *ScriptExecutionJobRunner) Run() { - if !runner.schedule.Recurring && runner.executedOnce { - return - } - runner.executedOnce = true - - scriptFile, err := runner.context.fileService.GetFileContent(runner.schedule.ScriptExecutionJob.ScriptPath) - if err != nil { - log.Printf("scheduled job error (script execution). Unable to retrieve script file (err=%s)\n", err) - return - } - - targets := make([]*portainer.Endpoint, 0) - for _, endpointID := range runner.schedule.ScriptExecutionJob.Endpoints { - endpoint, err := runner.context.dataStore.Endpoint().Endpoint(endpointID) - if err != nil { - log.Printf("scheduled job error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err) - return - } - - targets = append(targets, endpoint) - } - - runner.executeAndRetry(targets, scriptFile, 0) -} - -func (runner *ScriptExecutionJobRunner) executeAndRetry(endpoints []*portainer.Endpoint, script []byte, retryCount int) { - retryTargets := make([]*portainer.Endpoint, 0) - - for _, endpoint := range endpoints { - err := runner.context.jobService.ExecuteScript(endpoint, "", runner.schedule.ScriptExecutionJob.Image, script, runner.schedule) - if err == portainer.ErrUnableToPingEndpoint { - retryTargets = append(retryTargets, endpoint) - } else if err != nil { - log.Printf("scheduled job error (script execution). Unable to execute script (endpoint=%s) (err=%s)\n", endpoint.Name, err) - } - } - - retryCount++ - if retryCount >= runner.schedule.ScriptExecutionJob.RetryCount { - return - } - - time.Sleep(time.Duration(runner.schedule.ScriptExecutionJob.RetryInterval) * time.Second) - - runner.executeAndRetry(retryTargets, script, retryCount) -} - -// GetSchedule returns the schedule associated to the runner -func (runner *ScriptExecutionJobRunner) GetSchedule() *portainer.Schedule { - return runner.schedule -} diff --git a/api/cron/job_snapshot.go b/api/cron/job_snapshot.go deleted file mode 100644 index 7fedc0f6d..000000000 --- a/api/cron/job_snapshot.go +++ /dev/null @@ -1,85 +0,0 @@ -package cron - -import ( - "log" - - "github.com/portainer/portainer/api" -) - -// SnapshotJobRunner is used to run a SnapshotJob -type SnapshotJobRunner struct { - schedule *portainer.Schedule - context *SnapshotJobContext -} - -// SnapshotJobContext represents the context of execution of a SnapshotJob -type SnapshotJobContext struct { - dataStore portainer.DataStore - snapshotter portainer.Snapshotter -} - -// NewSnapshotJobContext returns a new context that can be used to execute a SnapshotJob -func NewSnapshotJobContext(dataStore portainer.DataStore, snapshotter portainer.Snapshotter) *SnapshotJobContext { - return &SnapshotJobContext{ - dataStore: dataStore, - snapshotter: snapshotter, - } -} - -// NewSnapshotJobRunner returns a new runner that can be scheduled -func NewSnapshotJobRunner(schedule *portainer.Schedule, context *SnapshotJobContext) *SnapshotJobRunner { - return &SnapshotJobRunner{ - schedule: schedule, - context: context, - } -} - -// GetSchedule returns the schedule associated to the runner -func (runner *SnapshotJobRunner) GetSchedule() *portainer.Schedule { - return runner.schedule -} - -// Run triggers the execution of the schedule. -// It will iterate through all the endpoints available in the database to -// create a snapshot of each one of them. -// As a snapshot can be a long process, to avoid any concurrency issue we -// retrieve the latest version of the endpoint right after a snapshot. -func (runner *SnapshotJobRunner) Run() { - go func() { - endpoints, err := runner.context.dataStore.Endpoint().Endpoints() - if err != nil { - log.Printf("background schedule error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err) - return - } - - for _, endpoint := range endpoints { - if endpoint.Type == portainer.AzureEnvironment || endpoint.Type == portainer.EdgeAgentEnvironment { - continue - } - - snapshot, snapshotError := runner.context.snapshotter.CreateSnapshot(&endpoint) - - latestEndpointReference, err := runner.context.dataStore.Endpoint().Endpoint(endpoint.ID) - if latestEndpointReference == nil { - log.Printf("background schedule error (endpoint snapshot). Endpoint not found inside the database anymore (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) - continue - } - - latestEndpointReference.Status = portainer.EndpointStatusUp - if snapshotError != nil { - log.Printf("background schedule error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, snapshotError) - latestEndpointReference.Status = portainer.EndpointStatusDown - } - - if snapshot != nil { - latestEndpointReference.Snapshots = []portainer.Snapshot{*snapshot} - } - - err = runner.context.dataStore.Endpoint().UpdateEndpoint(latestEndpointReference.ID, latestEndpointReference) - if err != nil { - log.Printf("background schedule error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) - return - } - } - }() -} diff --git a/api/cron/scheduler.go b/api/cron/scheduler.go deleted file mode 100644 index 870105010..000000000 --- a/api/cron/scheduler.go +++ /dev/null @@ -1,116 +0,0 @@ -package cron - -import ( - "github.com/portainer/portainer/api" - "github.com/robfig/cron/v3" -) - -// JobScheduler represents a service for managing crons -type JobScheduler struct { - cron *cron.Cron -} - -// NewJobScheduler initializes a new service -func NewJobScheduler() *JobScheduler { - return &JobScheduler{ - cron: cron.New(), - } -} - -// ScheduleJob schedules the execution of a job via a runner -func (scheduler *JobScheduler) ScheduleJob(runner portainer.JobRunner) error { - _, err := scheduler.cron.AddJob(runner.GetSchedule().CronExpression, runner) - return err -} - -// UpdateSystemJobSchedule updates the first occurence of the specified -// scheduled job based on the specified job type. -// It does so by re-creating a new cron -// and adding all the existing jobs. It will then re-schedule the new job -// with the update cron expression passed in parameter. -// NOTE: the cron library do not support updating schedules directly -// hence the work-around -func (scheduler *JobScheduler) UpdateSystemJobSchedule(jobType portainer.JobType, newCronExpression string) error { - cronEntries := scheduler.cron.Entries() - newCron := cron.New() - - for _, entry := range cronEntries { - if entry.Job.(portainer.JobRunner).GetSchedule().JobType == jobType { - _, err := newCron.AddJob(newCronExpression, entry.Job) - if err != nil { - return err - } - continue - } - - newCron.Schedule(entry.Schedule, entry.Job) - } - - scheduler.cron.Stop() - scheduler.cron = newCron - scheduler.cron.Start() - return nil -} - -// UpdateJobSchedule updates a specific scheduled job by re-creating a new cron -// and adding all the existing jobs. It will then re-schedule the new job -// via the specified JobRunner parameter. -// NOTE: the cron library do not support updating schedules directly -// hence the work-around -func (scheduler *JobScheduler) UpdateJobSchedule(runner portainer.JobRunner) error { - cronEntries := scheduler.cron.Entries() - newCron := cron.New() - - for _, entry := range cronEntries { - - if entry.Job.(portainer.JobRunner).GetSchedule().ID == runner.GetSchedule().ID { - - var jobRunner cron.Job = runner - if entry.Job.(portainer.JobRunner).GetSchedule().JobType == portainer.SnapshotJobType { - jobRunner = entry.Job - } - - _, err := newCron.AddJob(runner.GetSchedule().CronExpression, jobRunner) - if err != nil { - return err - } - continue - } - - newCron.Schedule(entry.Schedule, entry.Job) - } - - scheduler.cron.Stop() - scheduler.cron = newCron - scheduler.cron.Start() - return nil -} - -// UnscheduleJob remove a scheduled job by re-creating a new cron -// and adding all the existing jobs except for the one specified via scheduleID. -// NOTE: the cron library do not support removing schedules directly -// hence the work-around -func (scheduler *JobScheduler) UnscheduleJob(scheduleID portainer.ScheduleID) { - cronEntries := scheduler.cron.Entries() - newCron := cron.New() - - for _, entry := range cronEntries { - - if entry.Job.(portainer.JobRunner).GetSchedule().ID == scheduleID { - continue - } - - newCron.Schedule(entry.Schedule, entry.Job) - } - - scheduler.cron.Stop() - scheduler.cron = newCron - scheduler.cron.Start() -} - -// Start starts the scheduled jobs -func (scheduler *JobScheduler) Start() { - if len(scheduler.cron.Entries()) > 0 { - scheduler.cron.Start() - } -} diff --git a/api/docker/job.go b/api/docker/job.go deleted file mode 100644 index ff6dae2c2..000000000 --- a/api/docker/job.go +++ /dev/null @@ -1,115 +0,0 @@ -package docker - -import ( - "bytes" - "context" - "io" - "io/ioutil" - "strconv" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/network" - "github.com/docker/docker/api/types/strslice" - "github.com/docker/docker/client" - "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/archive" -) - -// JobService represents a service that handles the execution of jobs -type JobService struct { - dockerClientFactory *ClientFactory -} - -// NewJobService returns a pointer to a new job service -func NewJobService(dockerClientFactory *ClientFactory) *JobService { - return &JobService{ - dockerClientFactory: dockerClientFactory, - } -} - -// ExecuteScript will leverage a privileged container to execute a script against the specified endpoint/nodename. -// It will copy the script content specified as a parameter inside a container based on the specified image and execute it. -func (service *JobService) ExecuteScript(endpoint *portainer.Endpoint, nodeName, image string, script []byte, schedule *portainer.Schedule) error { - buffer, err := archive.TarFileInBuffer(script, "script.sh", 0700) - if err != nil { - return err - } - - cli, err := service.dockerClientFactory.CreateClient(endpoint, nodeName) - if err != nil { - return err - } - defer cli.Close() - - _, err = cli.Ping(context.Background()) - if err != nil { - return portainer.ErrUnableToPingEndpoint - } - - err = pullImage(cli, image) - if err != nil { - return err - } - - containerConfig := &container.Config{ - AttachStdin: true, - AttachStdout: true, - AttachStderr: true, - Tty: true, - WorkingDir: "/tmp", - Image: image, - Labels: map[string]string{ - "io.portainer.job.endpoint": strconv.Itoa(int(endpoint.ID)), - }, - Cmd: strslice.StrSlice([]string{"sh", "/tmp/script.sh"}), - } - - if schedule != nil { - containerConfig.Labels["io.portainer.schedule.id"] = strconv.Itoa(int(schedule.ID)) - } - - hostConfig := &container.HostConfig{ - Binds: []string{"/:/host", "/etc:/etc:ro", "/usr:/usr:ro", "/run:/run:ro", "/sbin:/sbin:ro", "/var:/var:ro"}, - NetworkMode: "host", - Privileged: true, - } - - networkConfig := &network.NetworkingConfig{} - - body, err := cli.ContainerCreate(context.Background(), containerConfig, hostConfig, networkConfig, "") - if err != nil { - return err - } - - if schedule != nil { - err = cli.ContainerRename(context.Background(), body.ID, schedule.Name+"_"+body.ID) - if err != nil { - return err - } - } - - copyOptions := types.CopyToContainerOptions{} - err = cli.CopyToContainer(context.Background(), body.ID, "/tmp", bytes.NewReader(buffer), copyOptions) - if err != nil { - return err - } - - startOptions := types.ContainerStartOptions{} - return cli.ContainerStart(context.Background(), body.ID, startOptions) -} - -func pullImage(cli *client.Client, image string) error { - imageReadCloser, err := cli.ImagePull(context.Background(), image, types.ImagePullOptions{}) - if err != nil { - return err - } - defer imageReadCloser.Close() - - _, err = io.Copy(ioutil.Discard, imageReadCloser) - if err != nil { - return err - } - - return nil -} diff --git a/api/filesystem/filesystem.go b/api/filesystem/filesystem.go index d65ad4f8d..7d3747ae4 100644 --- a/api/filesystem/filesystem.go +++ b/api/filesystem/filesystem.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "encoding/pem" + "fmt" "io/ioutil" "github.com/portainer/portainer/api" @@ -37,8 +38,8 @@ const ( PublicKeyFile = "portainer.pub" // BinaryStorePath represents the subfolder where binaries are stored in the file store folder. BinaryStorePath = "bin" - // ScheduleStorePath represents the subfolder where schedule files are stored. - ScheduleStorePath = "schedules" + // EdgeJobStorePath represents the subfolder where schedule files are stored. + EdgeJobStorePath = "edge_jobs" // ExtensionRegistryManagementStorePath represents the subfolder where files related to the // registry management extension are stored. ExtensionRegistryManagementStorePath = "extensions" @@ -392,22 +393,22 @@ func (service *Service) getContentFromPEMFile(filePath string) ([]byte, error) { return block.Bytes, nil } -// GetScheduleFolder returns the absolute path on the filesystem for a schedule based +// GetEdgeJobFolder returns the absolute path on the filesystem for an Edge job based // on its identifier. -func (service *Service) GetScheduleFolder(identifier string) string { - return path.Join(service.fileStorePath, ScheduleStorePath, identifier) +func (service *Service) GetEdgeJobFolder(identifier string) string { + return path.Join(service.fileStorePath, EdgeJobStorePath, identifier) } -// StoreScheduledJobFileFromBytes creates a subfolder in the ScheduleStorePath and stores a new file from bytes. +// StoreEdgeJobFileFromBytes creates a subfolder in the EdgeJobStorePath and stores a new file from bytes. // It returns the path to the folder where the file is stored. -func (service *Service) StoreScheduledJobFileFromBytes(identifier string, data []byte) (string, error) { - scheduleStorePath := path.Join(ScheduleStorePath, identifier) - err := service.createDirectoryInStore(scheduleStorePath) +func (service *Service) StoreEdgeJobFileFromBytes(identifier string, data []byte) (string, error) { + edgeJobStorePath := path.Join(EdgeJobStorePath, identifier) + err := service.createDirectoryInStore(edgeJobStorePath) if err != nil { return "", err } - filePath := path.Join(scheduleStorePath, createScheduledJobFileName(identifier)) + filePath := path.Join(edgeJobStorePath, createEdgeJobFileName(identifier)) r := bytes.NewReader(data) err = service.createFileInStore(filePath, r) if err != nil { @@ -417,6 +418,52 @@ func (service *Service) StoreScheduledJobFileFromBytes(identifier string, data [ return path.Join(service.fileStorePath, filePath), nil } -func createScheduledJobFileName(identifier string) string { +func createEdgeJobFileName(identifier string) string { return "job_" + identifier + ".sh" } + +// ClearEdgeJobTaskLogs clears the Edge job task logs +func (service *Service) ClearEdgeJobTaskLogs(edgeJobID string, taskID string) error { + path := service.getEdgeJobTaskLogPath(edgeJobID, taskID) + + err := os.Remove(path) + if err != nil { + return err + } + + return nil +} + +// GetEdgeJobTaskLogFileContent fetches the Edge job task logs +func (service *Service) GetEdgeJobTaskLogFileContent(edgeJobID string, taskID string) (string, error) { + path := service.getEdgeJobTaskLogPath(edgeJobID, taskID) + + fileContent, err := ioutil.ReadFile(path) + if err != nil { + return "", err + } + + return string(fileContent), nil +} + +// StoreEdgeJobTaskLogFileFromBytes stores the log file +func (service *Service) StoreEdgeJobTaskLogFileFromBytes(edgeJobID, taskID string, data []byte) error { + edgeJobStorePath := path.Join(EdgeJobStorePath, edgeJobID) + err := service.createDirectoryInStore(edgeJobStorePath) + if err != nil { + return err + } + + filePath := path.Join(edgeJobStorePath, fmt.Sprintf("logs_%s", taskID)) + r := bytes.NewReader(data) + err = service.createFileInStore(filePath, r) + if err != nil { + return err + } + + return nil +} + +func (service *Service) getEdgeJobTaskLogPath(edgeJobID string, taskID string) string { + return fmt.Sprintf("%s/logs_%s", service.GetEdgeJobFolder(edgeJobID), taskID) +} diff --git a/api/go.mod b/api/go.mod index c8b71965a..3af5acf37 100644 --- a/api/go.mod +++ b/api/go.mod @@ -28,7 +28,6 @@ require ( github.com/portainer/libcompose v0.5.3 github.com/portainer/libcrypto v0.0.0-20190723020515-23ebe86ab2c2 github.com/portainer/libhttp v0.0.0-20190806161843-ba068f58be33 - github.com/robfig/cron/v3 v3.0.0 golang.org/x/crypto v0.0.0-20191128160524-b544559bb6d1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/src-d/go-git.v4 v4.13.1 diff --git a/api/go.sum b/api/go.sum index 621d3a831..66b50d287 100644 --- a/api/go.sum +++ b/api/go.sum @@ -185,8 +185,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= -github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= -github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= diff --git a/api/http/handler/edgegroups/handler.go b/api/http/handler/edgegroups/handler.go index 5f8446f8a..374ca4ab2 100644 --- a/api/http/handler/edgegroups/handler.go +++ b/api/http/handler/edgegroups/handler.go @@ -21,14 +21,14 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler { Router: mux.NewRouter(), } h.Handle("/edge_groups", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeGroupCreate))).Methods(http.MethodPost) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeGroupCreate)))).Methods(http.MethodPost) h.Handle("/edge_groups", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeGroupList))).Methods(http.MethodGet) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeGroupList)))).Methods(http.MethodGet) h.Handle("/edge_groups/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeGroupInspect))).Methods(http.MethodGet) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeGroupInspect)))).Methods(http.MethodGet) h.Handle("/edge_groups/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeGroupUpdate))).Methods(http.MethodPut) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeGroupUpdate)))).Methods(http.MethodPut) h.Handle("/edge_groups/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeGroupDelete))).Methods(http.MethodDelete) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeGroupDelete)))).Methods(http.MethodDelete) return h } diff --git a/api/http/handler/edgejobs/edgejob_create.go b/api/http/handler/edgejobs/edgejob_create.go new file mode 100644 index 000000000..76720006c --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_create.go @@ -0,0 +1,220 @@ +package edgejobs + +import ( + "errors" + "net/http" + "strconv" + "strings" + "time" + + "github.com/asaskevich/govalidator" + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" +) + +// POST /api/edge_jobs?method=file|string +func (handler *Handler) edgeJobCreate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + method, err := request.RetrieveQueryParameter(r, "method", false) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", err} + } + + switch method { + case "string": + return handler.createEdgeJobFromFileContent(w, r) + case "file": + return handler.createEdgeJobFromFile(w, r) + default: + return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", errors.New(request.ErrInvalidQueryParameter)} + } +} + +type edgeJobCreateFromFileContentPayload struct { + Name string + CronExpression string + Recurring bool + Endpoints []portainer.EndpointID + FileContent string +} + +func (payload *edgeJobCreateFromFileContentPayload) Validate(r *http.Request) error { + if govalidator.IsNull(payload.Name) { + return portainer.Error("Invalid Edge job name") + } + + if !govalidator.Matches(payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]*$`) { + return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") + } + + if govalidator.IsNull(payload.CronExpression) { + return portainer.Error("Invalid cron expression") + } + + if payload.Endpoints == nil || len(payload.Endpoints) == 0 { + return portainer.Error("Invalid endpoints payload") + } + + if govalidator.IsNull(payload.FileContent) { + return portainer.Error("Invalid script file content") + } + + return nil +} + +func (handler *Handler) createEdgeJobFromFileContent(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + var payload edgeJobCreateFromFileContentPayload + err := request.DecodeAndValidateJSONPayload(r, &payload) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} + } + + edgeJob := handler.createEdgeJobObjectFromFileContentPayload(&payload) + + err = handler.addAndPersistEdgeJob(edgeJob, []byte(payload.FileContent)) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to schedule Edge job", err} + } + + return response.JSON(w, edgeJob) +} + +type edgeJobCreateFromFilePayload struct { + Name string + CronExpression string + Recurring bool + Endpoints []portainer.EndpointID + File []byte +} + +func (payload *edgeJobCreateFromFilePayload) Validate(r *http.Request) error { + name, err := request.RetrieveMultiPartFormValue(r, "Name", false) + if err != nil { + return errors.New("Invalid Edge job name") + } + + if !govalidator.Matches(name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { + return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") + } + payload.Name = name + + cronExpression, err := request.RetrieveMultiPartFormValue(r, "CronExpression", false) + if err != nil { + return errors.New("Invalid cron expression") + } + payload.CronExpression = cronExpression + + var endpoints []portainer.EndpointID + err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, false) + if err != nil { + return errors.New("Invalid endpoints") + } + payload.Endpoints = endpoints + + file, _, err := request.RetrieveMultiPartFormFile(r, "file") + if err != nil { + return portainer.Error("Invalid script file. Ensure that the file is uploaded correctly") + } + payload.File = file + + return nil +} + +func (handler *Handler) createEdgeJobFromFile(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + payload := &edgeJobCreateFromFilePayload{} + err := payload.Validate(r) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} + } + + edgeJob := handler.createEdgeJobObjectFromFilePayload(payload) + + err = handler.addAndPersistEdgeJob(edgeJob, payload.File) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to schedule Edge job", err} + } + + return response.JSON(w, edgeJob) +} + +func (handler *Handler) createEdgeJobObjectFromFilePayload(payload *edgeJobCreateFromFilePayload) *portainer.EdgeJob { + edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier()) + + endpoints := convertEndpointsToMetaObject(payload.Endpoints) + + edgeJob := &portainer.EdgeJob{ + ID: edgeJobIdentifier, + Name: payload.Name, + CronExpression: payload.CronExpression, + Recurring: payload.Recurring, + Created: time.Now().Unix(), + Endpoints: endpoints, + Version: 1, + } + + return edgeJob +} + +func (handler *Handler) createEdgeJobObjectFromFileContentPayload(payload *edgeJobCreateFromFileContentPayload) *portainer.EdgeJob { + edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier()) + + endpoints := convertEndpointsToMetaObject(payload.Endpoints) + + edgeJob := &portainer.EdgeJob{ + ID: edgeJobIdentifier, + Name: payload.Name, + CronExpression: payload.CronExpression, + Recurring: payload.Recurring, + Created: time.Now().Unix(), + Endpoints: endpoints, + Version: 1, + } + + return edgeJob +} + +func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []byte) error { + edgeCronExpression := strings.Split(edgeJob.CronExpression, " ") + if len(edgeCronExpression) == 6 { + edgeCronExpression = edgeCronExpression[1:] + } + edgeJob.CronExpression = strings.Join(edgeCronExpression, " ") + + for ID := range edgeJob.Endpoints { + endpoint, err := handler.DataStore.Endpoint().Endpoint(ID) + if err != nil { + return err + } + + if endpoint.Type != portainer.EdgeAgentEnvironment { + delete(edgeJob.Endpoints, ID) + } + } + + if len(edgeJob.Endpoints) == 0 { + return errors.New("Endpoints are mandatory for an Edge job") + } + + scriptPath, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), file) + if err != nil { + return err + } + edgeJob.ScriptPath = scriptPath + + for endpointID := range edgeJob.Endpoints { + handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + } + + return handler.DataStore.EdgeJob().CreateEdgeJob(edgeJob) +} + +func convertEndpointsToMetaObject(endpoints []portainer.EndpointID) map[portainer.EndpointID]portainer.EdgeJobEndpointMeta { + endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} + + for _, endpointID := range endpoints { + endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{} + } + + return endpointsMap +} diff --git a/api/http/handler/edgejobs/edgejob_delete.go b/api/http/handler/edgejobs/edgejob_delete.go new file mode 100644 index 000000000..da3e39ac6 --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_delete.go @@ -0,0 +1,40 @@ +package edgejobs + +import ( + "net/http" + "strconv" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" +) + +func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an Edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an Edge job with the specified identifier inside the database", err} + } + + edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(edgeJobID)) + err = handler.FileService.RemoveDirectory(edgeJobFolder) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the files associated to the Edge job on the filesystem", err} + } + + handler.ReverseTunnelService.RemoveEdgeJob(edgeJob.ID) + + err = handler.DataStore.EdgeJob().DeleteEdgeJob(edgeJob.ID) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the Edge job from the database", err} + } + + return response.Empty(w) +} diff --git a/api/http/handler/edgejobs/edgejob_file.go b/api/http/handler/edgejobs/edgejob_file.go new file mode 100644 index 000000000..cd02f0919 --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_file.go @@ -0,0 +1,36 @@ +package edgejobs + +import ( + "net/http" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" +) + +type edgeJobFileResponse struct { + FileContent string `json:"FileContent"` +} + +// GET request on /api/edge_jobs/:id/file +func (handler *Handler) edgeJobFile(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an Edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an Edge job with the specified identifier inside the database", err} + } + + edgeJobFileContent, err := handler.FileService.GetFileContent(edgeJob.ScriptPath) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve Edge job script file from disk", err} + } + + return response.JSON(w, &edgeJobFileResponse{FileContent: string(edgeJobFileContent)}) +} diff --git a/api/http/handler/edgejobs/edgejob_inspect.go b/api/http/handler/edgejobs/edgejob_inspect.go new file mode 100644 index 000000000..49857bb39 --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_inspect.go @@ -0,0 +1,43 @@ +package edgejobs + +import ( + "net/http" + + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" +) + +type edgeJobInspectResponse struct { + *portainer.EdgeJob + Endpoints []portainer.EndpointID +} + +func (handler *Handler) edgeJobInspect(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an Edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an Edge job with the specified identifier inside the database", err} + } + + endpointIDs := []portainer.EndpointID{} + + for endpointID := range edgeJob.Endpoints { + endpointIDs = append(endpointIDs, endpointID) + } + + responseObj := edgeJobInspectResponse{ + EdgeJob: edgeJob, + Endpoints: endpointIDs, + } + + return response.JSON(w, responseObj) +} diff --git a/api/http/handler/edgejobs/edgejob_list.go b/api/http/handler/edgejobs/edgejob_list.go new file mode 100644 index 000000000..c95d0946b --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_list.go @@ -0,0 +1,18 @@ +package edgejobs + +import ( + "net/http" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/response" +) + +// GET request on /api/edge_jobs +func (handler *Handler) edgeJobList(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + edgeJobs, err := handler.DataStore.EdgeJob().EdgeJobs() + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve Edge jobs from the database", err} + } + + return response.JSON(w, edgeJobs) +} diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go new file mode 100644 index 000000000..c100260b3 --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go @@ -0,0 +1,52 @@ +package edgejobs + +import ( + "net/http" + "strconv" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" +) + +// DELETE request on /api/edge_jobs/:id/tasks/:taskID/logs +func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + taskID, err := request.RetrieveNumericRouteVariableValue(r, "taskID") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Task identifier route variable", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an Edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an Edge job with the specified identifier inside the database", err} + } + + endpointID := portainer.EndpointID(taskID) + + meta := edgeJob.Endpoints[endpointID] + meta.CollectLogs = false + meta.LogsStatus = portainer.EdgeJobLogsStatusIdle + edgeJob.Endpoints[endpointID] = meta + + err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(edgeJobID), strconv.Itoa(taskID)) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to clear log file from disk", err} + } + + handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + + err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist Edge job changes in the database", err} + } + + return response.Empty(w) +} diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_collect.go b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go new file mode 100644 index 000000000..e19af696c --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go @@ -0,0 +1,46 @@ +package edgejobs + +import ( + "net/http" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" +) + +// POST request on /api/edge_jobs/:id/tasks/:taskID/logs +func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + taskID, err := request.RetrieveNumericRouteVariableValue(r, "taskID") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Task identifier route variable", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an Edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an Edge job with the specified identifier inside the database", err} + } + + endpointID := portainer.EndpointID(taskID) + + meta := edgeJob.Endpoints[endpointID] + meta.CollectLogs = true + meta.LogsStatus = portainer.EdgeJobLogsStatusPending + edgeJob.Endpoints[endpointID] = meta + + handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + + err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist Edge job changes in the database", err} + } + + return response.Empty(w) +} diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_inspect.go b/api/http/handler/edgejobs/edgejob_tasklogs_inspect.go new file mode 100644 index 000000000..5e63efa1b --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_tasklogs_inspect.go @@ -0,0 +1,36 @@ +package edgejobs + +import ( + "net/http" + "strconv" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" +) + +type fileResponse struct { + FileContent string `json:"FileContent"` +} + +// GET request on /api/edge_jobs/:id/tasks/:taskID/logs +func (handler *Handler) edgeJobTaskLogsInspect(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + taskID, err := request.RetrieveNumericRouteVariableValue(r, "taskID") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Task identifier route variable", err} + } + + logFileContent, err := handler.FileService.GetEdgeJobTaskLogFileContent(strconv.Itoa(edgeJobID), strconv.Itoa(taskID)) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve log file from disk", err} + } + + return response.JSON(w, &fileResponse{FileContent: string(logFileContent)}) +} + +// fmt.Sprintf("/tmp/edge_jobs/%s/logs_%s", edgeJobID, taskID) diff --git a/api/http/handler/edgejobs/edgejob_tasks_list.go b/api/http/handler/edgejobs/edgejob_tasks_list.go new file mode 100644 index 000000000..57bbf3784 --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_tasks_list.go @@ -0,0 +1,56 @@ +package edgejobs + +import ( + "fmt" + "net/http" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" +) + +type taskContainer struct { + ID string `json:"Id"` + EndpointID portainer.EndpointID `json:"EndpointId"` + LogsStatus portainer.EdgeJobLogsStatus `json:"LogsStatus"` +} + +// GET request on /api/edge_jobs/:id/tasks +func (handler *Handler) edgeJobTasksList(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + settings, err := handler.DataStore.Settings().Settings() + if err != nil { + return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} + } + + if !settings.EnableEdgeComputeFeatures { + return &httperror.HandlerError{http.StatusServiceUnavailable, "Edge compute features are disabled", portainer.ErrHostManagementFeaturesDisabled} + } + + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an Edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an Edge job with the specified identifier inside the database", err} + } + + tasks := make([]taskContainer, 0) + + for endpointID, meta := range edgeJob.Endpoints { + + cronTask := taskContainer{ + ID: fmt.Sprintf("edgejob_task_%d_%d", edgeJob.ID, endpointID), + EndpointID: endpointID, + LogsStatus: meta.LogsStatus, + } + + tasks = append(tasks, cronTask) + } + + return response.JSON(w, tasks) +} diff --git a/api/http/handler/edgejobs/edgejob_update.go b/api/http/handler/edgejobs/edgejob_update.go new file mode 100644 index 000000000..b33ad67b8 --- /dev/null +++ b/api/http/handler/edgejobs/edgejob_update.go @@ -0,0 +1,128 @@ +package edgejobs + +import ( + "errors" + "net/http" + "strconv" + + "github.com/asaskevich/govalidator" + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer/api" +) + +type edgeJobUpdatePayload struct { + Name *string + CronExpression *string + Recurring *bool + Endpoints []portainer.EndpointID + FileContent *string +} + +func (payload *edgeJobUpdatePayload) Validate(r *http.Request) error { + if payload.Name != nil && !govalidator.Matches(*payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { + return errors.New("Invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") + } + return nil +} + +func (handler *Handler) edgeJobUpdate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + settings, err := handler.DataStore.Settings().Settings() + if err != nil { + return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} + } + + if !settings.EnableEdgeComputeFeatures { + return &httperror.HandlerError{http.StatusServiceUnavailable, "Edge compute features are disabled", portainer.ErrHostManagementFeaturesDisabled} + } + + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid Edge job identifier route variable", err} + } + + var payload edgeJobUpdatePayload + err = request.DecodeAndValidateJSONPayload(r, &payload) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an Edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an Edge job with the specified identifier inside the database", err} + } + + err = handler.updateEdgeSchedule(edgeJob, &payload) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update Edge job", err} + } + + err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist Edge job changes inside the database", err} + } + + return response.JSON(w, edgeJob) +} + +func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *edgeJobUpdatePayload) error { + if payload.Name != nil { + edgeJob.Name = *payload.Name + } + + if payload.Endpoints != nil { + endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} + + for _, endpointID := range payload.Endpoints { + endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + if err != nil { + return err + } + + if endpoint.Type != portainer.EdgeAgentEnvironment { + continue + } + + if meta, ok := edgeJob.Endpoints[endpointID]; ok { + endpointsMap[endpointID] = meta + } else { + endpointsMap[endpointID] = portainer.EdgeJobEndpointMeta{} + } + } + + edgeJob.Endpoints = endpointsMap + } + + updateVersion := false + if payload.CronExpression != nil { + edgeJob.CronExpression = *payload.CronExpression + updateVersion = true + } + + if payload.FileContent != nil { + _, err := handler.FileService.StoreEdgeJobFileFromBytes(strconv.Itoa(int(edgeJob.ID)), []byte(*payload.FileContent)) + if err != nil { + return err + } + + updateVersion = true + } + + if payload.Recurring != nil { + edgeJob.Recurring = *payload.Recurring + updateVersion = true + } + + if updateVersion { + edgeJob.Version++ + } + + for endpointID := range edgeJob.Endpoints { + handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + } + + return nil +} diff --git a/api/http/handler/edgejobs/handler.go b/api/http/handler/edgejobs/handler.go new file mode 100644 index 000000000..35800b6e3 --- /dev/null +++ b/api/http/handler/edgejobs/handler.go @@ -0,0 +1,47 @@ +package edgejobs + +import ( + "net/http" + + "github.com/gorilla/mux" + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/http/security" +) + +// Handler is the HTTP handler used to handle Edge job operations. +type Handler struct { + *mux.Router + DataStore portainer.DataStore + FileService portainer.FileService + ReverseTunnelService portainer.ReverseTunnelService +} + +// NewHandler creates a handler to manage Edge job operations. +func NewHandler(bouncer *security.RequestBouncer) *Handler { + h := &Handler{ + Router: mux.NewRouter(), + } + + h.Handle("/edge_jobs", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobList)))).Methods(http.MethodGet) + h.Handle("/edge_jobs", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobCreate)))).Methods(http.MethodPost) + h.Handle("/edge_jobs/{id}", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobInspect)))).Methods(http.MethodGet) + h.Handle("/edge_jobs/{id}", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobUpdate)))).Methods(http.MethodPut) + h.Handle("/edge_jobs/{id}", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobDelete)))).Methods(http.MethodDelete) + h.Handle("/edge_jobs/{id}/file", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobFile)))).Methods(http.MethodGet) + h.Handle("/edge_jobs/{id}/tasks", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksList)))).Methods(http.MethodGet) + h.Handle("/edge_jobs/{id}/tasks/{taskID}/logs", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTaskLogsInspect)))).Methods(http.MethodGet) + h.Handle("/edge_jobs/{id}/tasks/{taskID}/logs", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksCollect)))).Methods(http.MethodPost) + h.Handle("/edge_jobs/{id}/tasks/{taskID}/logs", + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksClear)))).Methods(http.MethodDelete) + return h +} diff --git a/api/http/handler/edgestacks/handler.go b/api/http/handler/edgestacks/handler.go index 3c75f837e..2e0580d6d 100644 --- a/api/http/handler/edgestacks/handler.go +++ b/api/http/handler/edgestacks/handler.go @@ -25,17 +25,17 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler { requestBouncer: bouncer, } h.Handle("/edge_stacks", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeStackCreate))).Methods(http.MethodPost) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackCreate)))).Methods(http.MethodPost) h.Handle("/edge_stacks", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeStackList))).Methods(http.MethodGet) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackList)))).Methods(http.MethodGet) h.Handle("/edge_stacks/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeStackInspect))).Methods(http.MethodGet) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackInspect)))).Methods(http.MethodGet) h.Handle("/edge_stacks/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeStackUpdate))).Methods(http.MethodPut) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackUpdate)))).Methods(http.MethodPut) h.Handle("/edge_stacks/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeStackDelete))).Methods(http.MethodDelete) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackDelete)))).Methods(http.MethodDelete) h.Handle("/edge_stacks/{id}/file", - bouncer.AdminAccess(httperror.LoggerHandler(h.edgeStackFile))).Methods(http.MethodGet) + bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackFile)))).Methods(http.MethodGet) h.Handle("/edge_stacks/{id}/status", bouncer.PublicAccess(httperror.LoggerHandler(h.edgeStackStatusUpdate))).Methods(http.MethodPut) return h diff --git a/api/http/handler/endpointedge/endpoint_edgejob_logs.go b/api/http/handler/endpointedge/endpoint_edgejob_logs.go new file mode 100644 index 000000000..8596ceeb5 --- /dev/null +++ b/api/http/handler/endpointedge/endpoint_edgejob_logs.go @@ -0,0 +1,77 @@ +package endpointedge + +import ( + "net/http" + "strconv" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + portainer "github.com/portainer/portainer/api" +) + +type logsPayload struct { + FileContent string +} + +func (payload *logsPayload) Validate(r *http.Request) error { + return nil +} + +// POST request on api/endpoints/:id/edge/jobs/:jobID/logs +func (handler *Handler) endpointEdgeJobsLogs(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + endpointID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid endpoint identifier route variable", err} + } + + endpoint, err := handler.DataStore.Endpoint().Endpoint(portainer.EndpointID(endpointID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an endpoint with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an endpoint with the specified identifier inside the database", err} + } + + err = handler.requestBouncer.AuthorizedEdgeEndpointOperation(r, endpoint) + if err != nil { + return &httperror.HandlerError{http.StatusForbidden, "Permission denied to access endpoint", err} + } + + edgeJobID, err := request.RetrieveNumericRouteVariableValue(r, "jobID") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid edge job identifier route variable", err} + } + + var payload logsPayload + err = request.DecodeAndValidateJSONPayload(r, &payload) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} + } + + edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find an edge job with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an edge job with the specified identifier inside the database", err} + } + + err = handler.FileService.StoreEdgeJobTaskLogFileFromBytes(strconv.Itoa(edgeJobID), strconv.Itoa(endpointID), []byte(payload.FileContent)) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to save task log to the filesystem", err} + } + + meta := edgeJob.Endpoints[endpoint.ID] + meta.CollectLogs = false + meta.LogsStatus = portainer.EdgeJobLogsStatusCollected + edgeJob.Endpoints[endpoint.ID] = meta + + err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) + + handler.ReverseTunnelService.AddEdgeJob(endpoint.ID, edgeJob) + + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist edge job changes to the database", err} + } + + return response.JSON(w, nil) +} diff --git a/api/http/handler/endpointedge/handler.go b/api/http/handler/endpointedge/handler.go index 14aff281e..6ca96d3fa 100644 --- a/api/http/handler/endpointedge/handler.go +++ b/api/http/handler/endpointedge/handler.go @@ -13,9 +13,10 @@ import ( // Handler is the HTTP handler used to handle edge endpoint operations. type Handler struct { *mux.Router - requestBouncer *security.RequestBouncer - DataStore portainer.DataStore - FileService portainer.FileService + requestBouncer *security.RequestBouncer + DataStore portainer.DataStore + FileService portainer.FileService + ReverseTunnelService portainer.ReverseTunnelService } // NewHandler creates a handler to manage endpoint operations. @@ -27,6 +28,7 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler { h.Handle("/{id}/edge/stacks/{stackId}", bouncer.PublicAccess(httperror.LoggerHandler(h.endpointEdgeStackInspect))).Methods(http.MethodGet) - + h.Handle("/{id}/edge/jobs/{jobID}/logs", + bouncer.PublicAccess(httperror.LoggerHandler(h.endpointEdgeJobsLogs))).Methods(http.MethodPost) return h } diff --git a/api/http/handler/endpoints/endpoint_job.go b/api/http/handler/endpoints/endpoint_job.go deleted file mode 100644 index 77f727ea3..000000000 --- a/api/http/handler/endpoints/endpoint_job.go +++ /dev/null @@ -1,111 +0,0 @@ -package endpoints - -import ( - "errors" - "net/http" - - "github.com/asaskevich/govalidator" - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" -) - -type endpointJobFromFilePayload struct { - Image string - File []byte -} - -type endpointJobFromFileContentPayload struct { - Image string - FileContent string -} - -func (payload *endpointJobFromFilePayload) Validate(r *http.Request) error { - file, _, err := request.RetrieveMultiPartFormFile(r, "File") - if err != nil { - return portainer.Error("Invalid Script file. Ensure that the file is uploaded correctly") - } - payload.File = file - - image, err := request.RetrieveMultiPartFormValue(r, "Image", false) - if err != nil { - return portainer.Error("Invalid image name") - } - payload.Image = image - - return nil -} - -func (payload *endpointJobFromFileContentPayload) Validate(r *http.Request) error { - if govalidator.IsNull(payload.FileContent) { - return portainer.Error("Invalid script file content") - } - - if govalidator.IsNull(payload.Image) { - return portainer.Error("Invalid image name") - } - - return nil -} - -// POST request on /api/endpoints/:id/job?method&nodeName -func (handler *Handler) endpointJob(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - endpointID, err := request.RetrieveNumericRouteVariableValue(r, "id") - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid endpoint identifier route variable", err} - } - - method, err := request.RetrieveQueryParameter(r, "method", false) - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method", err} - } - - nodeName, _ := request.RetrieveQueryParameter(r, "nodeName", true) - - endpoint, err := handler.DataStore.Endpoint().Endpoint(portainer.EndpointID(endpointID)) - if err == portainer.ErrObjectNotFound { - return &httperror.HandlerError{http.StatusNotFound, "Unable to find an endpoint with the specified identifier inside the database", err} - } else if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an endpoint with the specified identifier inside the database", err} - } - - switch method { - case "file": - return handler.executeJobFromFile(w, r, endpoint, nodeName) - case "string": - return handler.executeJobFromFileContent(w, r, endpoint, nodeName) - } - - return &httperror.HandlerError{http.StatusBadRequest, "Invalid value for query parameter: method. Value must be one of: string or file", errors.New(request.ErrInvalidQueryParameter)} -} - -func (handler *Handler) executeJobFromFile(w http.ResponseWriter, r *http.Request, endpoint *portainer.Endpoint, nodeName string) *httperror.HandlerError { - payload := &endpointJobFromFilePayload{} - err := payload.Validate(r) - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} - } - - err = handler.JobService.ExecuteScript(endpoint, nodeName, payload.Image, payload.File, nil) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err} - } - - return response.Empty(w) -} - -func (handler *Handler) executeJobFromFileContent(w http.ResponseWriter, r *http.Request, endpoint *portainer.Endpoint, nodeName string) *httperror.HandlerError { - var payload endpointJobFromFileContentPayload - err := request.DecodeAndValidateJSONPayload(r, &payload) - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} - } - - err = handler.JobService.ExecuteScript(endpoint, nodeName, payload.Image, []byte(payload.FileContent), nil) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err} - } - - return response.Empty(w) -} diff --git a/api/http/handler/endpoints/endpoint_status_inspect.go b/api/http/handler/endpoints/endpoint_status_inspect.go index cb64bbfe8..a48dac06d 100644 --- a/api/http/handler/endpoints/endpoint_status_inspect.go +++ b/api/http/handler/endpoints/endpoint_status_inspect.go @@ -1,6 +1,7 @@ package endpoints import ( + "encoding/base64" "net/http" httperror "github.com/portainer/libhttp/error" @@ -14,13 +15,21 @@ type stackStatusResponse struct { Version int } +type edgeJobResponse struct { + ID portainer.EdgeJobID `json:"Id"` + CollectLogs bool `json:"CollectLogs"` + CronExpression string `json:"CronExpression"` + Script string `json:"Script"` + Version int `json:"Version"` +} + type endpointStatusInspectResponse struct { - Status string `json:"status"` - Port int `json:"port"` - Schedules []portainer.EdgeSchedule `json:"schedules"` - CheckinInterval int `json:"checkin"` - Credentials string `json:"credentials"` - Stacks []stackStatusResponse `json:"stacks"` + Status string `json:"status"` + Port int `json:"port"` + Schedules []edgeJobResponse `json:"schedules"` + CheckinInterval int `json:"checkin"` + Credentials string `json:"credentials"` + Stacks []stackStatusResponse `json:"stacks"` } // GET request on /api/endpoints/:id/status @@ -65,10 +74,30 @@ func (handler *Handler) endpointStatusInspect(w http.ResponseWriter, r *http.Req checkinInterval = endpoint.EdgeCheckinInterval } + schedules := []edgeJobResponse{} + for _, job := range tunnel.Jobs { + schedule := edgeJobResponse{ + ID: job.ID, + CronExpression: job.CronExpression, + CollectLogs: job.Endpoints[endpoint.ID].CollectLogs, + Version: job.Version, + } + + file, err := handler.FileService.GetFileContent(job.ScriptPath) + + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve Edge job script file", err} + } + + schedule.Script = base64.RawStdEncoding.EncodeToString(file) + + schedules = append(schedules, schedule) + } + statusResponse := endpointStatusInspectResponse{ Status: tunnel.Status, Port: tunnel.Port, - Schedules: tunnel.Schedules, + Schedules: schedules, CheckinInterval: checkinInterval, Credentials: tunnel.Credentials, } diff --git a/api/http/handler/endpoints/handler.go b/api/http/handler/endpoints/handler.go index 1d9ca9f03..6610d134c 100644 --- a/api/http/handler/endpoints/handler.go +++ b/api/http/handler/endpoints/handler.go @@ -26,7 +26,6 @@ type Handler struct { DataStore portainer.DataStore AuthorizationService *authorization.Service FileService portainer.FileService - JobService portainer.JobService ProxyManager *proxy.Manager ReverseTunnelService portainer.ReverseTunnelService Snapshotter portainer.Snapshotter @@ -55,8 +54,6 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler { bouncer.RestrictedAccess(httperror.LoggerHandler(h.endpointExtensionAdd))).Methods(http.MethodPost) h.Handle("/endpoints/{id}/extensions/{extensionType}", bouncer.RestrictedAccess(httperror.LoggerHandler(h.endpointExtensionRemove))).Methods(http.MethodDelete) - h.Handle("/endpoints/{id}/job", - bouncer.AdminAccess(httperror.LoggerHandler(h.endpointJob))).Methods(http.MethodPost) h.Handle("/endpoints/{id}/snapshot", bouncer.AdminAccess(httperror.LoggerHandler(h.endpointSnapshot))).Methods(http.MethodPost) h.Handle("/endpoints/{id}/status", diff --git a/api/http/handler/handler.go b/api/http/handler/handler.go index 8b167b12e..8dd9c3492 100644 --- a/api/http/handler/handler.go +++ b/api/http/handler/handler.go @@ -4,18 +4,13 @@ import ( "net/http" "strings" + "github.com/portainer/portainer/api/http/handler/auth" + "github.com/portainer/portainer/api/http/handler/dockerhub" "github.com/portainer/portainer/api/http/handler/edgegroups" + "github.com/portainer/portainer/api/http/handler/edgejobs" "github.com/portainer/portainer/api/http/handler/edgestacks" "github.com/portainer/portainer/api/http/handler/edgetemplates" "github.com/portainer/portainer/api/http/handler/endpointedge" - "github.com/portainer/portainer/api/http/handler/support" - - "github.com/portainer/portainer/api/http/handler/schedules" - - "github.com/portainer/portainer/api/http/handler/roles" - - "github.com/portainer/portainer/api/http/handler/auth" - "github.com/portainer/portainer/api/http/handler/dockerhub" "github.com/portainer/portainer/api/http/handler/endpointgroups" "github.com/portainer/portainer/api/http/handler/endpointproxy" "github.com/portainer/portainer/api/http/handler/endpoints" @@ -24,9 +19,11 @@ import ( "github.com/portainer/portainer/api/http/handler/motd" "github.com/portainer/portainer/api/http/handler/registries" "github.com/portainer/portainer/api/http/handler/resourcecontrols" + "github.com/portainer/portainer/api/http/handler/roles" "github.com/portainer/portainer/api/http/handler/settings" "github.com/portainer/portainer/api/http/handler/stacks" "github.com/portainer/portainer/api/http/handler/status" + "github.com/portainer/portainer/api/http/handler/support" "github.com/portainer/portainer/api/http/handler/tags" "github.com/portainer/portainer/api/http/handler/teammemberships" "github.com/portainer/portainer/api/http/handler/teams" @@ -42,6 +39,7 @@ type Handler struct { AuthHandler *auth.Handler DockerHubHandler *dockerhub.Handler EdgeGroupsHandler *edgegroups.Handler + EdgeJobsHandler *edgejobs.Handler EdgeStacksHandler *edgestacks.Handler EdgeTemplatesHandler *edgetemplates.Handler EndpointEdgeHandler *endpointedge.Handler @@ -54,7 +52,6 @@ type Handler struct { RegistryHandler *registries.Handler ResourceControlHandler *resourcecontrols.Handler RoleHandler *roles.Handler - SchedulesHanlder *schedules.Handler SettingsHandler *settings.Handler StackHandler *stacks.Handler StatusHandler *status.Handler @@ -76,10 +73,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.StripPrefix("/api", h.AuthHandler).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/dockerhub"): http.StripPrefix("/api", h.DockerHubHandler).ServeHTTP(w, r) - case strings.HasPrefix(r.URL.Path, "/api/edge_stacks"): - http.StripPrefix("/api", h.EdgeStacksHandler).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/edge_groups"): http.StripPrefix("/api", h.EdgeGroupsHandler).ServeHTTP(w, r) + case strings.HasPrefix(r.URL.Path, "/api/edge_jobs"): + http.StripPrefix("/api", h.EdgeJobsHandler).ServeHTTP(w, r) + case strings.HasPrefix(r.URL.Path, "/api/edge_stacks"): + http.StripPrefix("/api", h.EdgeStacksHandler).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/edge_templates"): http.StripPrefix("/api", h.EdgeTemplatesHandler).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/endpoint_groups"): @@ -107,8 +106,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.StripPrefix("/api", h.ResourceControlHandler).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/roles"): http.StripPrefix("/api", h.RoleHandler).ServeHTTP(w, r) - case strings.HasPrefix(r.URL.Path, "/api/schedules"): - http.StripPrefix("/api", h.SchedulesHanlder).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/settings"): http.StripPrefix("/api", h.SettingsHandler).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/stacks"): diff --git a/api/http/handler/schedules/handler.go b/api/http/handler/schedules/handler.go deleted file mode 100644 index 2d4382bf3..000000000 --- a/api/http/handler/schedules/handler.go +++ /dev/null @@ -1,43 +0,0 @@ -package schedules - -import ( - "net/http" - - "github.com/gorilla/mux" - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/http/security" -) - -// Handler is the HTTP handler used to handle schedule operations. -type Handler struct { - *mux.Router - DataStore portainer.DataStore - FileService portainer.FileService - JobService portainer.JobService - JobScheduler portainer.JobScheduler - ReverseTunnelService portainer.ReverseTunnelService -} - -// NewHandler creates a handler to manage schedule operations. -func NewHandler(bouncer *security.RequestBouncer) *Handler { - h := &Handler{ - Router: mux.NewRouter(), - } - - h.Handle("/schedules", - bouncer.AdminAccess(httperror.LoggerHandler(h.scheduleList))).Methods(http.MethodGet) - h.Handle("/schedules", - bouncer.AdminAccess(httperror.LoggerHandler(h.scheduleCreate))).Methods(http.MethodPost) - h.Handle("/schedules/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.scheduleInspect))).Methods(http.MethodGet) - h.Handle("/schedules/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.scheduleUpdate))).Methods(http.MethodPut) - h.Handle("/schedules/{id}", - bouncer.AdminAccess(httperror.LoggerHandler(h.scheduleDelete))).Methods(http.MethodDelete) - h.Handle("/schedules/{id}/file", - bouncer.AdminAccess(httperror.LoggerHandler(h.scheduleFile))).Methods(http.MethodGet) - h.Handle("/schedules/{id}/tasks", - bouncer.AdminAccess(httperror.LoggerHandler(h.scheduleTasks))).Methods(http.MethodGet) - return h -} diff --git a/api/http/handler/schedules/schedule_create.go b/api/http/handler/schedules/schedule_create.go deleted file mode 100644 index 9e54bbcab..000000000 --- a/api/http/handler/schedules/schedule_create.go +++ /dev/null @@ -1,280 +0,0 @@ -package schedules - -import ( - "encoding/base64" - "errors" - "net/http" - "strconv" - "strings" - "time" - - "github.com/asaskevich/govalidator" - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/cron" -) - -type scheduleCreateFromFilePayload struct { - Name string - Image string - CronExpression string - Recurring bool - Endpoints []portainer.EndpointID - File []byte - RetryCount int - RetryInterval int -} - -type scheduleCreateFromFileContentPayload struct { - Name string - CronExpression string - Recurring bool - Image string - Endpoints []portainer.EndpointID - FileContent string - RetryCount int - RetryInterval int -} - -func (payload *scheduleCreateFromFilePayload) Validate(r *http.Request) error { - name, err := request.RetrieveMultiPartFormValue(r, "Name", false) - if err != nil { - return errors.New("Invalid schedule name") - } - - if !govalidator.Matches(name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { - return errors.New("Invalid schedule name format. Allowed characters are: [a-zA-Z0-9_.-]") - } - payload.Name = name - - image, err := request.RetrieveMultiPartFormValue(r, "Image", false) - if err != nil { - return errors.New("Invalid schedule image") - } - payload.Image = image - - cronExpression, err := request.RetrieveMultiPartFormValue(r, "CronExpression", false) - if err != nil { - return errors.New("Invalid cron expression") - } - payload.CronExpression = cronExpression - - var endpoints []portainer.EndpointID - err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, false) - if err != nil { - return errors.New("Invalid endpoints") - } - payload.Endpoints = endpoints - - file, _, err := request.RetrieveMultiPartFormFile(r, "file") - if err != nil { - return portainer.Error("Invalid script file. Ensure that the file is uploaded correctly") - } - payload.File = file - - retryCount, _ := request.RetrieveNumericMultiPartFormValue(r, "RetryCount", true) - payload.RetryCount = retryCount - - retryInterval, _ := request.RetrieveNumericMultiPartFormValue(r, "RetryInterval", true) - payload.RetryInterval = retryInterval - - return nil -} - -func (payload *scheduleCreateFromFileContentPayload) Validate(r *http.Request) error { - if govalidator.IsNull(payload.Name) { - return portainer.Error("Invalid schedule name") - } - - if !govalidator.Matches(payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { - return errors.New("Invalid schedule name format. Allowed characters are: [a-zA-Z0-9_.-]") - } - - if govalidator.IsNull(payload.Image) { - return portainer.Error("Invalid schedule image") - } - - if govalidator.IsNull(payload.CronExpression) { - return portainer.Error("Invalid cron expression") - } - - if payload.Endpoints == nil || len(payload.Endpoints) == 0 { - return portainer.Error("Invalid endpoints payload") - } - - if govalidator.IsNull(payload.FileContent) { - return portainer.Error("Invalid script file content") - } - - if payload.RetryCount != 0 && payload.RetryInterval == 0 { - return portainer.Error("RetryInterval must be set") - } - - return nil -} - -// POST /api/schedules?method=file|string -func (handler *Handler) scheduleCreate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - settings, err := handler.DataStore.Settings().Settings() - if err != nil { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} - } - if !settings.EnableHostManagementFeatures { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Host management features are disabled", portainer.ErrHostManagementFeaturesDisabled} - } - - method, err := request.RetrieveQueryParameter(r, "method", false) - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", err} - } - - switch method { - case "string": - return handler.createScheduleFromFileContent(w, r) - case "file": - return handler.createScheduleFromFile(w, r) - default: - return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", errors.New(request.ErrInvalidQueryParameter)} - } -} - -func (handler *Handler) createScheduleFromFileContent(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - var payload scheduleCreateFromFileContentPayload - err := request.DecodeAndValidateJSONPayload(r, &payload) - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} - } - - schedule := handler.createScheduleObjectFromFileContentPayload(&payload) - - err = handler.addAndPersistSchedule(schedule, []byte(payload.FileContent)) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to schedule script job", err} - } - - return response.JSON(w, schedule) -} - -func (handler *Handler) createScheduleFromFile(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - payload := &scheduleCreateFromFilePayload{} - err := payload.Validate(r) - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} - } - - schedule := handler.createScheduleObjectFromFilePayload(payload) - - err = handler.addAndPersistSchedule(schedule, payload.File) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to schedule script job", err} - } - - return response.JSON(w, schedule) -} - -func (handler *Handler) createScheduleObjectFromFilePayload(payload *scheduleCreateFromFilePayload) *portainer.Schedule { - scheduleIdentifier := portainer.ScheduleID(handler.DataStore.Schedule().GetNextIdentifier()) - - job := &portainer.ScriptExecutionJob{ - Endpoints: payload.Endpoints, - Image: payload.Image, - RetryCount: payload.RetryCount, - RetryInterval: payload.RetryInterval, - } - - schedule := &portainer.Schedule{ - ID: scheduleIdentifier, - Name: payload.Name, - CronExpression: payload.CronExpression, - Recurring: payload.Recurring, - JobType: portainer.ScriptExecutionJobType, - ScriptExecutionJob: job, - Created: time.Now().Unix(), - } - - return schedule -} - -func (handler *Handler) createScheduleObjectFromFileContentPayload(payload *scheduleCreateFromFileContentPayload) *portainer.Schedule { - scheduleIdentifier := portainer.ScheduleID(handler.DataStore.Schedule().GetNextIdentifier()) - - job := &portainer.ScriptExecutionJob{ - Endpoints: payload.Endpoints, - Image: payload.Image, - RetryCount: payload.RetryCount, - RetryInterval: payload.RetryInterval, - } - - schedule := &portainer.Schedule{ - ID: scheduleIdentifier, - Name: payload.Name, - CronExpression: payload.CronExpression, - Recurring: payload.Recurring, - JobType: portainer.ScriptExecutionJobType, - ScriptExecutionJob: job, - Created: time.Now().Unix(), - } - - return schedule -} - -func (handler *Handler) addAndPersistSchedule(schedule *portainer.Schedule, file []byte) error { - nonEdgeEndpointIDs := make([]portainer.EndpointID, 0) - edgeEndpointIDs := make([]portainer.EndpointID, 0) - - edgeCronExpression := strings.Split(schedule.CronExpression, " ") - if len(edgeCronExpression) == 6 { - edgeCronExpression = edgeCronExpression[1:] - } - - for _, ID := range schedule.ScriptExecutionJob.Endpoints { - - endpoint, err := handler.DataStore.Endpoint().Endpoint(ID) - if err != nil { - return err - } - - if endpoint.Type != portainer.EdgeAgentEnvironment { - nonEdgeEndpointIDs = append(nonEdgeEndpointIDs, endpoint.ID) - } else { - edgeEndpointIDs = append(edgeEndpointIDs, endpoint.ID) - } - } - - if len(edgeEndpointIDs) > 0 { - edgeSchedule := &portainer.EdgeSchedule{ - ID: schedule.ID, - CronExpression: strings.Join(edgeCronExpression, " "), - Script: base64.RawStdEncoding.EncodeToString(file), - Endpoints: edgeEndpointIDs, - Version: 1, - } - - for _, endpointID := range edgeEndpointIDs { - handler.ReverseTunnelService.AddSchedule(endpointID, edgeSchedule) - } - - schedule.EdgeSchedule = edgeSchedule - } - - schedule.ScriptExecutionJob.Endpoints = nonEdgeEndpointIDs - - scriptPath, err := handler.FileService.StoreScheduledJobFileFromBytes(strconv.Itoa(int(schedule.ID)), file) - if err != nil { - return err - } - - schedule.ScriptExecutionJob.ScriptPath = scriptPath - - jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.DataStore, handler.FileService) - jobRunner := cron.NewScriptExecutionJobRunner(schedule, jobContext) - - err = handler.JobScheduler.ScheduleJob(jobRunner) - if err != nil { - return err - } - - return handler.DataStore.Schedule().CreateSchedule(schedule) -} diff --git a/api/http/handler/schedules/schedule_delete.go b/api/http/handler/schedules/schedule_delete.go deleted file mode 100644 index 8970e35bf..000000000 --- a/api/http/handler/schedules/schedule_delete.go +++ /dev/null @@ -1,55 +0,0 @@ -package schedules - -import ( - "errors" - "net/http" - "strconv" - - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" -) - -func (handler *Handler) scheduleDelete(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - settings, err := handler.DataStore.Settings().Settings() - if err != nil { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} - } - if !settings.EnableHostManagementFeatures { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Host management features are disabled", portainer.ErrHostManagementFeaturesDisabled} - } - - scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} - } - - schedule, err := handler.DataStore.Schedule().Schedule(portainer.ScheduleID(scheduleID)) - if err == portainer.ErrObjectNotFound { - return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} - } else if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} - } - - if schedule.JobType == portainer.SnapshotJobType || schedule.JobType == portainer.EndpointSyncJobType { - return &httperror.HandlerError{http.StatusBadRequest, "Cannot remove system schedules", errors.New("Cannot remove system schedule")} - } - - scheduleFolder := handler.FileService.GetScheduleFolder(strconv.Itoa(scheduleID)) - err = handler.FileService.RemoveDirectory(scheduleFolder) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the files associated to the schedule on the filesystem", err} - } - - handler.ReverseTunnelService.RemoveSchedule(schedule.ID) - - handler.JobScheduler.UnscheduleJob(schedule.ID) - - err = handler.DataStore.Schedule().DeleteSchedule(portainer.ScheduleID(scheduleID)) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the schedule from the database", err} - } - - return response.Empty(w) -} diff --git a/api/http/handler/schedules/schedule_file.go b/api/http/handler/schedules/schedule_file.go deleted file mode 100644 index 263ff6eb2..000000000 --- a/api/http/handler/schedules/schedule_file.go +++ /dev/null @@ -1,49 +0,0 @@ -package schedules - -import ( - "errors" - "net/http" - - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" -) - -type scheduleFileResponse struct { - ScheduleFileContent string `json:"ScheduleFileContent"` -} - -// GET request on /api/schedules/:id/file -func (handler *Handler) scheduleFile(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - settings, err := handler.DataStore.Settings().Settings() - if err != nil { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} - } - if !settings.EnableHostManagementFeatures { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Host management features are disabled", portainer.ErrHostManagementFeaturesDisabled} - } - - scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} - } - - schedule, err := handler.DataStore.Schedule().Schedule(portainer.ScheduleID(scheduleID)) - if err == portainer.ErrObjectNotFound { - return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} - } else if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} - } - - if schedule.JobType != portainer.ScriptExecutionJobType { - return &httperror.HandlerError{http.StatusBadRequest, "Unable to retrieve script file", errors.New("This type of schedule do not have any associated script file")} - } - - scheduleFileContent, err := handler.FileService.GetFileContent(schedule.ScriptExecutionJob.ScriptPath) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve schedule script file from disk", err} - } - - return response.JSON(w, &scheduleFileResponse{ScheduleFileContent: string(scheduleFileContent)}) -} diff --git a/api/http/handler/schedules/schedule_inspect.go b/api/http/handler/schedules/schedule_inspect.go deleted file mode 100644 index 9067c81f7..000000000 --- a/api/http/handler/schedules/schedule_inspect.go +++ /dev/null @@ -1,35 +0,0 @@ -package schedules - -import ( - "net/http" - - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" - - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/request" -) - -func (handler *Handler) scheduleInspect(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - settings, err := handler.DataStore.Settings().Settings() - if err != nil { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} - } - if !settings.EnableHostManagementFeatures { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Host management features are disabled", portainer.ErrHostManagementFeaturesDisabled} - } - - scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} - } - - schedule, err := handler.DataStore.Schedule().Schedule(portainer.ScheduleID(scheduleID)) - if err == portainer.ErrObjectNotFound { - return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} - } else if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} - } - - return response.JSON(w, schedule) -} diff --git a/api/http/handler/schedules/schedule_list.go b/api/http/handler/schedules/schedule_list.go deleted file mode 100644 index 399e7e1bb..000000000 --- a/api/http/handler/schedules/schedule_list.go +++ /dev/null @@ -1,27 +0,0 @@ -package schedules - -import ( - "net/http" - - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" -) - -// GET request on /api/schedules -func (handler *Handler) scheduleList(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - settings, err := handler.DataStore.Settings().Settings() - if err != nil { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} - } - if !settings.EnableHostManagementFeatures { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Host management features are disabled", portainer.ErrHostManagementFeaturesDisabled} - } - - schedules, err := handler.DataStore.Schedule().Schedules() - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve schedules from the database", err} - } - - return response.JSON(w, schedules) -} diff --git a/api/http/handler/schedules/schedule_tasks.go b/api/http/handler/schedules/schedule_tasks.go deleted file mode 100644 index 48dfc35aa..000000000 --- a/api/http/handler/schedules/schedule_tasks.go +++ /dev/null @@ -1,114 +0,0 @@ -package schedules - -import ( - "encoding/json" - "errors" - "fmt" - "net/http" - "strconv" - - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" -) - -type taskContainer struct { - ID string `json:"Id"` - EndpointID portainer.EndpointID `json:"EndpointId"` - Status string `json:"Status"` - Created float64 `json:"Created"` - Labels map[string]string `json:"Labels"` - Edge bool `json:"Edge"` -} - -// GET request on /api/schedules/:id/tasks -func (handler *Handler) scheduleTasks(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - settings, err := handler.DataStore.Settings().Settings() - if err != nil { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} - } - if !settings.EnableHostManagementFeatures { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Host management features are disabled", portainer.ErrHostManagementFeaturesDisabled} - } - - scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} - } - - schedule, err := handler.DataStore.Schedule().Schedule(portainer.ScheduleID(scheduleID)) - if err == portainer.ErrObjectNotFound { - return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} - } else if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} - } - - if schedule.JobType != portainer.ScriptExecutionJobType { - return &httperror.HandlerError{http.StatusBadRequest, "Unable to retrieve schedule tasks", errors.New("This type of schedule do not have any associated tasks")} - } - - tasks := make([]taskContainer, 0) - - for _, endpointID := range schedule.ScriptExecutionJob.Endpoints { - endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) - if err == portainer.ErrObjectNotFound { - continue - } else if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an endpoint with the specified identifier inside the database", err} - } - - endpointTasks, err := extractTasksFromContainerSnasphot(endpoint, schedule.ID) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find extract schedule tasks from endpoint snapshot", err} - } - - tasks = append(tasks, endpointTasks...) - } - - if schedule.EdgeSchedule != nil { - for _, endpointID := range schedule.EdgeSchedule.Endpoints { - - cronTask := taskContainer{ - ID: fmt.Sprintf("schedule_%d", schedule.EdgeSchedule.ID), - EndpointID: endpointID, - Edge: true, - Status: "", - Created: 0, - Labels: map[string]string{}, - } - - tasks = append(tasks, cronTask) - } - } - - return response.JSON(w, tasks) -} - -func extractTasksFromContainerSnasphot(endpoint *portainer.Endpoint, scheduleID portainer.ScheduleID) ([]taskContainer, error) { - endpointTasks := make([]taskContainer, 0) - if len(endpoint.Snapshots) == 0 { - return endpointTasks, nil - } - - b, err := json.Marshal(endpoint.Snapshots[0].SnapshotRaw.Containers) - if err != nil { - return nil, err - } - - var containers []taskContainer - err = json.Unmarshal(b, &containers) - if err != nil { - return nil, err - } - - for _, container := range containers { - if container.Labels["io.portainer.schedule.id"] == strconv.Itoa(int(scheduleID)) { - container.EndpointID = endpoint.ID - container.Edge = false - endpointTasks = append(endpointTasks, container) - } - } - - return endpointTasks, nil -} diff --git a/api/http/handler/schedules/schedule_update.go b/api/http/handler/schedules/schedule_update.go deleted file mode 100644 index 2bf2faf39..000000000 --- a/api/http/handler/schedules/schedule_update.go +++ /dev/null @@ -1,175 +0,0 @@ -package schedules - -import ( - "encoding/base64" - "errors" - "net/http" - "strconv" - - "github.com/asaskevich/govalidator" - httperror "github.com/portainer/libhttp/error" - "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" - "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/cron" -) - -type scheduleUpdatePayload struct { - Name *string - Image *string - CronExpression *string - Recurring *bool - Endpoints []portainer.EndpointID - FileContent *string - RetryCount *int - RetryInterval *int -} - -func (payload *scheduleUpdatePayload) Validate(r *http.Request) error { - if payload.Name != nil && !govalidator.Matches(*payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { - return errors.New("Invalid schedule name format. Allowed characters are: [a-zA-Z0-9_.-]") - } - return nil -} - -func (handler *Handler) scheduleUpdate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - settings, err := handler.DataStore.Settings().Settings() - if err != nil { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Unable to retrieve settings", err} - } - if !settings.EnableHostManagementFeatures { - return &httperror.HandlerError{http.StatusServiceUnavailable, "Host management features are disabled", portainer.ErrHostManagementFeaturesDisabled} - } - - scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} - } - - var payload scheduleUpdatePayload - err = request.DecodeAndValidateJSONPayload(r, &payload) - if err != nil { - return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} - } - - schedule, err := handler.DataStore.Schedule().Schedule(portainer.ScheduleID(scheduleID)) - if err == portainer.ErrObjectNotFound { - return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} - } else if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} - } - - updateJobSchedule := false - if schedule.EdgeSchedule != nil { - err := handler.updateEdgeSchedule(schedule, &payload) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update Edge schedule", err} - } - } else { - updateJobSchedule = updateSchedule(schedule, &payload) - } - - if payload.FileContent != nil { - _, err := handler.FileService.StoreScheduledJobFileFromBytes(strconv.Itoa(scheduleID), []byte(*payload.FileContent)) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist script file changes on the filesystem", err} - } - updateJobSchedule = true - } - - if updateJobSchedule { - jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.DataStore, handler.FileService) - jobRunner := cron.NewScriptExecutionJobRunner(schedule, jobContext) - err := handler.JobScheduler.UpdateJobSchedule(jobRunner) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update job scheduler", err} - } - } - - err = handler.DataStore.Schedule().UpdateSchedule(portainer.ScheduleID(scheduleID), schedule) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist schedule changes inside the database", err} - } - - return response.JSON(w, schedule) -} - -func (handler *Handler) updateEdgeSchedule(schedule *portainer.Schedule, payload *scheduleUpdatePayload) error { - if payload.Name != nil { - schedule.Name = *payload.Name - } - - if payload.Endpoints != nil { - - edgeEndpointIDs := make([]portainer.EndpointID, 0) - - for _, ID := range payload.Endpoints { - endpoint, err := handler.DataStore.Endpoint().Endpoint(ID) - if err != nil { - return err - } - - if endpoint.Type == portainer.EdgeAgentEnvironment { - edgeEndpointIDs = append(edgeEndpointIDs, endpoint.ID) - } - } - - schedule.EdgeSchedule.Endpoints = edgeEndpointIDs - } - - if payload.CronExpression != nil { - schedule.EdgeSchedule.CronExpression = *payload.CronExpression - schedule.EdgeSchedule.Version++ - } - - if payload.FileContent != nil { - schedule.EdgeSchedule.Script = base64.RawStdEncoding.EncodeToString([]byte(*payload.FileContent)) - schedule.EdgeSchedule.Version++ - } - - for _, endpointID := range schedule.EdgeSchedule.Endpoints { - handler.ReverseTunnelService.AddSchedule(endpointID, schedule.EdgeSchedule) - } - - return nil -} - -func updateSchedule(schedule *portainer.Schedule, payload *scheduleUpdatePayload) bool { - updateJobSchedule := false - - if payload.Name != nil { - schedule.Name = *payload.Name - } - - if payload.Endpoints != nil { - schedule.ScriptExecutionJob.Endpoints = payload.Endpoints - updateJobSchedule = true - } - - if payload.CronExpression != nil { - schedule.CronExpression = *payload.CronExpression - updateJobSchedule = true - } - - if payload.Recurring != nil { - schedule.Recurring = *payload.Recurring - updateJobSchedule = true - } - - if payload.Image != nil { - schedule.ScriptExecutionJob.Image = *payload.Image - updateJobSchedule = true - } - - if payload.RetryCount != nil { - schedule.ScriptExecutionJob.RetryCount = *payload.RetryCount - updateJobSchedule = true - } - - if payload.RetryInterval != nil { - schedule.ScriptExecutionJob.RetryInterval = *payload.RetryInterval - updateJobSchedule = true - } - - return updateJobSchedule -} diff --git a/api/http/handler/settings/handler.go b/api/http/handler/settings/handler.go index 8534b1867..c4471a2c5 100644 --- a/api/http/handler/settings/handler.go +++ b/api/http/handler/settings/handler.go @@ -8,6 +8,7 @@ import ( httperror "github.com/portainer/libhttp/error" "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/http/security" + "github.com/portainer/portainer/api/internal/snapshot" ) func hideFields(settings *portainer.Settings) { @@ -21,9 +22,9 @@ type Handler struct { AuthorizationService *authorization.Service DataStore portainer.DataStore FileService portainer.FileService - JobScheduler portainer.JobScheduler JWTService portainer.JWTService LDAPService portainer.LDAPService + SnapshotService *snapshot.Service } // NewHandler creates a handler to manage settings operations. diff --git a/api/http/handler/settings/settings_update.go b/api/http/handler/settings/settings_update.go index 81c1aa249..79047e146 100644 --- a/api/http/handler/settings/settings_update.go +++ b/api/http/handler/settings/settings_update.go @@ -186,26 +186,11 @@ func (handler *Handler) updateVolumeBrowserSetting(settings *portainer.Settings) func (handler *Handler) updateSnapshotInterval(settings *portainer.Settings, snapshotInterval string) error { settings.SnapshotInterval = snapshotInterval - schedules, err := handler.DataStore.Schedule().SchedulesByJobType(portainer.SnapshotJobType) + err := handler.SnapshotService.SetSnapshotInterval(snapshotInterval) if err != nil { return err } - if len(schedules) != 0 { - snapshotSchedule := schedules[0] - snapshotSchedule.CronExpression = "@every " + snapshotInterval - - err := handler.JobScheduler.UpdateSystemJobSchedule(portainer.SnapshotJobType, snapshotSchedule.CronExpression) - if err != nil { - return err - } - - err = handler.DataStore.Schedule().UpdateSchedule(snapshotSchedule.ID, &snapshotSchedule) - if err != nil { - return err - } - } - return nil } diff --git a/api/http/security/bouncer.go b/api/http/security/bouncer.go index 3fc61656a..1fe96ad32 100644 --- a/api/http/security/bouncer.go +++ b/api/http/security/bouncer.go @@ -362,3 +362,22 @@ func (bouncer *RequestBouncer) newRestrictedContextRequest(userID portainer.User return requestContext, nil } + +// EdgeComputeOperation defines a restriced edge compute operation. +// Use of this operation will only be authorized if edgeCompute is enabled in settings +func (bouncer *RequestBouncer) EdgeComputeOperation(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + settings, err := bouncer.dataStore.Settings().Settings() + if err != nil { + httperror.WriteError(w, http.StatusServiceUnavailable, "Unable to retrieve settings", err) + return + } + + if !settings.EnableEdgeComputeFeatures { + httperror.WriteError(w, http.StatusServiceUnavailable, "Edge compute features are disabled", errors.New("Edge compute features are disabled")) + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/api/http/server.go b/api/http/server.go index 11987901d..b86378512 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -8,6 +8,7 @@ import ( "github.com/portainer/portainer/api/http/handler/edgetemplates" "github.com/portainer/portainer/api/http/handler/endpointedge" "github.com/portainer/portainer/api/http/handler/support" + "github.com/portainer/portainer/api/internal/snapshot" "github.com/portainer/portainer/api/http/handler/roles" @@ -16,6 +17,7 @@ import ( "github.com/portainer/portainer/api/http/handler" "github.com/portainer/portainer/api/http/handler/auth" "github.com/portainer/portainer/api/http/handler/dockerhub" + "github.com/portainer/portainer/api/http/handler/edgejobs" "github.com/portainer/portainer/api/http/handler/endpointgroups" "github.com/portainer/portainer/api/http/handler/endpointproxy" "github.com/portainer/portainer/api/http/handler/endpoints" @@ -24,7 +26,6 @@ import ( "github.com/portainer/portainer/api/http/handler/motd" "github.com/portainer/portainer/api/http/handler/registries" "github.com/portainer/portainer/api/http/handler/resourcecontrols" - "github.com/portainer/portainer/api/http/handler/schedules" "github.com/portainer/portainer/api/http/handler/settings" "github.com/portainer/portainer/api/http/handler/stacks" "github.com/portainer/portainer/api/http/handler/status" @@ -54,7 +55,7 @@ type Server struct { ComposeStackManager portainer.ComposeStackManager CryptoService portainer.CryptoService SignatureService portainer.DigitalSignatureService - JobScheduler portainer.JobScheduler + SnapshotService *snapshot.Service Snapshotter portainer.Snapshotter FileService portainer.FileService DataStore portainer.DataStore @@ -67,7 +68,6 @@ type Server struct { SSLCert string SSLKey string DockerClientFactory *docker.ClientFactory - JobService portainer.JobService } // Start starts the HTTP server @@ -98,6 +98,11 @@ func (server *Server) Start() error { var edgeGroupsHandler = edgegroups.NewHandler(requestBouncer) edgeGroupsHandler.DataStore = server.DataStore + var edgeJobsHandler = edgejobs.NewHandler(requestBouncer) + edgeJobsHandler.DataStore = server.DataStore + edgeJobsHandler.FileService = server.FileService + edgeJobsHandler.ReverseTunnelService = server.ReverseTunnelService + var edgeStacksHandler = edgestacks.NewHandler(requestBouncer) edgeStacksHandler.DataStore = server.DataStore edgeStacksHandler.FileService = server.FileService @@ -110,7 +115,6 @@ func (server *Server) Start() error { endpointHandler.DataStore = server.DataStore endpointHandler.AuthorizationService = authorizationService endpointHandler.FileService = server.FileService - endpointHandler.JobService = server.JobService endpointHandler.ProxyManager = proxyManager endpointHandler.ReverseTunnelService = server.ReverseTunnelService endpointHandler.Snapshotter = server.Snapshotter @@ -118,6 +122,7 @@ func (server *Server) Start() error { var endpointEdgeHandler = endpointedge.NewHandler(requestBouncer) endpointEdgeHandler.DataStore = server.DataStore endpointEdgeHandler.FileService = server.FileService + endpointEdgeHandler.ReverseTunnelService = server.ReverseTunnelService var endpointGroupHandler = endpointgroups.NewHandler(requestBouncer) endpointGroupHandler.DataStore = server.DataStore @@ -145,20 +150,13 @@ func (server *Server) Start() error { var resourceControlHandler = resourcecontrols.NewHandler(requestBouncer) resourceControlHandler.DataStore = server.DataStore - var schedulesHandler = schedules.NewHandler(requestBouncer) - schedulesHandler.DataStore = server.DataStore - schedulesHandler.FileService = server.FileService - schedulesHandler.JobService = server.JobService - schedulesHandler.JobScheduler = server.JobScheduler - schedulesHandler.ReverseTunnelService = server.ReverseTunnelService - var settingsHandler = settings.NewHandler(requestBouncer) settingsHandler.AuthorizationService = authorizationService settingsHandler.DataStore = server.DataStore settingsHandler.FileService = server.FileService - settingsHandler.JobScheduler = server.JobScheduler settingsHandler.JWTService = server.JWTService settingsHandler.LDAPService = server.LDAPService + settingsHandler.SnapshotService = server.SnapshotService var stackHandler = stacks.NewHandler(requestBouncer) stackHandler.DataStore = server.DataStore @@ -207,6 +205,7 @@ func (server *Server) Start() error { AuthHandler: authHandler, DockerHubHandler: dockerHubHandler, EdgeGroupsHandler: edgeGroupsHandler, + EdgeJobsHandler: edgeJobsHandler, EdgeStacksHandler: edgeStacksHandler, EdgeTemplatesHandler: edgeTemplatesHandler, EndpointGroupHandler: endpointGroupHandler, @@ -230,7 +229,6 @@ func (server *Server) Start() error { UserHandler: userHandler, WebSocketHandler: websocketHandler, WebhookHandler: webhookHandler, - SchedulesHanlder: schedulesHandler, } if server.SSL { diff --git a/api/internal/snapshot/snapshot.go b/api/internal/snapshot/snapshot.go new file mode 100644 index 000000000..f5c0c49a5 --- /dev/null +++ b/api/internal/snapshot/snapshot.go @@ -0,0 +1,129 @@ +package snapshot + +import ( + "log" + "time" + + "github.com/portainer/portainer/api" +) + +// Service repesents a service to manage system snapshots +type Service struct { + dataStore portainer.DataStore + refreshSignal chan struct{} + snapshotIntervalInSeconds float64 + snapshotter portainer.Snapshotter +} + +// NewService creates a new instance of a service +func NewService(snapshotInterval string, dataStore portainer.DataStore, snapshotter portainer.Snapshotter) (*Service, error) { + snapshotFrequency, err := time.ParseDuration(snapshotInterval) + if err != nil { + return nil, err + } + + return &Service{ + dataStore: dataStore, + snapshotIntervalInSeconds: snapshotFrequency.Seconds(), + snapshotter: snapshotter, + }, nil +} + +// Start starts the service +func (service *Service) Start() { + if service.refreshSignal != nil { + return + } + + service.refreshSignal = make(chan struct{}) + service.startSnapshotLoop() +} + +func (service *Service) stop() { + if service.refreshSignal == nil { + return + } + + close(service.refreshSignal) +} + +// SetSnapshotInterval sets the snapshot interval and resets the service +func (service *Service) SetSnapshotInterval(snapshotInterval string) error { + service.stop() + + snapshotFrequency, err := time.ParseDuration(snapshotInterval) + if err != nil { + return err + } + service.snapshotIntervalInSeconds = snapshotFrequency.Seconds() + + service.Start() + + return nil +} + +func (service *Service) startSnapshotLoop() error { + ticker := time.NewTicker(time.Duration(service.snapshotIntervalInSeconds) * time.Second) + go func() { + err := service.snapshotEndpoints() + if err != nil { + log.Printf("[ERROR] [internal,snapshot] [message: background schedule error (endpoint snapshot).] [error: %s]", err) + } + + for { + select { + case <-ticker.C: + err := service.snapshotEndpoints() + if err != nil { + log.Printf("[ERROR] [internal,snapshot] [message: background schedule error (endpoint snapshot).] [error: %s]", err) + } + + case <-service.refreshSignal: + log.Println("[DEBUG] [internal,snapshot] [message: shutting down Snapshot service]") + ticker.Stop() + return + } + } + }() + + return nil +} + +func (service *Service) snapshotEndpoints() error { + endpoints, err := service.dataStore.Endpoint().Endpoints() + if err != nil { + return err + } + + for _, endpoint := range endpoints { + if endpoint.Type == portainer.EdgeAgentEnvironment { + continue + } + + snapshot, snapshotError := service.snapshotter.CreateSnapshot(&endpoint) + + latestEndpointReference, err := service.dataStore.Endpoint().Endpoint(endpoint.ID) + if latestEndpointReference == nil { + log.Printf("background schedule error (endpoint snapshot). Endpoint not found inside the database anymore (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) + continue + } + + latestEndpointReference.Status = portainer.EndpointStatusUp + if snapshotError != nil { + log.Printf("background schedule error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, snapshotError) + latestEndpointReference.Status = portainer.EndpointStatusDown + } + + if snapshot != nil { + latestEndpointReference.Snapshots = []portainer.Snapshot{*snapshot} + } + + err = service.dataStore.Endpoint().UpdateEndpoint(latestEndpointReference.ID, latestEndpointReference) + if err != nil { + log.Printf("background schedule error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) + continue + } + } + + return nil +} diff --git a/api/portainer.go b/api/portainer.go index 4182ce66d..05edb3866 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -76,6 +76,7 @@ type ( DockerHub() DockerHubService EdgeGroup() EdgeGroupService + EdgeJob() EdgeJobService EdgeStack() EdgeStackService Endpoint() EndpointService EndpointGroup() EndpointGroupService @@ -84,7 +85,6 @@ type ( Registry() RegistryService ResourceControl() ResourceControlService Role() RoleService - Schedule() ScheduleService Settings() SettingsService Stack() StackService Tag() TagService @@ -117,7 +117,32 @@ type ( // EdgeGroupID represents an Edge group identifier EdgeGroupID int + // EdgeJob represents a job that can run on Edge environments. + EdgeJob struct { + ID EdgeJobID `json:"Id"` + Created int64 `json:"Created"` + CronExpression string `json:"CronExpression"` + Endpoints map[EndpointID]EdgeJobEndpointMeta `json:"Endpoints"` + Name string `json:"Name"` + ScriptPath string `json:"ScriptPath"` + Recurring bool `json:"Recurring"` + Version int `json:"Version"` + } + + // EdgeJobEndpointMeta represents a meta data object for an Edge job and Endpoint relation + EdgeJobEndpointMeta struct { + LogsStatus EdgeJobLogsStatus + CollectLogs bool + } + + // EdgeJobID represents an Edge job identifier + EdgeJobID int + + // EdgeJobLogsStatus represent status of logs collection job + EdgeJobLogsStatus int + // EdgeSchedule represents a scheduled job that can run on Edge environments. + // Deprecated in favor of EdgeJob EdgeSchedule struct { ID ScheduleID `json:"Id"` CronExpression string `json:"CronExpression"` @@ -417,22 +442,19 @@ type ( // It only contains a pointer to one of the JobRunner implementations // based on the JobType. // NOTE: The Recurring option is only used by ScriptExecutionJob at the moment + // Deprecated in favor of EdgeJob Schedule struct { - ID ScheduleID `json:"Id"` - Name string - CronExpression string - Recurring bool - Created int64 - JobType JobType - EdgeSchedule *EdgeSchedule - ScriptExecutionJob *ScriptExecutionJob - SnapshotJob *SnapshotJob - - // Deprecated fields - EndpointSyncJob *EndpointSyncJob + ID ScheduleID `json:"Id"` + Name string + CronExpression string + Recurring bool + Created int64 + JobType JobType + EdgeSchedule *EdgeSchedule } // ScheduleID represents a schedule identifier. + // Deprecated in favor of EdgeJob ScheduleID int // ScriptExecutionJob represents a scheduled job that can execute a script via a privileged container @@ -484,9 +506,6 @@ type ( SnapshotRaw SnapshotRaw `json:"SnapshotRaw"` } - // SnapshotJob represents a scheduled job that can create endpoint snapshots - SnapshotJob struct{} - // SnapshotRaw represents all the information related to a snapshot as returned by the Docker API SnapshotRaw struct { Containers interface{} `json:"Containers"` @@ -664,7 +683,7 @@ type ( Status string LastActivity time.Time Port int - Schedules []EdgeSchedule + Jobs []EdgeJob Credentials string } @@ -741,6 +760,35 @@ type ( UpdateDockerHub(registry *DockerHub) error } + // EdgeGroupService represents a service to manage Edge groups + EdgeGroupService interface { + EdgeGroups() ([]EdgeGroup, error) + EdgeGroup(ID EdgeGroupID) (*EdgeGroup, error) + CreateEdgeGroup(group *EdgeGroup) error + UpdateEdgeGroup(ID EdgeGroupID, group *EdgeGroup) error + DeleteEdgeGroup(ID EdgeGroupID) error + } + + // EdgeJobService represents a service to manage Edge jobs + EdgeJobService interface { + EdgeJobs() ([]EdgeJob, error) + EdgeJob(ID EdgeJobID) (*EdgeJob, error) + CreateEdgeJob(edgeJob *EdgeJob) error + UpdateEdgeJob(ID EdgeJobID, edgeJob *EdgeJob) error + DeleteEdgeJob(ID EdgeJobID) error + GetNextIdentifier() int + } + + // EdgeStackService represents a service to manage Edge stacks + EdgeStackService interface { + EdgeStacks() ([]EdgeStack, error) + EdgeStack(ID EdgeStackID) (*EdgeStack, error) + CreateEdgeStack(edgeStack *EdgeStack) error + UpdateEdgeStack(ID EdgeStackID, edgeStack *EdgeStack) error + DeleteEdgeStack(ID EdgeStackID) error + GetNextIdentifier() int + } + // EndpointService represents a service for managing endpoint data EndpointService interface { Endpoint(ID EndpointID) (*Endpoint, error) @@ -806,8 +854,11 @@ type ( LoadKeyPair() ([]byte, []byte, error) WriteJSONToFile(path string, content interface{}) error FileExists(path string) (bool, error) - StoreScheduledJobFileFromBytes(identifier string, data []byte) (string, error) - GetScheduleFolder(identifier string) string + StoreEdgeJobFileFromBytes(identifier string, data []byte) (string, error) + GetEdgeJobFolder(identifier string) string + ClearEdgeJobTaskLogs(edgeJobID, taskID string) error + GetEdgeJobTaskLogFileContent(edgeJobID, taskID string) (string, error) + StoreEdgeJobTaskLogFileFromBytes(edgeJobID, taskID string, data []byte) error ExtractExtensionArchive(data []byte) error GetBinaryFolder() string } @@ -818,26 +869,6 @@ type ( ClonePrivateRepositoryWithBasicAuth(repositoryURL, referenceName string, destination, username, password string) error } - // JobRunner represents a service that can be used to run a job - JobRunner interface { - Run() - GetSchedule() *Schedule - } - - // JobScheduler represents a service to run jobs on a periodic basis - JobScheduler interface { - ScheduleJob(runner JobRunner) error - UpdateJobSchedule(runner JobRunner) error - UpdateSystemJobSchedule(jobType JobType, newCronExpression string) error - UnscheduleJob(ID ScheduleID) - Start() - } - - // JobService represents a service to manage job execution on hosts - JobService interface { - ExecuteScript(endpoint *Endpoint, nodeName, image string, script []byte, schedule *Schedule) error - } - // JWTService represents a service for managing JWT tokens JWTService interface { GenerateToken(data *TokenData) (string, error) @@ -879,8 +910,8 @@ type ( SetTunnelStatusToRequired(endpointID EndpointID) error SetTunnelStatusToIdle(endpointID EndpointID) GetTunnelDetails(endpointID EndpointID) *TunnelDetails - AddSchedule(endpointID EndpointID, schedule *EdgeSchedule) - RemoveSchedule(scheduleID ScheduleID) + AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob) + RemoveEdgeJob(edgeJobID EdgeJobID) } // RoleService represents a service for managing user roles @@ -891,17 +922,6 @@ type ( UpdateRole(ID RoleID, role *Role) error } - // ScheduleService represents a service for managing schedule data - ScheduleService interface { - Schedule(ID ScheduleID) (*Schedule, error) - Schedules() ([]Schedule, error) - SchedulesByJobType(jobType JobType) ([]Schedule, error) - CreateSchedule(schedule *Schedule) error - UpdateSchedule(ID ScheduleID, schedule *Schedule) error - DeleteSchedule(ID ScheduleID) error - GetNextIdentifier() int - } - // SettingsService represents a service for managing application settings SettingsService interface { Settings() (*Settings, error) @@ -1001,25 +1021,6 @@ type ( WebhookByToken(token string) (*Webhook, error) DeleteWebhook(serviceID WebhookID) error } - - // EdgeGroupService represents a service to manage Edge groups - EdgeGroupService interface { - EdgeGroups() ([]EdgeGroup, error) - EdgeGroup(ID EdgeGroupID) (*EdgeGroup, error) - CreateEdgeGroup(group *EdgeGroup) error - UpdateEdgeGroup(ID EdgeGroupID, group *EdgeGroup) error - DeleteEdgeGroup(ID EdgeGroupID) error - } - - // EdgeStackService represents a service to manage Edge stacks - EdgeStackService interface { - EdgeStacks() ([]EdgeStack, error) - EdgeStack(ID EdgeStackID) (*EdgeStack, error) - CreateEdgeStack(edgeStack *EdgeStack) error - UpdateEdgeStack(ID EdgeStackID, edgeStack *EdgeStack) error - DeleteEdgeStack(ID EdgeStackID) error - GetNextIdentifier() int - } ) const ( @@ -1072,6 +1073,16 @@ const ( AuthenticationOAuth ) +const ( + _ EdgeJobLogsStatus = iota + // EdgeJobLogsStatusIdle represents an idle log collection job + EdgeJobLogsStatusIdle + // EdgeJobLogsStatusPending represents a pending log collection job + EdgeJobLogsStatusPending + // EdgeJobLogsStatusCollected represents a completed log collection job + EdgeJobLogsStatusCollected +) + const ( _ EdgeStackStatusType = iota //StatusOk represents a successfully deployed edge stack @@ -1120,14 +1131,8 @@ const ( const ( _ JobType = iota - // ScriptExecutionJobType is a non-system job used to execute a script against a list of - // endpoints via privileged containers - ScriptExecutionJobType // SnapshotJobType is a system job used to create endpoint snapshots - SnapshotJobType - // EndpointSyncJobType is a system job used to synchronize endpoints from - // an external definition store (Deprecated) - EndpointSyncJobType + SnapshotJobType = 2 ) const ( diff --git a/app/constants.js b/app/constants.js index 3df08083c..9ea17dcd3 100644 --- a/app/constants.js +++ b/app/constants.js @@ -3,6 +3,7 @@ angular .constant('API_ENDPOINT_AUTH', 'api/auth') .constant('API_ENDPOINT_DOCKERHUB', 'api/dockerhub') .constant('API_ENDPOINT_EDGE_GROUPS', 'api/edge_groups') + .constant('API_ENDPOINT_EDGE_JOBS', 'api/edge_jobs') .constant('API_ENDPOINT_EDGE_STACKS', 'api/edge_stacks') .constant('API_ENDPOINT_EDGE_TEMPLATES', 'api/edge_templates') .constant('API_ENDPOINT_ENDPOINTS', 'api/endpoints') @@ -11,7 +12,6 @@ angular .constant('API_ENDPOINT_EXTENSIONS', 'api/extensions') .constant('API_ENDPOINT_REGISTRIES', 'api/registries') .constant('API_ENDPOINT_RESOURCE_CONTROLS', 'api/resource_controls') - .constant('API_ENDPOINT_SCHEDULES', 'api/schedules') .constant('API_ENDPOINT_SETTINGS', 'api/settings') .constant('API_ENDPOINT_STACKS', 'api/stacks') .constant('API_ENDPOINT_STATUS', 'api/status') diff --git a/app/docker/__module.js b/app/docker/__module.js index 8a1ecdaa3..816877528 100644 --- a/app/docker/__module.js +++ b/app/docker/__module.js @@ -175,16 +175,6 @@ angular.module('portainer.docker', ['portainer.app']).config([ }, }; - var hostJob = { - name: 'docker.host.job', - url: '/job', - views: { - 'content@': { - component: 'hostJobView', - }, - }, - }; - var events = { name: 'docker.events', url: '/events', @@ -299,16 +289,6 @@ angular.module('portainer.docker', ['portainer.app']).config([ }, }; - var nodeJob = { - name: 'docker.nodes.node.job', - url: '/job', - views: { - 'content@': { - component: 'nodeJobView', - }, - }, - }; - var secrets = { name: 'docker.secrets', url: '/secrets', @@ -495,7 +475,6 @@ angular.module('portainer.docker', ['portainer.app']).config([ $stateRegistryProvider.register(dashboard); $stateRegistryProvider.register(host); $stateRegistryProvider.register(hostBrowser); - $stateRegistryProvider.register(hostJob); $stateRegistryProvider.register(events); $stateRegistryProvider.register(images); $stateRegistryProvider.register(image); @@ -507,7 +486,6 @@ angular.module('portainer.docker', ['portainer.app']).config([ $stateRegistryProvider.register(nodes); $stateRegistryProvider.register(node); $stateRegistryProvider.register(nodeBrowser); - $stateRegistryProvider.register(nodeJob); $stateRegistryProvider.register(secrets); $stateRegistryProvider.register(secret); $stateRegistryProvider.register(secretCreation); diff --git a/app/docker/components/datatables/host-jobs-datatable/jobsDatatable.html b/app/docker/components/datatables/host-jobs-datatable/jobsDatatable.html deleted file mode 100644 index 16e44d5e0..000000000 --- a/app/docker/components/datatables/host-jobs-datatable/jobsDatatable.html +++ /dev/null @@ -1,130 +0,0 @@ -
- - Id - - - - | -
-
- State
-
-
-
-
- Filter
- Filter
-
-
- |
- - - - - Created - - | -||||||
---|---|---|---|---|---|---|---|---|
- {{ item | containername }} - | -- {{ item.Status }} - - {{ item.Status }} - | -- {{ item.Created | getisodatefromtimestamp }} - | -||||||
Loading... | -||||||||
No jobs available. | -
+ + + + Endpoint + + | ++ Actions + | +|||||||
---|---|---|---|---|---|---|---|---|
+ {{ item.Endpoint.Name }} + | +
+ |
+ |||||||
Loading... | +||||||||
No result available. | +
+ + This is a beta feature. +
+ +- - - - Endpoint - - | -- - Id - - - - | -- - State - - - - | -- - - - Created - - | -|||||
---|---|---|---|---|---|---|---|---|
- {{ item.Endpoint.Name }} - Download logs - | -- {{ item.Id | truncate: 32 }} - - - | -- {{ item.Status }} - - - | -- {{ item.Created | getisodatefromtimestamp }} - - - | -|||||
Loading... | -||||||||
No tasks available. | -