Skip to content

Instantly share code, notes, and snippets.

@sorentwo
Created February 7, 2021 16:16
Show Gist options
  • Save sorentwo/9e828628019d4a294179bf7aac77598c to your computer and use it in GitHub Desktop.
Save sorentwo/9e828628019d4a294179bf7aac77598c to your computer and use it in GitHub Desktop.
Benchmark comparing a basic Worker to the Chunk Worker in Pro
Oban.Pro.Repo.start_link()
Oban.start_link(
repo: Oban.Pro.Repo,
queues: [alpha: 20],
plugins: [Oban.Pro.Plugins.ChunkManager]
)
:persistent_term.put(:chunk_bench_pid, self())
defmodule BasicWorker do
use Oban.Worker, queue: :alpha
@impl Worker
def perform(%{args: args}) do
:erlang.phash2(args)
:chunk_bench_pid
|> :persistent_term.get()
|> send({:done, args["id"]})
:ok
end
end
defmodule ChunkWorker do
use Oban.Pro.Workers.Chunk, queue: :alpha, size: 1000, timeout: 500
@impl Chunk
def process(jobs) do
args = Enum.map(jobs, & &1.args)
_ = :erlang.phash2(args)
max_id =
args
|> Enum.max_by(& &1["id"])
|> Map.get("id")
:chunk_bench_pid
|> :persistent_term.get()
|> send({:done, max_id})
:ok
end
end
reset_tables = fn ->
Oban.Pro.Repo.query!("TRUNCATE oban_beats", [], log: false)
Oban.Pro.Repo.query!("TRUNCATE oban_jobs", [], log: false)
end
insert_jobs = fn worker, num_jobs ->
0..num_jobs
|> Enum.chunk_every(5_000)
|> Enum.each(fn ids ->
ids
|> Enum.map(&worker.new(%{id: &1}))
|> Oban.insert_all()
end)
end
for worker <- [BasicWorker, ChunkWorker], count <- [10_000, 50_000, 100_000] do
reset_tables.()
:ok = Oban.pause_queue(queue: "alpha")
insert_jobs.(worker, count)
IO.puts("[#{worker}] #{count} jobs starting...")
{micro, :ok} =
:timer.tc(fn ->
:ok = Oban.resume_queue(queue: "alpha")
receive do
{:done, ^count} -> :ok
after
120_000 -> :failure
end
end)
milli = System.convert_time_unit(micro, :microsecond, :millisecond)
IO.puts("[#{worker}] #{count} jobs completed in #{milli}ms")
reset_tables.()
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment