From 2ba348551ddd5545a0e5c7fcba14712ce2f6954e Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Thu, 4 Sep 2025 15:09:52 -0300 Subject: [PATCH] fix(scheduler): fix a data race in the job scheduler BE-12229 (#1146) --- api/scheduler/scheduler.go | 26 +++++++++++++++++++------- api/scheduler/scheduler_test.go | 29 +++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/api/scheduler/scheduler.go b/api/scheduler/scheduler.go index 69b79e408..9565f0e70 100644 --- a/api/scheduler/scheduler.go +++ b/api/scheduler/scheduler.go @@ -41,7 +41,10 @@ func NewScheduler(ctx context.Context) *Scheduler { if ctx != nil { go func() { <-ctx.Done() - s.Shutdown() + + if err := s.Shutdown(); err != nil { + log.Error().Err(err).Msg("failed to shutdown the scheduler") + } }() } @@ -55,21 +58,24 @@ func (s *Scheduler) Shutdown() error { } log.Debug().Msg("stopping scheduler") + ctx := s.crontab.Stop() <-ctx.Done() s.mu.Lock() + defer s.mu.Unlock() + for _, job := range s.crontab.Entries() { if cancel, ok := s.activeJobs[job.ID]; ok { cancel() } } - s.mu.Unlock() err := ctx.Err() if errors.Is(err, context.Canceled) { return nil } + return err } @@ -79,14 +85,15 @@ func (s *Scheduler) StopJob(jobID string) error { if err != nil { return errors.Wrapf(err, "failed convert jobID %q to int", jobID) } + entryID := cron.EntryID(id) s.mu.Lock() + defer s.mu.Unlock() + if cancel, ok := s.activeJobs[entryID]; ok { cancel() - delete(s.activeJobs, entryID) } - s.mu.Unlock() return nil } @@ -100,6 +107,7 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri cancelFn := func() { log.Debug().Msg("job cancelled, stopping") s.crontab.Remove(*entryID) + delete(s.activeJobs, *entryID) } jobFn := cron.FuncJob(func() { @@ -111,6 +119,10 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri var permErr *PermanentError if errors.As(err, &permErr) { log.Error().Err(permErr).Msg("job returned a permanent error, it will be stopped") + + s.mu.Lock() + defer s.mu.Unlock() + cancelFn() return @@ -119,11 +131,11 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri log.Error().Err(err).Msg("job returned an error, it will be rescheduled") }) - *entryID = s.crontab.Schedule(cron.Every(duration), jobFn) - s.mu.Lock() + defer s.mu.Unlock() + + *entryID = s.crontab.Schedule(cron.Every(duration), jobFn) s.activeJobs[*entryID] = cancelFn - s.mu.Unlock() return strconv.Itoa(int(*entryID)) } diff --git a/api/scheduler/scheduler_test.go b/api/scheduler/scheduler_test.go index aa5a13f3e..337068f49 100644 --- a/api/scheduler/scheduler_test.go +++ b/api/scheduler/scheduler_test.go @@ -8,13 +8,18 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var jobInterval = time.Second +func requireNoShutdownErr(t *testing.T, fn func() error) { + require.NoError(t, fn(), "scheduler must be shutdown without error") +} + func Test_ScheduledJobRuns(t *testing.T) { s := NewScheduler(context.Background()) - defer s.Shutdown() + defer requireNoShutdownErr(t, s.Shutdown) ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval) @@ -23,6 +28,7 @@ func Test_ScheduledJobRuns(t *testing.T) { workDone = true cancel() + return nil }) @@ -32,7 +38,7 @@ func Test_ScheduledJobRuns(t *testing.T) { func Test_JobCanBeStopped(t *testing.T) { s := NewScheduler(context.Background()) - defer s.Shutdown() + defer requireNoShutdownErr(t, s.Shutdown) ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval) @@ -41,9 +47,12 @@ func Test_JobCanBeStopped(t *testing.T) { workDone = true cancel() + return nil }) - s.StopJob(jobID) + + err := s.StopJob(jobID) + require.NoError(t, err) <-ctx.Done() assert.False(t, workDone, "job shouldn't had a chance to run") @@ -51,13 +60,15 @@ func Test_JobCanBeStopped(t *testing.T) { func Test_JobShouldStop_UponPermError(t *testing.T) { s := NewScheduler(context.Background()) - defer s.Shutdown() + defer requireNoShutdownErr(t, s.Shutdown) var acc int + ch := make(chan struct{}) s.StartJobEvery(jobInterval, func() error { acc++ close(ch) + return NewPermanentError(errors.New("failed")) }) @@ -68,13 +79,15 @@ func Test_JobShouldStop_UponPermError(t *testing.T) { func Test_JobShouldNotStop_UponError(t *testing.T) { s := NewScheduler(context.Background()) - defer s.Shutdown() + defer requireNoShutdownErr(t, s.Shutdown) var acc atomic.Int64 + ch := make(chan struct{}) s.StartJobEvery(jobInterval, func() error { if acc.Add(1) == 2 { close(ch) + return NewPermanentError(errors.New("failed")) } @@ -94,12 +107,12 @@ func Test_CanTerminateAllJobs_ByShuttingDownScheduler(t *testing.T) { var workDone bool s.StartJobEvery(jobInterval, func() error { workDone = true - cancel() + return nil }) - s.Shutdown() + requireNoShutdownErr(t, s.Shutdown) <-ctx.Done() assert.False(t, workDone, "job shouldn't had a chance to run") @@ -112,8 +125,8 @@ func Test_CanTerminateAllJobs_ByCancellingParentContext(t *testing.T) { var workDone bool s.StartJobEvery(jobInterval, func() error { workDone = true - cancel() + return nil })