Skip to content

Instantly share code, notes, and snippets.

@kinnrot
Created September 5, 2017 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kinnrot/b6a7c8c2b7962118597c173f446808bb to your computer and use it in GitHub Desktop.
Save kinnrot/b6a7c8c2b7962118597c173f446808bb to your computer and use it in GitHub Desktop.
Implement distributed workflow over delayed job mongoid
class JobListItem
include Mongoid::Document
include Mongoid::Timestamps::Created
belongs_to :next_job, :class_name => "JobListItem", :inverse_of => :prev_job, optional: true
has_one :prev_job, :class_name => "JobListItem", :inverse_of => :next_job
NONE=0
RUNNING = 1
FINISHED = 2
FAILED= 3
field :status, type: Integer, default: NONE
field :job_ids, type: Array
attr_accessor :jobs
def execute
set(status: JobListItem::RUNNING)
@jobs=[]
begin
schedule_jobs
rescue Exception => ex
Rails.logger.error "error on scheduling method #{ex} #{ex.backtrace}"
set(status: FAILED)
return
end
job_ids = jobs.map { |job| job.job_id.to_s }
Checkpoint.perform_later(job_ids, self.id.to_s)
save!
end
def schedule_jobs
raise "need to override schedule jobs and fill job ids array please"
end
def self.perform
self.new.execute
end
end
class Checkpoint < OutProcessJob
def perform(job_ids, job_list_item_id)
job_list_item_id = deserialize_id job_list_item_id
@job_list_item = JobListItem.find(job_list_item_id)
unless validate_completion(job_ids)
unless Rails.env.test?
# set myself to run in a minute
Checkpoint.set(wait: 10.seconds).
perform_later(job_ids, Job::Serializers.serialize_id(job_list_item_id))
return
end
end
if @job_list_item.next_job.present?
@job_list_item.next_job.execute
@job_list_item.destroy
else
@job_list_item.destroy
end
end
def validate_completion(job_ids)
return true if job_ids.empty?
any_job_left = jobs_selector(job_ids).exists?
any_job_failed = jobs_selector(job_ids).where(failed_at: nil).exists?
return false if any_job_left # not completed
if any_job_failed # some failed
@job_list_item.set(status: JobListItem::FAILED)
raise "checkpoint failed, performing some jobs failed chain stops"
end
@job_list_item.set(status: JobListItem::FINISHED)
true
end
def jobs_selector(job_ids)
Delayed::Job.or(job_ids.map{|id|{:handler => /.*job_id: #{id}.*/} })
end
end
class JobListExecutor
def self.run(*jobs)
internal_make_list(jobs).execute
end
def self.make_list(*jobs)
internal_make_list(jobs)
end
def self.internal_make_list(jobs)
jobs.each_with_index do |job, index|
if index+1 < jobs.length
job.next_job = jobs[index+1]
end
job.save!
end
jobs.first
end
end
# tests
require 'test_helper'
class JobCompositionTest < ActiveSupport::TestCase
def setup
@@result = {"11"=>0,"12"=>0,"2"=>0}
end
@@result = {}
def self.report(job_name)
@@result[job_name]+=1
end
class FirstJob < JobListItem
def schedule_jobs
jobs.push InternalJob11.perform_later
jobs.push InternalJob12.perform_later
end
end
class SecondJob < JobListItem
def schedule_jobs
jobs.push InternalJob2.perform_later
end
end
class InternalJob11 < OutProcessJob
def perform
JobCompositionTest.report "11"
end
end
class InternalJob12 < OutProcessJob
def perform
JobCompositionTest.report "12"
end
end
class InternalJob2 < OutProcessJob
def perform
JobCompositionTest.report "2"
end
end
test 'can create' do
job = JobListItem.create!()
assert_not_nil job.id
end
test 'got created at' do
job = JobListItem.create!()
assert_not_nil job.created_at
end
test 'can create with job ids' do
job = JobListItem.create!(job_ids: ["sd", "sdf"])
assert_not_empty job.job_ids
end
test 'can create with children' do
child = JobListItem.create!()
job = JobListItem.create!(next_job: child)
assert_not_nil job.next_job
end
test 'can create with next, prev is original' do
child = JobListItem.create!()
job = JobListItem.create!(next_job: child)
assert_equal job, job.next_job.prev_job
end
test 'can execute 1 job' do
list_head = JobListExecutor.make_list(SecondJob.create)
list_head.execute
assert_equal 1,@@result["2"], "second job didnt run"
end
test 'can execute 2 job' do
list_head = JobListExecutor.make_list(FirstJob.create)
list_head.execute
assert_equal 1,@@result["12"], "second job didnt run"
assert_equal 1, @@result["11"], "second job didnt run"
end
test 'can execute 2+2 job' do
list_head = JobListExecutor.make_list(FirstJob.create, FirstJob.create)
list_head.execute
assert_equal 2, @@result["12"], "second job didnt run"
assert_equal 2, @@result["11"], "second job didnt run"
end
test 'can execute 1+2 job' do
list_head = JobListExecutor.make_list(FirstJob.create, SecondJob.create)
list_head.execute
assert_equal 1,@@result["12"], "second job didnt run"
assert_equal 1,@@result["11"], "second job didnt run"
assert_equal 1,@@result["2"], "second job didnt run"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment