Skip to content

Instantly share code, notes, and snippets.

@sorentwo
Last active August 2, 2022 14:33
Show Gist options
  • Save sorentwo/aa75df4f3991fce8f516bcaff1c7882c to your computer and use it in GitHub Desktop.
Save sorentwo/aa75df4f3991fce8f516bcaff1c7882c to your computer and use it in GitHub Desktop.
Oban Queue Circuit Breaker
defmodule QueueBreaker do
@moduledoc """
Automatic queue pausing/resuming based on accumulated errors.
## Example
Pause after 1000 errors:
QueueBreaker.attach("risky-queue", 1000)
"""
def attach(queue, limit) do
counter = :counters.new(1, [:write_concurrency])
conf = {queue, limit, counter}
:telemetry.attach("queue-break", [:oban, :job, :exception], &__MODULE__.handler/4, conf)
end
def handle(_, _, %{conf: conf, job: job, queue: queue}, {queue, limit, counter}) do
:counters.add(counter, 1, 1)
if :counters.get(counter, 1) >= limit do
Oban.pause_queue(conf.name, queue: queue)
30
|> :timer.seconds()
|> :timer.apply_after(__MODULE__, :reset, [conf, queue, counter])
end
end
def handle(_events, _measure, _meta, _conf), do: :ok
def reset(conf, queue, counter) do
:counters.put(counter, 1, 0)
Oban.resume_queue(conf.name, queue: queue)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment