Skip to content

Instantly share code, notes, and snippets.

@imranismail
Last active April 3, 2021 21:12
Show Gist options
  • Save imranismail/7c2a8e5def4c62f279adb44a6d8f99ef to your computer and use it in GitHub Desktop.
Save imranismail/7c2a8e5def4c62f279adb44a6d8f99ef to your computer and use it in GitHub Desktop.
Process native queue with backpressure using Erlang's :queue and Elixir's GenStage spawning a Process for each job in queue

Usage

defmodule MyApp.Application do
  
  def start(_type, _args) do
    ...
    
    children = [
      worker(Worker.Producer, []),
      worker(Worker.Consumer, []),
      ...
    ]
    ...
  end
end

for product <- Repo.all(Product) do
  Worker.queue({Marketplace.Amazon, :get_and_update_inventory_quantity, [product]})
end
# Elixir :queue wrapper with modified behavior for empty queues to mimic Enumerable module in Elixir
defmodule Queue do
def insert(queue, item), do: :queue.in(item, queue)
def insert_last(queue, item), do: :queue.in_r(item, queue)
def member?(queue, item), do: :queue.member(item, queue)
def filter(queue, fun), do: :queue.filter(fun, queue)
def split(queue, n) do
length = Queue.length(queue)
if length >= n do
:queue.split(n, queue)
else
:queue.split(length, queue)
end
end
def take(queue, n) do
{q1, _} = split(queue, n)
q1
end
def pop(queue) do
case :queue.out(queue) do
{{:value, item}, new_queue} -> {item, new_queue}
{:empty, new_queue} -> {nil, new_queue}
end
end
def pop_last(queue) do
case :queue.out_r(queue) do
{{:value, item}, new_queue} -> {item, new_queue}
{:empty, new_queue} -> {nil, new_queue}
end
end
def first(queue) do
unless is_empty?(queue) do
:queue.get(queue)
end
end
def last(queue) do
unless is_empty?(queue) do
:queue.get_r(queue)
end
end
def head(queue) do
unless is_empty?(queue) do
:queue.head(queue)
end
end
def tail(queue) do
if is_empty?(queue) do
queue
else
:queue.tail(queue)
end
end
def drop(queue) do
if is_empty?(queue) do
queue
else
:queue.drop(queue)
end
end
def drop_last(queue) do
if is_empty?(queue) do
queue
else
:queue.drop_r(queue)
end
end
defdelegate length(queue), to: :queue, as: :len
defdelegate from_list(queue), to: :queue
defdelegate to_list(queue), to: :queue
defdelegate is_empty?(queue), to: :queue, as: :is_empty
defdelegate is_queue?(queue), to: :queue, as: :is_queue
defdelegate join(queue1, queue2), to: :queue
defdelegate reverse(queue), to: :queue
defdelegate new(), to: :queue
end
defmodule Worker do
require Logger
alias :timer, as: Timer
def queue(job) do
Process.send(Worker.Producer, {:queue, job}, [])
end
defmodule Producer do
use GenStage
def start_link(initial \\ %{demand: 0, queue: Queue.new()}) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(state) do
{:producer, state}
end
def process_queue do
Process.send_after(self(), :process_queue, Timer.seconds(5))
end
def handle_demand(demand, state) do
process_queue()
{:noreply, [], Map.update!(state, :demand, &(&1 + demand))}
end
def handle_info(:process_queue, state) do
process_queue()
supply_jobs(state)
end
def handle_info({:queue, job}, state) do
state
|> Map.update!(:queue, &Queue.insert(&1, job))
|> supply_jobs()
end
defp supply_jobs(%{queue: queue, demand: demand}) do
{jobs, new_queue} = Queue.split(queue, demand)
jobs = Queue.to_list(jobs)
{:noreply, jobs, %{demand: demand - Enum.count(jobs), queue: queue}}
end
end
defmodule Consumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
children = [
worker(Worker.Processor, [], restart: :transient),
]
opts = [
strategy: :one_for_one,
subscribe_to: [{Worker.Producer, max_demand: 10, min_demand: 1}]
]
{:ok, children, opts}
end
end
defmodule Processor do
def start_link({module, function, arguments}) do
Logger.debug("[Worker.Processor]: Processing job #{inspect({module, function, arguments})}...")
apply(module, function, arguments)
end
def start_link(function) when is_function(function) do
Logger.debug("[Worker.Processor]: Processing job #{inspect(function)}...")
function.()
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment