Skip to content

Instantly share code, notes, and snippets.

@vicmargar
Created February 20, 2018 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vicmargar/c28d59dda98602447aac5b17efc16774 to your computer and use it in GitHub Desktop.
Save vicmargar/c28d59dda98602447aac5b17efc16774 to your computer and use it in GitHub Desktop.
GenStage: ProducerConsumer vs Consumer Supervisor
require Logger
defmodule P do
use GenStage
@doc "Starts the broadcaster."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
defmodule SimpleClient do
def process_event(event) do
Task.start_link(fn ->
process(event)
Process.sleep(1000)
end)
end
def process({event, :q}) do
Logger.info "#{Time.utc_now()} Processing events in Q: #{inspect(event)}"
end
def process({event, :av}) do
Logger.info "#{Time.utc_now()} Processing events in AV: #{inspect(event)}"
end
end
defmodule Client do
use GenStage
import SimpleClient
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :ok}
end
def process_events([]), do: []
def process_events([event | events]) do
process(event)
Process.sleep(1000)
process_events(events)
end
def handle_events(events, _from, state) do
process_events(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
defmodule PC do
use GenStage
def start_link(provider) do
GenStage.start_link(__MODULE__, provider)
end
def init(provider) do
{:producer_consumer, provider, subscribe_to: [{P, selector: fn {_event, p} -> p == provider end}]}
end
def handle_events(events, _from, provider) do
{:noreply, events, provider}
end
end
defmodule PCS do
use ConsumerSupervisor
def start_link(provider) do
ConsumerSupervisor.start_link(__MODULE__, provider)
end
def init(provider: provider, max_demand: max_demand) do
children = [%{id: SimpleClient, restart: :transient, start: {SimpleClient, :process_event, []}}]
opts = [strategy: :one_for_one, subscribe_to: [{P, max_demand: max_demand, selector: fn {_event, p} -> p == provider end}]]
ConsumerSupervisor.init(children, opts)
end
end
defmodule GenStageTest do
def test_pc do
# producer
{:ok, _p} = P.start_link()
# producer consumers
{:ok, qpc} = PC.start_link(:q)
{:ok, avpc} = PC.start_link(:av)
# consumers
{:ok, q1} = Client.start_link()
{:ok, av1} = Client.start_link()
{:ok, av2} = Client.start_link()
GenStage.sync_subscribe(q1, to: qpc)
GenStage.sync_subscribe(av1, to: avpc)
GenStage.sync_subscribe(av2, to: avpc)
P.sync_notify({"hello 1", :q})
P.sync_notify({"hello 2", :q})
P.sync_notify({"hello 3", :q})
P.sync_notify({"hello 4", :q})
P.sync_notify({"hello 5", :av})
P.sync_notify({"hello 6", :av})
P.sync_notify({"hello 7", :av})
P.sync_notify({"hello 8", :av})
end
def test_pcs do
# producer
{:ok, _p} = P.start_link()
# consumer supervisors
{:ok, _qpc} = PCS.start_link(provider: :q, max_demand: 1)
{:ok, _avpc} = PCS.start_link(provider: :av, max_demand: 2)
P.sync_notify({"hello 1", :q})
P.sync_notify({"hello 2", :q})
P.sync_notify({"hello 3", :q})
P.sync_notify({"hello 4", :q})
P.sync_notify({"hello 5", :av})
P.sync_notify({"hello 6", :av})
P.sync_notify({"hello 7", :av})
P.sync_notify({"hello 8", :av})
end
end
@vicmargar
Copy link
Author

vicmargar commented Feb 20, 2018

Output:

The producer-consumer version works as expected with 3 concurrent processes (1 for :q, and 2 for :av)

iex(1)> GenStageTest.test_pc
[info] 13:34:48.688886 Processing events in AV: "hello 5"
[info] 13:34:48.688888 Processing events in AV: "hello 6"
[info] 13:34:48.688873 Processing events in Q: "hello 1"
[info] 13:34:49.719872 Processing events in AV: "hello 7"
[info] 13:34:49.720055 Processing events in AV: "hello 8"
[info] 13:34:49.720100 Processing events in Q: "hello 2"
[info] 13:34:50.720684 Processing events in Q: "hello 3"
[info] 13:34:51.721705 Processing events in Q: "hello 4"

The consumer supervisor version doesn't work as I'd expect.

iex(1)> GenStageTest.test_pcs
[info] 13:35:04.112282 Processing events in Q: "hello 1"
[info] 13:35:05.138644 Processing events in Q: "hello 2"
[info] 13:35:06.139409 Processing events in Q: "hello 3"
[info] 13:35:07.140738 Processing events in Q: "hello 4"
[info] 13:35:08.141492 Processing events in AV: "hello 5"
[info] 13:35:08.141649 Processing events in AV: "hello 6"
[info] 13:35:09.142459 Processing events in AV: "hello 7"
[info] 13:35:09.142633 Processing events in AV: "hello 8"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment