To get started, install Oban, Pro, and our dependencies. Note that the oban_pro
install requires a working Pro license.
Mix.install([
{:ecto_sql, "~> 3.6"},
{:postgrex, "~> 0.16"},
{:oban, "~> 2.11"},
{:oban_pro, "~> 0.10", repo: "oban"}
])
:ok
We'll also need an Ecto repo and a database prepared for Oban:
Application.put_env(:pro_demo, Repo, database: "oban_pro_demo")
defmodule Repo do
use Ecto.Repo,
adapter: Ecto.Adapters.Postgres,
otp_app: :pro_demo
end
defmodule Migration0 do
use Ecto.Migration
def change do
Oban.Migrations.up()
Oban.Pro.Migrations.Producers.change()
end
end
Repo.__adapter__().storage_down(Repo.config())
Repo.__adapter__().storage_up(Repo.config())
{:ok, _} = Supervisor.start_link([Repo], strategy: :one_for_one)
Ecto.Migrator.run(Repo, [{0, Migration0}], :up, all: true)
[]
08:07:58.086 [info] Migrations already up
{:ok, _pid} =
Oban.start_link(
engine: Oban.Pro.Queue.SmartEngine,
log: false,
repo: Repo,
queues: [default: 3],
plugins: [
Oban.Pro.Plugins.Relay
]
)
{:ok, #PID<0.3882.0>}
The Relay plugin brings task-style async
/await
flows to Oban jobs. Pass a Job changeset to Relay.async/1
and it will insert the job for execution on any running node. The job's result, whether it's a success or failure, is returned to the calling process. You then use Relay.await/1
or Relay.await_many/1
to synchronously wait on the results.
The following (highly contrived) example uses workers to generate a requested number of random bytes. Naturally, a real worker would do more...work...but this demonstrates that the job's return value can be any term, not just UTF8/JSON compatible data.
alias Oban.Pro.Plugins.Relay
defmodule Demo.SyncJob do
use Oban.Worker
@impl true
def perform(%{args: %{"bytes" => bytes}}) do
{:ok, :crypto.strong_rand_bytes(bytes)}
end
end
[16, 32, 64, 128, 256]
|> Enum.map(&Demo.SyncJob.new(%{bytes: &1}))
|> Enum.map(&Relay.async/1)
|> Relay.await_many()
[
ok: <<47, 82, 31, 48, 93, 197, 32, 108, 129, 43, 43, 201, 75, 154, 183, 196>>,
ok: <<226, 35, 220, 22, 177, 5, 21, 90, 128, 45, 106, 74, 56, 137, 108, 5, 139, 227, 160, 39, 132,
131, 161, 45, 162, 187, 185, 119, 175, 45, 126, 105>>,
ok: <<211, 4, 126, 0, 70, 73, 218, 37, 86, 247, 129, 107, 180, 59, 25, 167, 11, 56, 108, 132, 2,
129, 35, 246, 186, 16, 75, 222, 146, 80, 212, 184, 89, 11, 13, 51, 153, 153, 19, 236, 161, 50,
24, 86, 70, 115, 254, ...>>,
ok: <<28, 155, 63, 219, 199, 94, 111, 119, 188, 108, 100, 175, 110, 177, 132, 175, 25, 224, 236,
166, 181, 83, 85, 89, 104, 77, 4, 7, 142, 191, 244, 118, 246, 165, 94, 82, 131, 178, 87, 117,
225, 14, 150, 40, 20, 106, ...>>,
ok: <<35, 121, 231, 204, 101, 243, 158, 40, 142, 10, 214, 12, 48, 139, 27, 83, 64, 13, 196, 167,
156, 191, 188, 137, 236, 112, 227, 101, 212, 18, 206, 139, 116, 71, 88, 91, 129, 212, 78, 37,
219, 209, 172, 33, 157, ...>>
]
Workflows are a chain of related jobs with optional dependencies between them. Dependencies between jobs guarantees that a downstream job won't run until the upstream job completes successfully. Directed dependencies paired with a recorded
option allow downstream jobs to fetch the output of upstream jobs.
To demonstrate, we'll make a (similarly contrived) workflow that simulates a multi-step API interaction. The first job fetches an authentication token, subsequent jobs use the token to fetch some data, and the final job digests everything that was fetched.
defmodule Demo.WorkerA do
use Oban.Pro.Workers.Workflow, recorded: true
@impl true
def process(%Job{args: %{"api_key" => api_key}}) do
token =
api_key
|> String.graphemes()
|> Enum.shuffle()
|> to_string()
{:ok, token}
end
end
defmodule Demo.WorkerB do
use Oban.Pro.Workers.Workflow, recorded: true
@impl true
def process(%Job{args: %{"url" => url}} = job) do
{:ok, [token_job]} =
Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Stream.filter(&(&1.meta["name"] == "a"))
|> Enum.to_list()
end)
{:ok, token} = fetch_recorded(token_job)
{:ok, {token, url}}
end
end
defmodule Demo.WorkerC do
use Oban.Pro.Workers.Workflow
@impl true
def process(job) do
{:ok, output_jobs} =
Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Stream.filter(&(&1.meta["name"] in ~w(b c d)))
|> Enum.to_list()
end)
output_jobs
|> Enum.map(&Demo.WorkerB.fetch_recorded/1)
|> IO.inspect()
:ok
end
end
alias Demo.{WorkerA, WorkerB, WorkerC}
WorkerA.new_workflow()
|> WorkerA.add(:a, WorkerA.new(%{api_key: "23kl239bjljlk309af"}))
|> WorkerB.add(:b, WorkerB.new(%{url: "elixir-lang.org"}), deps: [:a])
|> WorkerB.add(:c, WorkerB.new(%{url: "www.erlang.org"}), deps: [:a])
|> WorkerB.add(:d, WorkerB.new(%{url: "getoban.pro"}), deps: [:a])
|> WorkerC.add(:e, WorkerC.new(%{}), deps: [:b, :c, :d])
|> Oban.insert_all()
[
%Oban.Job{
__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
args: %{"api_key" => "23kl239bjljlk309af"},
attempt: 0,
attempted_at: nil,
attempted_by: nil,
cancelled_at: nil,
completed_at: nil,
conf: nil,
conflict?: false,
discarded_at: nil,
errors: [],
id: 46,
inserted_at: ~U[2022-02-21 14:08:09.020047Z],
max_attempts: 20,
meta: %{
"deps" => [],
"name" => "a",
"recorded" => true,
"workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
},
priority: 0,
queue: "default",
replace: nil,
scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
state: "available",
tags: [],
unique: nil,
unsaved_error: nil,
worker: "Demo.WorkerA"
},
%Oban.Job{
__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
args: %{"url" => "elixir-lang.org"},
attempt: 0,
attempted_at: nil,
attempted_by: nil,
cancelled_at: nil,
completed_at: nil,
conf: nil,
conflict?: false,
discarded_at: nil,
errors: [],
id: 47,
inserted_at: ~U[2022-02-21 14:08:09.020047Z],
max_attempts: 20,
meta: %{
"deps" => ["a"],
"name" => "b",
"recorded" => true,
"workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
},
priority: 0,
queue: "default",
replace: nil,
scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
state: "available",
tags: [],
unique: nil,
unsaved_error: nil,
worker: "Demo.WorkerB"
},
%Oban.Job{
__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
args: %{"url" => "www.erlang.org"},
attempt: 0,
attempted_at: nil,
attempted_by: nil,
cancelled_at: nil,
completed_at: nil,
conf: nil,
conflict?: false,
discarded_at: nil,
errors: [],
id: 48,
inserted_at: ~U[2022-02-21 14:08:09.020047Z],
max_attempts: 20,
meta: %{
"deps" => ["a"],
"name" => "c",
"recorded" => true,
"workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
},
priority: 0,
queue: "default",
replace: nil,
scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
state: "available",
tags: [],
unique: nil,
unsaved_error: nil,
worker: "Demo.WorkerB"
},
%Oban.Job{
__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
args: %{"url" => "getoban.pro"},
attempt: 0,
attempted_at: nil,
attempted_by: nil,
cancelled_at: nil,
completed_at: nil,
conf: nil,
conflict?: false,
discarded_at: nil,
errors: [],
id: 49,
inserted_at: ~U[2022-02-21 14:08:09.020047Z],
max_attempts: 20,
meta: %{
"deps" => ["a"],
"name" => "d",
"recorded" => true,
"workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
},
priority: 0,
queue: "default",
replace: nil,
scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
state: "available",
tags: [],
unique: nil,
unsaved_error: nil,
worker: "Demo.WorkerB"
},
%Oban.Job{
__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
args: %{},
attempt: 0,
attempted_at: nil,
attempted_by: nil,
cancelled_at: nil,
completed_at: nil,
conf: nil,
conflict?: false,
discarded_at: nil,
errors: [],
id: 50,
inserted_at: ~U[2022-02-21 14:08:09.020047Z],
max_attempts: 20,
meta: %{
"deps" => ["b", "c", "d"],
"name" => "e",
"workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
},
priority: 0,
queue: "default",
replace: nil,
scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
state: "available",
tags: [],
unique: nil,
unsaved_error: nil,
worker: "Demo.WorkerC"
}
]
[
ok: {"93l2jlj3kl90baf2k3", "elixir-lang.org"},
ok: {"93l2jlj3kl90baf2k3", "www.erlang.org"},
ok: {"93l2jlj3kl90baf2k3", "getoban.pro"}
]