Skip to content

Instantly share code, notes, and snippets.

@sorentwo
Created January 16, 2024 18:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sorentwo/a9bb2d72651c78a2de38ad5ffe2088da to your computer and use it in GitHub Desktop.
Save sorentwo/a9bb2d72651c78a2de38ad5ffe2088da to your computer and use it in GitHub Desktop.
Benchmark for counting queries and transactions with Oban's Basic and Smart engines
defmodule Oban.Pro.Repo do
use Ecto.Repo, otp_app: :oban_pro, adapter: Ecto.Adapters.Postgres
end
Application.ensure_all_started(:postgrex)
Oban.Pro.Repo.start_link()
defmodule BenchWorker do
@moduledoc false
use Oban.Worker, queue: :default
@impl Oban.Worker
def perform(%{args: %{"max" => max, "bin_pid" => bin_pid, "bin_cnt" => bin_cnt}}) do
pid = base64_to_term(bin_pid)
ctn = base64_to_term(bin_cnt)
:ok = :counters.add(ctn, 1, 1)
if :counters.get(ctn, 1) >= max do
send(pid, :finished)
end
:ok
end
def term_to_base64(term) do
term
|> :erlang.term_to_binary()
|> Base.encode64()
end
def base64_to_term(bin) do
bin
|> Base.decode64!()
|> :erlang.binary_to_term()
end
end
defmodule QueryTracker do
@event [:oban, :pro, :repo, :query]
def attach(counter) do
:telemetry.attach("pro-query", @event, &__MODULE__.handle/4, counter)
end
def handle(_event, _measure, _meta, counter) do
:ok = :counters.add(counter, 1, 1)
end
end
query_counter = :counters.new(1, [])
counter = :counters.new(1, [])
QueryTracker.attach(query_counter)
insert_and_await = fn engine, count ->
Oban.Pro.Repo.query!("TRUNCATE oban_producers", [], log: false)
Oban.Pro.Repo.query!("TRUNCATE oban_jobs", [], log: false)
Oban.start_link(engine: engine, repo: Oban.Pro.Repo, queues: [default: [limit: 20, paused: true]])
args = %{
max: count,
bin_pid: BenchWorker.term_to_base64(self()),
bin_cnt: BenchWorker.term_to_base64(counter)
}
0..count
|> Enum.map(fn _ -> BenchWorker.new(args, queue: :default) end)
|> Oban.insert_all()
:ok = :counters.put(counter, 1, 0)
:ok = :counters.put(query_counter, 1, 0)
%{rows: [[min_txid]]} = Oban.Pro.Repo.query!("SELECT txid_current()", [], log: false)
Oban.resume_queue(queue: :default, local_only: true)
receive do
:finished ->
Oban
|> Oban.Registry.whereis()
|> Supervisor.stop()
%{rows: [[max_txid]]} = Oban.Pro.Repo.query!("SELECT txid_current()", [], log: false)
max_txid - min_txid
after
5_000 -> raise "Timeout"
end
end
for count <- [100, 500, 1000, 5000], engine <- [Oban.Engines.Basic, Oban.Pro.Engines.Smart] do
txn = insert_and_await.(engine, count)
cnt = :counters.get(query_counter, 1)
name = engine |> inspect() |> String.pad_trailing(23, " ")
IO.puts("| #{name} | #{count} | #{cnt} | #{txn} |")
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment