Skip to content

Instantly share code, notes, and snippets.

@fractaledmind
Last active January 22, 2023 20:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fractaledmind/4554d4e3f384dc56c10a9fe444a870bb to your computer and use it in GitHub Desktop.
Save fractaledmind/4554d4e3f384dc56c10a9fe444a870bb to your computer and use it in GitHub Desktop.
class JobRun < ActiveRecord::Base
FINISHED_RECOVERY_POINT = "FINISHED"
validates :job_class, presence: true
validates :job_id, presence: true
validates :serialized_job, presence: true
serialize :serialized_job, JSON
store :persisted_data
belongs_to :awaited_by, class_name: "JobRun", optional: true
has_many :batched_runs, class_name: "JobRun", foreign_key: "awaited_by_id"
scope :outstanding, lambda {
where.not(recovery_point: FINISHED_RECOVERY_POINT).or(where(recovery_point: [nil, ""]))
}
after_create_commit :enqueue_job, if: :staged?
after_update_commit :proceed_with_parent, if: :finished?
def self.enqueue!(job)
# force the job to resolve the `queue_name`, so that we don't try to serialize a Proc into ActiveRecord
job.queue_name
create!(
staged: true,
job_class: job.class.name,
job_id: job.job_id,
serialized_job: job.serialize,
)
end
def self.await!(job, by:, return_to:)
create!(
staged: true,
awaited_by: by,
job_class: job.class.name,
job_id: job.job_id,
serialized_job: job.serialize,
)
by.update!(returning_to: return_to)
end
def proceed_with_parent
return unless finished?
return unless awaited_by.present?
return if awaited_by.batched_runs.outstanding.any?
awaited_by.proceed
end
def proceed
# when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again
update!(
recovery_point: returning_to,
returning_to: nil
)
return if finished?
enqueue_job
end
def enqueue_job
job.enqueue
end
def job
return @job if defined? @job
job_class = job_class.constantize
@job = job_class.deserialize(serialized_job)
end
def finished?
recovery_point.to_s == FINISHED_RECOVERY_POINT
end
end
class AwaitingChildrenJobs < StandardError; end
class OnAppointmentAttendedJob < ActiveJob::Base
discard_on AwaitingChildrenJobs
# steps: charge_and_invoice -> notify_of_charge -> create_insurance_claim -> submit_to_insurance -> FINISH
def perform(appointment)
# Always setup any data that is required within steps but available at the start of the job
# via simple instance variables at the top of the `perform` method
@appointment = appointment
@copay_cents = appointment.service.copay_cents
@run = ::ActiveRecord::Base.transaction(isolation: :serializable) do
run = JobRun.find_by(
job_class: self.class,
job_id: job_id,
job_args: [appointment]
)
if run.present?
run.update!(
last_run_at: Time.current
)
else
JobRun.create!(
job_class: self.class,
job_id: job_id,
job_args: [appointment],
serialized_job: serialize,
recovery_point: :charge_and_invoice
)
end
end
return if @run.recovery_point == "FINISH"
idempotently_perform_step(:charge_and_invoice, progress_to: :notify_of_charge)
idempotently_perform_step(:notify_of_charge, progress_to: :create_insurance_claim)
idempotently_perform_step(:create_insurance_claim, progress_to: :submit_to_insurance)
idempotently_perform_step(:submit_to_insurance, progress_to: :FINISH)
end
private
def idempotently_perform_step(current_step_method, progress_to:)
# Don't re-run steps that the job run has already successfully performed
return unless @run.recovery_point == current_step_method.to_s
# Use a database transaction and a database lock for ACIDic guarantees + concurrency mitigation
@run.with_lock do
current_step_result = method(current_step_method).call
case current_step_result
in [:await, awaited_jobs]
Array(awaited_jobs).compact.each do |awaited_job|
# "Stage" child jobs
# Mark them as children of this current `@run` and provide the "step" in this run to return to
# once all of the children jobs have finished.
JobRun.await!(awaited_job, by: @run, returning_to: progress_to)
end
# after processing the current step and staging the children jobs,
# break the processing loop and stop this method from blocking in the primary worker.
# This job will continue once the background jobs all succeed,
# so we want to keep the primary worker queue free to process new work.
raise AwaitingChildrenJobs
else
# Only progress to the next step if the entire database transaction commits
@run.update!(recovery_point: progress_to)
end
end
end
def charge_and_invoice
# Data that is needed between steps needs to be stored in the database
# so that it is accessible on retries that skip this step.
# But, to save on unnecessary db queries if this job succeeds on the first run,
# we also store the data in an instance variable for access across methods
@invoice = @run.persisted_data[:invoice] = Invoice.create!(
customer: @appointment.customer,
amount_cents: @copay_cents,
)
@charge = @run.persisted_data[:charge] = Charge.create!(
invoice: invoice,
amount_cents: @copay_cents,
)
end
def notify_of_charge
# Fetch the charge record if starting this method from a retry
@charge ||= @run.persisted_data[:charge]
# Initialize an ActiveJob without performing it
job = ChargeJob.new(@charge.id)
# Enqueue the job through the database
# Using an `after_create_commit` hook, this job will be enqueued
# and performed only if the step-wrapping transaction commits.
JobRun.enqueue!(job)
end
def create_insurance_claim
# Fetch the charge record if starting this method from a retry
@charge ||= @run.persisted_data[:charge]
@insurance_claim = @run.persisted_data[:insurance_claim] = InsuranceClaim.create!(
appointment: @appointment,
copay: @charge,
)
end
def submit_to_insurance
# Fetch the insurance_claim record if starting this method from a retry
@insurance_claim ||= @run.persisted_data[:insurance_claim]
[:await, SubmitToInsuranceJob.new(@insurance_claim)]
end
end
class SubmitToInsuranceJob < ActiveJob::Base
after_perform :finish_job_run, if: :has_preexisting_job_run_record
def perform(insurance_claim)
return if API::InsuranceProvider.get(insurance_claim.id).present?
API::InsuranceProvider.post(insurance_claim.id)
end
private
def has_preexisting_job_run_record?
@run = JobRun.find_by(
job_class: self.class,
job_id: job_id,
job_args: [insurance_claim]
)
@run.present?
end
def finish_job_run
@run.update!(recovery_point: JobRun::FINISHED_RECOVERY_POINT)
end
end
class JobRun < ActiveRecord::Base
validates :job_class, presence: true
validates :job_id, presence: true
validates :serialized_job, presence: true
serialize :serialized_job, JSON
after_create_commit :enqueue_job, if: :staged?
def self.enqueue!(job)
# force the job to resolve the `queue_name`, so that we don't try to serialize a Proc into ActiveRecord
job.queue_name
create!(
staged: true,
job_class: job.class.name,
job_id: job.job_id,
serialized_job: job.serialize,
)
end
def enqueue_job
job.enqueue
end
def job
return @job if defined? @job
job_class = job_class.constantize
@job = job_class.deserialize(serialized_job)
end
end
class OnAppointmentAttendedJob < ActiveJob::Base
def perform(appointment)
@run = ::ActiveRecord::Base.transaction(isolation: :serializable) do
run = JobRun.find_by(
job_class: self.class,
job_id: job_id,
job_args: [appointment]
)
if run.present?
run.update!(
last_run_at: Time.current
)
else
JobRun.create!(
job_class: self.class,
job_id: job_id,
job_args: [appointment],
serialized_job: serialize,
recovery_point: :charge_and_invoice
)
end
end
return if @run.completed_at.present?
# Because all "action" is now some form of database write,
# we can simply wrap the process in a single db transaction
@run.with_lock do
copay_cents = appointment.service.copay_cents
invoice = Invoice.create!(
customer: appointment.customer,
amount_cents: copay_cents,
)
charge = Charge.create!(
invoice: invoice,
amount_cents: copay_cents,
)
# Enqueue the job through the database
# Using an `after_create_commit` hook, this job will be enqueued
# and performed only if the step-wrapping transaction commits.
JobRun.enqueue!(ChargeJob.new(charge.id))
insurance_claim = InsuranceClaim.create!(
appointment: appointment,
copay: charge,
)
JobRun.enqueue!(SubmitToInsuranceJob.new(insurance_claim.id))
# Only complete the run if the entire database transaction commits
@run.update!(completed_at: Time.now)
end
end
end
class JobRun < ActiveRecord::Base
validates :job_class, presence: true
validates :job_id, presence: true
validates :serialized_job, presence: true
serialize :serialized_job, JSON
store :persisted_data
after_create_commit :enqueue_job, if: :staged?
def self.enqueue!(job)
# force the job to resolve the `queue_name`, so that we don't try to serialize a Proc into ActiveRecord
job.queue_name
create!(
staged: true,
job_class: job.class.name,
job_id: job.job_id,
serialized_job: job.serialize,
)
end
def enqueue_job
job.enqueue
end
def job
return @job if defined? @job
job_class = job_class.constantize
@job = job_class.deserialize(serialized_job)
end
end
class OnAppointmentAttendedJob < ActiveJob::Base
# steps: charge_and_invoice -> notify_of_charge -> create_insurance_claim -> submit_to_insurance -> FINISH
def perform(appointment)
# Always setup any data that is required within steps but available at the start of the job
# via simple instance variables at the top of the `perform` method
@appointment = appointment
@copay_cents = appointment.service.copay_cents
@run = ::ActiveRecord::Base.transaction(isolation: :serializable) do
run = JobRun.find_by(
job_class: self.class,
job_id: job_id,
job_args: [appointment]
)
if run.present?
run.update!(
last_run_at: Time.current
)
else
JobRun.create!(
job_class: self.class,
job_id: job_id,
job_args: [appointment],
serialized_job: serialize,
recovery_point: :charge_and_invoice
)
end
end
return if @run.recovery_point == "FINISH"
idempotently_perform_step(:charge_and_invoice, progress_to: :notify_of_charge)
idempotently_perform_step(:notify_of_charge, progress_to: :create_insurance_claim)
idempotently_perform_step(:create_insurance_claim, progress_to: :submit_to_insurance)
idempotently_perform_step(:submit_to_insurance, progress_to: :FINISH)
end
private
def idempotently_perform_step(current_step_method, progress_to:)
# Don't re-run steps that the job run has already successfully performed
return unless @run.recovery_point == current_step_method.to_s
# Use a database transaction and a database lock for ACIDic guarantees + concurrency mitigation
@run.with_lock do
method(current_step_method).call
# Only progress to the next step if the entire database transaction commits
@run.update!(recovery_point: progress_to)
end
end
def charge_and_invoice
# Data that is needed between steps needs to be stored in the database
# so that it is accessible on retries that skip this step.
# But, to save on unnecessary db queries if this job succeeds on the first run,
# we also store the data in an instance variable for access across methods
@invoice = @run.persisted_data[:invoice] = Invoice.create!(
customer: @appointment.customer,
amount_cents: @copay_cents,
)
@charge = @run.persisted_data[:charge] = Charge.create!(
invoice: invoice,
amount_cents: @copay_cents,
)
end
def notify_of_charge
# Fetch the charge record if starting this method from a retry
@charge ||= @run.persisted_data[:charge]
# Initialize an ActiveJob without performing it
job = ChargeJob.new(@charge.id)
# Enqueue the job through the database
# Using an `after_create_commit` hook, this job will be enqueued
# and performed only if the step-wrapping transaction commits.
JobRun.enqueue!(job)
end
def create_insurance_claim
# Fetch the charge record if starting this method from a retry
@charge ||= @run.persisted_data[:charge]
@insurance_claim = @run.persisted_data[:insurance_claim] = InsuranceClaim.create!(
appointment: @appointment,
copay: @charge,
)
end
def submit_to_insurance
# Fetch the insurance_claim record if starting this method from a retry
@insurance_claim ||= @run.persisted_data[:insurance_claim]
return if API::InsuranceProvider.get(@insurance_claim.id).present?
API::InsuranceProvider.post(@insurance_claim.id)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment