Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Demonstrate how to integration test batch callbacks
def start_supervised_oban!(opts) do
opts =
opts
|> Keyword.put_new(:name, Oban)
|> Keyword.put_new(:repo, MyApp.Repo)
|> Keyword.put_new(:poll_interval, :timer.minutes(10))
|> Keyword.put_new(:shutdown_grace_period, 25)
pid = start_supervised!({Oban, opts}, id: opts[:name])
# For Oban < v2.11 you can remove the `{:_, Oban.Peer}` part
for key <- [{:_, Oban.Peer}, {:_, {:producer, :_}}, {:_, {:plugin, :_}}],
pid <- Registry.select(Oban.Registry, [{{key, :"$2", :_}, [], [:"$2"]}]) do
Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, self(), pid)
end
pid
end
@name MyApp.SomeTest.Oban
@opts [
name: @name,
queues: [default: 10]
plugins: [BatchManager, {Repeater, interval: 25}]
]
test "batch callback args are forwarded to callback jobs" do
start_supervised_oban!(@opts)
batch = MyBatch.new_batch([%{ref: 1}, %{ref: 2}])
Oban.insert_all(@name, batch)
# verify the batch callback
stop_supervised(@name)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment