Created February 21, 2022 14:15
Demonstration of using Oban job output with Pro's Relay and Workflow

Recording Output with Oban Pro


To get started, install Oban, Pro, and our dependencies. Note that the oban_pro install requires a working Pro license.

  {:ecto_sql, "~> 3.6"},
  {:postgrex, "~> 0.16"},
  {:oban, "~> 2.11"},
  {:oban_pro, "~> 0.10", repo: "oban"}

We'll also need an Ecto repo and a database prepared for Oban:

Application.put_env(:pro_demo, Repo, database: "oban_pro_demo")

defmodule Repo do
  use Ecto.Repo,
    adapter: Ecto.Adapters.Postgres,
    otp_app: :pro_demo

defmodule Migration0 do
  use Ecto.Migration

  def change do

{:ok, _} = Supervisor.start_link([Repo], strategy: :one_for_one), [{0, Migration0}], :up, all: true)

{:ok, _pid} =
    engine: Oban.Pro.Queue.SmartEngine,
    log: false,
    repo: Repo,
    queues: [default: 3],
    plugins: [
Relay — Await Multiple Jobs

The Relay plugin brings task-style async/await flows to Oban jobs. Pass a Job changeset to Relay.async/1 and it will insert the job for execution on any running node. The job's result, whether it's a success or failure, is returned to the calling process. You then use Relay.await/1 or Relay.await_many/1 to synchronously wait on the results.

The following (highly contrived) example uses workers to generate a requested number of random bytes. Naturally, a real worker would do this demonstrates that the job's return value can be any term, not just UTF8/JSON compatible data.

alias Oban.Pro.Plugins.Relay

defmodule Demo.SyncJob do
  use Oban.Worker

  @impl true
  def perform(%{args: %{"bytes" => bytes}}) do
    {:ok, :crypto.strong_rand_bytes(bytes)}

[16, 32, 64, 128, 256]
|>{bytes: &1}))
|> Relay.await_many()
  ok: <<47, 82, 31, 48, 93, 197, 32, 108, 129, 43, 43, 201, 75, 154, 183, 196>>,
  ok: <<226, 35, 220, 22, 177, 5, 21, 90, 128, 45, 106, 74, 56, 137, 108, 5, 139, 227, 160, 39, 132,
    131, 161, 45, 162, 187, 185, 119, 175, 45, 126, 105>>,
  ok: <<211, 4, 126, 0, 70, 73, 218, 37, 86, 247, 129, 107, 180, 59, 25, 167, 11, 56, 108, 132, 2,
    129, 35, 246, 186, 16, 75, 222, 146, 80, 212, 184, 89, 11, 13, 51, 153, 153, 19, 236, 161, 50,
    24, 86, 70, 115, 254, ...>>,
  ok: <<28, 155, 63, 219, 199, 94, 111, 119, 188, 108, 100, 175, 110, 177, 132, 175, 25, 224, 236,
    166, 181, 83, 85, 89, 104, 77, 4, 7, 142, 191, 244, 118, 246, 165, 94, 82, 131, 178, 87, 117,
    225, 14, 150, 40, 20, 106, ...>>,
  ok: <<35, 121, 231, 204, 101, 243, 158, 40, 142, 10, 214, 12, 48, 139, 27, 83, 64, 13, 196, 167,
    156, 191, 188, 137, 236, 112, 227, 101, 212, 18, 206, 139, 116, 71, 88, 91, 129, 212, 78, 37,
    219, 209, 172, 33, 157, ...>>

Workflows — Use Results from Upstream Jobs

Workflows are a chain of related jobs with optional dependencies between them. Dependencies between jobs guarantees that a downstream job won't run until the upstream job completes successfully. Directed dependencies paired with a recorded option allow downstream jobs to fetch the output of upstream jobs.

To demonstrate, we'll make a (similarly contrived) workflow that simulates a multi-step API interaction. The first job fetches an authentication token, subsequent jobs use the token to fetch some data, and the final job digests everything that was fetched.

defmodule Demo.WorkerA do
  use Oban.Pro.Workers.Workflow, recorded: true

  @impl true
  def process(%Job{args: %{"api_key" => api_key}}) do
    token =
      |> String.graphemes()
      |> Enum.shuffle()
      |> to_string()

    {:ok, token}

defmodule Demo.WorkerB do
  use Oban.Pro.Workers.Workflow, recorded: true

  @impl true
  def process(%Job{args: %{"url" => url}} = job) do
    {:ok, [token_job]} =
      Repo.transaction(fn ->
        |> stream_workflow_jobs()
        |> Stream.filter(&(&1.meta["name"] == "a"))
        |> Enum.to_list()

    {:ok, token} = fetch_recorded(token_job)

    {:ok, {token, url}}

defmodule Demo.WorkerC do
  use Oban.Pro.Workers.Workflow

  @impl true
  def process(job) do
    {:ok, output_jobs} =
      Repo.transaction(fn ->
        |> stream_workflow_jobs()
        |> Stream.filter(&(&1.meta["name"] in ~w(b c d)))
        |> Enum.to_list()

    |> IO.inspect()


alias Demo.{WorkerA, WorkerB, WorkerC}

|> WorkerA.add(:a,{api_key: "23kl239bjljlk309af"}))
|> WorkerB.add(:b,{url: ""}), deps: [:a])
|> WorkerB.add(:c,{url: ""}), deps: [:a])
|> WorkerB.add(:d,{url: ""}), deps: [:a])
|> WorkerC.add(:e,{}), deps: [:b, :c, :d])
|> Oban.insert_all()
    __meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
    args: %{"api_key" => "23kl239bjljlk309af"},
    attempt: 0,
    attempted_at: nil,
    attempted_by: nil,
    cancelled_at: nil,
    completed_at: nil,
    conf: nil,
    conflict?: false,
    discarded_at: nil,
    errors: [],
    id: 46,
    inserted_at: ~U[2022-02-21 14:08:09.020047Z],
    max_attempts: 20,
    meta: %{
      "deps" => [],
      "name" => "a",
      "recorded" => true,
      "workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
    priority: 0,
    queue: "default",
    replace: nil,
    scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
    state: "available",
    tags: [],
    unique: nil,
    unsaved_error: nil,
    worker: "Demo.WorkerA"
