You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
portainer/api/scheduler/scheduler.go

130 lines
2.5 KiB

package scheduler
import (
"context"
"strconv"
"sync"
"time"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
)
type Scheduler struct {
crontab *cron.Cron
activeJobs map[cron.EntryID]context.CancelFunc
mu sync.Mutex
}
type PermanentError struct {
err error
}
func NewPermanentError(err error) *PermanentError {
return &PermanentError{err: err}
}
func (e *PermanentError) Error() string {
return e.err.Error()
}
func NewScheduler(ctx context.Context) *Scheduler {
crontab := cron.New(cron.WithChain(cron.Recover(cron.DefaultLogger)))
crontab.Start()
s := &Scheduler{
crontab: crontab,
activeJobs: make(map[cron.EntryID]context.CancelFunc),
}
if ctx != nil {
go func() {
<-ctx.Done()
s.Shutdown()
}()
}
return s
}
// Shutdown stops the scheduler and waits for it to stop if it is running; otherwise does nothing.
func (s *Scheduler) Shutdown() error {
if s.crontab == nil {
return nil
}
log.Debug().Msg("stopping scheduler")
ctx := s.crontab.Stop()
<-ctx.Done()
s.mu.Lock()
for _, job := range s.crontab.Entries() {
if cancel, ok := s.activeJobs[job.ID]; ok {
cancel()
}
}
s.mu.Unlock()
err := ctx.Err()
if errors.Is(err, context.Canceled) {
return nil
}
return err
}
// StopJob stops the job from being run in the future
func (s *Scheduler) StopJob(jobID string) error {
id, err := strconv.Atoi(jobID)
if err != nil {
return errors.Wrapf(err, "failed convert jobID %q to int", jobID)
}
entryID := cron.EntryID(id)
s.mu.Lock()
if cancel, ok := s.activeJobs[entryID]; ok {
cancel()
}
s.mu.Unlock()
return nil
}
// StartJobEvery schedules a new periodic job with a given duration.
// Returns job id that could be used to stop the given job.
// When job run returns an error, that job won't be run again.
func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) string {
ctx, cancel := context.WithCancel(context.Background())
jobFn := cron.FuncJob(func() {
err := job()
if err == nil {
return
}
var permErr *PermanentError
if errors.As(err, &permErr) {
log.Error().Err(permErr).Msg("job returned a permanent error, it will be stopped")
cancel()
return
}
log.Error().Err(err).Msg("job returned an error, it will be rescheduled")
})
entryID := s.crontab.Schedule(cron.Every(duration), jobFn)
s.mu.Lock()
s.activeJobs[entryID] = cancel
s.mu.Unlock()
go func(entryID cron.EntryID) {
<-ctx.Done()
log.Debug().Msg("job cancelled, stopping")
s.crontab.Remove(entryID)
}(entryID)
return strconv.Itoa(int(entryID))
}