Skip to content

Instantly share code, notes, and snippets.

@jclosure
Created January 12, 2017 17:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jclosure/b58fa0e82feb32b13432b56f441efb4d to your computer and use it in GitHub Desktop.
Save jclosure/b58fa0e82feb32b13432b56f441efb4d to your computer and use it in GitHub Desktop.
Using GenStage to create a demand-driven Message Pump with back-pressure control
defmodule QueueMessagePump do
@moduledoc """
# Add to mix.exs
defp applications(_) do
[:gen_stage]
end
defp deps(_) do
[{:gen_stage, "~> 0.10.0"}]
end
# Start the producer
QueueMessagePump.start_link()
# Start multiple consumers
Printer.start_link()
Printer.start_link()
Printer.start_link()
Printer.start_link()
At this point, all consumers must have sent their demand which we were
not able to fulfill. Now by calling `sync_notify`, the event shall be
broadcasted to all consumers at once as we have buffered the demand in
the producer:
QueueMessagePump.sync_notify(:hello_world)
"""
alias Experimental.GenStage
use GenStage
@doc "Starts the message pump."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.DemandDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
defmodule Printer do
alias Experimental.GenStage
use GenStage
@doc "Starts the consumer."
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# Starts a permanent subscription to the broadcaster
# which will automatically start requesting items.
{:consumer, :ok, subscribe_to: [QueueBroadcaster]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect {self(), event}
end
{:noreply, [], state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment