From 78150a738fec9d078fd0f8b3a8ddc2ada6fe344a Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Wed, 16 Mar 2022 10:33:15 -0300 Subject: [PATCH] fix(scheduler): fix a data race in the scheduler EE-2716 (#6629) --- api/scheduler/scheduler.go | 9 +++++++++ api/scheduler/scheduler_test.go | 17 +++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/api/scheduler/scheduler.go b/api/scheduler/scheduler.go index 529574f0c..bb4ce5461 100644 --- a/api/scheduler/scheduler.go +++ b/api/scheduler/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "log" "strconv" + "sync" "time" "github.com/pkg/errors" @@ -14,6 +15,7 @@ import ( type Scheduler struct { crontab *cron.Cron activeJobs map[cron.EntryID]context.CancelFunc + mu sync.Mutex } func NewScheduler(ctx context.Context) *Scheduler { @@ -45,11 +47,13 @@ func (s *Scheduler) Shutdown() error { ctx := s.crontab.Stop() <-ctx.Done() + s.mu.Lock() for _, job := range s.crontab.Entries() { if cancel, ok := s.activeJobs[job.ID]; ok { cancel() } } + s.mu.Unlock() err := ctx.Err() if err == context.Canceled { @@ -65,9 +69,12 @@ func (s *Scheduler) StopJob(jobID string) error { return errors.Wrapf(err, "failed convert jobID %q to int", jobID) } entryID := cron.EntryID(id) + + s.mu.Lock() if cancel, ok := s.activeJobs[entryID]; ok { cancel() } + s.mu.Unlock() return nil } @@ -87,7 +94,9 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri entryID := s.crontab.Schedule(cron.Every(duration), j) + s.mu.Lock() s.activeJobs[entryID] = cancel + s.mu.Unlock() go func(entryID cron.EntryID) { <-ctx.Done() diff --git a/api/scheduler/scheduler_test.go b/api/scheduler/scheduler_test.go index b7159a1a1..56377b4bf 100644 --- a/api/scheduler/scheduler_test.go +++ b/api/scheduler/scheduler_test.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "errors" "fmt" "testing" "time" @@ -101,3 +102,19 @@ func Test_CanTerminateAllJobs_ByCancellingParentContext(t *testing.T) { <-ctx.Done() assert.False(t, workDone, "job shouldn't had a chance to run") } + +func Test_StartJobEvery_Concurrently(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval) + s := NewScheduler(ctx) + + f := func() error { + return errors.New("error") + } + + go s.StartJobEvery(jobInterval, f) + s.StartJobEvery(jobInterval, f) + + cancel() + + <-ctx.Done() +}