Skip to content

Commit 93ffead

Browse files
barkhayotBarkhayot Juraev
andauthored
[fix] race condition in NextRun() when multiple jobs complete simultaneously (#907)
* fix: select execute job on complete * fix: update next job scheduling * fix: handle test case * feat: test next run with multiple jobs * feat: test next run with concurrent completions * fix: remove iteration param based on feedbacks --------- Co-authored-by: Barkhayot Juraev <[email protected]>
1 parent 51570c3 commit 93ffead

File tree

2 files changed

+202
-16
lines changed

2 files changed

+202
-16
lines changed

job_test.go

Lines changed: 195 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -609,14 +609,14 @@ func TestJob_NextRuns(t *testing.T) {
609609
tests := []struct {
610610
name string
611611
jd JobDefinition
612-
assertion func(t *testing.T, iteration int, previousRun, nextRun time.Time)
612+
assertion func(t *testing.T, previousRun, nextRun time.Time)
613613
}{
614614
{
615615
"simple - milliseconds",
616616
DurationJob(
617617
100 * time.Millisecond,
618618
),
619-
func(t *testing.T, _ int, previousRun, nextRun time.Time) {
619+
func(t *testing.T, previousRun, nextRun time.Time) {
620620
assert.Equal(t, previousRun.UnixMilli()+100, nextRun.UnixMilli())
621621
},
622622
},
@@ -629,13 +629,11 @@ func TestJob_NextRuns(t *testing.T) {
629629
NewAtTime(0, 0, 0),
630630
),
631631
),
632-
func(t *testing.T, iteration int, previousRun, nextRun time.Time) {
632+
func(t *testing.T, previousRun, nextRun time.Time) {
633+
// With the fix for NextRun accuracy, the immediate run (Jan 1) is removed
634+
// from nextScheduled after it completes. So all intervals should be 14 days
635+
// (2 weeks as configured).
633636
diff := time.Hour * 14 * 24
634-
if iteration == 1 {
635-
// because the job is run immediately, the first run is on
636-
// Saturday 1/1/2000. The following run is then on Tuesday 1/11/2000
637-
diff = time.Hour * 10 * 24
638-
}
639637
assert.Equal(t, previousRun.Add(diff).Day(), nextRun.Day())
640638
},
641639
},
@@ -672,7 +670,7 @@ func TestJob_NextRuns(t *testing.T) {
672670
// skipping because there is no previous run
673671
continue
674672
}
675-
tt.assertion(t, i, nextRuns[i-1], nextRuns[i])
673+
tt.assertion(t, nextRuns[i-1], nextRuns[i])
676674
}
677675

678676
assert.NoError(t, s.Shutdown())
@@ -1197,3 +1195,191 @@ func TestWithIntervalFromCompletion_FirstRun(t *testing.T) {
11971195
assert.Less(t, timeSinceStart.Seconds(), 1.0,
11981196
"First run should happen quickly with WithStartImmediately")
11991197
}
1198+
1199+
func TestJob_NextRun_MultipleJobsSimultaneously(t *testing.T) {
1200+
// This test reproduces the bug where multiple jobs completing simultaneously
1201+
// would cause NextRun() to return stale values due to race conditions in
1202+
// nextScheduled cleanup.
1203+
1204+
testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
1205+
fakeClock := clockwork.NewFakeClockAt(testTime)
1206+
1207+
s := newTestScheduler(t,
1208+
WithClock(fakeClock),
1209+
WithLocation(time.UTC),
1210+
)
1211+
1212+
jobsCompleted := make(chan struct{}, 4)
1213+
1214+
// Create multiple jobs with different intervals that will complete around the same time
1215+
job1, err := s.NewJob(
1216+
DurationJob(1*time.Minute),
1217+
NewTask(func() {
1218+
jobsCompleted <- struct{}{}
1219+
}),
1220+
WithName("job1"),
1221+
WithStartAt(WithStartImmediately()),
1222+
)
1223+
require.NoError(t, err)
1224+
1225+
job2, err := s.NewJob(
1226+
DurationJob(2*time.Minute),
1227+
NewTask(func() {
1228+
jobsCompleted <- struct{}{}
1229+
}),
1230+
WithName("job2"),
1231+
WithStartAt(WithStartImmediately()),
1232+
)
1233+
require.NoError(t, err)
1234+
1235+
job3, err := s.NewJob(
1236+
DurationJob(3*time.Minute),
1237+
NewTask(func() {
1238+
jobsCompleted <- struct{}{}
1239+
}),
1240+
WithName("job3"),
1241+
WithStartAt(WithStartImmediately()),
1242+
)
1243+
require.NoError(t, err)
1244+
1245+
job4, err := s.NewJob(
1246+
DurationJob(4*time.Minute),
1247+
NewTask(func() {
1248+
jobsCompleted <- struct{}{}
1249+
}),
1250+
WithName("job4"),
1251+
WithStartAt(WithStartImmediately()),
1252+
)
1253+
require.NoError(t, err)
1254+
1255+
s.Start()
1256+
1257+
// Wait for all 4 jobs to complete their immediate run
1258+
for i := 0; i < 4; i++ {
1259+
<-jobsCompleted
1260+
}
1261+
1262+
// Give the scheduler time to process the completions and reschedule
1263+
time.Sleep(50 * time.Millisecond)
1264+
1265+
// Verify that NextRun() returns the correct next scheduled time for each job
1266+
// and not a stale value from the just-completed run
1267+
1268+
nextRun1, err := job1.NextRun()
1269+
require.NoError(t, err)
1270+
assert.Equal(t, testTime.Add(1*time.Minute), nextRun1, "job1 NextRun should be 1 minute from start")
1271+
1272+
nextRun2, err := job2.NextRun()
1273+
require.NoError(t, err)
1274+
assert.Equal(t, testTime.Add(2*time.Minute), nextRun2, "job2 NextRun should be 2 minutes from start")
1275+
1276+
nextRun3, err := job3.NextRun()
1277+
require.NoError(t, err)
1278+
assert.Equal(t, testTime.Add(3*time.Minute), nextRun3, "job3 NextRun should be 3 minutes from start")
1279+
1280+
nextRun4, err := job4.NextRun()
1281+
require.NoError(t, err)
1282+
assert.Equal(t, testTime.Add(4*time.Minute), nextRun4, "job4 NextRun should be 4 minutes from start")
1283+
1284+
// Advance time to trigger job1's next run
1285+
fakeClock.Advance(1 * time.Minute)
1286+
1287+
// Wait for job1 to complete
1288+
<-jobsCompleted
1289+
time.Sleep(50 * time.Millisecond)
1290+
1291+
// After job1's second run, it should be scheduled for +2 minutes from start
1292+
nextRun1, err = job1.NextRun()
1293+
require.NoError(t, err)
1294+
assert.Equal(t, testTime.Add(2*time.Minute), nextRun1, "job1 NextRun should be 2 minutes from start after first interval")
1295+
1296+
require.NoError(t, s.Shutdown())
1297+
}
1298+
1299+
func TestJob_NextRun_ConcurrentCompletions(t *testing.T) {
1300+
// This test verifies that when multiple jobs complete at exactly the same time,
1301+
// their NextRun() values are correctly updated without race conditions.
1302+
1303+
testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
1304+
fakeClock := clockwork.NewFakeClockAt(testTime)
1305+
1306+
s := newTestScheduler(t,
1307+
WithClock(fakeClock),
1308+
WithLocation(time.UTC), // Set scheduler to use UTC to match our test time
1309+
)
1310+
1311+
var wg sync.WaitGroup
1312+
jobCompletionBarrier := make(chan struct{})
1313+
1314+
// Create jobs that will all complete at the same instant
1315+
createJob := func(name string, interval time.Duration) Job {
1316+
job, err := s.NewJob(
1317+
DurationJob(interval),
1318+
NewTask(func() {
1319+
wg.Done()
1320+
<-jobCompletionBarrier // Wait until all jobs are ready to complete
1321+
}),
1322+
WithName(name),
1323+
WithStartAt(WithStartImmediately()),
1324+
)
1325+
require.NoError(t, err)
1326+
return job
1327+
}
1328+
1329+
wg.Add(4)
1330+
job1 := createJob("concurrent-job1", 1*time.Minute)
1331+
job2 := createJob("concurrent-job2", 2*time.Minute)
1332+
job3 := createJob("concurrent-job3", 3*time.Minute)
1333+
job4 := createJob("concurrent-job4", 4*time.Minute)
1334+
1335+
s.Start()
1336+
1337+
wg.Wait()
1338+
close(jobCompletionBarrier)
1339+
1340+
// Give the scheduler time to process all completions
1341+
time.Sleep(100 * time.Millisecond)
1342+
1343+
// Verify NextRun() for all jobs concurrently to stress test the race condition
1344+
var testWg sync.WaitGroup
1345+
testWg.Add(4)
1346+
1347+
go func() {
1348+
defer testWg.Done()
1349+
for i := 0; i < 10; i++ {
1350+
nextRun, err := job1.NextRun()
1351+
require.NoError(t, err)
1352+
assert.Equal(t, testTime.Add(1*time.Minute), nextRun)
1353+
}
1354+
}()
1355+
1356+
go func() {
1357+
defer testWg.Done()
1358+
for i := 0; i < 10; i++ {
1359+
nextRun, err := job2.NextRun()
1360+
require.NoError(t, err)
1361+
assert.Equal(t, testTime.Add(2*time.Minute), nextRun)
1362+
}
1363+
}()
1364+
1365+
go func() {
1366+
defer testWg.Done()
1367+
for i := 0; i < 10; i++ {
1368+
nextRun, err := job3.NextRun()
1369+
require.NoError(t, err)
1370+
assert.Equal(t, testTime.Add(3*time.Minute), nextRun)
1371+
}
1372+
}()
1373+
1374+
go func() {
1375+
defer testWg.Done()
1376+
for i := 0; i < 10; i++ {
1377+
nextRun, err := job4.NextRun()
1378+
require.NoError(t, err)
1379+
assert.Equal(t, testTime.Add(4*time.Minute), nextRun)
1380+
}
1381+
}()
1382+
1383+
testWg.Wait()
1384+
require.NoError(t, s.Shutdown())
1385+
}

scheduler.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -443,11 +443,11 @@ func (s *scheduler) updateNextScheduled(id uuid.UUID) {
443443
return
444444
}
445445
var newNextScheduled []time.Time
446+
now := s.now()
446447
for _, t := range j.nextScheduled {
447-
if t.Before(s.now()) {
448-
continue
448+
if t.After(now) { // Changed to match selectExecJobsOutCompleted
449+
newNextScheduled = append(newNextScheduled, t)
449450
}
450-
newNextScheduled = append(newNextScheduled, t)
451451
}
452452
j.nextScheduled = newNextScheduled
453453
s.jobs[id] = j
@@ -460,13 +460,13 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
460460
}
461461

462462
// if the job has nextScheduled time in the past,
463-
// we need to remove any that are in the past.
463+
// we need to remove any that are in the past or at the current time (just executed).
464464
var newNextScheduled []time.Time
465+
now := s.now()
465466
for _, t := range j.nextScheduled {
466-
if t.Before(s.now()) {
467-
continue
467+
if t.After(now) {
468+
newNextScheduled = append(newNextScheduled, t)
468469
}
469-
newNextScheduled = append(newNextScheduled, t)
470470
}
471471
j.nextScheduled = newNextScheduled
472472

0 commit comments

Comments
 (0)