mirror of https://github.com/portainer/portainer
fix(scheduler): fix a data race in the job scheduler BE-12229 (#1146)
parent
110f88f22d
commit
2ba348551d
|
@ -41,7 +41,10 @@ func NewScheduler(ctx context.Context) *Scheduler {
|
||||||
if ctx != nil {
|
if ctx != nil {
|
||||||
go func() {
|
go func() {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
s.Shutdown()
|
|
||||||
|
if err := s.Shutdown(); err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to shutdown the scheduler")
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,21 +58,24 @@ func (s *Scheduler) Shutdown() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Msg("stopping scheduler")
|
log.Debug().Msg("stopping scheduler")
|
||||||
|
|
||||||
ctx := s.crontab.Stop()
|
ctx := s.crontab.Stop()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
for _, job := range s.crontab.Entries() {
|
for _, job := range s.crontab.Entries() {
|
||||||
if cancel, ok := s.activeJobs[job.ID]; ok {
|
if cancel, ok := s.activeJobs[job.ID]; ok {
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
err := ctx.Err()
|
err := ctx.Err()
|
||||||
if errors.Is(err, context.Canceled) {
|
if errors.Is(err, context.Canceled) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,14 +85,15 @@ func (s *Scheduler) StopJob(jobID string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "failed convert jobID %q to int", jobID)
|
return errors.Wrapf(err, "failed convert jobID %q to int", jobID)
|
||||||
}
|
}
|
||||||
|
|
||||||
entryID := cron.EntryID(id)
|
entryID := cron.EntryID(id)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
if cancel, ok := s.activeJobs[entryID]; ok {
|
if cancel, ok := s.activeJobs[entryID]; ok {
|
||||||
cancel()
|
cancel()
|
||||||
delete(s.activeJobs, entryID)
|
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -100,6 +107,7 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri
|
||||||
cancelFn := func() {
|
cancelFn := func() {
|
||||||
log.Debug().Msg("job cancelled, stopping")
|
log.Debug().Msg("job cancelled, stopping")
|
||||||
s.crontab.Remove(*entryID)
|
s.crontab.Remove(*entryID)
|
||||||
|
delete(s.activeJobs, *entryID)
|
||||||
}
|
}
|
||||||
|
|
||||||
jobFn := cron.FuncJob(func() {
|
jobFn := cron.FuncJob(func() {
|
||||||
|
@ -111,6 +119,10 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri
|
||||||
var permErr *PermanentError
|
var permErr *PermanentError
|
||||||
if errors.As(err, &permErr) {
|
if errors.As(err, &permErr) {
|
||||||
log.Error().Err(permErr).Msg("job returned a permanent error, it will be stopped")
|
log.Error().Err(permErr).Msg("job returned a permanent error, it will be stopped")
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
cancelFn()
|
cancelFn()
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -119,11 +131,11 @@ func (s *Scheduler) StartJobEvery(duration time.Duration, job func() error) stri
|
||||||
log.Error().Err(err).Msg("job returned an error, it will be rescheduled")
|
log.Error().Err(err).Msg("job returned an error, it will be rescheduled")
|
||||||
})
|
})
|
||||||
|
|
||||||
*entryID = s.crontab.Schedule(cron.Every(duration), jobFn)
|
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
*entryID = s.crontab.Schedule(cron.Every(duration), jobFn)
|
||||||
s.activeJobs[*entryID] = cancelFn
|
s.activeJobs[*entryID] = cancelFn
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
return strconv.Itoa(int(*entryID))
|
return strconv.Itoa(int(*entryID))
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,13 +8,18 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var jobInterval = time.Second
|
var jobInterval = time.Second
|
||||||
|
|
||||||
|
func requireNoShutdownErr(t *testing.T, fn func() error) {
|
||||||
|
require.NoError(t, fn(), "scheduler must be shutdown without error")
|
||||||
|
}
|
||||||
|
|
||||||
func Test_ScheduledJobRuns(t *testing.T) {
|
func Test_ScheduledJobRuns(t *testing.T) {
|
||||||
s := NewScheduler(context.Background())
|
s := NewScheduler(context.Background())
|
||||||
defer s.Shutdown()
|
defer requireNoShutdownErr(t, s.Shutdown)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval)
|
||||||
|
|
||||||
|
@ -23,6 +28,7 @@ func Test_ScheduledJobRuns(t *testing.T) {
|
||||||
workDone = true
|
workDone = true
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -32,7 +38,7 @@ func Test_ScheduledJobRuns(t *testing.T) {
|
||||||
|
|
||||||
func Test_JobCanBeStopped(t *testing.T) {
|
func Test_JobCanBeStopped(t *testing.T) {
|
||||||
s := NewScheduler(context.Background())
|
s := NewScheduler(context.Background())
|
||||||
defer s.Shutdown()
|
defer requireNoShutdownErr(t, s.Shutdown)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*jobInterval)
|
||||||
|
|
||||||
|
@ -41,9 +47,12 @@ func Test_JobCanBeStopped(t *testing.T) {
|
||||||
workDone = true
|
workDone = true
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
s.StopJob(jobID)
|
|
||||||
|
err := s.StopJob(jobID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
assert.False(t, workDone, "job shouldn't had a chance to run")
|
assert.False(t, workDone, "job shouldn't had a chance to run")
|
||||||
|
@ -51,13 +60,15 @@ func Test_JobCanBeStopped(t *testing.T) {
|
||||||
|
|
||||||
func Test_JobShouldStop_UponPermError(t *testing.T) {
|
func Test_JobShouldStop_UponPermError(t *testing.T) {
|
||||||
s := NewScheduler(context.Background())
|
s := NewScheduler(context.Background())
|
||||||
defer s.Shutdown()
|
defer requireNoShutdownErr(t, s.Shutdown)
|
||||||
|
|
||||||
var acc int
|
var acc int
|
||||||
|
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
s.StartJobEvery(jobInterval, func() error {
|
s.StartJobEvery(jobInterval, func() error {
|
||||||
acc++
|
acc++
|
||||||
close(ch)
|
close(ch)
|
||||||
|
|
||||||
return NewPermanentError(errors.New("failed"))
|
return NewPermanentError(errors.New("failed"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -68,13 +79,15 @@ func Test_JobShouldStop_UponPermError(t *testing.T) {
|
||||||
|
|
||||||
func Test_JobShouldNotStop_UponError(t *testing.T) {
|
func Test_JobShouldNotStop_UponError(t *testing.T) {
|
||||||
s := NewScheduler(context.Background())
|
s := NewScheduler(context.Background())
|
||||||
defer s.Shutdown()
|
defer requireNoShutdownErr(t, s.Shutdown)
|
||||||
|
|
||||||
var acc atomic.Int64
|
var acc atomic.Int64
|
||||||
|
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
s.StartJobEvery(jobInterval, func() error {
|
s.StartJobEvery(jobInterval, func() error {
|
||||||
if acc.Add(1) == 2 {
|
if acc.Add(1) == 2 {
|
||||||
close(ch)
|
close(ch)
|
||||||
|
|
||||||
return NewPermanentError(errors.New("failed"))
|
return NewPermanentError(errors.New("failed"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,12 +107,12 @@ func Test_CanTerminateAllJobs_ByShuttingDownScheduler(t *testing.T) {
|
||||||
var workDone bool
|
var workDone bool
|
||||||
s.StartJobEvery(jobInterval, func() error {
|
s.StartJobEvery(jobInterval, func() error {
|
||||||
workDone = true
|
workDone = true
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
s.Shutdown()
|
requireNoShutdownErr(t, s.Shutdown)
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
assert.False(t, workDone, "job shouldn't had a chance to run")
|
assert.False(t, workDone, "job shouldn't had a chance to run")
|
||||||
|
@ -112,8 +125,8 @@ func Test_CanTerminateAllJobs_ByCancellingParentContext(t *testing.T) {
|
||||||
var workDone bool
|
var workDone bool
|
||||||
s.StartJobEvery(jobInterval, func() error {
|
s.StartJobEvery(jobInterval, func() error {
|
||||||
workDone = true
|
workDone = true
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue