Skip to content

Instantly share code, notes, and snippets.

@sorentwo
Created February 21, 2022 14:15
Show Gist options
  • Save sorentwo/8f64f42b9294475c0991c232ca47035c to your computer and use it in GitHub Desktop.
Save sorentwo/8f64f42b9294475c0991c232ca47035c to your computer and use it in GitHub Desktop.
Demonstration of using Oban job output with Pro's Relay and Workflow

Recording Output with Oban Pro

Setup

To get started, install Oban, Pro, and our dependencies. Note that the oban_pro install requires a working Pro license.

Mix.install([
  {:ecto_sql, "~> 3.6"},
  {:postgrex, "~> 0.16"},
  {:oban, "~> 2.11"},
  {:oban_pro, "~> 0.10", repo: "oban"}
])
:ok

We'll also need an Ecto repo and a database prepared for Oban:

Application.put_env(:pro_demo, Repo, database: "oban_pro_demo")

defmodule Repo do
  use Ecto.Repo,
    adapter: Ecto.Adapters.Postgres,
    otp_app: :pro_demo
end

defmodule Migration0 do
  use Ecto.Migration

  def change do
    Oban.Migrations.up()
    Oban.Pro.Migrations.Producers.change()
  end
end

Repo.__adapter__().storage_down(Repo.config())
Repo.__adapter__().storage_up(Repo.config())
{:ok, _} = Supervisor.start_link([Repo], strategy: :one_for_one)

Ecto.Migrator.run(Repo, [{0, Migration0}], :up, all: true)
[]

08:07:58.086 [info]  Migrations already up

{:ok, _pid} =
  Oban.start_link(
    engine: Oban.Pro.Queue.SmartEngine,
    log: false,
    repo: Repo,
    queues: [default: 3],
    plugins: [
      Oban.Pro.Plugins.Relay
    ]
  )
{:ok, #PID<0.3882.0>}

Relay — Await Multiple Jobs

The Relay plugin brings task-style async/await flows to Oban jobs. Pass a Job changeset to Relay.async/1 and it will insert the job for execution on any running node. The job's result, whether it's a success or failure, is returned to the calling process. You then use Relay.await/1 or Relay.await_many/1 to synchronously wait on the results.

The following (highly contrived) example uses workers to generate a requested number of random bytes. Naturally, a real worker would do more...work...but this demonstrates that the job's return value can be any term, not just UTF8/JSON compatible data.

alias Oban.Pro.Plugins.Relay

defmodule Demo.SyncJob do
  use Oban.Worker

  @impl true
  def perform(%{args: %{"bytes" => bytes}}) do
    {:ok, :crypto.strong_rand_bytes(bytes)}
  end
end

[16, 32, 64, 128, 256]
|> Enum.map(&Demo.SyncJob.new(%{bytes: &1}))
|> Enum.map(&Relay.async/1)
|> Relay.await_many()
[
  ok: <<47, 82, 31, 48, 93, 197, 32, 108, 129, 43, 43, 201, 75, 154, 183, 196>>,
  ok: <<226, 35, 220, 22, 177, 5, 21, 90, 128, 45, 106, 74, 56, 137, 108, 5, 139, 227, 160, 39, 132,
    131, 161, 45, 162, 187, 185, 119, 175, 45, 126, 105>>,
  ok: <<211, 4, 126, 0, 70, 73, 218, 37, 86, 247, 129, 107, 180, 59, 25, 167, 11, 56, 108, 132, 2,
    129, 35, 246, 186, 16, 75, 222, 146, 80, 212, 184, 89, 11, 13, 51, 153, 153, 19, 236, 161, 50,
    24, 86, 70, 115, 254, ...>>,
  ok: <<28, 155, 63, 219, 199, 94, 111, 119, 188, 108, 100, 175, 110, 177, 132, 175, 25, 224, 236,
    166, 181, 83, 85, 89, 104, 77, 4, 7, 142, 191, 244, 118, 246, 165, 94, 82, 131, 178, 87, 117,
    225, 14, 150, 40, 20, 106, ...>>,
  ok: <<35, 121, 231, 204, 101, 243, 158, 40, 142, 10, 214, 12, 48, 139, 27, 83, 64, 13, 196, 167,
    156, 191, 188, 137, 236, 112, 227, 101, 212, 18, 206, 139, 116, 71, 88, 91, 129, 212, 78, 37,
    219, 209, 172, 33, 157, ...>>
]

Workflows — Use Results from Upstream Jobs

Workflows are a chain of related jobs with optional dependencies between them. Dependencies between jobs guarantees that a downstream job won't run until the upstream job completes successfully. Directed dependencies paired with a recorded option allow downstream jobs to fetch the output of upstream jobs.

To demonstrate, we'll make a (similarly contrived) workflow that simulates a multi-step API interaction. The first job fetches an authentication token, subsequent jobs use the token to fetch some data, and the final job digests everything that was fetched.

defmodule Demo.WorkerA do
  use Oban.Pro.Workers.Workflow, recorded: true

  @impl true
  def process(%Job{args: %{"api_key" => api_key}}) do
    token =
      api_key
      |> String.graphemes()
      |> Enum.shuffle()
      |> to_string()

    {:ok, token}
  end
end

defmodule Demo.WorkerB do
  use Oban.Pro.Workers.Workflow, recorded: true

  @impl true
  def process(%Job{args: %{"url" => url}} = job) do
    {:ok, [token_job]} =
      Repo.transaction(fn ->
        job
        |> stream_workflow_jobs()
        |> Stream.filter(&(&1.meta["name"] == "a"))
        |> Enum.to_list()
      end)

    {:ok, token} = fetch_recorded(token_job)

    {:ok, {token, url}}
  end
end

defmodule Demo.WorkerC do
  use Oban.Pro.Workers.Workflow

  @impl true
  def process(job) do
    {:ok, output_jobs} =
      Repo.transaction(fn ->
        job
        |> stream_workflow_jobs()
        |> Stream.filter(&(&1.meta["name"] in ~w(b c d)))
        |> Enum.to_list()
      end)

    output_jobs
    |> Enum.map(&Demo.WorkerB.fetch_recorded/1)
    |> IO.inspect()

    :ok
  end
end

alias Demo.{WorkerA, WorkerB, WorkerC}

WorkerA.new_workflow()
|> WorkerA.add(:a, WorkerA.new(%{api_key: "23kl239bjljlk309af"}))
|> WorkerB.add(:b, WorkerB.new(%{url: "elixir-lang.org"}), deps: [:a])
|> WorkerB.add(:c, WorkerB.new(%{url: "www.erlang.org"}), deps: [:a])
|> WorkerB.add(:d, WorkerB.new(%{url: "getoban.pro"}), deps: [:a])
|> WorkerC.add(:e, WorkerC.new(%{}), deps: [:b, :c, :d])
|> Oban.insert_all()
[
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
    args: %{"api_key" => "23kl239bjljlk309af"},
    attempt: 0,
    attempted_at: nil,
    attempted_by: nil,
    cancelled_at: nil,
    completed_at: nil,
    conf: nil,
    conflict?: false,
    discarded_at: nil,
    errors: [],
    id: 46,
    inserted_at: ~U[2022-02-21 14:08:09.020047Z],
    max_attempts: 20,
    meta: %{
      "deps" => [],
      "name" => "a",
      "recorded" => true,
      "workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
    },
    priority: 0,
    queue: "default",
    replace: nil,
    scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
    state: "available",
    tags: [],
    unique: nil,
    unsaved_error: nil,
    worker: "Demo.WorkerA"
  },
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
    args: %{"url" => "elixir-lang.org"},
    attempt: 0,
    attempted_at: nil,
    attempted_by: nil,
    cancelled_at: nil,
    completed_at: nil,
    conf: nil,
    conflict?: false,
    discarded_at: nil,
    errors: [],
    id: 47,
    inserted_at: ~U[2022-02-21 14:08:09.020047Z],
    max_attempts: 20,
    meta: %{
      "deps" => ["a"],
      "name" => "b",
      "recorded" => true,
      "workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
    },
    priority: 0,
    queue: "default",
    replace: nil,
    scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
    state: "available",
    tags: [],
    unique: nil,
    unsaved_error: nil,
    worker: "Demo.WorkerB"
  },
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
    args: %{"url" => "www.erlang.org"},
    attempt: 0,
    attempted_at: nil,
    attempted_by: nil,
    cancelled_at: nil,
    completed_at: nil,
    conf: nil,
    conflict?: false,
    discarded_at: nil,
    errors: [],
    id: 48,
    inserted_at: ~U[2022-02-21 14:08:09.020047Z],
    max_attempts: 20,
    meta: %{
      "deps" => ["a"],
      "name" => "c",
      "recorded" => true,
      "workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
    },
    priority: 0,
    queue: "default",
    replace: nil,
    scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
    state: "available",
    tags: [],
    unique: nil,
    unsaved_error: nil,
    worker: "Demo.WorkerB"
  },
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
    args: %{"url" => "getoban.pro"},
    attempt: 0,
    attempted_at: nil,
    attempted_by: nil,
    cancelled_at: nil,
    completed_at: nil,
    conf: nil,
    conflict?: false,
    discarded_at: nil,
    errors: [],
    id: 49,
    inserted_at: ~U[2022-02-21 14:08:09.020047Z],
    max_attempts: 20,
    meta: %{
      "deps" => ["a"],
      "name" => "d",
      "recorded" => true,
      "workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
    },
    priority: 0,
    queue: "default",
    replace: nil,
    scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
    state: "available",
    tags: [],
    unique: nil,
    unsaved_error: nil,
    worker: "Demo.WorkerB"
  },
  %Oban.Job{
    __meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
    args: %{},
    attempt: 0,
    attempted_at: nil,
    attempted_by: nil,
    cancelled_at: nil,
    completed_at: nil,
    conf: nil,
    conflict?: false,
    discarded_at: nil,
    errors: [],
    id: 50,
    inserted_at: ~U[2022-02-21 14:08:09.020047Z],
    max_attempts: 20,
    meta: %{
      "deps" => ["b", "c", "d"],
      "name" => "e",
      "workflow_id" => "1b3f27cc-ebb9-4d16-9bb3-d38f3f5963b1"
    },
    priority: 0,
    queue: "default",
    replace: nil,
    scheduled_at: ~U[2022-02-21 14:08:09.020047Z],
    state: "available",
    tags: [],
    unique: nil,
    unsaved_error: nil,
    worker: "Demo.WorkerC"
  }
]
[
  ok: {"93l2jlj3kl90baf2k3", "elixir-lang.org"},
  ok: {"93l2jlj3kl90baf2k3", "www.erlang.org"},
  ok: {"93l2jlj3kl90baf2k3", "getoban.pro"}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment