Created
September 5, 2017 16:58
-
-
Save kinnrot/b6a7c8c2b7962118597c173f446808bb to your computer and use it in GitHub Desktop.
Implement distributed workflow over delayed job mongoid
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JobListItem | |
include Mongoid::Document | |
include Mongoid::Timestamps::Created | |
belongs_to :next_job, :class_name => "JobListItem", :inverse_of => :prev_job, optional: true | |
has_one :prev_job, :class_name => "JobListItem", :inverse_of => :next_job | |
NONE=0 | |
RUNNING = 1 | |
FINISHED = 2 | |
FAILED= 3 | |
field :status, type: Integer, default: NONE | |
field :job_ids, type: Array | |
attr_accessor :jobs | |
def execute | |
set(status: JobListItem::RUNNING) | |
@jobs=[] | |
begin | |
schedule_jobs | |
rescue Exception => ex | |
Rails.logger.error "error on scheduling method #{ex} #{ex.backtrace}" | |
set(status: FAILED) | |
return | |
end | |
job_ids = jobs.map { |job| job.job_id.to_s } | |
Checkpoint.perform_later(job_ids, self.id.to_s) | |
save! | |
end | |
def schedule_jobs | |
raise "need to override schedule jobs and fill job ids array please" | |
end | |
def self.perform | |
self.new.execute | |
end | |
end | |
class Checkpoint < OutProcessJob | |
def perform(job_ids, job_list_item_id) | |
job_list_item_id = deserialize_id job_list_item_id | |
@job_list_item = JobListItem.find(job_list_item_id) | |
unless validate_completion(job_ids) | |
unless Rails.env.test? | |
# set myself to run in a minute | |
Checkpoint.set(wait: 10.seconds). | |
perform_later(job_ids, Job::Serializers.serialize_id(job_list_item_id)) | |
return | |
end | |
end | |
if @job_list_item.next_job.present? | |
@job_list_item.next_job.execute | |
@job_list_item.destroy | |
else | |
@job_list_item.destroy | |
end | |
end | |
def validate_completion(job_ids) | |
return true if job_ids.empty? | |
any_job_left = jobs_selector(job_ids).exists? | |
any_job_failed = jobs_selector(job_ids).where(failed_at: nil).exists? | |
return false if any_job_left # not completed | |
if any_job_failed # some failed | |
@job_list_item.set(status: JobListItem::FAILED) | |
raise "checkpoint failed, performing some jobs failed chain stops" | |
end | |
@job_list_item.set(status: JobListItem::FINISHED) | |
true | |
end | |
def jobs_selector(job_ids) | |
Delayed::Job.or(job_ids.map{|id|{:handler => /.*job_id: #{id}.*/} }) | |
end | |
end | |
class JobListExecutor | |
def self.run(*jobs) | |
internal_make_list(jobs).execute | |
end | |
def self.make_list(*jobs) | |
internal_make_list(jobs) | |
end | |
def self.internal_make_list(jobs) | |
jobs.each_with_index do |job, index| | |
if index+1 < jobs.length | |
job.next_job = jobs[index+1] | |
end | |
job.save! | |
end | |
jobs.first | |
end | |
end | |
# tests | |
require 'test_helper' | |
class JobCompositionTest < ActiveSupport::TestCase | |
def setup | |
@@result = {"11"=>0,"12"=>0,"2"=>0} | |
end | |
@@result = {} | |
def self.report(job_name) | |
@@result[job_name]+=1 | |
end | |
class FirstJob < JobListItem | |
def schedule_jobs | |
jobs.push InternalJob11.perform_later | |
jobs.push InternalJob12.perform_later | |
end | |
end | |
class SecondJob < JobListItem | |
def schedule_jobs | |
jobs.push InternalJob2.perform_later | |
end | |
end | |
class InternalJob11 < OutProcessJob | |
def perform | |
JobCompositionTest.report "11" | |
end | |
end | |
class InternalJob12 < OutProcessJob | |
def perform | |
JobCompositionTest.report "12" | |
end | |
end | |
class InternalJob2 < OutProcessJob | |
def perform | |
JobCompositionTest.report "2" | |
end | |
end | |
test 'can create' do | |
job = JobListItem.create!() | |
assert_not_nil job.id | |
end | |
test 'got created at' do | |
job = JobListItem.create!() | |
assert_not_nil job.created_at | |
end | |
test 'can create with job ids' do | |
job = JobListItem.create!(job_ids: ["sd", "sdf"]) | |
assert_not_empty job.job_ids | |
end | |
test 'can create with children' do | |
child = JobListItem.create!() | |
job = JobListItem.create!(next_job: child) | |
assert_not_nil job.next_job | |
end | |
test 'can create with next, prev is original' do | |
child = JobListItem.create!() | |
job = JobListItem.create!(next_job: child) | |
assert_equal job, job.next_job.prev_job | |
end | |
test 'can execute 1 job' do | |
list_head = JobListExecutor.make_list(SecondJob.create) | |
list_head.execute | |
assert_equal 1,@@result["2"], "second job didnt run" | |
end | |
test 'can execute 2 job' do | |
list_head = JobListExecutor.make_list(FirstJob.create) | |
list_head.execute | |
assert_equal 1,@@result["12"], "second job didnt run" | |
assert_equal 1, @@result["11"], "second job didnt run" | |
end | |
test 'can execute 2+2 job' do | |
list_head = JobListExecutor.make_list(FirstJob.create, FirstJob.create) | |
list_head.execute | |
assert_equal 2, @@result["12"], "second job didnt run" | |
assert_equal 2, @@result["11"], "second job didnt run" | |
end | |
test 'can execute 1+2 job' do | |
list_head = JobListExecutor.make_list(FirstJob.create, SecondJob.create) | |
list_head.execute | |
assert_equal 1,@@result["12"], "second job didnt run" | |
assert_equal 1,@@result["11"], "second job didnt run" | |
assert_equal 1,@@result["2"], "second job didnt run" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment