Created
January 12, 2017 17:25
-
-
Save jclosure/b58fa0e82feb32b13432b56f441efb4d to your computer and use it in GitHub Desktop.
Using GenStage to create a demand-driven Message Pump with back-pressure control
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule QueueMessagePump do | |
@moduledoc """ | |
# Add to mix.exs | |
defp applications(_) do | |
[:gen_stage] | |
end | |
defp deps(_) do | |
[{:gen_stage, "~> 0.10.0"}] | |
end | |
# Start the producer | |
QueueMessagePump.start_link() | |
# Start multiple consumers | |
Printer.start_link() | |
Printer.start_link() | |
Printer.start_link() | |
Printer.start_link() | |
At this point, all consumers must have sent their demand which we were | |
not able to fulfill. Now by calling `sync_notify`, the event shall be | |
broadcasted to all consumers at once as we have buffered the demand in | |
the producer: | |
QueueMessagePump.sync_notify(:hello_world) | |
""" | |
alias Experimental.GenStage | |
use GenStage | |
@doc "Starts the message pump." | |
def start_link() do | |
GenStage.start_link(__MODULE__, :ok, name: __MODULE__) | |
end | |
@doc "Sends an event and returns only after the event is dispatched." | |
def sync_notify(event, timeout \\ 5000) do | |
GenStage.call(__MODULE__, {:notify, event}, timeout) | |
end | |
## Callbacks | |
def init(:ok) do | |
{:producer, {:queue.new, 0}, dispatcher: GenStage.DemandDispatcher} | |
end | |
def handle_call({:notify, event}, from, {queue, pending_demand}) do | |
queue = :queue.in({from, event}, queue) | |
dispatch_events(queue, pending_demand, []) | |
end | |
def handle_demand(incoming_demand, {queue, pending_demand}) do | |
dispatch_events(queue, incoming_demand + pending_demand, []) | |
end | |
defp dispatch_events(queue, 0, events) do | |
{:noreply, Enum.reverse(events), {queue, 0}} | |
end | |
defp dispatch_events(queue, demand, events) do | |
case :queue.out(queue) do | |
{{:value, {from, event}}, queue} -> | |
GenStage.reply(from, :ok) | |
dispatch_events(queue, demand - 1, [event | events]) | |
{:empty, queue} -> | |
{:noreply, Enum.reverse(events), {queue, demand}} | |
end | |
end | |
end | |
defmodule Printer do | |
alias Experimental.GenStage | |
use GenStage | |
@doc "Starts the consumer." | |
def start_link() do | |
GenStage.start_link(__MODULE__, :ok) | |
end | |
def init(:ok) do | |
# Starts a permanent subscription to the broadcaster | |
# which will automatically start requesting items. | |
{:consumer, :ok, subscribe_to: [QueueBroadcaster]} | |
end | |
def handle_events(events, _from, state) do | |
for event <- events do | |
IO.inspect {self(), event} | |
end | |
{:noreply, [], state} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment