diff --git a/api/bolt/datastore.go b/api/bolt/datastore.go index 192bc0271..ccce5576c 100644 --- a/api/bolt/datastore.go +++ b/api/bolt/datastore.go @@ -13,6 +13,7 @@ import ( "github.com/portainer/portainer/bolt/migrator" "github.com/portainer/portainer/bolt/registry" "github.com/portainer/portainer/bolt/resourcecontrol" + "github.com/portainer/portainer/bolt/schedule" "github.com/portainer/portainer/bolt/settings" "github.com/portainer/portainer/bolt/stack" "github.com/portainer/portainer/bolt/tag" @@ -49,6 +50,7 @@ type Store struct { UserService *user.Service VersionService *version.Service WebhookService *webhook.Service + ScheduleService *schedule.Service } // NewStore initializes a new Store and the associated services @@ -240,5 +242,11 @@ func (store *Store) initServices() error { } store.WebhookService = webhookService + scheduleService, err := schedule.NewService(store.db) + if err != nil { + return err + } + store.ScheduleService = scheduleService + return nil } diff --git a/api/bolt/schedule/schedule.go b/api/bolt/schedule/schedule.go new file mode 100644 index 000000000..dab557dce --- /dev/null +++ b/api/bolt/schedule/schedule.go @@ -0,0 +1,103 @@ +package schedule + +import ( + "github.com/portainer/portainer" + "github.com/portainer/portainer/bolt/internal" + + "github.com/boltdb/bolt" +) + +const ( + // BucketName represents the name of the bucket where this service stores data. + BucketName = "schedules" +) + +// Service represents a service for managing schedule data. +type Service struct { + db *bolt.DB +} + +// NewService creates a new instance of a service. +func NewService(db *bolt.DB) (*Service, error) { + err := internal.CreateBucket(db, BucketName) + if err != nil { + return nil, err + } + + return &Service{ + db: db, + }, nil +} + +// Schedule returns a schedule by ID. +func (service *Service) Schedule(ID portainer.ScheduleID) (*portainer.Schedule, error) { + var schedule portainer.Schedule + identifier := internal.Itob(int(ID)) + + err := internal.GetObject(service.db, BucketName, identifier, &schedule) + if err != nil { + return nil, err + } + + return &schedule, nil +} + +// UpdateSchedule updates a schedule. +func (service *Service) UpdateSchedule(ID portainer.ScheduleID, schedule *portainer.Schedule) error { + identifier := internal.Itob(int(ID)) + return internal.UpdateObject(service.db, BucketName, identifier, schedule) +} + +// DeleteSchedule deletes a schedule. +func (service *Service) DeleteSchedule(ID portainer.ScheduleID) error { + identifier := internal.Itob(int(ID)) + return internal.DeleteObject(service.db, BucketName, identifier) +} + +// Schedules return a array containing all the schedules. +func (service *Service) Schedules() ([]portainer.Schedule, error) { + var schedules = make([]portainer.Schedule, 0) + + err := service.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(BucketName)) + + cursor := bucket.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + var schedule portainer.Schedule + err := internal.UnmarshalObject(v, &schedule) + if err != nil { + return err + } + schedules = append(schedules, schedule) + } + + return nil + }) + + return schedules, err +} + +// CreateSchedule assign an ID to a new schedule and saves it. +func (service *Service) CreateSchedule(schedule *portainer.Schedule) error { + return service.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(BucketName)) + + // We manually manage sequences for schedules + err := bucket.SetSequence(uint64(schedule.ID)) + if err != nil { + return err + } + + data, err := internal.MarshalObject(schedule) + if err != nil { + return err + } + + return bucket.Put(internal.Itob(int(schedule.ID)), data) + }) +} + +// GetNextIdentifier returns the next identifier for a schedule. +func (service *Service) GetNextIdentifier() int { + return internal.GetNextIdentifier(service.db, BucketName) +} diff --git a/api/cmd/portainer/main.go b/api/cmd/portainer/main.go index 7602b2a90..9e344dde1 100644 --- a/api/cmd/portainer/main.go +++ b/api/cmd/portainer/main.go @@ -111,18 +111,32 @@ func initSnapshotter(clientFactory *docker.ClientFactory) portainer.Snapshotter } func initJobScheduler(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter, flags *portainer.CLIFlags) (portainer.JobScheduler, error) { - jobScheduler := cron.NewJobScheduler(endpointService, snapshotter) + jobScheduler := cron.NewJobScheduler() if *flags.ExternalEndpoints != "" { log.Println("Using external endpoint definition. Endpoint management via the API will be disabled.") - err := jobScheduler.ScheduleEndpointSyncJob(*flags.ExternalEndpoints, *flags.SyncInterval) + + endpointSyncTaskContext := &cron.EndpointSyncTaskContext{ + EndpointService: endpointService, + EndpointFilePath: *flags.ExternalEndpoints, + } + endpointSyncTask := cron.NewEndpointSyncTask(endpointSyncTaskContext) + + err := jobScheduler.ScheduleTask("@every "+*flags.SyncInterval, endpointSyncTask) if err != nil { return nil, err } } if *flags.Snapshot { - err := jobScheduler.ScheduleSnapshotJob(*flags.SnapshotInterval) + + endpointSnapshotTaskContext := &cron.SnapshotTaskContext{ + EndpointService: endpointService, + Snapshotter: snapshotter, + } + endpointSnapshotTask := cron.NewSnapshotTask(endpointSnapshotTaskContext) + + err := jobScheduler.ScheduleTask("@every "+*flags.SnapshotInterval, endpointSnapshotTask) if err != nil { return nil, err } @@ -131,6 +145,31 @@ func initJobScheduler(endpointService portainer.EndpointService, snapshotter por return jobScheduler, nil } +func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService portainer.JobService, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, fileService portainer.FileService) error { + schedules, err := scheduleService.Schedules() + if err != nil { + return err + } + + for _, schedule := range schedules { + taskContext := &cron.ScriptTaskContext{ + JobService: jobService, + EndpointService: endpointService, + FileService: fileService, + ScheduleID: schedule.ID, + TargetEndpoints: schedule.Endpoints, + } + + schedule.Task.(cron.ScriptTask).SetContext(taskContext) + err = jobScheduler.ScheduleTask(schedule.CronExpression, schedule.Task) + if err != nil { + return err + } + } + + return nil +} + func initStatus(endpointManagement, snapshot bool, flags *portainer.CLIFlags) *portainer.Status { return &portainer.Status{ Analytics: !*flags.NoAnalytics, @@ -421,6 +460,11 @@ func main() { log.Fatal(err) } + err = loadSchedulesFromDatabase(jobScheduler, jobService, store.ScheduleService, store.EndpointService, fileService) + if err != nil { + log.Fatal(err) + } + jobScheduler.Start() endpointManagement := true @@ -509,6 +553,7 @@ func main() { RegistryService: store.RegistryService, DockerHubService: store.DockerHubService, StackService: store.StackService, + ScheduleService: store.ScheduleService, TagService: store.TagService, TemplateService: store.TemplateService, WebhookService: store.WebhookService, diff --git a/api/cron/job_endpoint_snapshot.go b/api/cron/job_endpoint_snapshot.go deleted file mode 100644 index ff7cc333b..000000000 --- a/api/cron/job_endpoint_snapshot.go +++ /dev/null @@ -1,60 +0,0 @@ -package cron - -import ( - "log" - - "github.com/portainer/portainer" -) - -type ( - endpointSnapshotJob struct { - endpointService portainer.EndpointService - snapshotter portainer.Snapshotter - } -) - -func newEndpointSnapshotJob(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) endpointSnapshotJob { - return endpointSnapshotJob{ - endpointService: endpointService, - snapshotter: snapshotter, - } -} - -func (job endpointSnapshotJob) Snapshot() error { - - endpoints, err := job.endpointService.Endpoints() - if err != nil { - return err - } - - for _, endpoint := range endpoints { - if endpoint.Type == portainer.AzureEnvironment { - continue - } - - snapshot, err := job.snapshotter.CreateSnapshot(&endpoint) - endpoint.Status = portainer.EndpointStatusUp - if err != nil { - log.Printf("cron error: endpoint snapshot error (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) - endpoint.Status = portainer.EndpointStatusDown - } - - if snapshot != nil { - endpoint.Snapshots = []portainer.Snapshot{*snapshot} - } - - err = job.endpointService.UpdateEndpoint(endpoint.ID, &endpoint) - if err != nil { - return err - } - } - - return nil -} - -func (job endpointSnapshotJob) Run() { - err := job.Snapshot() - if err != nil { - log.Printf("cron error: snapshot job error (err=%s)\n", err) - } -} diff --git a/api/cron/scheduler.go b/api/cron/scheduler.go index 42abee371..6a6dc0159 100644 --- a/api/cron/scheduler.go +++ b/api/cron/scheduler.go @@ -1,77 +1,93 @@ package cron import ( - "log" - "github.com/portainer/portainer" "github.com/robfig/cron" ) // JobScheduler represents a service for managing crons. type JobScheduler struct { - cron *cron.Cron - endpointService portainer.EndpointService - snapshotter portainer.Snapshotter - - endpointFilePath string - endpointSyncInterval string + cron *cron.Cron } // NewJobScheduler initializes a new service. -func NewJobScheduler(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) *JobScheduler { +func NewJobScheduler() *JobScheduler { return &JobScheduler{ - cron: cron.New(), - endpointService: endpointService, - snapshotter: snapshotter, + cron: cron.New(), } } -// ScheduleEndpointSyncJob schedules a cron job to synchronize the endpoints from a file -func (scheduler *JobScheduler) ScheduleEndpointSyncJob(endpointFilePath string, interval string) error { - - scheduler.endpointFilePath = endpointFilePath - scheduler.endpointSyncInterval = interval - - job := newEndpointSyncJob(endpointFilePath, scheduler.endpointService) - - err := job.Sync() - if err != nil { - return err - } - - return scheduler.cron.AddJob("@every "+interval, job) -} - -// ScheduleSnapshotJob schedules a cron job to create endpoint snapshots -func (scheduler *JobScheduler) ScheduleSnapshotJob(interval string) error { - job := newEndpointSnapshotJob(scheduler.endpointService, scheduler.snapshotter) - go job.Snapshot() - - return scheduler.cron.AddJob("@every "+interval, job) -} - -// UpdateSnapshotJob will update the schedules to match the new snapshot interval -func (scheduler *JobScheduler) UpdateSnapshotJob(interval string) { - // TODO: the cron library do not support removing/updating schedules. - // As a work-around we need to re-create the cron and reschedule the jobs. - // We should update the library. +// UpdateScheduledTask updates a specific scheduled task by re-creating a new cron +// and adding all the existing jobs. It will then re-schedule the new task +// based on the updatedTask parameter. +// NOTE: the cron library do not support updating schedules directly +// hence the work-around. +func (scheduler *JobScheduler) UpdateScheduledTask(scheduleID portainer.ScheduleID, cronExpression string, updatedTask portainer.Task) error { jobs := scheduler.cron.Entries() - scheduler.cron.Stop() - - scheduler.cron = cron.New() + newCron := cron.New() for _, job := range jobs { - switch job.Job.(type) { - case endpointSnapshotJob: - scheduler.ScheduleSnapshotJob(interval) - case endpointSyncJob: - scheduler.ScheduleEndpointSyncJob(scheduler.endpointFilePath, scheduler.endpointSyncInterval) - default: - log.Println("Unsupported job") + + switch task := job.Job.(type) { + case ScriptTask: + if task.context.ScheduleID == scheduleID { + err := newCron.AddJob(cronExpression, updatedTask) + if err != nil { + return err + } + + continue + } + case SnapshotTask: + _, ok := updatedTask.(SnapshotTask) + if ok { + err := newCron.AddJob(cronExpression, job.Job) + if err != nil { + return err + } + + continue + } } + + newCron.Schedule(job.Schedule, job.Job) } + scheduler.cron.Stop() + scheduler.cron = newCron scheduler.cron.Start() + return nil +} + +// UnscheduleTask remove a schedule by re-creating a new cron +// and adding all the existing jobs except for the one specified via scheduleID. +// NOTE: the cron library do not support removing schedules directly +// hence the work-around. +func (scheduler *JobScheduler) UnscheduleTask(scheduleID portainer.ScheduleID) { + jobs := scheduler.cron.Entries() + + newCron := cron.New() + + for _, job := range jobs { + + switch task := job.Job.(type) { + case ScriptTask: + if task.context.ScheduleID == scheduleID { + continue + } + } + + newCron.Schedule(job.Schedule, job.Job) + } + + scheduler.cron.Stop() + scheduler.cron = newCron + scheduler.cron.Start() +} + +// ScheduleTask adds a new task to be scheduled in the cron. +func (scheduler *JobScheduler) ScheduleTask(cronExpression string, task portainer.Task) error { + return scheduler.cron.AddJob(cronExpression, task) } // Start starts the scheduled jobs diff --git a/api/cron/job_endpoint_sync.go b/api/cron/task_endpoint_sync.go similarity index 79% rename from api/cron/job_endpoint_sync.go rename to api/cron/task_endpoint_sync.go index 9fbf595f3..45b2474e7 100644 --- a/api/cron/job_endpoint_sync.go +++ b/api/cron/task_endpoint_sync.go @@ -10,9 +10,17 @@ import ( ) type ( - endpointSyncJob struct { - endpointService portainer.EndpointService - endpointFilePath string + // EndpointSyncTask represents a task used to synchronize endpoints + // based on an external file. It can be scheduled. + EndpointSyncTask struct { + context *EndpointSyncTaskContext + } + + // EndpointSyncTaskContext represents the context required for the execution + // of an EndpointSyncTask. + EndpointSyncTaskContext struct { + EndpointService portainer.EndpointService + EndpointFilePath string } synchronization struct { @@ -32,21 +40,52 @@ type ( } ) -const ( - // ErrEmptyEndpointArray is an error raised when the external endpoint source array is empty. - ErrEmptyEndpointArray = portainer.Error("External endpoint source is empty") -) +// NewEndpointSyncTask creates a new EndpointSyncTask using the specified +// context. +func NewEndpointSyncTask(context *EndpointSyncTaskContext) EndpointSyncTask { + return EndpointSyncTask{ + context: context, + } +} -func newEndpointSyncJob(endpointFilePath string, endpointService portainer.EndpointService) endpointSyncJob { - return endpointSyncJob{ - endpointService: endpointService, - endpointFilePath: endpointFilePath, +// Run triggers the execution of the endpoint synchronization process. +func (task EndpointSyncTask) Run() { + data, err := ioutil.ReadFile(task.context.EndpointFilePath) + if endpointSyncError(err) { + return + } + + var fileEndpoints []fileEndpoint + err = json.Unmarshal(data, &fileEndpoints) + if endpointSyncError(err) { + return + } + + if len(fileEndpoints) == 0 { + log.Println("background task error (endpoint synchronization). External endpoint source is empty") + return + } + + storedEndpoints, err := task.context.EndpointService.Endpoints() + if endpointSyncError(err) { + return + } + + convertedFileEndpoints := convertFileEndpoints(fileEndpoints) + + sync := prepareSyncData(storedEndpoints, convertedFileEndpoints) + if sync.requireSync() { + err = task.context.EndpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete) + if endpointSyncError(err) { + return + } + log.Printf("Endpoint synchronization ended. [created: %v] [updated: %v] [deleted: %v]", len(sync.endpointsToCreate), len(sync.endpointsToUpdate), len(sync.endpointsToDelete)) } } func endpointSyncError(err error) bool { if err != nil { - log.Printf("cron error: synchronization job error (err=%s)\n", err) + log.Printf("background task error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err) return true } return false @@ -126,8 +165,7 @@ func (sync synchronization) requireSync() bool { return false } -// TMP: endpointSyncJob method to access logger, should be generic -func (job endpointSyncJob) prepareSyncData(storedEndpoints, fileEndpoints []portainer.Endpoint) *synchronization { +func prepareSyncData(storedEndpoints, fileEndpoints []portainer.Endpoint) *synchronization { endpointsToCreate := make([]*portainer.Endpoint, 0) endpointsToUpdate := make([]*portainer.Endpoint, 0) endpointsToDelete := make([]*portainer.Endpoint, 0) @@ -164,43 +202,3 @@ func (job endpointSyncJob) prepareSyncData(storedEndpoints, fileEndpoints []port endpointsToDelete: endpointsToDelete, } } - -func (job endpointSyncJob) Sync() error { - data, err := ioutil.ReadFile(job.endpointFilePath) - if endpointSyncError(err) { - return err - } - - var fileEndpoints []fileEndpoint - err = json.Unmarshal(data, &fileEndpoints) - if endpointSyncError(err) { - return err - } - - if len(fileEndpoints) == 0 { - return ErrEmptyEndpointArray - } - - storedEndpoints, err := job.endpointService.Endpoints() - if endpointSyncError(err) { - return err - } - - convertedFileEndpoints := convertFileEndpoints(fileEndpoints) - - sync := job.prepareSyncData(storedEndpoints, convertedFileEndpoints) - if sync.requireSync() { - err = job.endpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete) - if endpointSyncError(err) { - return err - } - log.Printf("Endpoint synchronization ended. [created: %v] [updated: %v] [deleted: %v]", len(sync.endpointsToCreate), len(sync.endpointsToUpdate), len(sync.endpointsToDelete)) - } - return nil -} - -func (job endpointSyncJob) Run() { - log.Println("cron: synchronization job started") - err := job.Sync() - endpointSyncError(err) -} diff --git a/api/cron/task_script.go b/api/cron/task_script.go new file mode 100644 index 000000000..fcde39ef5 --- /dev/null +++ b/api/cron/task_script.go @@ -0,0 +1,63 @@ +package cron + +import ( + "log" + + "github.com/portainer/portainer" +) + +// ScriptTaskContext represents the context required for the execution +// of a ScriptTask. +type ScriptTaskContext struct { + JobService portainer.JobService + EndpointService portainer.EndpointService + FileService portainer.FileService + ScheduleID portainer.ScheduleID + TargetEndpoints []portainer.EndpointID +} + +// ScriptTask represents a task used to execute a script inside a privileged +// container. It can be scheduled. +type ScriptTask struct { + Image string + ScriptPath string + context *ScriptTaskContext +} + +// NewScriptTask creates a new ScriptTask using the specified context. +func NewScriptTask(image, scriptPath string, context *ScriptTaskContext) ScriptTask { + return ScriptTask{ + Image: image, + ScriptPath: scriptPath, + context: context, + } +} + +// SetContext can be used to set/override the task context +func (task ScriptTask) SetContext(context *ScriptTaskContext) { + task.context = context +} + +// Run triggers the execution of the task. +// It will iterate through all the endpoints specified in the context to +// execute the script associated to the task. +func (task ScriptTask) Run() { + scriptFile, err := task.context.FileService.GetFileContent(task.ScriptPath) + if err != nil { + log.Printf("scheduled task error (script execution). Unable to retrieve script file (err=%s)\n", err) + return + } + + for _, endpointID := range task.context.TargetEndpoints { + endpoint, err := task.context.EndpointService.Endpoint(endpointID) + if err != nil { + log.Printf("scheduled task error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err) + return + } + + err = task.context.JobService.Execute(endpoint, "", task.Image, scriptFile) + if err != nil { + log.Printf("scheduled task error (script execution). Unable to execute scrtip (endpoint=%s) (err=%s)\n", endpoint.Name, err) + } + } +} diff --git a/api/cron/task_snapshot.go b/api/cron/task_snapshot.go new file mode 100644 index 000000000..df73d37e1 --- /dev/null +++ b/api/cron/task_snapshot.go @@ -0,0 +1,61 @@ +package cron + +import ( + "log" + + "github.com/portainer/portainer" +) + +// SnapshotTaskContext represents the context required for the execution +// of a SnapshotTask. +type SnapshotTaskContext struct { + EndpointService portainer.EndpointService + Snapshotter portainer.Snapshotter +} + +// SnapshotTask represents a task used to create endpoint snapshots. +// It can be scheduled. +type SnapshotTask struct { + context *SnapshotTaskContext +} + +// NewSnapshotTask creates a new ScriptTask using the specified context. +func NewSnapshotTask(context *SnapshotTaskContext) SnapshotTask { + return SnapshotTask{ + context: context, + } +} + +// Run triggers the execution of the task. +// It will iterate through all the endpoints available in the database to +// create a snapshot of each one of them. +func (task SnapshotTask) Run() { + endpoints, err := task.context.EndpointService.Endpoints() + if err != nil { + log.Printf("background task error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err) + return + } + + for _, endpoint := range endpoints { + if endpoint.Type == portainer.AzureEnvironment { + continue + } + + snapshot, err := task.context.Snapshotter.CreateSnapshot(&endpoint) + endpoint.Status = portainer.EndpointStatusUp + if err != nil { + log.Printf("background task error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) + endpoint.Status = portainer.EndpointStatusDown + } + + if snapshot != nil { + endpoint.Snapshots = []portainer.Snapshot{*snapshot} + } + + err = task.context.EndpointService.UpdateEndpoint(endpoint.ID, &endpoint) + if err != nil { + log.Printf("background task error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) + return + } + } +} diff --git a/api/docker/jobservice.go b/api/docker/job.go similarity index 97% rename from api/docker/jobservice.go rename to api/docker/job.go index d2559d7d2..eba82e3e5 100644 --- a/api/docker/jobservice.go +++ b/api/docker/job.go @@ -16,7 +16,7 @@ import ( "github.com/portainer/portainer/archive" ) -// JobService represnts a service that handles jobs on the host +// JobService represents a service that handles the execution of jobs type JobService struct { DockerClientFactory *ClientFactory } diff --git a/api/filesystem/filesystem.go b/api/filesystem/filesystem.go index 1f5ca322e..a0c85727d 100644 --- a/api/filesystem/filesystem.go +++ b/api/filesystem/filesystem.go @@ -5,6 +5,7 @@ import ( "encoding/json" "encoding/pem" "io/ioutil" + "strconv" "github.com/portainer/portainer" @@ -32,6 +33,8 @@ const ( PrivateKeyFile = "portainer.key" // PublicKeyFile represents the name on disk of the file containing the public key. PublicKeyFile = "portainer.pub" + // ScheduleStorePath represents the subfolder where schedule files are stored. + ScheduleStorePath = "schedules" ) // Service represents a service for managing files and directories. @@ -318,3 +321,33 @@ func (service *Service) getContentFromPEMFile(filePath string) ([]byte, error) { block, _ := pem.Decode(fileContent) return block.Bytes, nil } + +// GetScheduleFolder returns the absolute path on the FS for a schedule based +// on its identifier. +func (service *Service) GetScheduleFolder(scheduleIdentifier portainer.ScheduleID) string { + return path.Join(service.fileStorePath, ScheduleStorePath, strconv.Itoa(int(scheduleIdentifier))) +} + +// StoreScheduledJobFileFromBytes creates a subfolder in the ScheduleStorePath and stores a new file from bytes. +// It returns the path to the folder where the file is stored. +func (service *Service) StoreScheduledJobFileFromBytes(scheduleIdentifier portainer.ScheduleID, data []byte) (string, error) { + identifier := strconv.Itoa(int(scheduleIdentifier)) + scheduleStorePath := path.Join(ScheduleStorePath, identifier) + err := service.createDirectoryInStore(scheduleStorePath) + if err != nil { + return "", err + } + + filePath := path.Join(scheduleStorePath, createScheduledJobFileName(identifier)) + r := bytes.NewReader(data) + err = service.createFileInStore(filePath, r) + if err != nil { + return "", err + } + + return path.Join(service.fileStorePath, filePath), nil +} + +func createScheduledJobFileName(identifier string) string { + return "job_" + identifier + ".sh" +} diff --git a/api/http/handler/handler.go b/api/http/handler/handler.go index 40ae9f57d..f58a290aa 100644 --- a/api/http/handler/handler.go +++ b/api/http/handler/handler.go @@ -13,6 +13,7 @@ import ( "github.com/portainer/portainer/http/handler/motd" "github.com/portainer/portainer/http/handler/registries" "github.com/portainer/portainer/http/handler/resourcecontrols" + "github.com/portainer/portainer/http/handler/schedules" "github.com/portainer/portainer/http/handler/settings" "github.com/portainer/portainer/http/handler/stacks" "github.com/portainer/portainer/http/handler/status" @@ -49,6 +50,7 @@ type Handler struct { UserHandler *users.Handler WebSocketHandler *websocket.Handler WebhookHandler *webhooks.Handler + SchedulesHanlder *schedules.Handler } // ServeHTTP delegates a request to the appropriate subhandler. @@ -99,6 +101,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.StripPrefix("/api", h.WebSocketHandler).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/api/webhooks"): http.StripPrefix("/api", h.WebhookHandler).ServeHTTP(w, r) + case strings.HasPrefix(r.URL.Path, "/api/schedules"): + http.StripPrefix("/api", h.SchedulesHanlder).ServeHTTP(w, r) case strings.HasPrefix(r.URL.Path, "/"): h.FileHandler.ServeHTTP(w, r) } diff --git a/api/http/handler/schedules/handler.go b/api/http/handler/schedules/handler.go new file mode 100644 index 000000000..efe8ab6a0 --- /dev/null +++ b/api/http/handler/schedules/handler.go @@ -0,0 +1,51 @@ +package schedules + +import ( + "net/http" + + "github.com/gorilla/mux" + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/portainer" + "github.com/portainer/portainer/cron" + "github.com/portainer/portainer/http/security" +) + +// Handler is the HTTP handler used to handle schedule operations. +type Handler struct { + *mux.Router + ScheduleService portainer.ScheduleService + EndpointService portainer.EndpointService + FileService portainer.FileService + JobService portainer.JobService + JobScheduler portainer.JobScheduler +} + +// NewHandler creates a handler to manage schedule operations. +func NewHandler(bouncer *security.RequestBouncer) *Handler { + h := &Handler{ + Router: mux.NewRouter(), + } + + h.Handle("/schedules", + bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleList))).Methods(http.MethodGet) + h.Handle("/schedules", + bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleCreate))).Methods(http.MethodPost) + h.Handle("/schedules/{id}", + bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleInspect))).Methods(http.MethodGet) + h.Handle("/schedules/{id}", + bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleUpdate))).Methods(http.MethodPut) + h.Handle("/schedules/{id}", + bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleDelete))).Methods(http.MethodDelete) + + return h +} + +func (handler *Handler) createTaskExecutionContext(scheduleID portainer.ScheduleID, endpoints []portainer.EndpointID) *cron.ScriptTaskContext { + return &cron.ScriptTaskContext{ + JobService: handler.JobService, + EndpointService: handler.EndpointService, + FileService: handler.FileService, + ScheduleID: scheduleID, + TargetEndpoints: endpoints, + } +} diff --git a/api/http/handler/schedules/schedule_create.go b/api/http/handler/schedules/schedule_create.go new file mode 100644 index 000000000..88c124f07 --- /dev/null +++ b/api/http/handler/schedules/schedule_create.go @@ -0,0 +1,167 @@ +package schedules + +import ( + "errors" + "net/http" + + "github.com/asaskevich/govalidator" + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer" + "github.com/portainer/portainer/cron" +) + +type scheduleFromFilePayload struct { + Name string + Image string + CronExpression string + Endpoints []portainer.EndpointID + File []byte +} + +type scheduleFromFileContentPayload struct { + Name string + CronExpression string + Image string + Endpoints []portainer.EndpointID + FileContent string +} + +func (payload *scheduleFromFilePayload) Validate(r *http.Request) error { + name, err := request.RetrieveMultiPartFormValue(r, "Name", false) + if err != nil { + return err + } + payload.Name = name + + image, err := request.RetrieveMultiPartFormValue(r, "Image", false) + if err != nil { + return err + } + payload.Image = image + + cronExpression, err := request.RetrieveMultiPartFormValue(r, "Schedule", false) + if err != nil { + return err + } + payload.CronExpression = cronExpression + + var endpoints []portainer.EndpointID + err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, false) + if err != nil { + return err + } + payload.Endpoints = endpoints + + file, _, err := request.RetrieveMultiPartFormFile(r, "File") + if err != nil { + return portainer.Error("Invalid Script file. Ensure that the file is uploaded correctly") + } + payload.File = file + + return nil +} + +func (payload *scheduleFromFileContentPayload) Validate(r *http.Request) error { + if govalidator.IsNull(payload.Name) { + return portainer.Error("Invalid schedule name") + } + + if govalidator.IsNull(payload.Image) { + return portainer.Error("Invalid schedule image") + } + + if govalidator.IsNull(payload.CronExpression) { + return portainer.Error("Invalid cron expression") + } + + if payload.Endpoints == nil || len(payload.Endpoints) == 0 { + return portainer.Error("Invalid endpoints payload") + } + + if govalidator.IsNull(payload.FileContent) { + return portainer.Error("Invalid script file content") + } + + return nil +} + +// POST /api/schedules?method=file/string +func (handler *Handler) scheduleCreate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + method, err := request.RetrieveQueryParameter(r, "method", false) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", err} + } + + switch method { + case "string": + return handler.createScheduleFromFileContent(w, r) + case "file": + return handler.createScheduleFromFile(w, r) + default: + return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", errors.New(request.ErrInvalidQueryParameter)} + } +} + +func (handler *Handler) createScheduleFromFileContent(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + var payload scheduleFromFileContentPayload + err := request.DecodeAndValidateJSONPayload(r, &payload) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} + } + + schedule, err := handler.createSchedule(payload.Name, payload.Image, payload.CronExpression, payload.Endpoints, []byte(payload.FileContent)) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err} + } + + return response.JSON(w, schedule) +} + +func (handler *Handler) createScheduleFromFile(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + payload := &scheduleFromFilePayload{} + err := payload.Validate(r) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} + } + + schedule, err := handler.createSchedule(payload.Name, payload.Image, payload.CronExpression, payload.Endpoints, payload.File) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err} + } + + return response.JSON(w, schedule) +} + +func (handler *Handler) createSchedule(name, image, cronExpression string, endpoints []portainer.EndpointID, file []byte) (*portainer.Schedule, error) { + scheduleIdentifier := portainer.ScheduleID(handler.ScheduleService.GetNextIdentifier()) + + scriptPath, err := handler.FileService.StoreScheduledJobFileFromBytes(scheduleIdentifier, file) + if err != nil { + return nil, err + } + + taskContext := handler.createTaskExecutionContext(scheduleIdentifier, endpoints) + task := cron.NewScriptTask(image, scriptPath, taskContext) + + err = handler.JobScheduler.ScheduleTask(cronExpression, task) + if err != nil { + return nil, err + } + + schedule := &portainer.Schedule{ + ID: scheduleIdentifier, + Name: name, + Endpoints: endpoints, + CronExpression: cronExpression, + Task: task, + } + + err = handler.ScheduleService.CreateSchedule(schedule) + if err != nil { + return nil, err + } + + return schedule, nil +} diff --git a/api/http/handler/schedules/schedule_delete.go b/api/http/handler/schedules/schedule_delete.go new file mode 100644 index 000000000..33526c568 --- /dev/null +++ b/api/http/handler/schedules/schedule_delete.go @@ -0,0 +1,32 @@ +package schedules + +import ( + "net/http" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer" +) + +func (handler *Handler) scheduleDelete(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} + } + + handler.JobScheduler.UnscheduleTask(portainer.ScheduleID(scheduleID)) + + scheduleFolder := handler.FileService.GetScheduleFolder(portainer.ScheduleID(scheduleID)) + err = handler.FileService.RemoveDirectory(scheduleFolder) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the files associated to the schedule on the filesystem", err} + } + + err = handler.ScheduleService.DeleteSchedule(portainer.ScheduleID(scheduleID)) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the schedule from the database", err} + } + + return response.Empty(w) +} diff --git a/api/http/handler/schedules/schedule_inspect.go b/api/http/handler/schedules/schedule_inspect.go new file mode 100644 index 000000000..9b721801e --- /dev/null +++ b/api/http/handler/schedules/schedule_inspect.go @@ -0,0 +1,27 @@ +package schedules + +import ( + "net/http" + + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" +) + +func (handler *Handler) scheduleInspect(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} + } + + schedule, err := handler.ScheduleService.Schedule(portainer.ScheduleID(scheduleID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} + } + + return response.JSON(w, schedule) +} diff --git a/api/http/handler/schedules/schedule_list.go b/api/http/handler/schedules/schedule_list.go new file mode 100644 index 000000000..4bc658b91 --- /dev/null +++ b/api/http/handler/schedules/schedule_list.go @@ -0,0 +1,18 @@ +package schedules + +import ( + "net/http" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/response" +) + +// GET request on /api/schedules +func (handler *Handler) scheduleList(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + schedules, err := handler.ScheduleService.Schedules() + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve schedules from the database", err} + } + + return response.JSON(w, schedules) +} diff --git a/api/http/handler/schedules/schedule_update.go b/api/http/handler/schedules/schedule_update.go new file mode 100644 index 000000000..561cabc17 --- /dev/null +++ b/api/http/handler/schedules/schedule_update.go @@ -0,0 +1,87 @@ +package schedules + +import ( + "net/http" + + httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/request" + "github.com/portainer/libhttp/response" + "github.com/portainer/portainer" + "github.com/portainer/portainer/cron" +) + +type scheduleUpdatePayload struct { + Name *string + Image *string + CronExpression *string + Endpoints []portainer.EndpointID +} + +func (payload *scheduleUpdatePayload) Validate(r *http.Request) error { + return nil +} + +func (handler *Handler) scheduleUpdate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { + scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id") + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} + } + + var payload scheduleUpdatePayload + err = request.DecodeAndValidateJSONPayload(r, &payload) + if err != nil { + return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} + } + + schedule, err := handler.ScheduleService.Schedule(portainer.ScheduleID(scheduleID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} + } + + updateTaskSchedule := updateSchedule(schedule, &payload) + if updateTaskSchedule { + taskContext := handler.createTaskExecutionContext(schedule.ID, schedule.Endpoints) + schedule.Task.(cron.ScriptTask).SetContext(taskContext) + + err := handler.JobScheduler.UpdateScheduledTask(schedule.ID, schedule.CronExpression, schedule.Task) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update task scheduler", err} + } + } + + err = handler.ScheduleService.UpdateSchedule(portainer.ScheduleID(scheduleID), schedule) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist schedule changes inside the database", err} + } + + return response.JSON(w, schedule) +} + +func updateSchedule(schedule *portainer.Schedule, payload *scheduleUpdatePayload) bool { + updateTaskSchedule := false + + if payload.Name != nil { + schedule.Name = *payload.Name + } + + if payload.Endpoints != nil { + schedule.Endpoints = payload.Endpoints + updateTaskSchedule = true + } + + if payload.CronExpression != nil { + schedule.CronExpression = *payload.CronExpression + updateTaskSchedule = true + } + + if payload.Image != nil { + t := schedule.Task.(cron.ScriptTask) + t.Image = *payload.Image + + updateTaskSchedule = true + } + + return updateTaskSchedule +} diff --git a/api/http/handler/settings/settings_update.go b/api/http/handler/settings/settings_update.go index 18513931f..ad50dd65d 100644 --- a/api/http/handler/settings/settings_update.go +++ b/api/http/handler/settings/settings_update.go @@ -8,6 +8,7 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" "github.com/portainer/portainer" + "github.com/portainer/portainer/cron" "github.com/portainer/portainer/filesystem" ) @@ -78,7 +79,11 @@ func (handler *Handler) settingsUpdate(w http.ResponseWriter, r *http.Request) * if payload.SnapshotInterval != nil && *payload.SnapshotInterval != settings.SnapshotInterval { settings.SnapshotInterval = *payload.SnapshotInterval - handler.JobScheduler.UpdateSnapshotJob(settings.SnapshotInterval) + + err := handler.JobScheduler.UpdateScheduledTask(0, "@every "+*payload.SnapshotInterval, cron.NewSnapshotTask(nil)) + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update task scheduler", err} + } } tlsError := handler.updateTLS(settings) diff --git a/api/http/server.go b/api/http/server.go index bd5b9fe30..d117471b6 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -15,6 +15,7 @@ import ( "github.com/portainer/portainer/http/handler/motd" "github.com/portainer/portainer/http/handler/registries" "github.com/portainer/portainer/http/handler/resourcecontrols" + "github.com/portainer/portainer/http/handler/schedules" "github.com/portainer/portainer/http/handler/settings" "github.com/portainer/portainer/http/handler/stacks" "github.com/portainer/portainer/http/handler/status" @@ -54,6 +55,7 @@ type Server struct { LDAPService portainer.LDAPService RegistryService portainer.RegistryService ResourceControlService portainer.ResourceControlService + ScheduleService portainer.ScheduleService SettingsService portainer.SettingsService StackService portainer.StackService SwarmStackManager portainer.SwarmStackManager @@ -81,6 +83,7 @@ func (server *Server) Start() error { AuthDisabled: server.AuthDisabled, } requestBouncer := security.NewRequestBouncer(requestBouncerParameters) + proxyManagerParameters := &proxy.ManagerParams{ ResourceControlService: server.ResourceControlService, TeamMembershipService: server.TeamMembershipService, @@ -90,6 +93,7 @@ func (server *Server) Start() error { SignatureService: server.SignatureService, } proxyManager := proxy.NewManager(proxyManagerParameters) + rateLimiter := security.NewRateLimiter(10, 1*time.Second, 1*time.Hour) var authHandler = auth.NewHandler(requestBouncer, rateLimiter, server.AuthDisabled) @@ -130,6 +134,13 @@ func (server *Server) Start() error { var resourceControlHandler = resourcecontrols.NewHandler(requestBouncer) resourceControlHandler.ResourceControlService = server.ResourceControlService + var schedulesHandler = schedules.NewHandler(requestBouncer) + schedulesHandler.ScheduleService = server.ScheduleService + schedulesHandler.EndpointService = server.EndpointService + schedulesHandler.FileService = server.FileService + schedulesHandler.JobService = server.JobService + schedulesHandler.JobScheduler = server.JobScheduler + var settingsHandler = settings.NewHandler(requestBouncer) settingsHandler.SettingsService = server.SettingsService settingsHandler.LDAPService = server.LDAPService @@ -203,6 +214,7 @@ func (server *Server) Start() error { UserHandler: userHandler, WebSocketHandler: websocketHandler, WebhookHandler: webhookHandler, + SchedulesHanlder: schedulesHandler, } if server.SSL { diff --git a/api/portainer.go b/api/portainer.go index 2c866f3da..d378486b2 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -220,7 +220,19 @@ type ( TLSKeyPath string `json:"TLSKey,omitempty"` } - // WebhookID represents an webhook identifier. + // ScheduleID represents a schedule identifier. + ScheduleID int + + // Schedule represents a task that is scheduled on one or multiple endpoints. + Schedule struct { + ID ScheduleID `json:"Id"` + Name string `json:"Name"` + Endpoints []EndpointID `json:"Endpoints"` + CronExpression string `json:"Schedule"` + Task Task `json:"Task"` + } + + // WebhookID represents a webhook identifier. WebhookID int // WebhookType represents the type of resource a webhook is related to @@ -552,6 +564,16 @@ type ( DeleteResourceControl(ID ResourceControlID) error } + // ScheduleService represents a service for managing schedule data + ScheduleService interface { + Schedule(ID ScheduleID) (*Schedule, error) + Schedules() ([]Schedule, error) + CreateSchedule(schedule *Schedule) error + UpdateSchedule(ID ScheduleID, schedule *Schedule) error + DeleteSchedule(ID ScheduleID) error + GetNextIdentifier() int + } + // TagService represents a service for managing tag data TagService interface { Tags() ([]Tag, error) @@ -605,6 +627,8 @@ type ( LoadKeyPair() ([]byte, []byte, error) WriteJSONToFile(path string, content interface{}) error FileExists(path string) (bool, error) + StoreScheduledJobFileFromBytes(scheduleIdentifier ScheduleID, data []byte) (string, error) + GetScheduleFolder(scheduleIdentifier ScheduleID) string } // GitService represents a service for managing Git @@ -615,12 +639,17 @@ type ( // JobScheduler represents a service to run jobs on a periodic basis JobScheduler interface { - ScheduleEndpointSyncJob(endpointFilePath, interval string) error - ScheduleSnapshotJob(interval string) error - UpdateSnapshotJob(interval string) + ScheduleTask(cronExpression string, task Task) error + UpdateScheduledTask(ID ScheduleID, cronExpression string, updatedTask Task) error + UnscheduleTask(ID ScheduleID) Start() } + // Task represents a process that can be scheduled + Task interface { + Run() + } + // Snapshotter represents a service used to create endpoint snapshots Snapshotter interface { CreateSnapshot(endpoint *Endpoint) (*Snapshot, error)