Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A benchmark for measuring the throughput of a single Oban queue
defmodule Metrics do
@moduledoc false
def attach(event) do
counter = :counters.new(4, [])
:persistent_term.put(event, counter)
for idx <- 1..4, do: :counters.put(counter, idx, 0)
:telemetry.attach(to_name(event), event, &__MODULE__.handle/4, counter)
end
def handle(_event, %{duration: duration}, _meta, counter) do
duration = System.convert_time_unit(duration, :native, :millisecond)
old_min = :counters.get(counter, 3)
old_max = :counters.get(counter, 4)
:counters.add(counter, 1, 1)
:counters.add(counter, 2, duration)
:counters.put(counter, 3, min(old_min, duration))
:counters.put(counter, 4, max(old_max, duration))
end
def report(event) do
counter = :persistent_term.get(event)
cnt = :counters.get(counter, 1)
tot = :counters.get(counter, 2)
min = :counters.get(counter, 3)
max = :counters.get(counter, 4)
avg = Float.floor(tot / cnt, 2)
IO.puts("\n#{to_name(event)} Timing")
IO.puts("Avg\tMin\tMax")
IO.puts("#{avg}\t#{min}\t#{max}")
end
defp to_name(event) do
inspect(event)
end
end
defmodule CountWorker do
@moduledoc false
use Oban.Worker
@impl Oban.Worker
def perform(%{args: %{"bin_cnt" => bin_cnt}}) do
bin_cnt
|> Base.decode64!()
|> :erlang.binary_to_term()
|> :counters.add(1, 1)
end
def insert(counter, total) do
bin_cnt =
counter
|> :erlang.term_to_binary()
|> Base.encode64()
reps = div(total, 5000)
1..reps
|> Task.async_stream(fn _ ->
Oban.insert_all(for _ <- 1..5000, do: new(%{bin_cnt: bin_cnt}))
end)
|> Enum.map(fn _ -> IO.write(".") end)
end
def report(counter, ellapsed, total) do
count = :counters.get(counter, 1)
rate = div(count, ellapsed)
IO.puts("#{rate}\t\t#{ellapsed - 1}\t\t#{count}")
if count < total do
Process.sleep(1000)
report(counter, ellapsed + 1, total)
end
end
end
{parsed, _, _} =
OptionParser.parse(
System.argv(),
switches: [limit: :integer, cooldown: :integer, total: :integer, timeout: :integer]
)
{:ok, opts} = Keyword.validate(parsed, limit: 10, cooldown: 5, total: 1_000_000, timeout: 60_000)
IO.puts("PREPARING...")
Oban.Test.Repo.start_link()
Oban.Test.Repo.query!("TRUNCATE oban_jobs", [], log: false)
Oban.Test.Repo.query!("TRUNCATE oban_peers", [], log: false)
Oban.start_link(
repo: Oban.Test.Repo,
queues: [default: [limit: opts[:limit], dispatch_cooldown: opts[:cooldown], paused: true]]
)
IO.puts("INSERTING...")
counter = :counters.new(1, [])
:ok = :counters.put(counter, 1, 0)
CountWorker.insert(counter, opts[:total])
metrics = [
[:oban, :job, :stop],
[:oban, :producer, :stop],
[:oban, :engine, :fetch_jobs, :stop]
]
for metric <- metrics, do: Metrics.attach(metric)
IO.puts("\nSTARTING...")
IO.puts("Jobs/Sec\tEllapsed\tTotal")
Oban.resume_queue(queue: :default)
task = Task.async(CountWorker, :report, [counter, 1, opts[:total]])
Task.yield(task, opts[:timeout]) || Task.shutdown(task)
for metric <- metrics, do: Metrics.report(metric)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment