Skip to content

Commit d0a3ef1

Browse files
committed
Make jobs migrator idempotent and self-healing
Idempotency helps when your argument list doesn't change; self-healing helps when the argument list is dynamically generated refs AE-3244
1 parent 3809894 commit d0a3ef1

File tree

4 files changed

+89
-12
lines changed

4 files changed

+89
-12
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# frozen_string_literal: true
2+
3+
class AddPreviousJobsShard < ActiveRecord::Migration[7.1]
4+
def change
5+
add_reference :switchman_shards, :previous_delayed_jobs_shard, foreign_key: { to_table: :switchman_shards }, index: false, if_not_exists: true
6+
end
7+
end

lib/switchman_inst_jobs/jobs_migrator.rb

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,39 +35,58 @@ def transaction_on(shards, &)
3535
end
3636

3737
def migrate_shards(shard_map)
38-
source_shards = Set[]
38+
effective_map = shard_map.dup
39+
source_shards = Hash.new([].freeze)
3940
target_shards = Hash.new([].freeze)
40-
shard_map.each do |(shard, target_shard)|
41+
# Also add any incomplete moves to the source shards to ensure we clean up appropriately
42+
::Switchman::Shard.where.not(previous_delayed_jobs_shard_id: nil).each do |shard|
43+
effective_map[shard.id] ||= shard.delayed_jobs_shard.id
44+
end
45+
effective_map.each do |(shard, target_shard)|
4146
shard = ::Switchman::Shard.find(shard) unless shard.is_a?(::Switchman::Shard)
42-
source_shards << shard.delayed_jobs_shard.id
4347
target_shard = target_shard.try(:id) || target_shard
48+
# if a move was interrupted, the new shard is already set as the delayed_jobs_shard
49+
# but we still have the old shard stored in previous_delayed_jobs_shard and should
50+
# act as if we are moving from there in the first place
51+
if shard.previous_delayed_jobs_shard_id && shard.delayed_jobs_shard.id == target_shard
52+
source_shards[shard.previous_delayed_jobs_shard_id] += [shard.id]
53+
else
54+
source_shards[shard.delayed_jobs_shard.id] += [shard.id]
55+
end
4456
target_shards[target_shard] += [shard.id]
4557

4658
@validation_callbacks&.each do |proc|
4759
proc.call(shard:, target_shard: ::Switchman::Shard.find(target_shard))
4860
end
4961
end
5062

51-
# Do the updates in batches and then just clear redis instead of clearing them one at a time
52-
target_shards.each do |target_shard, shards|
53-
updates = { delayed_jobs_shard_id: target_shard, block_stranded: true }
54-
updates[:updated_at] = Time.zone.now if ::Switchman::Shard.column_names.include?("updated_at")
55-
::Switchman::Shard.where(id: shards).update_all(updates)
63+
::Switchman::Shard.transaction do
64+
# Do the updates in batches and then just clear redis instead of clearing them one at a time
65+
source_shards.each do |source_shard, shards|
66+
updates = { previous_delayed_jobs_shard_id: source_shard }
67+
::Switchman::Shard.where(id: shards).update_all(updates)
68+
end
69+
target_shards.each do |target_shard, shards|
70+
updates = { delayed_jobs_shard_id: target_shard, block_stranded: true }
71+
updates[:updated_at] = Time.zone.now if ::Switchman::Shard.column_names.include?("updated_at")
72+
::Switchman::Shard.where(id: shards).update_all(updates)
73+
end
5674
end
5775
clear_shard_cache(default: ::Switchman::Shard.exists?(id: target_shards.values.flatten, default: true))
5876

5977
::Switchman::Shard.clear_cache
6078
# rubocop:disable Style/CombinableLoops
6179
# We first migrate strands so that we can stop blocking strands before we migrate unstranded jobs
62-
source_shards.each do |s|
80+
source_shards.keys.each do |s|
6381
::Switchman::Shard.lookup(s).activate(::Delayed::Backend::ActiveRecord::AbstractJob) { migrate_strands }
6482
end
6583

66-
source_shards.each do |s|
84+
source_shards.keys.each do |s|
6785
::Switchman::Shard.lookup(s).activate(::Delayed::Backend::ActiveRecord::AbstractJob) { migrate_everything }
6886
end
69-
ensure_unblock_stranded_for(shard_map.map(&:first))
87+
ensure_unblock_stranded_for(effective_map.map(&:first))
7088
# rubocop:enable Style/CombinableLoops
89+
::Switchman::Shard.where(id: effective_map.map(&:first)).update_all(previous_delayed_jobs_shard_id: nil)
7190
end
7291

7392
# if :migrate_strands ran on any shards that fell into scenario 1, then

lib/switchman_inst_jobs/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module SwitchmanInstJobs
4-
VERSION = "4.3.0"
4+
VERSION = "4.3.1"
55
end

spec/lib/switchman_inst_jobs/jobs_migrator_spec.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,4 +529,55 @@ def activate_target_shard
529529
described_class.clear_callbacks!
530530
end
531531
end
532+
533+
context "fault tolerance" do
534+
before do
535+
@times_called = 0
536+
original_method = described_class.method(:migrate_everything)
537+
allow(described_class).to receive(:migrate_everything).with(no_args) do
538+
@times_called += 1
539+
raise "test failure" if @times_called == 1
540+
original_method.call
541+
end
542+
end
543+
544+
it "should do work when called twice in a row" do
545+
activate_source_shard do
546+
3.times { Kernel.delay.sleep(0.3) }
547+
end
548+
expect(Delayed::Job.count).to eq 3
549+
550+
shard1.delayed_jobs_shard_id = Switchman::Shard.default.id
551+
shard1.save!
552+
expect { described_class.migrate_shards({ shard1 => shard1 }) }.to raise_error("test failure")
553+
expect(Delayed::Job.count).to eq 3
554+
expect { described_class.migrate_shards({ shard1 => shard1 }) }.not_to raise_error
555+
expect(Delayed::Job.count).to eq 0
556+
557+
activate_target_shard do
558+
expect(Delayed::Job.count).to eq 3
559+
end
560+
end
561+
562+
it "should cleanup partial migrations implicitly" do
563+
activate_source_shard do
564+
3.times { Kernel.delay.sleep(0.3) }
565+
end
566+
expect(Delayed::Job.count).to eq 3
567+
568+
shard1.delayed_jobs_shard_id = Switchman::Shard.default.id
569+
shard1.save!
570+
expect(::Switchman::Shard.where.not(previous_delayed_jobs_shard_id: nil).count).to eq 0
571+
expect { described_class.migrate_shards({ shard1 => shard1 }) }.to raise_error("test failure")
572+
expect(Delayed::Job.count).to eq 3
573+
expect(::Switchman::Shard.where.not(previous_delayed_jobs_shard_id: nil).count).to eq 1
574+
expect { described_class.migrate_shards({}) }.not_to raise_error
575+
expect(Delayed::Job.count).to eq 0
576+
577+
activate_target_shard do
578+
expect(Delayed::Job.count).to eq 3
579+
end
580+
expect(::Switchman::Shard.where.not(previous_delayed_jobs_shard_id: nil).count).to eq 0
581+
end
582+
end
532583
end

0 commit comments

Comments
 (0)