Last active
January 22, 2023 20:56
-
-
Save fractaledmind/4554d4e3f384dc56c10a9fe444a870bb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JobRun < ActiveRecord::Base | |
FINISHED_RECOVERY_POINT = "FINISHED" | |
validates :job_class, presence: true | |
validates :job_id, presence: true | |
validates :serialized_job, presence: true | |
serialize :serialized_job, JSON | |
store :persisted_data | |
belongs_to :awaited_by, class_name: "JobRun", optional: true | |
has_many :batched_runs, class_name: "JobRun", foreign_key: "awaited_by_id" | |
scope :outstanding, lambda { | |
where.not(recovery_point: FINISHED_RECOVERY_POINT).or(where(recovery_point: [nil, ""])) | |
} | |
after_create_commit :enqueue_job, if: :staged? | |
after_update_commit :proceed_with_parent, if: :finished? | |
def self.enqueue!(job) | |
# force the job to resolve the `queue_name`, so that we don't try to serialize a Proc into ActiveRecord | |
job.queue_name | |
create!( | |
staged: true, | |
job_class: job.class.name, | |
job_id: job.job_id, | |
serialized_job: job.serialize, | |
) | |
end | |
def self.await!(job, by:, return_to:) | |
create!( | |
staged: true, | |
awaited_by: by, | |
job_class: job.class.name, | |
job_id: job.job_id, | |
serialized_job: job.serialize, | |
) | |
by.update!(returning_to: return_to) | |
end | |
def proceed_with_parent | |
return unless finished? | |
return unless awaited_by.present? | |
return if awaited_by.batched_runs.outstanding.any? | |
awaited_by.proceed | |
end | |
def proceed | |
# when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again | |
update!( | |
recovery_point: returning_to, | |
returning_to: nil | |
) | |
return if finished? | |
enqueue_job | |
end | |
def enqueue_job | |
job.enqueue | |
end | |
def job | |
return @job if defined? @job | |
job_class = job_class.constantize | |
@job = job_class.deserialize(serialized_job) | |
end | |
def finished? | |
recovery_point.to_s == FINISHED_RECOVERY_POINT | |
end | |
end | |
class AwaitingChildrenJobs < StandardError; end | |
class OnAppointmentAttendedJob < ActiveJob::Base | |
discard_on AwaitingChildrenJobs | |
# steps: charge_and_invoice -> notify_of_charge -> create_insurance_claim -> submit_to_insurance -> FINISH | |
def perform(appointment) | |
# Always setup any data that is required within steps but available at the start of the job | |
# via simple instance variables at the top of the `perform` method | |
@appointment = appointment | |
@copay_cents = appointment.service.copay_cents | |
@run = ::ActiveRecord::Base.transaction(isolation: :serializable) do | |
run = JobRun.find_by( | |
job_class: self.class, | |
job_id: job_id, | |
job_args: [appointment] | |
) | |
if run.present? | |
run.update!( | |
last_run_at: Time.current | |
) | |
else | |
JobRun.create!( | |
job_class: self.class, | |
job_id: job_id, | |
job_args: [appointment], | |
serialized_job: serialize, | |
recovery_point: :charge_and_invoice | |
) | |
end | |
end | |
return if @run.recovery_point == "FINISH" | |
idempotently_perform_step(:charge_and_invoice, progress_to: :notify_of_charge) | |
idempotently_perform_step(:notify_of_charge, progress_to: :create_insurance_claim) | |
idempotently_perform_step(:create_insurance_claim, progress_to: :submit_to_insurance) | |
idempotently_perform_step(:submit_to_insurance, progress_to: :FINISH) | |
end | |
private | |
def idempotently_perform_step(current_step_method, progress_to:) | |
# Don't re-run steps that the job run has already successfully performed | |
return unless @run.recovery_point == current_step_method.to_s | |
# Use a database transaction and a database lock for ACIDic guarantees + concurrency mitigation | |
@run.with_lock do | |
current_step_result = method(current_step_method).call | |
case current_step_result | |
in [:await, awaited_jobs] | |
Array(awaited_jobs).compact.each do |awaited_job| | |
# "Stage" child jobs | |
# Mark them as children of this current `@run` and provide the "step" in this run to return to | |
# once all of the children jobs have finished. | |
JobRun.await!(awaited_job, by: @run, returning_to: progress_to) | |
end | |
# after processing the current step and staging the children jobs, | |
# break the processing loop and stop this method from blocking in the primary worker. | |
# This job will continue once the background jobs all succeed, | |
# so we want to keep the primary worker queue free to process new work. | |
raise AwaitingChildrenJobs | |
else | |
# Only progress to the next step if the entire database transaction commits | |
@run.update!(recovery_point: progress_to) | |
end | |
end | |
end | |
def charge_and_invoice | |
# Data that is needed between steps needs to be stored in the database | |
# so that it is accessible on retries that skip this step. | |
# But, to save on unnecessary db queries if this job succeeds on the first run, | |
# we also store the data in an instance variable for access across methods | |
@invoice = @run.persisted_data[:invoice] = Invoice.create!( | |
customer: @appointment.customer, | |
amount_cents: @copay_cents, | |
) | |
@charge = @run.persisted_data[:charge] = Charge.create!( | |
invoice: invoice, | |
amount_cents: @copay_cents, | |
) | |
end | |
def notify_of_charge | |
# Fetch the charge record if starting this method from a retry | |
@charge ||= @run.persisted_data[:charge] | |
# Initialize an ActiveJob without performing it | |
job = ChargeJob.new(@charge.id) | |
# Enqueue the job through the database | |
# Using an `after_create_commit` hook, this job will be enqueued | |
# and performed only if the step-wrapping transaction commits. | |
JobRun.enqueue!(job) | |
end | |
def create_insurance_claim | |
# Fetch the charge record if starting this method from a retry | |
@charge ||= @run.persisted_data[:charge] | |
@insurance_claim = @run.persisted_data[:insurance_claim] = InsuranceClaim.create!( | |
appointment: @appointment, | |
copay: @charge, | |
) | |
end | |
def submit_to_insurance | |
# Fetch the insurance_claim record if starting this method from a retry | |
@insurance_claim ||= @run.persisted_data[:insurance_claim] | |
[:await, SubmitToInsuranceJob.new(@insurance_claim)] | |
end | |
end | |
class SubmitToInsuranceJob < ActiveJob::Base | |
after_perform :finish_job_run, if: :has_preexisting_job_run_record | |
def perform(insurance_claim) | |
return if API::InsuranceProvider.get(insurance_claim.id).present? | |
API::InsuranceProvider.post(insurance_claim.id) | |
end | |
private | |
def has_preexisting_job_run_record? | |
@run = JobRun.find_by( | |
job_class: self.class, | |
job_id: job_id, | |
job_args: [insurance_claim] | |
) | |
@run.present? | |
end | |
def finish_job_run | |
@run.update!(recovery_point: JobRun::FINISHED_RECOVERY_POINT) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JobRun < ActiveRecord::Base | |
validates :job_class, presence: true | |
validates :job_id, presence: true | |
validates :serialized_job, presence: true | |
serialize :serialized_job, JSON | |
after_create_commit :enqueue_job, if: :staged? | |
def self.enqueue!(job) | |
# force the job to resolve the `queue_name`, so that we don't try to serialize a Proc into ActiveRecord | |
job.queue_name | |
create!( | |
staged: true, | |
job_class: job.class.name, | |
job_id: job.job_id, | |
serialized_job: job.serialize, | |
) | |
end | |
def enqueue_job | |
job.enqueue | |
end | |
def job | |
return @job if defined? @job | |
job_class = job_class.constantize | |
@job = job_class.deserialize(serialized_job) | |
end | |
end | |
class OnAppointmentAttendedJob < ActiveJob::Base | |
def perform(appointment) | |
@run = ::ActiveRecord::Base.transaction(isolation: :serializable) do | |
run = JobRun.find_by( | |
job_class: self.class, | |
job_id: job_id, | |
job_args: [appointment] | |
) | |
if run.present? | |
run.update!( | |
last_run_at: Time.current | |
) | |
else | |
JobRun.create!( | |
job_class: self.class, | |
job_id: job_id, | |
job_args: [appointment], | |
serialized_job: serialize, | |
recovery_point: :charge_and_invoice | |
) | |
end | |
end | |
return if @run.completed_at.present? | |
# Because all "action" is now some form of database write, | |
# we can simply wrap the process in a single db transaction | |
@run.with_lock do | |
copay_cents = appointment.service.copay_cents | |
invoice = Invoice.create!( | |
customer: appointment.customer, | |
amount_cents: copay_cents, | |
) | |
charge = Charge.create!( | |
invoice: invoice, | |
amount_cents: copay_cents, | |
) | |
# Enqueue the job through the database | |
# Using an `after_create_commit` hook, this job will be enqueued | |
# and performed only if the step-wrapping transaction commits. | |
JobRun.enqueue!(ChargeJob.new(charge.id)) | |
insurance_claim = InsuranceClaim.create!( | |
appointment: appointment, | |
copay: charge, | |
) | |
JobRun.enqueue!(SubmitToInsuranceJob.new(insurance_claim.id)) | |
# Only complete the run if the entire database transaction commits | |
@run.update!(completed_at: Time.now) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JobRun < ActiveRecord::Base | |
validates :job_class, presence: true | |
validates :job_id, presence: true | |
validates :serialized_job, presence: true | |
serialize :serialized_job, JSON | |
store :persisted_data | |
after_create_commit :enqueue_job, if: :staged? | |
def self.enqueue!(job) | |
# force the job to resolve the `queue_name`, so that we don't try to serialize a Proc into ActiveRecord | |
job.queue_name | |
create!( | |
staged: true, | |
job_class: job.class.name, | |
job_id: job.job_id, | |
serialized_job: job.serialize, | |
) | |
end | |
def enqueue_job | |
job.enqueue | |
end | |
def job | |
return @job if defined? @job | |
job_class = job_class.constantize | |
@job = job_class.deserialize(serialized_job) | |
end | |
end | |
class OnAppointmentAttendedJob < ActiveJob::Base | |
# steps: charge_and_invoice -> notify_of_charge -> create_insurance_claim -> submit_to_insurance -> FINISH | |
def perform(appointment) | |
# Always setup any data that is required within steps but available at the start of the job | |
# via simple instance variables at the top of the `perform` method | |
@appointment = appointment | |
@copay_cents = appointment.service.copay_cents | |
@run = ::ActiveRecord::Base.transaction(isolation: :serializable) do | |
run = JobRun.find_by( | |
job_class: self.class, | |
job_id: job_id, | |
job_args: [appointment] | |
) | |
if run.present? | |
run.update!( | |
last_run_at: Time.current | |
) | |
else | |
JobRun.create!( | |
job_class: self.class, | |
job_id: job_id, | |
job_args: [appointment], | |
serialized_job: serialize, | |
recovery_point: :charge_and_invoice | |
) | |
end | |
end | |
return if @run.recovery_point == "FINISH" | |
idempotently_perform_step(:charge_and_invoice, progress_to: :notify_of_charge) | |
idempotently_perform_step(:notify_of_charge, progress_to: :create_insurance_claim) | |
idempotently_perform_step(:create_insurance_claim, progress_to: :submit_to_insurance) | |
idempotently_perform_step(:submit_to_insurance, progress_to: :FINISH) | |
end | |
private | |
def idempotently_perform_step(current_step_method, progress_to:) | |
# Don't re-run steps that the job run has already successfully performed | |
return unless @run.recovery_point == current_step_method.to_s | |
# Use a database transaction and a database lock for ACIDic guarantees + concurrency mitigation | |
@run.with_lock do | |
method(current_step_method).call | |
# Only progress to the next step if the entire database transaction commits | |
@run.update!(recovery_point: progress_to) | |
end | |
end | |
def charge_and_invoice | |
# Data that is needed between steps needs to be stored in the database | |
# so that it is accessible on retries that skip this step. | |
# But, to save on unnecessary db queries if this job succeeds on the first run, | |
# we also store the data in an instance variable for access across methods | |
@invoice = @run.persisted_data[:invoice] = Invoice.create!( | |
customer: @appointment.customer, | |
amount_cents: @copay_cents, | |
) | |
@charge = @run.persisted_data[:charge] = Charge.create!( | |
invoice: invoice, | |
amount_cents: @copay_cents, | |
) | |
end | |
def notify_of_charge | |
# Fetch the charge record if starting this method from a retry | |
@charge ||= @run.persisted_data[:charge] | |
# Initialize an ActiveJob without performing it | |
job = ChargeJob.new(@charge.id) | |
# Enqueue the job through the database | |
# Using an `after_create_commit` hook, this job will be enqueued | |
# and performed only if the step-wrapping transaction commits. | |
JobRun.enqueue!(job) | |
end | |
def create_insurance_claim | |
# Fetch the charge record if starting this method from a retry | |
@charge ||= @run.persisted_data[:charge] | |
@insurance_claim = @run.persisted_data[:insurance_claim] = InsuranceClaim.create!( | |
appointment: @appointment, | |
copay: @charge, | |
) | |
end | |
def submit_to_insurance | |
# Fetch the insurance_claim record if starting this method from a retry | |
@insurance_claim ||= @run.persisted_data[:insurance_claim] | |
return if API::InsuranceProvider.get(@insurance_claim.id).present? | |
API::InsuranceProvider.post(@insurance_claim.id) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment