Merge branch 'develop' into webpack

pull/2670/head
Chaim Lev-Ari 2018-11-06 14:57:13 +02:00
commit 04a7944b32
21 changed files with 1080 additions and 203 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/portainer/portainer/bolt/migrator" "github.com/portainer/portainer/bolt/migrator"
"github.com/portainer/portainer/bolt/registry" "github.com/portainer/portainer/bolt/registry"
"github.com/portainer/portainer/bolt/resourcecontrol" "github.com/portainer/portainer/bolt/resourcecontrol"
"github.com/portainer/portainer/bolt/schedule"
"github.com/portainer/portainer/bolt/settings" "github.com/portainer/portainer/bolt/settings"
"github.com/portainer/portainer/bolt/stack" "github.com/portainer/portainer/bolt/stack"
"github.com/portainer/portainer/bolt/tag" "github.com/portainer/portainer/bolt/tag"
@ -49,6 +50,7 @@ type Store struct {
UserService *user.Service UserService *user.Service
VersionService *version.Service VersionService *version.Service
WebhookService *webhook.Service WebhookService *webhook.Service
ScheduleService *schedule.Service
} }
// NewStore initializes a new Store and the associated services // NewStore initializes a new Store and the associated services
@ -240,5 +242,11 @@ func (store *Store) initServices() error {
} }
store.WebhookService = webhookService store.WebhookService = webhookService
scheduleService, err := schedule.NewService(store.db)
if err != nil {
return err
}
store.ScheduleService = scheduleService
return nil return nil
} }

View File

@ -0,0 +1,129 @@
package schedule
import (
"github.com/portainer/portainer"
"github.com/portainer/portainer/bolt/internal"
"github.com/boltdb/bolt"
)
const (
// BucketName represents the name of the bucket where this service stores data.
BucketName = "schedules"
)
// Service represents a service for managing schedule data.
type Service struct {
db *bolt.DB
}
// NewService creates a new instance of a service.
func NewService(db *bolt.DB) (*Service, error) {
err := internal.CreateBucket(db, BucketName)
if err != nil {
return nil, err
}
return &Service{
db: db,
}, nil
}
// Schedule returns a schedule by ID.
func (service *Service) Schedule(ID portainer.ScheduleID) (*portainer.Schedule, error) {
var schedule portainer.Schedule
identifier := internal.Itob(int(ID))
err := internal.GetObject(service.db, BucketName, identifier, &schedule)
if err != nil {
return nil, err
}
return &schedule, nil
}
// UpdateSchedule updates a schedule.
func (service *Service) UpdateSchedule(ID portainer.ScheduleID, schedule *portainer.Schedule) error {
identifier := internal.Itob(int(ID))
return internal.UpdateObject(service.db, BucketName, identifier, schedule)
}
// DeleteSchedule deletes a schedule.
func (service *Service) DeleteSchedule(ID portainer.ScheduleID) error {
identifier := internal.Itob(int(ID))
return internal.DeleteObject(service.db, BucketName, identifier)
}
// Schedules return a array containing all the schedules.
func (service *Service) Schedules() ([]portainer.Schedule, error) {
var schedules = make([]portainer.Schedule, 0)
err := service.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(BucketName))
cursor := bucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var schedule portainer.Schedule
err := internal.UnmarshalObject(v, &schedule)
if err != nil {
return err
}
schedules = append(schedules, schedule)
}
return nil
})
return schedules, err
}
// SchedulesByJobType return a array containing all the schedules
// with the specified JobType.
func (service *Service) SchedulesByJobType(jobType portainer.JobType) ([]portainer.Schedule, error) {
var schedules = make([]portainer.Schedule, 0)
err := service.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(BucketName))
cursor := bucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var schedule portainer.Schedule
err := internal.UnmarshalObject(v, &schedule)
if err != nil {
return err
}
if schedule.JobType == jobType {
schedules = append(schedules, schedule)
}
}
return nil
})
return schedules, err
}
// CreateSchedule assign an ID to a new schedule and saves it.
func (service *Service) CreateSchedule(schedule *portainer.Schedule) error {
return service.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(BucketName))
// We manually manage sequences for schedules
err := bucket.SetSequence(uint64(schedule.ID))
if err != nil {
return err
}
data, err := internal.MarshalObject(schedule)
if err != nil {
return err
}
return bucket.Put(internal.Itob(int(schedule.ID)), data)
})
}
// GetNextIdentifier returns the next identifier for a schedule.
func (service *Service) GetNextIdentifier() int {
return internal.GetNextIdentifier(service.db, BucketName)
}

View File

@ -110,25 +110,100 @@ func initSnapshotter(clientFactory *docker.ClientFactory) portainer.Snapshotter
return docker.NewSnapshotter(clientFactory) return docker.NewSnapshotter(clientFactory)
} }
func initJobScheduler(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter, flags *portainer.CLIFlags) (portainer.JobScheduler, error) { func initJobScheduler() portainer.JobScheduler {
jobScheduler := cron.NewJobScheduler(endpointService, snapshotter) return cron.NewJobScheduler()
}
if *flags.ExternalEndpoints != "" { func loadSnapshotSystemSchedule(jobScheduler portainer.JobScheduler, snapshotter portainer.Snapshotter, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, flags *portainer.CLIFlags) error {
log.Println("Using external endpoint definition. Endpoint management via the API will be disabled.") if !*flags.Snapshot {
err := jobScheduler.ScheduleEndpointSyncJob(*flags.ExternalEndpoints, *flags.SyncInterval) return nil
}
schedules, err := scheduleService.SchedulesByJobType(portainer.SnapshotJobType)
if err != nil {
return err
}
if len(schedules) != 0 {
return nil
}
snapshotJob := &portainer.SnapshotJob{}
snapshotSchedule := &portainer.Schedule{
ID: portainer.ScheduleID(scheduleService.GetNextIdentifier()),
Name: "system_snapshot",
CronExpression: "@every " + *flags.SnapshotInterval,
JobType: portainer.SnapshotJobType,
SnapshotJob: snapshotJob,
}
snapshotJobContext := cron.NewSnapshotJobContext(endpointService, snapshotter)
snapshotJobRunner := cron.NewSnapshotJobRunner(snapshotJob, snapshotJobContext)
err = jobScheduler.CreateSchedule(snapshotSchedule, snapshotJobRunner)
if err != nil {
return err
}
return scheduleService.CreateSchedule(snapshotSchedule)
}
func loadEndpointSyncSystemSchedule(jobScheduler portainer.JobScheduler, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, flags *portainer.CLIFlags) error {
if *flags.ExternalEndpoints == "" {
return nil
}
log.Println("Using external endpoint definition. Endpoint management via the API will be disabled.")
schedules, err := scheduleService.SchedulesByJobType(portainer.EndpointSyncJobType)
if err != nil {
return err
}
if len(schedules) != 0 {
return nil
}
endpointSyncJob := &portainer.EndpointSyncJob{}
endointSyncSchedule := &portainer.Schedule{
ID: portainer.ScheduleID(scheduleService.GetNextIdentifier()),
Name: "system_endpointsync",
CronExpression: "@every " + *flags.SyncInterval,
JobType: portainer.EndpointSyncJobType,
EndpointSyncJob: endpointSyncJob,
}
endpointSyncJobContext := cron.NewEndpointSyncJobContext(endpointService, *flags.ExternalEndpoints)
endpointSyncJobRunner := cron.NewEndpointSyncJobRunner(endpointSyncJob, endpointSyncJobContext)
err = jobScheduler.CreateSchedule(endointSyncSchedule, endpointSyncJobRunner)
if err != nil {
return err
}
return scheduleService.CreateSchedule(endointSyncSchedule)
}
func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService portainer.JobService, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, fileService portainer.FileService) error {
schedules, err := scheduleService.Schedules()
if err != nil {
return err
}
for _, schedule := range schedules {
jobContext := cron.NewScriptExecutionJobContext(jobService, endpointService, fileService)
jobRunner := cron.NewScriptExecutionJobRunner(schedule.ScriptExecutionJob, jobContext)
err = jobScheduler.CreateSchedule(&schedule, jobRunner)
if err != nil { if err != nil {
return nil, err return err
} }
} }
if *flags.Snapshot { return nil
err := jobScheduler.ScheduleSnapshotJob(*flags.SnapshotInterval)
if err != nil {
return nil, err
}
}
return jobScheduler, nil
} }
func initStatus(endpointManagement, snapshot bool, flags *portainer.CLIFlags) *portainer.Status { func initStatus(endpointManagement, snapshot bool, flags *portainer.CLIFlags) *portainer.Status {
@ -416,7 +491,19 @@ func main() {
snapshotter := initSnapshotter(clientFactory) snapshotter := initSnapshotter(clientFactory)
jobScheduler, err := initJobScheduler(store.EndpointService, snapshotter, flags) jobScheduler := initJobScheduler()
err = loadSchedulesFromDatabase(jobScheduler, jobService, store.ScheduleService, store.EndpointService, fileService)
if err != nil {
log.Fatal(err)
}
err = loadEndpointSyncSystemSchedule(jobScheduler, store.ScheduleService, store.EndpointService, flags)
if err != nil {
log.Fatal(err)
}
err = loadSnapshotSystemSchedule(jobScheduler, snapshotter, store.ScheduleService, store.EndpointService, flags)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -509,6 +596,7 @@ func main() {
RegistryService: store.RegistryService, RegistryService: store.RegistryService,
DockerHubService: store.DockerHubService, DockerHubService: store.DockerHubService,
StackService: store.StackService, StackService: store.StackService,
ScheduleService: store.ScheduleService,
TagService: store.TagService, TagService: store.TagService,
TemplateService: store.TemplateService, TemplateService: store.TemplateService,
WebhookService: store.WebhookService, WebhookService: store.WebhookService,

View File

@ -1,60 +0,0 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
type (
endpointSnapshotJob struct {
endpointService portainer.EndpointService
snapshotter portainer.Snapshotter
}
)
func newEndpointSnapshotJob(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) endpointSnapshotJob {
return endpointSnapshotJob{
endpointService: endpointService,
snapshotter: snapshotter,
}
}
func (job endpointSnapshotJob) Snapshot() error {
endpoints, err := job.endpointService.Endpoints()
if err != nil {
return err
}
for _, endpoint := range endpoints {
if endpoint.Type == portainer.AzureEnvironment {
continue
}
snapshot, err := job.snapshotter.CreateSnapshot(&endpoint)
endpoint.Status = portainer.EndpointStatusUp
if err != nil {
log.Printf("cron error: endpoint snapshot error (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
endpoint.Status = portainer.EndpointStatusDown
}
if snapshot != nil {
endpoint.Snapshots = []portainer.Snapshot{*snapshot}
}
err = job.endpointService.UpdateEndpoint(endpoint.ID, &endpoint)
if err != nil {
return err
}
}
return nil
}
func (job endpointSnapshotJob) Run() {
err := job.Snapshot()
if err != nil {
log.Printf("cron error: snapshot job error (err=%s)\n", err)
}
}

View File

@ -9,44 +9,103 @@ import (
"github.com/portainer/portainer" "github.com/portainer/portainer"
) )
type ( // EndpointSyncJobRunner is used to run a EndpointSyncJob
endpointSyncJob struct { type EndpointSyncJobRunner struct {
endpointService portainer.EndpointService job *portainer.EndpointSyncJob
endpointFilePath string context *EndpointSyncJobContext
} }
synchronization struct { // EndpointSyncJobContext represents the context of execution of a EndpointSyncJob
endpointsToCreate []*portainer.Endpoint type EndpointSyncJobContext struct {
endpointsToUpdate []*portainer.Endpoint endpointService portainer.EndpointService
endpointsToDelete []*portainer.Endpoint endpointFilePath string
} }
fileEndpoint struct { // NewEndpointSyncJobContext returns a new context that can be used to execute a EndpointSyncJob
Name string `json:"Name"` func NewEndpointSyncJobContext(endpointService portainer.EndpointService, endpointFilePath string) *EndpointSyncJobContext {
URL string `json:"URL"` return &EndpointSyncJobContext{
TLS bool `json:"TLS,omitempty"`
TLSSkipVerify bool `json:"TLSSkipVerify,omitempty"`
TLSCACert string `json:"TLSCACert,omitempty"`
TLSCert string `json:"TLSCert,omitempty"`
TLSKey string `json:"TLSKey,omitempty"`
}
)
const (
// ErrEmptyEndpointArray is an error raised when the external endpoint source array is empty.
ErrEmptyEndpointArray = portainer.Error("External endpoint source is empty")
)
func newEndpointSyncJob(endpointFilePath string, endpointService portainer.EndpointService) endpointSyncJob {
return endpointSyncJob{
endpointService: endpointService, endpointService: endpointService,
endpointFilePath: endpointFilePath, endpointFilePath: endpointFilePath,
} }
} }
// NewEndpointSyncJobRunner returns a new runner that can be scheduled
func NewEndpointSyncJobRunner(job *portainer.EndpointSyncJob, context *EndpointSyncJobContext) *EndpointSyncJobRunner {
return &EndpointSyncJobRunner{
job: job,
context: context,
}
}
type synchronization struct {
endpointsToCreate []*portainer.Endpoint
endpointsToUpdate []*portainer.Endpoint
endpointsToDelete []*portainer.Endpoint
}
type fileEndpoint struct {
Name string `json:"Name"`
URL string `json:"URL"`
TLS bool `json:"TLS,omitempty"`
TLSSkipVerify bool `json:"TLSSkipVerify,omitempty"`
TLSCACert string `json:"TLSCACert,omitempty"`
TLSCert string `json:"TLSCert,omitempty"`
TLSKey string `json:"TLSKey,omitempty"`
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *EndpointSyncJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *EndpointSyncJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *EndpointSyncJobRunner) GetJobType() portainer.JobType {
return portainer.EndpointSyncJobType
}
// Run triggers the execution of the endpoint synchronization process.
func (runner *EndpointSyncJobRunner) Run() {
data, err := ioutil.ReadFile(runner.context.endpointFilePath)
if endpointSyncError(err) {
return
}
var fileEndpoints []fileEndpoint
err = json.Unmarshal(data, &fileEndpoints)
if endpointSyncError(err) {
return
}
if len(fileEndpoints) == 0 {
log.Println("background job error (endpoint synchronization). External endpoint source is empty")
return
}
storedEndpoints, err := runner.context.endpointService.Endpoints()
if endpointSyncError(err) {
return
}
convertedFileEndpoints := convertFileEndpoints(fileEndpoints)
sync := prepareSyncData(storedEndpoints, convertedFileEndpoints)
if sync.requireSync() {
err = runner.context.endpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete)
if endpointSyncError(err) {
return
}
log.Printf("Endpoint synchronization ended. [created: %v] [updated: %v] [deleted: %v]", len(sync.endpointsToCreate), len(sync.endpointsToUpdate), len(sync.endpointsToDelete))
}
}
func endpointSyncError(err error) bool { func endpointSyncError(err error) bool {
if err != nil { if err != nil {
log.Printf("cron error: synchronization job error (err=%s)\n", err) log.Printf("background job error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err)
return true return true
} }
return false return false
@ -126,8 +185,7 @@ func (sync synchronization) requireSync() bool {
return false return false
} }
// TMP: endpointSyncJob method to access logger, should be generic func prepareSyncData(storedEndpoints, fileEndpoints []portainer.Endpoint) *synchronization {
func (job endpointSyncJob) prepareSyncData(storedEndpoints, fileEndpoints []portainer.Endpoint) *synchronization {
endpointsToCreate := make([]*portainer.Endpoint, 0) endpointsToCreate := make([]*portainer.Endpoint, 0)
endpointsToUpdate := make([]*portainer.Endpoint, 0) endpointsToUpdate := make([]*portainer.Endpoint, 0)
endpointsToDelete := make([]*portainer.Endpoint, 0) endpointsToDelete := make([]*portainer.Endpoint, 0)
@ -164,43 +222,3 @@ func (job endpointSyncJob) prepareSyncData(storedEndpoints, fileEndpoints []port
endpointsToDelete: endpointsToDelete, endpointsToDelete: endpointsToDelete,
} }
} }
func (job endpointSyncJob) Sync() error {
data, err := ioutil.ReadFile(job.endpointFilePath)
if endpointSyncError(err) {
return err
}
var fileEndpoints []fileEndpoint
err = json.Unmarshal(data, &fileEndpoints)
if endpointSyncError(err) {
return err
}
if len(fileEndpoints) == 0 {
return ErrEmptyEndpointArray
}
storedEndpoints, err := job.endpointService.Endpoints()
if endpointSyncError(err) {
return err
}
convertedFileEndpoints := convertFileEndpoints(fileEndpoints)
sync := job.prepareSyncData(storedEndpoints, convertedFileEndpoints)
if sync.requireSync() {
err = job.endpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete)
if endpointSyncError(err) {
return err
}
log.Printf("Endpoint synchronization ended. [created: %v] [updated: %v] [deleted: %v]", len(sync.endpointsToCreate), len(sync.endpointsToUpdate), len(sync.endpointsToDelete))
}
return nil
}
func (job endpointSyncJob) Run() {
log.Println("cron: synchronization job started")
err := job.Sync()
endpointSyncError(err)
}

View File

@ -0,0 +1,76 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// ScriptExecutionJobRunner is used to run a ScriptExecutionJob
type ScriptExecutionJobRunner struct {
job *portainer.ScriptExecutionJob
context *ScriptExecutionJobContext
}
// ScriptExecutionJobContext represents the context of execution of a ScriptExecutionJob
type ScriptExecutionJobContext struct {
jobService portainer.JobService
endpointService portainer.EndpointService
fileService portainer.FileService
}
// NewScriptExecutionJobContext returns a new context that can be used to execute a ScriptExecutionJob
func NewScriptExecutionJobContext(jobService portainer.JobService, endpointService portainer.EndpointService, fileService portainer.FileService) *ScriptExecutionJobContext {
return &ScriptExecutionJobContext{
jobService: jobService,
endpointService: endpointService,
fileService: fileService,
}
}
// NewScriptExecutionJobRunner returns a new runner that can be scheduled
func NewScriptExecutionJobRunner(job *portainer.ScriptExecutionJob, context *ScriptExecutionJobContext) *ScriptExecutionJobRunner {
return &ScriptExecutionJobRunner{
job: job,
context: context,
}
}
// Run triggers the execution of the job.
// It will iterate through all the endpoints specified in the context to
// execute the script associated to the job.
func (runner *ScriptExecutionJobRunner) Run() {
scriptFile, err := runner.context.fileService.GetFileContent(runner.job.ScriptPath)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to retrieve script file (err=%s)\n", err)
return
}
for _, endpointID := range runner.job.Endpoints {
endpoint, err := runner.context.endpointService.Endpoint(endpointID)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err)
return
}
err = runner.context.jobService.Execute(endpoint, "", runner.job.Image, scriptFile)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to execute scrtip (endpoint=%s) (err=%s)\n", endpoint.Name, err)
}
}
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *ScriptExecutionJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *ScriptExecutionJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *ScriptExecutionJobRunner) GetJobType() portainer.JobType {
return portainer.ScriptExecutionJobType
}

84
api/cron/job_snapshot.go Normal file
View File

@ -0,0 +1,84 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// SnapshotJobRunner is used to run a SnapshotJob
type SnapshotJobRunner struct {
job *portainer.SnapshotJob
context *SnapshotJobContext
}
// SnapshotJobContext represents the context of execution of a SnapshotJob
type SnapshotJobContext struct {
endpointService portainer.EndpointService
snapshotter portainer.Snapshotter
}
// NewSnapshotJobContext returns a new context that can be used to execute a SnapshotJob
func NewSnapshotJobContext(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) *SnapshotJobContext {
return &SnapshotJobContext{
endpointService: endpointService,
snapshotter: snapshotter,
}
}
// NewSnapshotJobRunner returns a new runner that can be scheduled
func NewSnapshotJobRunner(job *portainer.SnapshotJob, context *SnapshotJobContext) *SnapshotJobRunner {
return &SnapshotJobRunner{
job: job,
context: context,
}
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *SnapshotJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *SnapshotJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *SnapshotJobRunner) GetJobType() portainer.JobType {
return portainer.EndpointSyncJobType
}
// Run triggers the execution of the job.
// It will iterate through all the endpoints available in the database to
// create a snapshot of each one of them.
func (runner *SnapshotJobRunner) Run() {
endpoints, err := runner.context.endpointService.Endpoints()
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err)
return
}
for _, endpoint := range endpoints {
if endpoint.Type == portainer.AzureEnvironment {
continue
}
snapshot, err := runner.context.snapshotter.CreateSnapshot(&endpoint)
endpoint.Status = portainer.EndpointStatusUp
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
endpoint.Status = portainer.EndpointStatusDown
}
if snapshot != nil {
endpoint.Snapshots = []portainer.Snapshot{*snapshot}
}
err = runner.context.endpointService.UpdateEndpoint(endpoint.ID, &endpoint)
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
return
}
}
}

View File

@ -1,76 +1,80 @@
package cron package cron
import ( import (
"log"
"github.com/portainer/portainer" "github.com/portainer/portainer"
"github.com/robfig/cron" "github.com/robfig/cron"
) )
// JobScheduler represents a service for managing crons. // JobScheduler represents a service for managing crons
type JobScheduler struct { type JobScheduler struct {
cron *cron.Cron cron *cron.Cron
endpointService portainer.EndpointService
snapshotter portainer.Snapshotter
endpointFilePath string
endpointSyncInterval string
} }
// NewJobScheduler initializes a new service. // NewJobScheduler initializes a new service
func NewJobScheduler(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) *JobScheduler { func NewJobScheduler() *JobScheduler {
return &JobScheduler{ return &JobScheduler{
cron: cron.New(), cron: cron.New(),
endpointService: endpointService,
snapshotter: snapshotter,
} }
} }
// ScheduleEndpointSyncJob schedules a cron job to synchronize the endpoints from a file // CreateSchedule schedules the execution of a job via a runner
func (scheduler *JobScheduler) ScheduleEndpointSyncJob(endpointFilePath string, interval string) error { func (scheduler *JobScheduler) CreateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error {
runner.SetScheduleID(schedule.ID)
scheduler.endpointFilePath = endpointFilePath return scheduler.cron.AddJob(schedule.CronExpression, runner)
scheduler.endpointSyncInterval = interval
job := newEndpointSyncJob(endpointFilePath, scheduler.endpointService)
err := job.Sync()
if err != nil {
return err
}
return scheduler.cron.AddJob("@every "+interval, job)
} }
// ScheduleSnapshotJob schedules a cron job to create endpoint snapshots // UpdateSchedule updates a specific scheduled job by re-creating a new cron
func (scheduler *JobScheduler) ScheduleSnapshotJob(interval string) error { // and adding all the existing jobs. It will then re-schedule the new job
job := newEndpointSnapshotJob(scheduler.endpointService, scheduler.snapshotter) // via the specified JobRunner parameter.
go job.Snapshot() // NOTE: the cron library do not support updating schedules directly
// hence the work-around
func (scheduler *JobScheduler) UpdateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error {
cronEntries := scheduler.cron.Entries()
newCron := cron.New()
return scheduler.cron.AddJob("@every "+interval, job) for _, entry := range cronEntries {
}
// UpdateSnapshotJob will update the schedules to match the new snapshot interval if entry.Job.(portainer.JobRunner).GetScheduleID() == schedule.ID {
func (scheduler *JobScheduler) UpdateSnapshotJob(interval string) {
// TODO: the cron library do not support removing/updating schedules.
// As a work-around we need to re-create the cron and reschedule the jobs.
// We should update the library.
jobs := scheduler.cron.Entries()
scheduler.cron.Stop()
scheduler.cron = cron.New() var jobRunner cron.Job = runner
if entry.Job.(portainer.JobRunner).GetJobType() == portainer.SnapshotJobType {
jobRunner = entry.Job
}
for _, job := range jobs { err := newCron.AddJob(schedule.CronExpression, jobRunner)
switch job.Job.(type) { if err != nil {
case endpointSnapshotJob: return err
scheduler.ScheduleSnapshotJob(interval) }
case endpointSyncJob:
scheduler.ScheduleEndpointSyncJob(scheduler.endpointFilePath, scheduler.endpointSyncInterval)
default:
log.Println("Unsupported job")
} }
newCron.Schedule(entry.Schedule, entry.Job)
} }
scheduler.cron.Stop()
scheduler.cron = newCron
scheduler.cron.Start()
return nil
}
// RemoveSchedule remove a scheduled job by re-creating a new cron
// and adding all the existing jobs except for the one specified via scheduleID.
// NOTE: the cron library do not support removing schedules directly
// hence the work-around
func (scheduler *JobScheduler) RemoveSchedule(scheduleID portainer.ScheduleID) {
cronEntries := scheduler.cron.Entries()
newCron := cron.New()
for _, entry := range cronEntries {
if entry.Job.(portainer.JobRunner).GetScheduleID() == scheduleID {
continue
}
newCron.Schedule(entry.Schedule, entry.Job)
}
scheduler.cron.Stop()
scheduler.cron = newCron
scheduler.cron.Start() scheduler.cron.Start()
} }

View File

@ -16,7 +16,7 @@ import (
"github.com/portainer/portainer/archive" "github.com/portainer/portainer/archive"
) )
// JobService represnts a service that handles jobs on the host // JobService represents a service that handles the execution of jobs
type JobService struct { type JobService struct {
DockerClientFactory *ClientFactory DockerClientFactory *ClientFactory
} }

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"encoding/pem" "encoding/pem"
"io/ioutil" "io/ioutil"
"strconv"
"github.com/portainer/portainer" "github.com/portainer/portainer"
@ -32,6 +33,8 @@ const (
PrivateKeyFile = "portainer.key" PrivateKeyFile = "portainer.key"
// PublicKeyFile represents the name on disk of the file containing the public key. // PublicKeyFile represents the name on disk of the file containing the public key.
PublicKeyFile = "portainer.pub" PublicKeyFile = "portainer.pub"
// ScheduleStorePath represents the subfolder where schedule files are stored.
ScheduleStorePath = "schedules"
) )
// Service represents a service for managing files and directories. // Service represents a service for managing files and directories.
@ -318,3 +321,33 @@ func (service *Service) getContentFromPEMFile(filePath string) ([]byte, error) {
block, _ := pem.Decode(fileContent) block, _ := pem.Decode(fileContent)
return block.Bytes, nil return block.Bytes, nil
} }
// GetScheduleFolder returns the absolute path on the FS for a schedule based
// on its identifier.
func (service *Service) GetScheduleFolder(scheduleIdentifier portainer.ScheduleID) string {
return path.Join(service.fileStorePath, ScheduleStorePath, strconv.Itoa(int(scheduleIdentifier)))
}
// StoreScheduledJobFileFromBytes creates a subfolder in the ScheduleStorePath and stores a new file from bytes.
// It returns the path to the folder where the file is stored.
func (service *Service) StoreScheduledJobFileFromBytes(scheduleIdentifier portainer.ScheduleID, data []byte) (string, error) {
identifier := strconv.Itoa(int(scheduleIdentifier))
scheduleStorePath := path.Join(ScheduleStorePath, identifier)
err := service.createDirectoryInStore(scheduleStorePath)
if err != nil {
return "", err
}
filePath := path.Join(scheduleStorePath, createScheduledJobFileName(identifier))
r := bytes.NewReader(data)
err = service.createFileInStore(filePath, r)
if err != nil {
return "", err
}
return path.Join(service.fileStorePath, filePath), nil
}
func createScheduledJobFileName(identifier string) string {
return "job_" + identifier + ".sh"
}

View File

@ -13,6 +13,7 @@ import (
"github.com/portainer/portainer/http/handler/motd" "github.com/portainer/portainer/http/handler/motd"
"github.com/portainer/portainer/http/handler/registries" "github.com/portainer/portainer/http/handler/registries"
"github.com/portainer/portainer/http/handler/resourcecontrols" "github.com/portainer/portainer/http/handler/resourcecontrols"
"github.com/portainer/portainer/http/handler/schedules"
"github.com/portainer/portainer/http/handler/settings" "github.com/portainer/portainer/http/handler/settings"
"github.com/portainer/portainer/http/handler/stacks" "github.com/portainer/portainer/http/handler/stacks"
"github.com/portainer/portainer/http/handler/status" "github.com/portainer/portainer/http/handler/status"
@ -49,6 +50,7 @@ type Handler struct {
UserHandler *users.Handler UserHandler *users.Handler
WebSocketHandler *websocket.Handler WebSocketHandler *websocket.Handler
WebhookHandler *webhooks.Handler WebhookHandler *webhooks.Handler
SchedulesHanlder *schedules.Handler
} }
// ServeHTTP delegates a request to the appropriate subhandler. // ServeHTTP delegates a request to the appropriate subhandler.
@ -99,6 +101,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.StripPrefix("/api", h.WebSocketHandler).ServeHTTP(w, r) http.StripPrefix("/api", h.WebSocketHandler).ServeHTTP(w, r)
case strings.HasPrefix(r.URL.Path, "/api/webhooks"): case strings.HasPrefix(r.URL.Path, "/api/webhooks"):
http.StripPrefix("/api", h.WebhookHandler).ServeHTTP(w, r) http.StripPrefix("/api", h.WebhookHandler).ServeHTTP(w, r)
case strings.HasPrefix(r.URL.Path, "/api/schedules"):
http.StripPrefix("/api", h.SchedulesHanlder).ServeHTTP(w, r)
case strings.HasPrefix(r.URL.Path, "/"): case strings.HasPrefix(r.URL.Path, "/"):
h.FileHandler.ServeHTTP(w, r) h.FileHandler.ServeHTTP(w, r)
} }

View File

@ -0,0 +1,40 @@
package schedules
import (
"net/http"
"github.com/gorilla/mux"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/portainer"
"github.com/portainer/portainer/http/security"
)
// Handler is the HTTP handler used to handle schedule operations.
type Handler struct {
*mux.Router
ScheduleService portainer.ScheduleService
EndpointService portainer.EndpointService
FileService portainer.FileService
JobService portainer.JobService
JobScheduler portainer.JobScheduler
}
// NewHandler creates a handler to manage schedule operations.
func NewHandler(bouncer *security.RequestBouncer) *Handler {
h := &Handler{
Router: mux.NewRouter(),
}
h.Handle("/schedules",
bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleList))).Methods(http.MethodGet)
h.Handle("/schedules",
bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleCreate))).Methods(http.MethodPost)
h.Handle("/schedules/{id}",
bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleInspect))).Methods(http.MethodGet)
h.Handle("/schedules/{id}",
bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleUpdate))).Methods(http.MethodPut)
h.Handle("/schedules/{id}",
bouncer.AdministratorAccess(httperror.LoggerHandler(h.scheduleDelete))).Methods(http.MethodDelete)
return h
}

View File

@ -0,0 +1,174 @@
package schedules
import (
"errors"
"net/http"
"github.com/asaskevich/govalidator"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
"github.com/portainer/portainer"
"github.com/portainer/portainer/cron"
)
type scheduleFromFilePayload struct {
Name string
Image string
CronExpression string
Endpoints []portainer.EndpointID
File []byte
}
type scheduleFromFileContentPayload struct {
Name string
CronExpression string
Image string
Endpoints []portainer.EndpointID
FileContent string
}
func (payload *scheduleFromFilePayload) Validate(r *http.Request) error {
name, err := request.RetrieveMultiPartFormValue(r, "Name", false)
if err != nil {
return err
}
payload.Name = name
image, err := request.RetrieveMultiPartFormValue(r, "Image", false)
if err != nil {
return err
}
payload.Image = image
cronExpression, err := request.RetrieveMultiPartFormValue(r, "Schedule", false)
if err != nil {
return err
}
payload.CronExpression = cronExpression
var endpoints []portainer.EndpointID
err = request.RetrieveMultiPartFormJSONValue(r, "Endpoints", &endpoints, false)
if err != nil {
return err
}
payload.Endpoints = endpoints
file, _, err := request.RetrieveMultiPartFormFile(r, "File")
if err != nil {
return portainer.Error("Invalid Script file. Ensure that the file is uploaded correctly")
}
payload.File = file
return nil
}
func (payload *scheduleFromFileContentPayload) Validate(r *http.Request) error {
if govalidator.IsNull(payload.Name) {
return portainer.Error("Invalid schedule name")
}
if govalidator.IsNull(payload.Image) {
return portainer.Error("Invalid schedule image")
}
if govalidator.IsNull(payload.CronExpression) {
return portainer.Error("Invalid cron expression")
}
if payload.Endpoints == nil || len(payload.Endpoints) == 0 {
return portainer.Error("Invalid endpoints payload")
}
if govalidator.IsNull(payload.FileContent) {
return portainer.Error("Invalid script file content")
}
return nil
}
// POST /api/schedules?method=file/string
func (handler *Handler) scheduleCreate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
method, err := request.RetrieveQueryParameter(r, "method", false)
if err != nil {
return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", err}
}
switch method {
case "string":
return handler.createScheduleFromFileContent(w, r)
case "file":
return handler.createScheduleFromFile(w, r)
default:
return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method. Valid values are: file or string", errors.New(request.ErrInvalidQueryParameter)}
}
}
func (handler *Handler) createScheduleFromFileContent(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
var payload scheduleFromFileContentPayload
err := request.DecodeAndValidateJSONPayload(r, &payload)
if err != nil {
return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err}
}
schedule, err := handler.createSchedule(payload.Name, payload.Image, payload.CronExpression, payload.Endpoints, []byte(payload.FileContent))
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err}
}
return response.JSON(w, schedule)
}
func (handler *Handler) createScheduleFromFile(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
payload := &scheduleFromFilePayload{}
err := payload.Validate(r)
if err != nil {
return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err}
}
schedule, err := handler.createSchedule(payload.Name, payload.Image, payload.CronExpression, payload.Endpoints, payload.File)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err}
}
return response.JSON(w, schedule)
}
func (handler *Handler) createSchedule(name, image, cronExpression string, endpoints []portainer.EndpointID, file []byte) (*portainer.Schedule, error) {
scheduleIdentifier := portainer.ScheduleID(handler.ScheduleService.GetNextIdentifier())
scriptPath, err := handler.FileService.StoreScheduledJobFileFromBytes(scheduleIdentifier, file)
if err != nil {
return nil, err
}
job := &portainer.ScriptExecutionJob{
Endpoints: endpoints,
Image: image,
ScriptPath: scriptPath,
ScheduleID: scheduleIdentifier,
}
schedule := &portainer.Schedule{
ID: scheduleIdentifier,
Name: name,
CronExpression: cronExpression,
JobType: portainer.ScriptExecutionJobType,
ScriptExecutionJob: job,
}
jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.EndpointService, handler.FileService)
jobRunner := cron.NewScriptExecutionJobRunner(job, jobContext)
err = handler.JobScheduler.CreateSchedule(schedule, jobRunner)
if err != nil {
return nil, err
}
err = handler.ScheduleService.CreateSchedule(schedule)
if err != nil {
return nil, err
}
return schedule, nil
}

View File

@ -0,0 +1,42 @@
package schedules
import (
"errors"
"net/http"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
"github.com/portainer/portainer"
)
func (handler *Handler) scheduleDelete(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id")
if err != nil {
return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err}
}
schedule, err := handler.ScheduleService.Schedule(portainer.ScheduleID(scheduleID))
if err == portainer.ErrObjectNotFound {
return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err}
} else if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err}
}
if schedule.JobType == portainer.SnapshotJobType || schedule.JobType == portainer.EndpointSyncJobType {
return &httperror.HandlerError{http.StatusBadRequest, "Cannot remove system schedules", errors.New("Cannot remove system schedule")}
}
scheduleFolder := handler.FileService.GetScheduleFolder(portainer.ScheduleID(scheduleID))
err = handler.FileService.RemoveDirectory(scheduleFolder)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the files associated to the schedule on the filesystem", err}
}
err = handler.ScheduleService.DeleteSchedule(portainer.ScheduleID(scheduleID))
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to remove the schedule from the database", err}
}
return response.Empty(w)
}

View File

@ -0,0 +1,27 @@
package schedules
import (
"net/http"
"github.com/portainer/libhttp/response"
"github.com/portainer/portainer"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
)
func (handler *Handler) scheduleInspect(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id")
if err != nil {
return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err}
}
schedule, err := handler.ScheduleService.Schedule(portainer.ScheduleID(scheduleID))
if err == portainer.ErrObjectNotFound {
return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err}
} else if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err}
}
return response.JSON(w, schedule)
}

View File

@ -0,0 +1,18 @@
package schedules
import (
"net/http"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/response"
)
// GET request on /api/schedules
func (handler *Handler) scheduleList(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
schedules, err := handler.ScheduleService.Schedules()
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to retrieve schedules from the database", err}
}
return response.JSON(w, schedules)
}

View File

@ -0,0 +1,85 @@
package schedules
import (
"net/http"
httperror "github.com/portainer/libhttp/error"
"github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response"
"github.com/portainer/portainer"
"github.com/portainer/portainer/cron"
)
type scheduleUpdatePayload struct {
Name *string
Image *string
CronExpression *string
Endpoints []portainer.EndpointID
}
func (payload *scheduleUpdatePayload) Validate(r *http.Request) error {
return nil
}
func (handler *Handler) scheduleUpdate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
scheduleID, err := request.RetrieveNumericRouteVariableValue(r, "id")
if err != nil {
return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err}
}
var payload scheduleUpdatePayload
err = request.DecodeAndValidateJSONPayload(r, &payload)
if err != nil {
return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err}
}
schedule, err := handler.ScheduleService.Schedule(portainer.ScheduleID(scheduleID))
if err == portainer.ErrObjectNotFound {
return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err}
} else if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err}
}
updateJobSchedule := updateSchedule(schedule, &payload)
if updateJobSchedule {
jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.EndpointService, handler.FileService)
jobRunner := cron.NewScriptExecutionJobRunner(schedule.ScriptExecutionJob, jobContext)
err := handler.JobScheduler.UpdateSchedule(schedule, jobRunner)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update job scheduler", err}
}
}
err = handler.ScheduleService.UpdateSchedule(portainer.ScheduleID(scheduleID), schedule)
if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to persist schedule changes inside the database", err}
}
return response.JSON(w, schedule)
}
func updateSchedule(schedule *portainer.Schedule, payload *scheduleUpdatePayload) bool {
updateJobSchedule := false
if payload.Name != nil {
schedule.Name = *payload.Name
}
if payload.Endpoints != nil {
schedule.ScriptExecutionJob.Endpoints = payload.Endpoints
updateJobSchedule = true
}
if payload.CronExpression != nil {
schedule.CronExpression = *payload.CronExpression
updateJobSchedule = true
}
if payload.Image != nil {
schedule.ScriptExecutionJob.Image = *payload.Image
updateJobSchedule = true
}
return updateJobSchedule
}

View File

@ -16,6 +16,7 @@ type Handler struct {
LDAPService portainer.LDAPService LDAPService portainer.LDAPService
FileService portainer.FileService FileService portainer.FileService
JobScheduler portainer.JobScheduler JobScheduler portainer.JobScheduler
ScheduleService portainer.ScheduleService
} }
// NewHandler creates a handler to manage settings operations. // NewHandler creates a handler to manage settings operations.

View File

@ -77,8 +77,10 @@ func (handler *Handler) settingsUpdate(w http.ResponseWriter, r *http.Request) *
} }
if payload.SnapshotInterval != nil && *payload.SnapshotInterval != settings.SnapshotInterval { if payload.SnapshotInterval != nil && *payload.SnapshotInterval != settings.SnapshotInterval {
settings.SnapshotInterval = *payload.SnapshotInterval err := handler.updateSnapshotInterval(settings, *payload.SnapshotInterval)
handler.JobScheduler.UpdateSnapshotJob(settings.SnapshotInterval) if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update snapshot interval", err}
}
} }
tlsError := handler.updateTLS(settings) tlsError := handler.updateTLS(settings)
@ -94,6 +96,27 @@ func (handler *Handler) settingsUpdate(w http.ResponseWriter, r *http.Request) *
return response.JSON(w, settings) return response.JSON(w, settings)
} }
func (handler *Handler) updateSnapshotInterval(settings *portainer.Settings, snapshotInterval string) error {
settings.SnapshotInterval = snapshotInterval
schedules, err := handler.ScheduleService.SchedulesByJobType(portainer.SnapshotJobType)
if err != nil {
return err
}
if len(schedules) != 0 {
snapshotSchedule := schedules[0]
snapshotSchedule.CronExpression = "@every " + snapshotInterval
err := handler.JobScheduler.UpdateSchedule(&snapshotSchedule, nil)
if err != nil {
return err
}
}
return nil
}
func (handler *Handler) updateTLS(settings *portainer.Settings) *httperror.HandlerError { func (handler *Handler) updateTLS(settings *portainer.Settings) *httperror.HandlerError {
if (settings.LDAPSettings.TLSConfig.TLS || settings.LDAPSettings.StartTLS) && !settings.LDAPSettings.TLSConfig.TLSSkipVerify { if (settings.LDAPSettings.TLSConfig.TLS || settings.LDAPSettings.StartTLS) && !settings.LDAPSettings.TLSConfig.TLSSkipVerify {
caCertPath, _ := handler.FileService.GetPathForTLSFile(filesystem.LDAPStorePath, portainer.TLSFileCA) caCertPath, _ := handler.FileService.GetPathForTLSFile(filesystem.LDAPStorePath, portainer.TLSFileCA)

View File

@ -15,6 +15,7 @@ import (
"github.com/portainer/portainer/http/handler/motd" "github.com/portainer/portainer/http/handler/motd"
"github.com/portainer/portainer/http/handler/registries" "github.com/portainer/portainer/http/handler/registries"
"github.com/portainer/portainer/http/handler/resourcecontrols" "github.com/portainer/portainer/http/handler/resourcecontrols"
"github.com/portainer/portainer/http/handler/schedules"
"github.com/portainer/portainer/http/handler/settings" "github.com/portainer/portainer/http/handler/settings"
"github.com/portainer/portainer/http/handler/stacks" "github.com/portainer/portainer/http/handler/stacks"
"github.com/portainer/portainer/http/handler/status" "github.com/portainer/portainer/http/handler/status"
@ -54,6 +55,7 @@ type Server struct {
LDAPService portainer.LDAPService LDAPService portainer.LDAPService
RegistryService portainer.RegistryService RegistryService portainer.RegistryService
ResourceControlService portainer.ResourceControlService ResourceControlService portainer.ResourceControlService
ScheduleService portainer.ScheduleService
SettingsService portainer.SettingsService SettingsService portainer.SettingsService
StackService portainer.StackService StackService portainer.StackService
SwarmStackManager portainer.SwarmStackManager SwarmStackManager portainer.SwarmStackManager
@ -81,6 +83,7 @@ func (server *Server) Start() error {
AuthDisabled: server.AuthDisabled, AuthDisabled: server.AuthDisabled,
} }
requestBouncer := security.NewRequestBouncer(requestBouncerParameters) requestBouncer := security.NewRequestBouncer(requestBouncerParameters)
proxyManagerParameters := &proxy.ManagerParams{ proxyManagerParameters := &proxy.ManagerParams{
ResourceControlService: server.ResourceControlService, ResourceControlService: server.ResourceControlService,
TeamMembershipService: server.TeamMembershipService, TeamMembershipService: server.TeamMembershipService,
@ -90,6 +93,7 @@ func (server *Server) Start() error {
SignatureService: server.SignatureService, SignatureService: server.SignatureService,
} }
proxyManager := proxy.NewManager(proxyManagerParameters) proxyManager := proxy.NewManager(proxyManagerParameters)
rateLimiter := security.NewRateLimiter(10, 1*time.Second, 1*time.Hour) rateLimiter := security.NewRateLimiter(10, 1*time.Second, 1*time.Hour)
var authHandler = auth.NewHandler(requestBouncer, rateLimiter, server.AuthDisabled) var authHandler = auth.NewHandler(requestBouncer, rateLimiter, server.AuthDisabled)
@ -130,11 +134,19 @@ func (server *Server) Start() error {
var resourceControlHandler = resourcecontrols.NewHandler(requestBouncer) var resourceControlHandler = resourcecontrols.NewHandler(requestBouncer)
resourceControlHandler.ResourceControlService = server.ResourceControlService resourceControlHandler.ResourceControlService = server.ResourceControlService
var schedulesHandler = schedules.NewHandler(requestBouncer)
schedulesHandler.ScheduleService = server.ScheduleService
schedulesHandler.EndpointService = server.EndpointService
schedulesHandler.FileService = server.FileService
schedulesHandler.JobService = server.JobService
schedulesHandler.JobScheduler = server.JobScheduler
var settingsHandler = settings.NewHandler(requestBouncer) var settingsHandler = settings.NewHandler(requestBouncer)
settingsHandler.SettingsService = server.SettingsService settingsHandler.SettingsService = server.SettingsService
settingsHandler.LDAPService = server.LDAPService settingsHandler.LDAPService = server.LDAPService
settingsHandler.FileService = server.FileService settingsHandler.FileService = server.FileService
settingsHandler.JobScheduler = server.JobScheduler settingsHandler.JobScheduler = server.JobScheduler
settingsHandler.ScheduleService = server.ScheduleService
var stackHandler = stacks.NewHandler(requestBouncer) var stackHandler = stacks.NewHandler(requestBouncer)
stackHandler.FileService = server.FileService stackHandler.FileService = server.FileService
@ -203,6 +215,7 @@ func (server *Server) Start() error {
UserHandler: userHandler, UserHandler: userHandler,
WebSocketHandler: websocketHandler, WebSocketHandler: websocketHandler,
WebhookHandler: webhookHandler, WebhookHandler: webhookHandler,
SchedulesHanlder: schedulesHandler,
} }
if server.SSL { if server.SSL {

View File

@ -220,7 +220,44 @@ type (
TLSKeyPath string `json:"TLSKey,omitempty"` TLSKeyPath string `json:"TLSKey,omitempty"`
} }
// WebhookID represents an webhook identifier. // ScheduleID represents a schedule identifier.
ScheduleID int
// JobType represents a job type
JobType int
// ScriptExecutionJob represents a scheduled job that can execute a script via a privileged container
ScriptExecutionJob struct {
ScheduleID ScheduleID `json:"ScheduleId"`
Endpoints []EndpointID
Image string
ScriptPath string
}
// SnapshotJob represents a scheduled job that can create endpoint snapshots
SnapshotJob struct {
ScheduleID ScheduleID `json:"ScheduleId"`
}
// EndpointSyncJob represents a scheduled job that synchronize endpoints based on an external file
EndpointSyncJob struct {
ScheduleID ScheduleID `json:"ScheduleId"`
}
// Schedule represents a scheduled job.
// It only contains a pointer to one of the JobRunner implementations
// based on the JobType
Schedule struct {
ID ScheduleID `json:"Id"`
Name string
CronExpression string
JobType JobType
ScriptExecutionJob *ScriptExecutionJob
SnapshotJob *SnapshotJob
EndpointSyncJob *EndpointSyncJob
}
// WebhookID represents a webhook identifier.
WebhookID int WebhookID int
// WebhookType represents the type of resource a webhook is related to // WebhookType represents the type of resource a webhook is related to
@ -552,6 +589,17 @@ type (
DeleteResourceControl(ID ResourceControlID) error DeleteResourceControl(ID ResourceControlID) error
} }
// ScheduleService represents a service for managing schedule data
ScheduleService interface {
Schedule(ID ScheduleID) (*Schedule, error)
Schedules() ([]Schedule, error)
SchedulesByJobType(jobType JobType) ([]Schedule, error)
CreateSchedule(schedule *Schedule) error
UpdateSchedule(ID ScheduleID, schedule *Schedule) error
DeleteSchedule(ID ScheduleID) error
GetNextIdentifier() int
}
// TagService represents a service for managing tag data // TagService represents a service for managing tag data
TagService interface { TagService interface {
Tags() ([]Tag, error) Tags() ([]Tag, error)
@ -605,6 +653,8 @@ type (
LoadKeyPair() ([]byte, []byte, error) LoadKeyPair() ([]byte, []byte, error)
WriteJSONToFile(path string, content interface{}) error WriteJSONToFile(path string, content interface{}) error
FileExists(path string) (bool, error) FileExists(path string) (bool, error)
StoreScheduledJobFileFromBytes(scheduleIdentifier ScheduleID, data []byte) (string, error)
GetScheduleFolder(scheduleIdentifier ScheduleID) string
} }
// GitService represents a service for managing Git // GitService represents a service for managing Git
@ -615,12 +665,20 @@ type (
// JobScheduler represents a service to run jobs on a periodic basis // JobScheduler represents a service to run jobs on a periodic basis
JobScheduler interface { JobScheduler interface {
ScheduleEndpointSyncJob(endpointFilePath, interval string) error CreateSchedule(schedule *Schedule, runner JobRunner) error
ScheduleSnapshotJob(interval string) error UpdateSchedule(schedule *Schedule, runner JobRunner) error
UpdateSnapshotJob(interval string) RemoveSchedule(ID ScheduleID)
Start() Start()
} }
// JobRunner represents a service that can be used to run a job
JobRunner interface {
Run()
GetScheduleID() ScheduleID
SetScheduleID(ID ScheduleID)
GetJobType() JobType
}
// Snapshotter represents a service used to create endpoint snapshots // Snapshotter represents a service used to create endpoint snapshots
Snapshotter interface { Snapshotter interface {
CreateSnapshot(endpoint *Endpoint) (*Snapshot, error) CreateSnapshot(endpoint *Endpoint) (*Snapshot, error)
@ -779,3 +837,15 @@ const (
// ServiceWebhook is a webhook for restarting a docker service // ServiceWebhook is a webhook for restarting a docker service
ServiceWebhook ServiceWebhook
) )
const (
_ JobType = iota
// ScriptExecutionJobType is a non-system job used to execute a script against a list of
// endpoints via privileged containers
ScriptExecutionJobType
// SnapshotJobType is a system job used to create endpoint snapshots
SnapshotJobType
// EndpointSyncJobType is a system job used to synchronize endpoints from
// an external definition store
EndpointSyncJobType
)