fix(scheduler): fix a data race in the scheduler EE-2716 (#6629)

pull/6596/merge
andres-portainer 2022-03-16 10:33:15 -03:00 committed by GitHub
parent ecf5e90783
commit 78150a738f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 0 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"log" "log"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -14,6 +15,7 @@ import (
type Scheduler struct { type Scheduler struct {
crontab *cron.Cron crontab *cron.Cron
activeJobs map[cron.EntryID]context.CancelFunc activeJobs map[cron.EntryID]context.CancelFunc
mu sync.Mutex
} }
func NewScheduler(ctx context.Context) *Scheduler { func NewScheduler(ctx context.Context) *Scheduler {
@ -45,11 +47,13 @@ func (s *Scheduler) Shutdown() error {
ctx := s.crontab.Stop() ctx := s.crontab.Stop()
<-ctx.Done() <-ctx.Done()
s.mu.Lock()
for _, job := range s.crontab.Entries() { for _, job := range s.crontab.Entries() {
if cancel, ok := s.activeJobs[job.ID]; ok { if cancel, ok := s.activeJobs[job.ID]; ok {
cancel() cancel()
} }
} }
s.mu.Unlock()
err := ctx.Err() err := ctx.Err()
if err == context.Canceled { if err == context.Canceled {
@ -65,9 +69,12 @@ func (s *Scheduler) StopJob(jobID string) error {
return errors.Wrapf(err, "failed convert jobID %q to int", jobID) return errors.Wrapf(err, "failed convert jobID %q to int", jobID)
} }
entryID := cron.EntryID(id) entryID := cron.EntryID(id)
s.mu.Lock()
if cancel, ok := s.activeJobs[entryID]; ok { if cancel, ok := s.activeJobs[entryID]; ok {
cancel() cancel()
} }
s.mu.Unlock()
return nil return nil
} }
@ -87,7 +94,9 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri
entryID := s.crontab.Schedule(cron.Every(duration), j) entryID := s.crontab.Schedule(cron.Every(duration), j)
s.mu.Lock()
s.activeJobs[entryID] = cancel s.activeJobs[entryID] = cancel
s.mu.Unlock()
go func(entryID cron.EntryID) { go func(entryID cron.EntryID) {
<-ctx.Done() <-ctx.Done()

View File

@ -2,6 +2,7 @@ package scheduler
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"testing" "testing"
"time" "time"
@ -101,3 +102,19 @@ func Test_CanTerminateAllJobs_ByCancellingParentContext(t *testing.T) {
<-ctx.Done() <-ctx.Done()
assert.False(t, workDone, "job shouldn't had a chance to run") assert.False(t, workDone, "job shouldn't had a chance to run")
} }
func Test_StartJobEvery_Concurrently(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval)
s := NewScheduler(ctx)
f := func() error {
return errors.New("error")
}
go s.StartJobEvery(jobInterval, f)
s.StartJobEvery(jobInterval, f)
cancel()
<-ctx.Done()
}