Skip to content

Instantly share code, notes, and snippets.

@robacarp
Created May 7, 2023 02:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robacarp/936b723fcaa51b5c3e79dcfed521aeff to your computer and use it in GitHub Desktop.
Save robacarp/936b723fcaa51b5c3e79dcfed521aeff to your computer and use it in GitHub Desktop.
Mosquito Fan-out-in example
class FanOutInJob < Mosquito::QueuedJob
params(
root : String,
fan_state : String = "starting",
parent_job_id : String = "",
branch : String = ""
)
def perform
case fan_state
when "starting" then dispatch
when "finished"
purge_metadata
finish
else
fan_metadata[job_run_id.not_nil!] = "started"
each
fan_metadata[job_run_id.not_nil!] = "finished"
if all_finished?
self.class.new(
root: @root.not_nil!,
parent_job_id: @parent_job_id.not_nil!,
fan_state: "finished"
).enqueue
end
end
end
def fan_metadata : Mosquito::Metadata
Mosquito::Metadata.new(@parent_job_id.not_nil!)
end
def my_fan_metadata : Mosquito::Metadata
Mosquito::Metadata.new(self.job_run_id.not_nil!)
end
def purge_metadata
fan_metadata.delete
end
def all_finished? : Bool
fan_metadata.to_h.values.all? { |v| v == "finished" }
end
def dispatch
parent_job_id = self.job_run_id.not_nil!
fan_info = my_fan_metadata
jobs = branches.map do |i|
self.class
.new(
root: @root.not_nil!,
fan_state: "branch",
parent_job_id: parent_job_id,
branch: i
).enqueue
end
jobs.each do |job|
fan_info[job.id] = "pending"
end
log "Enqueued #{jobs.size} jobs: #{jobs.map(&.id).join(", ")}"
end
def branches : Array(String)
# ostensibly do something with @root, and produce some list of branches
%w[one two three four five six seven eight nine ten]
end
def each
# @branch is something different for each invocation
count = Random.rand(1..10)
count.times do |i|
n = Random.rand(1..10)
sleep n
log "root: #{@root} branch: #{@branch} step: #{i}/#{count} took #{n} seconds"
end
end
def finish
# called only when all branches have finished
log "#{@root} is all done, mate"
end
end
FanOutInJob.new(root: "fizzbuzz-#{Random.rand(1..22)}").enqueue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment